From ProventusNova DeveloperWiki

How to Build a Video Capture Application Using GStreamer and OpenCV in Python

This guide explains how to create a basic video capture application using **GStreamer** and **OpenCV** in Python. The application captures video from a camera using GStreamer and processes it with OpenCV.

Full Script

For those who just want the code, here is the complete script:

import gi
import cv2
import numpy as np
import sys

gi.require_version("Gst", "1.0")
gi.require_version("GLib", "2.0")
from gi.repository import Gst, GLib

class Camera:
    def __init__(self, width=640, height=480, fps=30):
        """Initialize the GStreamer pipeline and elements."""
        Gst.init(None)  # Initialize GStreamer

        self.width = width
        self.height = height
        self.fps = fps
        self.frame = None  # Placeholder for the latest frame
        self.running = True  # Flag to track if the loop should run

        # Define GStreamer pipeline
        self.pipeline_str = (
            f"v4l2src device=/dev/video6 ! "
            f"image/jpeg, width={self.width}, height={self.height}, framerate={self.fps}/1 ! "
            "jpegdec ! "
            "videoconvert ! "
            "videoscale ! "
            f"video/x-raw,format=BGR,width={self.width},height={self.height} ! "
            "appsink name=sink emit-signals=true max-buffers=1 drop=true"
        )

        # Create pipeline
        self.pipeline = Gst.parse_launch(self.pipeline_str)
        self.appsink = self.pipeline.get_by_name("sink")

        if not self.appsink:
            print("Error: Could not find appsink element.")
            sys.exit(1)

        # Connect new-sample signal
        self.appsink.connect("new-sample", self.on_new_sample)

        # Start pipeline
        self.pipeline.set_state(Gst.State.PLAYING)

        # Start GLib main loop
        self.loop = GLib.MainLoop()

        # Add a timer to update OpenCV display
        GLib.timeout_add(30, self.update_display)

    def on_new_sample(self, sink):
        """Callback function for processing new samples."""
        sample = sink.emit("pull-sample")
        if sample is None:
            return Gst.FlowReturn.ERROR

        buffer = sample.get_buffer()
        success, map_info = buffer.map(Gst.MapFlags.READ)
        if not success:
            return Gst.FlowReturn.ERROR

        # Convert buffer to NumPy array
        frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
        buffer.unmap(map_info)

        try:
            self.frame = frame_data.reshape((self.height, self.width, 3))
        except ValueError:
            print("Frame size mismatch, skipping frame.")

        return Gst.FlowReturn.OK

    def update_display(self):
        """Update OpenCV display and check if the window is closed."""
        if self.frame is not None:
            cv2.imshow("GStreamer Video", self.frame)
            
            # Check if OpenCV window is closed
            if cv2.getWindowProperty("GStreamer Video", cv2.WND_PROP_VISIBLE) < 1:
                self.stop()
                return False  # Stop updating

            if cv2.waitKey(1) & 0xFF == ord("q"):
                self.stop()
                return False  # Stop updating

        return True  # Continue updating

    def start(self):
        """Run the GStreamer pipeline loop."""
        try:
            self.loop.run()
        except KeyboardInterrupt:
            self.stop()

    def stop(self):
        """Stop the GStreamer pipeline and cleanup."""
        if self.running:
            self.running = False
            print("Stopping camera...")
            self.pipeline.set_state(Gst.State.NULL)
            cv2.destroyAllWindows()
            self.loop.quit()

if __name__ == "__main__":
    camera = Camera(width=640, height=480, fps=30)
    camera.start()

Step-by-Step Guide

1. Install Dependencies

Before running the script, make sure you have the required dependencies installed.

Run the following command to install them:

pip install opencv-python numpy pygobject
sudo apt install gstreamer1.0-tools gstreamer1.0-plugins-base gstreamer1.0-plugins-good

2. Understand the GStreamer Pipeline

The script uses the following GStreamer pipeline:

v4l2src device=/dev/video6 ! image/jpeg, width=640, height=480, framerate=30/1 ! 
jpegdec ! videoconvert ! videoscale ! video/x-raw,format=BGR,width=640,height=480 ! appsink

- **v4l2src** → Captures video from a device (e.g., `/dev/video6`). - **jpegdec** → Decodes JPEG frames. - **videoconvert** → Converts to a format compatible with OpenCV. - **appsink** → Sends frames to OpenCV for further processing.

3. Run the Script

Save the script as `camera.py` and run it:

python camera.py

Press **'q'** or close the OpenCV window to stop the program.

4. Modify for Your Camera

- To use a different camera, change the `device=/dev/video6` in the script. - Modify `width`, `height`, and `fps` in the `Camera` class.

FAQ

How can I change the camera source?

Modify the `v4l2src` device in the pipeline:

self.pipeline_str = "v4l2src device=/dev/video0 ! ..."

Use `/dev/video0`, `/dev/video1`, etc., depending on your setup.

Why is the video not displaying?

Possible issues: 1. **Camera is in use** → Close other applications using the camera. 2. **Wrong device path** → Check available devices using:

   v4l2-ctl --list-devices

3. **Missing dependencies** → Ensure all required GStreamer plugins are installed.

How do I exit the application?

Press **'q'** or close the OpenCV window to stop the script.

How can I improve performance?

- Reduce `width` and `height` for better speed. - Set `drop=true` in `appsink` to avoid frame buffering delays.

Conclusion

You've now built a basic **video capture application** using **GStreamer** and **OpenCV** in Python! 🚀 Modify the script to suit your needs, such as adding **filters**, **recording**, or **streaming**.

If you have any questions, feel free to ask in the comments. Happy coding! 🎥🖥️