Media Server with GStreamer: Python-based Video Streaming, Recording, and Snapshot Solution
Keywords: gstreamer, media server, tee, python, video streaming, video recording, snapshot
Description
This project demonstrates a versatile media server implemented in Python using the powerful GStreamer multimedia framework. It handles live video streaming;on-demand video recording and snapshot capture — all in one efficient application.
This solution provides a solid foundation for custom media handling needs and highlights how modern Python bindings for GStreamer can be leveraged for flexible, hardware-accelerated media pipelines.
Test it yourself!
You can get the full code snipped right below:
Full code snippet
#!/usr/bin/env python3
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib, GObject
import sys
import threading
import time
from datetime import datetime
import os
Gst.init(None)
class GStreamerApp:
def __init__(self):
self.loop = GLib.MainLoop() # Main loop for GStreamer
self.recording = False # Flag to track if recording is in progress
self.snapshot_taking = False # Flag to track if snapshot is being taken
self.recording_ready = True # Flag to track if recording branch is ready
self.build_main_pipeline() # Build and start the main pipeline
self.build_receiving_pipeline() # Build and start the receiving pipeline
# Start the GLib main loop in a separate thread
self.loop_thread = threading.Thread(target=self.loop.run)
self.loop_thread.start()
def build_main_pipeline(self):
# Source pipeline description: generates a test video pattern and sends it over UDP
main_pipeline_description = """
videotestsrc is-live=true pattern=ball !
video/x-raw,width=1280,height=1024,framerate=30/1 !
nvvidconv !
nvv4l2h264enc bitrate=4000000 !
h264parse !
rtph264pay config-interval=1 pt=96 !
udpsink host=10.42.0.141 port=5000
"""
self.pipeline = Gst.parse_launch(main_pipeline_description) # Parse and create the GStreamer pipeline
if not self.pipeline:
raise Exception("Failed to create main pipeline")
self.pipeline.set_state(Gst.State.PLAYING) # Start the pipeline
def build_receiving_pipeline(self):
# Receiving pipeline description: receives RTP H264 over UDP, decodes it, and branches with tee for snapshot and recording
snapshot_pipeline_description = """
udpsrc name=udpsrc port=5000 caps=application/x-rtp,encoding-name=H264,payload=96 !
rtph264depay name=depay ! queue ! h264parse ! queue ! avdec_h264 ! videoconvert !
tee name=t
t. ! queue ! autovideosink sync=false
t. ! queue ! valve name=snapshot_valve drop=false ! nvjpegenc !
appsink name=snapshot_sink emit-signals=true sync=false
t. ! queue ! valve name=record_valve drop=false ! nvvidconv !
nvv4l2h264enc bitrate=4000000 ! h264parse ! queue !
splitmuxsink name="record_sink" max-size-time=0 max-size-bytes=0 max-files=0
"""
self.recv_pipeline = Gst.parse_launch(snapshot_pipeline_description) # Parse and create the receiving pipeline
if not self.recv_pipeline:
raise Exception("Failed to create receiving pipeline")
self.snapshot_valve = self.recv_pipeline.get_by_name("snapshot_valve") # Get the snapshot valve element for future control
self.snapshot_sink = self.recv_pipeline.get_by_name("snapshot_sink") # Get the snapshot sink element to connect to new-sample signal
self.record_valve = self.recv_pipeline.get_by_name("record_valve") # Get the recording valve element for future control
self.record_sink = self.recv_pipeline.get_by_name("record_sink") # Get the recording sink element to connect to format-location signal
if not self.snapshot_valve or not self.snapshot_sink:
raise Exception("Could not find snapshot valve or sink in receiving pipeline")
if not self.record_valve or not self.record_sink:
raise Exception("Recording elements not found")
self.snapshot_sink.connect("new-sample", self._on_new_sample) # Connect to new-sample signal for snapshot handling
self.record_sink.connect("format-location", self._format_location) # Connect to format-location signal for dynamic filename generation
self.recv_pipeline.set_state(Gst.State.PLAYING) # Start the receiving pipeline
print("Pipeline started.")
GLib.timeout_add(10000, self._close_snapshot_valve_after_start)# FIXME: Adjust timeout as needed. Maybe use a pad probe to trigger close.
def _close_snapshot_valve_after_start(self):
self.snapshot_valve.set_property("drop", True) # Close the snapshot valve to stop buffer flow to snapshot branch after warming up
print("Snapshot branch warmed up and valve closed.")
return False
def take_snapshot(self):
if self.snapshot_taking: # Verify if snapshot is already in progress
print("Snapshot already in progress.")
return
print("Triggering snapshot...")
self.snapshot_taking = True # Set flag to indicate snapshot is being taken
self.snapshot_valve.set_property("drop", False) # Open snapshot valve to allow snapshot branch to pass data
GLib.timeout_add(200, self._stop_snapshot) # Close snapshot valve after 200ms to allow snapshot to be taken
def _on_new_sample(self, sink):
sample = sink.emit("pull-sample") # Get the new sample from the appsink
if sample:
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
data = map_info.data
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.snapshot_filename = f"snapshot_{timestamp}.jpg" # Format timestamp for filename
with open(self.snapshot_filename, "wb") as f:
f.write(data) # Write the snapshot data to a file
buffer.unmap(map_info)
else:
print("Failed to map buffer")
self.snapshot_taking = False # Reset snapshot taking flag
return Gst.FlowReturn.OK
def _stop_snapshot(self):
self.snapshot_valve.set_property("drop", True) # Close the snapshot valve to stop passing data to snapshot branch
print("Snapshot complete.")
print(f"📸 Snapshot saved as: {self.snapshot_filename}")
return False
def start_recording(self):
if self.recording: # If already recording, do not start again
print("Already recording.")
return
if self.recording_ready: # If recording branch is ready, reset the recording state to adjust for new recording file
self.recording_ready = False # Reset recording ready state
self.record_valve.set_property("drop", True) # Close the recording valve to stop passing data to recording branch and allow for location change
self.record_sink.emit("split-now") # Emit split-now to finalize any previous recording and dynamically generate a new filename
print("🎥 Starting recording...")
self.record_valve.set_property("drop", False) # Open the recording valve to allow recording branch to pass data
self.recording = True # Set recording flag to indicate recording is in progress
def stop_recording(self):
if not self.recording: # If not currently recording, do not stop
print("Not currently recording...")
return
print("Stopping recording...")
print("Emitting split-now to finalize MP4...")
self.recording = False # Reset recording flag to indicate recording has stopped
self.recording_ready = True # Set recording ready state to allow for new recording
self.record_valve.set_property("drop", True) # Close the recording valve to stop passing data to recording branch
self.record_sink.emit("split-now") # Emit split-now to finalize the current recording and prepare for a new one
self.record_valve.set_property("drop", False) # Reopen the recording valve to allow for new recordings
def _format_location(self, splitmux, fragment_id):
if self.recording: # If recording is in progress, generate a dynamic filename if not, dump to /dev/null
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"record_{timestamp}.mp4" # Format timestamp for filename
print(f"Generating filename: {filename}")
return filename
else:
return "/dev/null" # Dump to /dev/null if not recording
def shutdown(self):
self.pipeline.set_state(Gst.State.NULL) # Stop the main pipeline
if self.recording: # If recording is in progress, stop it
self.stop_recording()
if self.snapshot_taking: # If snapshot is being taken, stop it
self.snapshot_valve.set_property("drop",True)
self.loop.quit() # Stop the GLib main loop
self.loop_thread.join() # Wait for the main loop thread to finish
print("Shutdown complete.")
def main():
app = GStreamerApp()
print("WORKING VERSION: 1.0")
time.sleep(15) # Allow time for pipeline to start before enabling commands.
print("Commands: 'r' = start recording, 's' = stop recording, 'p' = snapshot, 'q' = quit")
try:
while True:
cmd = input("Enter command: ").strip().lower()
if cmd == 'r':
app.start_recording()
elif cmd == 's':
app.stop_recording()
elif cmd == 'p':
app.take_snapshot()
elif cmd == 'q':
break
else:
print("Unknown command.")
except KeyboardInterrupt:
pass
finally:
app.shutdown()
if __name__ == "__main__":
main()
To run it, execute the following command on console:
python3 main.py
You should see the following log messages when starting the application:
CURRENT DIRECTORY: /home/nico/MP4_SS_Freeze
libEGL warning: DRI3: failed to query the version
Opening in BLOCKING MODE
NvMMLiteOpen : Block : BlockType = 4
===== NvVideo: NVENC =====
NvMMLiteBlockCreate : Block : BlockType = 4
H264: Profile = 66 Level = 0
NVMEDIA: Need to set EMC bandwidth : 846000
NvVideo: bBlitMode is set to TRUE
Opening in BLOCKING MODE
Pipeline started.
WORKING VERSION: 1.0
NvMMLiteOpen : Block : BlockType = 4
===== NvVideo: NVENC =====
NvMMLiteBlockCreate : Block : BlockType = 4
NvMMLiteBlockCreate : Block : BlockType = 1
H264: Profile = 66 Level = 0
NVMEDIA: Need to set EMC bandwidth : 846000
NvVideo: bBlitMode is set to TRUE
Snapshot branch warmed up and valve closed.
Give the application time to start the pipeline. When the application is ready for use it will display the following message:
Commands: 'r' = start recording, 's' = stop recording, 'p' = snapshot, 'q' = quit
Enter command:
Usage:
- r: Starts recording.
- s: Stops recording.
- p: Take a snapshot.
- q: Quit the application.
Now you can start testing taking video recording and snapshots on-demand!
Basics
GStreamer
GStreamer is an open-source multimedia framework ideal for creating complex media-handling pipelines. It supports video capture, encoding, streaming, and recording with hardware acceleration on many platforms.
- Modular architecture with reusable elements (sources, filters, encoders, sinks)
- Pipeline design enables flexible data routing and processing
- Support for hardware-accelerated encoding (e.g., NVIDIA’s NVENC)
- Built-in elements like tee to branch streams efficiently
For more information about GStreamer visit this page: GStreamer Application development.
GObject Introspection and Python Bindings
This project uses GObject Introspection via the gi.repository Python package to interact with GStreamer and related GLib libraries. This provides dynamic access to C libraries from Python, enabling powerful multimedia and event-driven programming capabilities.
gi.repository is part of the GObject Introspection infrastructure. It allows Python programs to dynamically use libraries written in C that follow the GObject system, such as GStreamer (Gst), GLib, and GObject itself. Different from running GStreamer on the command line, it enables:
Dynamic bindings: No need for manual wrapper generation; you get real-time access to the C APIs.
Access to GStreamer and GLib: Enables building and controlling multimedia pipelines and event loops directly from Python.
Tee element
In GStreamer, the tee element is used to split a single incoming data stream into multiple pipelines. The tee duplicates the stream to multiple outputs without the need to decode and re-encode each branch separately. This enables simultaneous processing of the same incoming data stream on different branches — allowing for video recording, streaming, snapshots and more.
Take, for example, the following pipelines: using the tee element on the command line to separate the incoming data stream into three branches:
- Video stream source: This pipeline is using videotestsrc for creating a live video stream and then send it via UDP to a port.
gst-launch-1.0 videotestsrc is-live=true ! nvvidconv ! nvv4l2h264enc bitrate=4000000 ! h264parse ! rtph264pay config-interval=1 pt=96 ! udpsink host=10.42.0.141 port=5000
- Receiving pipeline: This pipeline receives the incoming data stream from UDP and separates it into three branches:
- Video preview: Using autovideosink.
- Saving snapshots: Using nvjpegenc.
- Video recording: Using filesink.
gst-launch-1.0 -v udpsrc port=5000 caps="application/x-rtp,media=video,encoding-name=H264,payload=96" ! rtph264depay ! tee name=t t. ! queue ! avdec_h264 ! videoconvert ! autovideosink sync=false t. ! queue ! h264parse ! avdec_h264 ! videoconvert ! nvjpegenc ! multifilesink location="snapshot_%d.jpg" max-files=1 t. ! queue ! h264parse ! mp4mux streamable=true fragment-duration=1000 ! filesink location=record.mp4
With this two pipelines we have a simple usage of the tee element for receiving an incoming video stream, separate it into branches and handled the data separately but simultaneously; but there is a catch! When running GStreamer pipelines on command line is not possible to dynamically manipulate the pipeline, hence, why we need to create an App to enable the full power of the tee element! Refer to the next section for a more detailed explanation on how to dynamically use the tee element and harness the full power of this GStreamer element.
Code architecture
The following section dives deep into the implementation of each important code block needed to get a basic media server application with snapshots, video recording and preview video streaming. It will be divided into this sections:
- Building the source pipeline.
- Building the receiving pipeline.
- Building On-demand Snapshot functionality.
- Building On-demand video recording.
- Closing the application.
- Building User interface.
Section 1: Building the source pipeline
- Import necessary libraries:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib, GObject # Import GStreamer python libraries for media handling.
import sys
import threading
import time
from datetime import datetime
import os
Gst.init(None) # Initialize Gst
- TheGStreamerApp class purpose is to handle everything related with the Media server functionality. In its __init__ method we initialize the main pipeline(may be referred as source pipeline), the receiving pipeline, and control variables for the main routines in the application.
def __init__(self):
self.loop = GLib.MainLoop() # Main loop for GStreamer
self.recording = False # Flag to track if recording is in progress
self.snapshot_taking = False # Flag to track if snapshot is being taken
self.recording_ready = True # Flag to track if recording branch is ready
self.build_main_pipeline() # Build and start the main pipeline
self.build_receiving_pipeline() # Build and start the receiving pipeline
# Start the GLib main loop in a separate thread
self.loop_thread = threading.Thread(target=self.loop.run)
self.loop_thread.start()
- Define build_main_pipeline method.
def build_main_pipeline(self):
# Source pipeline: generates a test video pattern and sends it over UDP
main_pipeline_description = """
videotestsrc is-live=true pattern=ball !
video/x-raw,width=1280,height=1024,framerate=30/1 !
nvvidconv !
nvv4l2h264enc bitrate=4000000 !
h264parse !
rtph264pay config-interval=1 pt=96 !
udpsink host=10.42.0.141 port=5000
"""
self.pipeline = Gst.parse_launch(main_pipeline_description) # Parse and create the GStreamer pipeline
if not self.pipeline:
raise Exception("Failed to create main pipeline")
self.pipeline.set_state(Gst.State.PLAYING) # Start the pipeline
Section 2: Building the receiving pipeline
- Define build_receiving_pipeline method.
def build_receiving_pipeline(self):
# Receiving pipeline description: receives RTP H264 over UDP, decodes it, and branches with tee for snapshot and recording
snapshot_pipeline_description = """
udpsrc name=udpsrc port=5000 caps=application/x-rtp,encoding-name=H264,payload=96 !
rtph264depay name=depay ! queue ! h264parse ! queue ! avdec_h264 ! videoconvert !
tee name=t
t. ! queue ! autovideosink sync=false
t. ! queue ! valve name=snapshot_valve drop=false ! nvjpegenc !
appsink name=snapshot_sink emit-signals=true sync=false
t. ! queue ! valve name=record_valve drop=false ! nvvidconv !
nvv4l2h264enc bitrate=4000000 ! h264parse ! queue !
splitmuxsink name="record_sink" max-size-time=0 max-size-bytes=0 max-files=0
"""
self.recv_pipeline = Gst.parse_launch(snapshot_pipeline_description) # Parse and create the receiving pipeline
if not self.recv_pipeline:
raise Exception("Failed to create receiving pipeline")
self.snapshot_valve = self.recv_pipeline.get_by_name("snapshot_valve") # Get the snapshot valve element for future control
self.snapshot_sink = self.recv_pipeline.get_by_name("snapshot_sink") # Get the snapshot sink element to connect to new-sample signal
self.record_valve = self.recv_pipeline.get_by_name("record_valve") # Get the recording valve element for future control
self.record_sink = self.recv_pipeline.get_by_name("record_sink") # Get the recording sink element to connect to format-location signal
if not self.snapshot_valve or not self.snapshot_sink:
raise Exception("Could not find snapshot valve or sink in receiving pipeline")
if not self.record_valve or not self.record_sink:
raise Exception("Recording elements not found")
self.snapshot_sink.connect("new-sample", self._on_new_sample) # Connect to new-sample signal for snapshot handling
self.record_sink.connect("format-location", self._format_location_cb) # Connect to format-location signal for dynamic filename generation
self.recv_pipeline.set_state(Gst.State.PLAYING) # Start the receiving pipeline
print("Pipeline started.")
GLib.timeout_add(10000, self._close_snapshot_valve_after_start)# FIXME: Adjust timeout as needed. Maybe use a pad probe to trigger close.
Key Aspects:
- New-sample signal and Format-location signal allows us to manipulate dynamically GStreamer elements during runtime.
- GLib.timeout_add(10000, self._close_snapshot_valve_after_start) method is used for giving the pipeline time to start and negotiate caps accordingly before exposing media server functionalities to the user.
- Implement _close_snapshot_valve_after_start helper function. It closes the valve for the snapshot branch to prevent more buffer coming in into the appsink element. Note: Recording branch has its own way of initializing. Check recording branch implementation.
def _close_snapshot_valve_after_start(self):
self.snapshot_valve.set_property("drop", True) # Close the snapshot valve to stop buffer flow to snapshot branch after warming up
print("Snapshot branch warmed up and valve closed.")
return False
Section 3: Build On-demand snapshots functionality
- Implement take_snapshot method for taking snapshots on demand.
def take_snapshot(self):
if self.snapshot_taking: # Verify if snapshot is already in progress
print("Snapshot already in progress.")
return
print("Triggering snapshot...")
self.snapshot_taking = True # Set flag to indicate snapshot is being taken
self.snapshot_valve.set_property("drop", False) # Open snapshot valve to allow snapshot branch to pass data
GLib.timeout_add(200, self._stop_snapshot) # Close snapshot valve after 200ms to allow snapshot to be taken
- Implement _on_new_sample method to pull the buffer and save it into a file. This method is the one triggered when a new buffer reaches the snapshot sink and the new-sample signal is sent.
def _on_new_sample(self, sink):
sample = sink.emit("pull-sample") # Get the new sample from the appsink
if sample:
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
data = map_info.data
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.snapshot_filename = f"snapshot_{timestamp}.jpg" # Format timestamp for filename
with open(self.snapshot_filename, "wb") as f:
f.write(data) # Write the snapshot data to a file
buffer.unmap(map_info)
else:
print("Failed to map buffer")
self.snapshot_taking = False # Reset snapshot taking flag
return Gst.FlowReturn.OK
- Implement the _stop_snapshot method.
def _stop_snapshot(self):
self.snapshot_valve.set_property("drop", True) # Close the snapshot valve to stop passing data to snapshot branch
print("Snapshot complete.")
print(f"📸 Snapshot saved as: {self.snapshot_filename}")
return False
Section 4: Build On-demand video recording functionality
- Implement start_recording method.
def start_recording(self):
if self.recording: # If already recording, do not start again
print("Already recording.")
return
if self.recording_ready: # If recording branch is ready, reset the recording state to adjust for new recording file
self.recording_ready = False # Reset recording ready state
self.record_valve.set_property("drop", True) # Close the recording valve to stop passing data to recording branch and allow for location change
self.record_sink.emit("split-now") # Emit split-now to finalize any previous recording and dynamically generate a new filename
print("🎥 Starting recording...")
self.record_valve.set_property("drop", False) # Open the recording valve to allow recording branch to pass data
self.recording = True # Set recording flag to indicate recording is in progress
- Implement stop_recording method.
def stop_recording(self):
if not self.recording: # If not currently recording, do not stop
print("Not currently recording...")
return
print("Stopping recording...")
print("Emitting split-now to finalize MP4...")
self.recording = False # Reset recording flag to indicate recording has stopped
self.recording_ready = True # Set recording ready state to allow for new recording
self.record_valve.set_property("drop", True) # Close the recording valve to stop passing data to recording branch
self.record_sink.emit("split-now") # Emit split-now to finalize the current recording and prepare for a new one
self.record_valve.set_property("drop", False) # Reopen the recording valve to allow for new recordings
- Implement _format_location helper function. This method is the one triggered when a new buffer reaches the record sink and the format-location signal is triggered. It can also receive a signal triggered manually by the user. By switching state variables like self.recording and emitting a format location signal is how the file location for the video recording is dynamically changed. Allowing for on-demand video recording.
def _format_location(self, splitmux, fragment_id):
if self.recording: # If recording is in progress, generate a dynamic filename if not, dump to /dev/null
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"record_{timestamp}.mp4" # Format timestamp for filename
print(f"Generating filename: {filename}")
return filename
else:
return "/dev/null" # Dump to /dev/null if not recording
Section 5: Shutdown application
After using the application, it is needed to close it gracefully to avoid any crashes.
- Implement shutdown method.
def shutdown(self):
self.pipeline.set_state(Gst.State.NULL) # Stop the main pipeline
if self.recording: # If recording is in progress, stop it
self.stop_recording()
if self.snapshot_taking: # If snapshot is being taken, stop it
self.snapshot_valve.set_property("drop",True)
self.loop.quit() # Stop the GLib main loop
self.loop_thread.join() # Wait for the main loop thread to finish
print("Shutdown complete.")
Section 6: Building user interface
A simple menu is created for triggering snapshots and recordings on demand, as well as closing the application gracefully.
- Implement main function to expose the media server functionalities.
def main():
app = GStreamerApp()
print("WORKING VERSION: 1.0")
time.sleep(15) # Allow time for pipeline to start before enabling commands.
print("Commands: 'r' = start recording, 's' = stop recording, 'p' = snapshot, 'q' = quit")
try:
while True:
cmd = input("Enter command: ").strip().lower()
if cmd == 'r':
app.start_recording()
elif cmd == 's':
app.stop_recording()
elif cmd == 'p':
app.take_snapshot()
elif cmd == 'q':
break
else:
print("Unknown command.")
except KeyboardInterrupt:
pass
finally:
app.shutdown()
if __name__ == "__main__":
main()
🏗 Need a Solution for Your Project?
Are you looking for ways to:
✅ Optimize your embedded system for better performance?
✅ Integrate AI and computer vision into your products?
✅ Improve multimedia processing for real-time applications?
✅ Develop a robust and scalable web platform?
Our team has helped businesses across multiple industries solve these challenges.
📩 Let’s collaborate! Contact us at [support@proventusnova.com](mailto:support@proventusnova.com) or visit [ProventusNova.com](https://proventusnova.com) to discuss your project. -->