Please provide complete information as applicable to your setup.
**• Orin AGX 64GB
**• Deepstream 6.2
**• Jetpack 5.1.1-b56
**• Question / bug(?)
Hi!
I’m getting
PosixMemMap:84 mmap failed : Cannot allocate memory
errors after running the pipeline for some time. The script has an auto-restart mechanism, but it would not let me restart, the output at the end is as follows:
PosixMemMap:84 mmap failed : Cannot allocate memory
PosixMemMap:84 mmap failed : Cannot allocate memory
PosixMemMap:84 mmap failed : Cannot allocate memory
PosixMemMap:84 mmap failed : Cannot allocate memory
PosixMemMap:84 mmap failed : Cannot allocate memory
WARNING: [TRT]: Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.
3:16:27.651114553 81162 0x4da7f60 INFO nvinfer gstnvinfer.cpp:680:gst_nvinfer_logger: NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1909> [UID = 1]: deserialized trt engine from :/home/nvidia/Documents/DeepStream-Yolo/maritimo.engine
WARNING: [TRT]: The getMaxBatchSize() function should not be used with an engine built from a network created with NetworkDefinitionCreationFlag::kEXPLICIT_BATCH flag. This function will always return 1.
INFO: [Implicit Engine Info]: layers num: 4
0 INPUT kFLOAT input 3x736x1280
1 OUTPUT kFLOAT boxes 57960x4
2 OUTPUT kFLOAT scores 57960x1
3 OUTPUT kFLOAT classes 57960x13:16:27.796425358 81162 0x4da7f60 INFO nvinfer gstnvinfer.cpp:680:gst_nvinfer_logger: NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2012> [UID = 1]: Use deserialized engine model: /home/nvidia/Documents/DeepStream-Yolo/maritimo.engine
3:16:27.800331348 81162 0x4da7f60 INFO nvinfer gstnvinfer_impl.cpp:328:notifyLoadModelStatus: [UID 1]: Load new model:yolov7config.txt sucessfully
Pipeline changed state from null to ready
Pipeline changed state from ready to paused
NvMMLiteOpen : Block : BlockType = 277
NVMEDIA: Reading vendor.tegra.display-size : status: 6
NvMMLiteBlockCreate : Block : BlockType = 277
PosixMemMap:84 mmap failed : Cannot allocate memory
JPEGEncAllocRingEntry: 449: NvVideoAllocMem Failed for pic setup buffer
JPEGEncCreate: 638: ring buffers allocation failed
NvMediaIJPCreate: Unable to create NvVideo JPEG encoder
Segmentation fault
The same pipeline runs fine on an Orin AGX 64GB devkit with slightly different versions (Jetpack 5.1.2-b104).
Could you point me into the right direction to debug this?
Thanks!
Find the code here:
Essentially, it takes in an MJPEG stream, runs it through a yolov7 and tracker, and sends it out over appsink to a websocket (and optionally saves it as well but this was turned off for the above bug).
import sys
import gi
from threading import Thread
from collections import defaultdict, deque
import threading
import queue
import asyncio
import websockets
import pickle
import os
import socket
import psutil
from urllib.parse import urlparse, urlunparse
sys.path.append(‘…/’)
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import GLib, Gst
from ctypes import *
import time
import math
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.FPS import PERF_DATA
import numpy as np
import pyds
import json
import configparser
import requests
log_file_handle = None
perf_data = None
frame_count = {}
saved_count = {}
global PGIE_CLASS_ID_VEHICLE
PGIE_CLASS_ID_VEHICLE = 0
global PGIE_CLASS_ID_PERSON
PGIE_CLASS_ID_PERSON = 2
MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1920
TILED_OUTPUT_HEIGHT = 1080
GST_CAPS_FEATURES_NVMM = “memory:NVMM”
global custom_frame_counter
custom_frame_counter = 0
id_to_cat_str = {0: ‘boat’, 1: ‘other’}
#mount_point = “/home/nvidia/Downloads”
mount_point = “/mnt/data”
video_base_filename = “vid”
video_file_extension = “.mp4”
from datetime import datetime
from shutil import disk_usage
def get_unique_filename(base_path, base_name, extension):
# Get the current date and time formatted as “YYYY-MM-DD_HH-MM-SS-FFF”
timestamp = datetime.now().strftime(“%Y-%m-%d_%H-%M-%S-%f”)[:-3] # Remove last three digits to get milliseconds
# Create the full filename with the timestamp
filename = f"{base_name}{timestamp}{extension}"
# Check if the file already exists; if it does, add a counter
counter = 1
while os.path.exists(os.path.join(base_path, filename)):
filename = f"{base_name}{timestamp}_{counter}{extension}"
counter += 1
return filename
def check_disk_space(path):
total, used, free = disk_usage(path)
return free
def is_http_server_running(url, timeout=5):
“”“Check if the HTTP server is running at the specified URL.”“”
try:
# Send a GET request to the server
response = requests.get(url, timeout=timeout)
# Check if the response status code is 200 (OK)
if response.status_code == 200:
print(f"Server at {url} is running.“)
return True
else:
print(f"Server at {url} responded with status code: {response.status_code}”)
return False
except requests.exceptions.RequestException as e:
print(f"Cannot connect to server at {url}: {e}")
return False
def remove_img_from_url(url):
# Parse the URL into its components
parsed_url = urlparse(url)
# Check if the path ends with '/img' and remove it if present
if parsed_url.path.endswith('/img'):
# Remove '/img' from the path
new_path = parsed_url.path[:-4] # Remove the last 4 characters ('/img')
# Construct the new URL without '/img'
new_url = urlunparse(parsed_url._replace(path=new_path))
return new_url
else:
# Return the original URL if '/img' is not present
return url
def log_worker():
with open(“frame_data_log.json”, “a”) as log_file:
while True:
frame_data = log_queue.get()
if frame_data is None: # Signal to exit
break
log_file.write(json.dumps(frame_data) + “\n”)
log_queue.task_done()
frame_queue = queue.Queue()
clients = set()
async def send_frame_to_clients(data):
start_time = time.time()
disconnected_clients = set()
for client in clients:
try:
await client.send(data)
except websockets.exceptions.ConnectionClosed:
disconnected_clients.add(client)
for client in disconnected_clients:
clients.remove(client)
if elapsed_time > 0.1:
# Log timestamp and elapsed time
print(datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), 'send_frame_to_clients:', elapsed_time)
# Log memory and CPU usage
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
cpu_percent = process.cpu_percent(interval=0.1) # Non-blocking CPU percentage usage
print(f"Memory usage: {memory_info.rss / (1024 * 1024):.2f} MB") # Resident Set Size in MB
print(f"CPU usage: {cpu_percent:.2f}%")
# Optionally, log the system-wide metrics
virtual_memory = psutil.virtual_memory()
print(f"Total system memory: {virtual_memory.total / (1024 * 1024):.2f} MB")
print(f"Available system memory: {virtual_memory.available / (1024 * 1024):.2f} MB")
print(f"System-wide CPU usage: {psutil.cpu_percent()}%")
def new_sample_callback(appsink, user_data):
start_time = time.time()
global custom_frame_counter
sample = appsink.emit(“pull-sample”)
if not sample:
return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
if not buffer:
print("Unable to get GstBuffer")
return Gst.FlowReturn.ERROR
# Extract the encoded frame from the buffer
buffer_size = buffer.get_size()
encoded_frame = buffer.extract_dup(0, buffer_size)
# Extract metadata
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buffer))
frame_meta = None
if batch_meta:
l_frame = batch_meta.frame_meta_list
if l_frame is not None:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if frame_meta:
# Extract timestamp
pts = buffer.pts
timestamp = pts / Gst.SECOND # Convert from nanoseconds to seconds
objects = []
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
rect_params = obj_meta.rect_params
objects.append({
"obj_id": obj_meta.object_id,
"category": id_to_cat_str.get(obj_meta.class_id, 'unknown'),
"confidence": obj_meta.confidence,
"x": rect_params.left,
"y": rect_params.top,
"width": rect_params.width,
"height": rect_params.height,
"alertstate": -1,
"x_world": -1,
"y_world": -1,
"distance": -1,
})
l_obj = l_obj.next
except StopIteration:
break
# Convert metadata to JSON
json_to_send = {"frame_number": custom_frame_counter, "timestamp": timestamp, "bboxes": objects}
metadata_json = json.dumps(json_to_send).encode('utf-8')
# print([entry['category'] for entry in objects])
message = len(metadata_json).to_bytes(4, byteorder='big') + metadata_json + encoded_frame
frame_data = {
"timestamp": timestamp,
"frame_number": custom_frame_counter,
"metadata": json_to_send,
}
# log_queue.put(frame_data)
# Send the data asynchronously
asyncio.run_coroutine_threadsafe(send_frame_to_clients(message), loop)
custom_frame_counter += 1
if time.time() - start_time > 0.1:
print(datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), 'new_sample_callback:', time.time() - start_time)
return Gst.FlowReturn.OK
async def websocket_handler(websocket, path):
clients.add(websocket)
try:
while True:
await websocket.recv() # Keep connection open
except websockets.exceptions.ConnectionClosed:
clients.remove(websocket)
def start_websocket_server():
global loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(websocket_handler, ‘0.0.0.0’, 5001)
loop.run_until_complete(start_server)
print(“WebSocket server started on ws://0.0.0.0:5001”)
loop.run_forever()
Start the WebSocket server in a separate thread
websocket_thread = threading.Thread(target=start_websocket_server)
websocket_thread.start()
def cb_newpad(decodebin, decoder_src_pad, data):
print(“In cb_newpad\n”)
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
if (gstname.find("video") != -1):
if features.contains("memory:NVMM"):
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy, Object, name, user_data):
print(“Decodebin child added:”, name, “\n”)
if name.find(“decodebin”) != -1:
Object.connect(“child-added”, decodebin_child_added, user_data)
if not is_aarch64() and name.find("nvv4l2decoder") != -1:
Object.set_property("cudadec-memtype", 2)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def create_source_bin(index, uri):
print(“Creating source bin”)
bin_name = "source-bin-%02d" % index
print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
return None
# Create source element for HTTP stream
source = Gst.ElementFactory.make("souphttpsrc", "http-source")
source.set_property("location", uri)
source.set_property("do-timestamp", True)
source.set_property("blocksize", 4096)
source.set_property("timeout", 5000000) # Timeout in microseconds
source.set_property("is-live", True)
if not source:
sys.stderr.write(" Unable to create http source \n")
return None
source.set_property("location", uri)
# Create and link parser and decoder for MJPEG stream
jpegparse = Gst.ElementFactory.make("jpegparse", "jpeg-parser")
if not jpegparse:
sys.stderr.write(" Unable to create jpegparse \n")
return None
decoder = Gst.ElementFactory.make("nvv4l2decoder", "jpeg-decoder")
if not decoder:
sys.stderr.write(" Unable to create JPEG decoder \n")
return None
decoder.set_property('mjpeg', 1)
# Create and link video converter
nvvidconvsrc = Gst.ElementFactory.make("nvvideoconvert", "convertor_src2")
if not nvvidconvsrc:
sys.stderr.write(" Unable to create nvvideoconvert \n")
return None
# Add elements to the bin
nbin.add(source)
nbin.add(jpegparse)
nbin.add(decoder)
nbin.add(nvvidconvsrc)
# Link elements
if not source.link(jpegparse):
sys.stderr.write(" Unable to link source to jpegparse \n")
return None
if not jpegparse.link(decoder):
sys.stderr.write(" Unable to link jpegparse to decoder \n")
return None
if not decoder.link(nvvidconvsrc):
sys.stderr.write(" Unable to link decoder to nvvidconvsrc \n")
return None
# Create and add ghost pad to bin
bin_pad = Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
if not bin_pad:
sys.stderr.write(" Failed to create ghost pad in source bin \n")
return None
nbin.add_pad(bin_pad)
# Get the static pad from the video converter and check if it's valid
nvvidconvsrc_src_pad = nvvidconvsrc.get_static_pad("src")
if not nvvidconvsrc_src_pad:
sys.stderr.write(" Failed to get static pad from nvvidconvsrc \n")
return None
# Link the converter source pad to the bin's ghost pad
if not bin_pad.set_target(nvvidconvsrc_src_pad):
sys.stderr.write(" Failed to link ghost pad to nvvidconvsrc's src pad \n")
return None
return nbin
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err}“)
print(f"Debugging information: {debug}”)
loop.quit()
elif t == Gst.MessageType.EOS:
print(“End-Of-Stream reached.”)
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(f"Warning received from element {message.src.get_name()}: {err}“)
print(f"Debugging information: {debug}”)
elif t == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
print(f"Pipeline changed state from {old_state.value_nick} to {new_state.value_nick}")
return True
def main(args):
recording = True
while True:
try:
if len(args) < 1:
sys.stderr.write(“usage: %s [uri2] … [uriN] \n” % args[0])
sys.exit(1)
# Check if the HTTP server is running before proceeding
while True:
if not is_http_server_running(remove_img_from_url(args[1])):
print(args[1], "HTTP server is not running..")
time.sleep(1)
else:
print('cam server running, continue..')
break
global perf_data
perf_data = PERF_DATA(len(args) - 2)
number_sources = 1
Gst.init(None)
if recording:
video_filename = get_unique_filename(mount_point, video_base_filename, video_file_extension)
video_path = os.path.join(mount_point, video_filename)
if not os.path.exists(mount_point):
recording = False
print('No video saving.. Directory for saving does not exist:', mount_point)
else:
if check_disk_space(mount_point) < 100 * 1024 * 1024: # Check if free space is less than 100MB
print("Less than 100MB free space remaining.")
recording = False
else:
print('Enough disk space. Saving video.')
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
frame_count["stream_" + str(i)] = 0
saved_count["stream_" + str(i)] = 0
print("Creating source_bin ", i, " \n ")
uri_name = args[i + 1]
# Check if the source is a live HTTP stream
if uri_name.find("http://") == 0 or uri_name.find("https://") == 0:
is_live = True
source_bin = create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname = "sink_%u" % i
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
# Log capabilities before linking pads
src_caps = srcpad.query_caps(None)
sink_caps = sinkpad.query_caps(None)
print(f"Source pad caps: {src_caps.to_string()}")
print(f"Sink pad caps: {sink_caps.to_string()}")
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
sys.stderr.write("Error linking source_bin to streammux \n")
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
print("Creating Tracker \n")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
config = configparser.ConfigParser()
config.read('dstest2_tracker_config.txt')
for key in config['tracker']:
if key in ['tracker-width', 'tracker-height', 'gpu-id']:
tracker.set_property(key, config.getint('tracker', key))
else:
tracker.set_property(key, config.get('tracker', key))
print("Creating nvvidconv1 \n ")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
print("Creating tiler \n ")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating JPEG Encoder \n")
jpegenc = Gst.ElementFactory.make("nvjpegenc", "jpeg-encoder")
if not jpegenc:
sys.stderr.write(" Unable to create jpeg encoder \n")
print("Creating appsink \n")
appsink = Gst.ElementFactory.make("appsink", "app-sink")
if not appsink:
sys.stderr.write(" Unable to create app sink \n")
appsink.set_property("emit-signals", True)
appsink.set_property("sync", False)
appsink.set_property("drop", True)
appsink.set_property("max-buffers", 1)
appsink.connect("new-sample", new_sample_callback, None)
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create nv3dsink \n")
if is_live:
print("At least one of the sources is live")
streammux.set_property('live-source', 1)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "yolov7config.txt")
pgie_batch_size = pgie.get_property("batch-size")
if (pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
number_sources, " \n")
pgie.set_property("batch-size", number_sources)
tiler_rows = int(math.sqrt(number_sources))
tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("sync", 0)
sink.set_property("qos", 0)
if not is_aarch64():
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
tiler.set_property("nvbuf-memory-type", mem_type)
if recording:
h264enc = Gst.ElementFactory.make("nvv4l2h264enc", "h264-encoder")
h264enc.set_property("insert-sps-pps", 1)
h264enc.set_property("bitrate", 500000) # 0.5 Mbps
# Create h264parse element
h264parse = Gst.ElementFactory.make("h264parse", "h264-parse")
if not h264parse:
sys.stderr.write("Unable to create h264parse \n")
# Create mp4mux element
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4-muxer")
#mp4mux.set_property('faststart', True)
if not mp4mux:
sys.stderr.write("Unable to create mp4mux \n")
filesink = Gst.ElementFactory.make("filesink", "file-sink")
filesink.set_property("location", video_path)
# Create the tee element
tee = Gst.ElementFactory.make("tee", "tee")
pipeline.add(tee)
# Create queues for both branches
queue1 = Gst.ElementFactory.make("queue", "queue1")
queue2 = Gst.ElementFactory.make("queue", "queue2")
# Set properties on queue1 to make it leaky
queue1.set_property('leaky', 2) # Downstream leaky
queue1.set_property('max-size-buffers', 5) # Adjust as needed
# Add queues to the pipeline
pipeline.add(queue1)
pipeline.add(queue2)
else:
# Create a leaky queue before appsink even if not recording
queue1 = Gst.ElementFactory.make("queue", "queue1")
queue1.set_property('leaky', 2) # Downstream leaky
queue1.set_property('max-size-buffers', 5) # Adjust as needed
pipeline.add(queue1)
nvvidconv_flip_v = Gst.ElementFactory.make("nvvideoconvert", "nvvidconv_flip_v")
if not nvvidconv_flip_v:
sys.stderr.write(" Unable to create nvvideoconvert for flipping \n")
nvvidconv_flip_v.set_property("flip-method", 2) # Vertical flip
pipeline.add(nvvidconv_flip_v)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(filter1)
pipeline.add(nvvidconv1)
pipeline.add(jpegenc)
pipeline.add(appsink)
if recording:
pipeline.add(h264enc)
pipeline.add(h264parse)
pipeline.add(mp4mux)
pipeline.add(filesink)
print("Linking elements in the Pipeline \n")
streammux.link(nvvidconv_flip_v)
nvvidconv_flip_v.link(pgie)
pgie.link(tracker)
tracker.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(tiler)
tiler.link(nvvidconv)
if recording:
nvvidconv.link(tee)
# Branch 1: Link tee -> queue1 -> jpegenc -> appsink
tee.link(queue1)
queue1.link(jpegenc)
jpegenc.link(appsink)
# Branch 2: Link tee -> queue2 -> h264enc -> filesink
tee.link(queue2)
queue2.link(h264enc)
h264enc.link(h264parse)
h264parse.link(mp4mux)
mp4mux.link(filesink)
else:
nvvidconv.link(queue1)
queue1.link(jpegenc)
jpegenc.link(appsink)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
print("Now playing...")
for i, source in enumerate(args[:-1]):
if i != 0:
print(i, ": ", source)
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
print("some exit signal")
# pass
# Send EOS event to the pipeline
pipeline.send_event(Gst.Event.new_eos())
# Wait until the EOS is received and processed
bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
frame_queue.put((None, None))
except Exception as e:
print("Error occurred", e)
print("Restarting...")
time.sleep(2)
if name == ‘main’:
sys.exit(main(sys.argv))