This problem has nothing to do with deepstream, just add signal processing to the python program
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
import signal
is_running = True
def sigint_handler(signum, frame):
global is_running
print("\nReceived interrupt signal. Shutting down gracefully...")
is_running = False
main_loop.quit()
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("End of stream")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}")
print(f"Debug info: {debug}")
loop.quit()
return True
def main():
global main_loop
# Initialize GStreamer
Gst.init(None)
# Create the main loop
main_loop = GLib.MainLoop()
# Create a simple DeepStream pipeline
pipeline = Gst.Pipeline.new("deepstream-signal-example")
# Create pipeline elements (example with videotestsrc)
source = Gst.ElementFactory.make("videotestsrc", "test-source")
converter = Gst.ElementFactory.make("nvvideoconvert", "converter")
sink = Gst.ElementFactory.make("autovideosink", "video-output")
if not pipeline or not source or not converter or not sink:
print("Not all elements could be created.")
return
# Add elements to the pipeline
pipeline.add(source)
pipeline.add(converter)
pipeline.add(sink)
# Link the elements
source.link(converter)
converter.link(sink)
# Get the bus and add a watch
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, main_loop)
# Start the pipeline
pipeline.set_state(Gst.State.PLAYING)
print("Pipeline started. Press Ctrl+C to exit.")
# Set up signal handling
signal.signal(signal.SIGINT, sigint_handler)
# Run the main loop
try:
main_loop.run()
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Clean up
pipeline.set_state(Gst.State.NULL)
print("Pipeline stopped.")
if __name__ == "__main__":
main()
There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks