How to Build a Video Capture Application Using GStreamer and OpenCV in Python
This guide explains how to create a basic video capture application using **GStreamer** and **OpenCV** in Python. The application captures video from a camera using GStreamer and processes it with OpenCV.
Full Script
For those who just want the code, here is the complete script:
import gi
import cv2
import numpy as np
import sys
gi.require_version("Gst", "1.0")
gi.require_version("GLib", "2.0")
from gi.repository import Gst, GLib
class Camera:
def __init__(self, width=640, height=480, fps=30):
"""Initialize the GStreamer pipeline and elements."""
Gst.init(None) # Initialize GStreamer
self.width = width
self.height = height
self.fps = fps
self.frame = None # Placeholder for the latest frame
self.running = True # Flag to track if the loop should run
# Define GStreamer pipeline
self.pipeline_str = (
f"v4l2src device=/dev/video6 ! "
f"image/jpeg, width={self.width}, height={self.height}, framerate={self.fps}/1 ! "
"jpegdec ! "
"videoconvert ! "
"videoscale ! "
f"video/x-raw,format=BGR,width={self.width},height={self.height} ! "
"appsink name=sink emit-signals=true max-buffers=1 drop=true"
)
# Create pipeline
self.pipeline = Gst.parse_launch(self.pipeline_str)
self.appsink = self.pipeline.get_by_name("sink")
if not self.appsink:
print("Error: Could not find appsink element.")
sys.exit(1)
# Connect new-sample signal
self.appsink.connect("new-sample", self.on_new_sample)
# Start pipeline
self.pipeline.set_state(Gst.State.PLAYING)
# Start GLib main loop
self.loop = GLib.MainLoop()
# Add a timer to update OpenCV display
GLib.timeout_add(30, self.update_display)
def on_new_sample(self, sink):
"""Callback function for processing new samples."""
sample = sink.emit("pull-sample")
if sample is None:
return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
return Gst.FlowReturn.ERROR
# Convert buffer to NumPy array
frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
buffer.unmap(map_info)
try:
self.frame = frame_data.reshape((self.height, self.width, 3))
except ValueError:
print("Frame size mismatch, skipping frame.")
return Gst.FlowReturn.OK
def update_display(self):
"""Update OpenCV display and check if the window is closed."""
if self.frame is not None:
cv2.imshow("GStreamer Video", self.frame)
# Check if OpenCV window is closed
if cv2.getWindowProperty("GStreamer Video", cv2.WND_PROP_VISIBLE) < 1:
self.stop()
return False # Stop updating
if cv2.waitKey(1) & 0xFF == ord("q"):
self.stop()
return False # Stop updating
return True # Continue updating
def start(self):
"""Run the GStreamer pipeline loop."""
try:
self.loop.run()
except KeyboardInterrupt:
self.stop()
def stop(self):
"""Stop the GStreamer pipeline and cleanup."""
if self.running:
self.running = False
print("Stopping camera...")
self.pipeline.set_state(Gst.State.NULL)
cv2.destroyAllWindows()
self.loop.quit()
if __name__ == "__main__":
camera = Camera(width=640, height=480, fps=30)
camera.start()
Step-by-Step Guide
1. Install Dependencies
Before running the script, make sure you have the required dependencies installed.
Run the following command to install them:
pip install opencv-python numpy pygobject
sudo apt install gstreamer1.0-tools gstreamer1.0-plugins-base gstreamer1.0-plugins-good
2. Understand the GStreamer Pipeline
The script uses the following GStreamer pipeline:
v4l2src device=/dev/video6 ! image/jpeg, width=640, height=480, framerate=30/1 !
jpegdec ! videoconvert ! videoscale ! video/x-raw,format=BGR,width=640,height=480 ! appsink
- **v4l2src** → Captures video from a device (e.g., `/dev/video6`). - **jpegdec** → Decodes JPEG frames. - **videoconvert** → Converts to a format compatible with OpenCV. - **appsink** → Sends frames to OpenCV for further processing.
3. Run the Script
Save the script as `camera.py` and run it:
python camera.py
Press **'q'** or close the OpenCV window to stop the program.
4. Modify for Your Camera
- To use a different camera, change the `device=/dev/video6` in the script. - Modify `width`, `height`, and `fps` in the `Camera` class.
FAQ
How can I change the camera source?
Modify the `v4l2src` device in the pipeline:
self.pipeline_str = "v4l2src device=/dev/video0 ! ..."
Use `/dev/video0`, `/dev/video1`, etc., depending on your setup.
Why is the video not displaying?
Possible issues: 1. **Camera is in use** → Close other applications using the camera. 2. **Wrong device path** → Check available devices using:
v4l2-ctl --list-devices
3. **Missing dependencies** → Ensure all required GStreamer plugins are installed.
How do I exit the application?
Press **'q'** or close the OpenCV window to stop the script.
How can I improve performance?
- Reduce `width` and `height` for better speed. - Set `drop=true` in `appsink` to avoid frame buffering delays.
Conclusion
You've now built a basic **video capture application** using **GStreamer** and **OpenCV** in Python! 🚀 Modify the script to suit your needs, such as adding **filters**, **recording**, or **streaming**.
If you have any questions, feel free to ask in the comments. Happy coding! 🎥🖥️