I’ve written the following Python script to stream video from my Jetson’s IMX477 CSI camera. It uses GStreamer to show the video locally and simultaneously stream
#!/usr/bin/env python3
import gi
import signal
import sys
import time
gi.require_version(‘Gst’, ‘1.0’)
gi.require_version(‘GstVideo’, ‘1.0’)
from gi.repository import Gst, GstVideo, GLib
class CameraStreamer:
def init(self):
Gst.init(None)
self.loop = GLib.MainLoop()
self.pipeline = None
self.window_id = None
self.pipeline_str = (
"nvarguscamerasrc sensor-id=0 ! "
"queue max-size-buffers=3 leaky=downstream ! "
"video/x-raw(memory:NVMM), width=1280, height=720, framerate=30/1, format=NV12 ! "
"nvvidconv flip-method=0 ! "
"tee name=t ! "
"queue ! xvimagesink sync=false async=false name=local_display "
"t. ! queue ! videoconvert ! video/x-raw,format=RGB ! "
"queue ! jpegenc quality=85 ! "
"rtpjpegpay ! udpsink host=127.0.0.1 port=5000 sync=false name=network_stream"
)
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, sig, frame):
print(f"\nSignal {sig} received, stopping pipeline...")
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
def run(self):
self.pipeline = Gst.parse_launch(self.pipeline_str)
local_sink = self.pipeline.get_by_name("local_display")
if local_sink:
local_sink.set_property("force-aspect-ratio", True)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_bus_message)
print("Starting pipeline...")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Failed to start pipeline")
return
print("""
📹 Camera Stream Active:
- Local display: X11 window
- Network stream: udp://127.0.0.1:5000
Press Ctrl+C to stop
""")
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
print(" Pipeline stopped")
self.pipeline.set_state(Gst.State.NULL)
def on_bus_message(self, bus, message):
mtype = message.type
if mtype == Gst.MessageType.EOS:
print(" End of stream")
self.loop.quit()
elif mtype == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f" Error: {err}\n🔧 Debug: {debug}")
self.loop.quit()
elif mtype == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old, new, pending = message.parse_state_changed()
print(f"Pipeline state changed: {old.value_nick} → {new.value_nick}")
return True
if name == “main”:
print(“=== 🛠 Jetson IMX477 Camera Streamer ===”)
streamer = CameraStreamer()
streamer.run()
When I manually start the script (or the service), everything works perfectly. The camera is initialized, and both local display and network streaming function as expected.
However, the problem is: the script does NOT work at boot when launched via systemd
service.
The pipeline fails to start, and the camera doesn’t activate properly. Here’s my systemd
service file:
[Unit]
Description=Jetson Camera Streamer
After=graphical.target nvargus-daemon.service
Requires=nvargus-daemon.service
[Service]
Type=simple
User=jetson
Group=video
Environment=“DISPLAY=:0”
Environment=“XAUTHORITY=/home/jetson/.Xauthority”
WorkingDirectory=/home/jetson
ExecStartPre=/bin/sleep 10
ExecStart=/usr/bin/python3 /path/to/streamer.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
The camera works fine with:
argus_camera
gst-launch-1.0 nvarguscamerasrc ! nvoverlaysink