Skip to content

Handling Media Files

Pyspark provides several APIs to deal with image, audio, and video files. In this article we will discuss some ways to handle these files in PySpark.

Basic Features

It just basic ways to handle these files. Depending on the specific use case, you may need to perform additional operations such as resizing images, extracting audio features, or processing video frames.

Image Files

from pyspark.ml.image import ImageSchema
from PIL import Image

# Read image file
image = Image.open("path/to/image.jpg")

# Convert to PySpark DataFrame
df = ImageSchema.readImages("path/to/image.jpg")

Audio Files

from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
from pydub import AudioSegment

# Define a UDF to read audio file
@udf(returnType=BinaryType())
def read_audio_file(path):
    audio = AudioSegment.from_file(path)
    return audio.export(format="wav").read()

# Read audio file and convert to PySpark DataFrame
df = (
    spark.read.format("binaryFile")
        .load("path/to/audio.mp3")
        .selectExpr("path", "read_audio_file(content) as audio_data")
)

Video Files

import cv2
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType

# Define a UDF to read video file
@udf(returnType=BinaryType())
def read_video_file(path):
    cap = cv2.VideoCapture(path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frames = []
    for i in range(frame_count):
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

# Read video file and convert to PySpark DataFrame
df = (
    spark.read.format("binaryFile")
        .load("path/to/video.mp4")
        .selectExpr("path", "read_video_file(content) as video_data")
)

Additional Features provided by PySpark API

In addition to reading and converting image, audio, and video files to PySpark DataFrames, there are several other operations that you can perform on these files in PySpark.

Image Files

  • Resize images
from pyspark.ml.image import ImageSchema
from PIL import Image

# Read image file
image = Image.open("path/to/image.jpg")

# Resize image
resized_image = image.resize((224, 224))

# Convert to PySpark DataFrame
df = ImageSchema.readImages("path/to/image.jpg")
  • Convert images to different formats
from pyspark.ml.image import ImageSchema
from PIL import Image

# Read image file
image = Image.open("path/to/image.jpg")

# Convert to PNG format
image.save("path/to/image.png")

# Convert to PySpark DataFrame
df = ImageSchema.readImages("path/to/image.png")
  • Extract image features
from pyspark.ml.image import ImageSchema
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input

# Read image file
df = ImageSchema.readImages("path/to/image.jpg")

# Load pre-trained VGG16 model
model = VGG16(weights="imagenet", include_top=False)

# Preprocess input image
df = df.select("image.origin", preprocess_input("image.data").alias("features"))

# Extract image features
df = model.transform(df)

Audio Files

  • Extract audio features
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyAudioAnalysis import audioFeatureExtraction

# Define a UDF to extract MFCC features from audio file
@udf(returnType=ArrayType(DoubleType()))
def extract_mfcc_features(audio_data):
    return audioFeatureExtraction.stFeatureExtraction(
        audio_data, 44100, 44100, 0.050*44100, 0.025*44100
    )[0].tolist()

# Read audio file and convert to PySpark DataFrame
df = (
    spark.read.format("binaryFile")
        .load("path/to/audio.wav")
        .selectExpr("path", "content")
)

# Extract MFCC features
df = df.select("path", extract_mfcc_features("content").alias("features"))
  • Convert audio files to different formats
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
from pydub import AudioSegment

# Define a UDF to convert audio file to MP3 format
@udf(returnType=BinaryType())
def convert_to_mp3(audio_data):
    audio = AudioSegment.from_file(audio_data, format="wav")
    return audio.export(format="mp3").read()

# Read audio file and convert to PySpark DataFrame
df = spark.read.format("binaryFile").load("path/to/audio.wav").selectExpr("path", "content")

# Convert to MP3 format
df = df.select("path", convert_to_mp3("content").alias("audio_data"))
  • Remove noise from audio files

You can use techniques such as bandpass filtering, low-pass filtering, or high-pass filtering to remove noise from audio files.

Video Files

  • Extract video frames
import cv2
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, BinaryType

# Define a UDF to extract video frames from video file
@udf(returnType=ArrayType(BinaryType()))
def extract_video_frames(video_data):
    cap = cv2.VideoCapture(video_data)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = np.asarray(frame)
        frames.append(frame.tobytes())
    return frames

# Read video file and convert to PySpark DataFrame
df = spark.read.format("binaryFile").load("path/to/video.mp4").selectExpr("path", "content")

# Extract video frames
df = df.select("path", extract_video_frames("content").alias("frames"))
  • Apply video filters
import cv2
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
from PIL import Image, ImageFilter

# Define a UDF to apply a Gaussian blur filter to video frames
@udf(returnType=BinaryType())
def apply_gaussian_blur(frame_data):
    # Convert bytes to NumPy array
    frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((480, 640, 3))

    # Apply Gaussian blur filter
    img = Image.fromarray(frame)
    img = img.filter(ImageFilter.GaussianBlur(radius=5))
    frame = np.asarray(img)

    # Convert back to bytes
    return frame.tobytes()

# Read video file and convert to PySpark DataFrame
df = spark.read.format("binaryFile").load("path/to/video.mp4").selectExpr("path", "content")

# Apply Gaussian blur filter to video frames
df = df.select("path", apply_gaussian_blur("content").alias("frame_data"))
  • Perform object detection
import cv2
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, BinaryType
from tensorflow.keras.models import load_model

# Load pre-trained object detection model
model = load_model("path/to/object_detection_model.h5")

# Define a UDF to perform object detection on video frames
@udf(returnType=ArrayType(BinaryType()))
def perform_object_detection(frame_data):
    # Convert bytes to NumPy array
    frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((480, 640, 3))

    # Perform object detection
    detections = model.detect(frame)

    # Draw bounding boxes on the frame
    for detection in detections:
        x, y, w, h = detection["box"]
        cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)

    # Convert back to bytes
    return frame.tobytes()

# Read video file and convert to PySpark DataFrame
df = spark.read.format("binaryFile").load("path/to/video.mp4").selectExpr("path", "content")

# Perform object detection on video frames
df = df.select("path", perform_object_detection("content").alias("frame_data"))

Read Mores

  • https://blog.devgenius.io/handling-media-files-in-pyspark-image-audio-video-files-8e3bcd7a5c4e