Please provide complete information as applicable to your setup.
• Hardware Platform (Jetson / GPU)
Quadro RTX 6000 * 2
• DeepStream Version
deepstream6.4
• NVIDIA GPU Driver Version (valid for GPU only)
When I added the first source:
Warning: gst-stream-error-quark: No decoder available for type ‘application/x-rtp, media=(string)application, payload=(int)107, clock-rate=(int)90000, encoding-name=(string)VND.ONVIF.METADATA, decodertag=(string)“h3c-v3\ RTCP=0”, a-recvonly=(string)“”, ssrc=(uint)31766572, npt-start=(guint64)0, play-speed=(double)1, play-scale=(double)1, onvif-mode=(boolean)false’. (6): …/gst/playback/gsturidecodebin.c(960): unknown_type_cb (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01
When I added the second source:
Warning: gst-stream-error-quark: No decoder available for type ‘application/x-rtp, media=(string)application, payload=(int)107, clock-rate=(int)90000, encoding-name=(string)VND.ONVIF.METADATA, decodertag=(string)“h3c-v3\ RTCP=0”, a-recvonly=(string)“”, ssrc=(uint)154311474, npt-start=(guint64)0, play-speed=(double)1, play-scale=(double)1, onvif-mode=(boolean)false’. (6): …/gst/playback/gsturidecodebin.c(960): unknown_type_cb (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-02
At this point, in addition to the initialized flow, there are two flows in the pipeline that are being analyzed normally.
Then,
When I deleted one of the streams
The following error occurred,
But in fact, I think the pipeline has successfully deleted this
(python3:4739): GStreamer-CRITICAL **: 14:39:57.109: Element Stream-muxer already has a pad named sink_2, the behaviour of gst_element_get_request_pad() for existing pads is undefined!
stop sikpad= <gi.GstNvStreamPad object at 0x7fdc95a6d480 (GstNvStreamPad at 0x7fda2c011180)>
Error: gst-resource-error-quark: Unhandled error (9): …/gst/rtsp/gstrtspsrc.c(6795): gst_rtspsrc_send (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-02/GstRTSPSrc:source:
ServerInternal (500)
After that, I won’t be able to add source to the pipeline anymore
When I added the source, the following error occurred. The source was indeed not added, but the source of the original pipeline is still being analyzed normally
Error: gst-resource-error-quark: Could not write to resource. (10): …/gst/rtsp/gstrtspsrc.c(8346): gst_rtspsrc_close (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-02/GstRTSPSrc:source:
Could not send message. (Generic error)
Warning: gst-resource-error-quark: Could not read from resource. (9): …/gst/rtsp/gstrtspsrc.c(5819): gst_rtspsrc_loop_udp (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-03/GstRTSPSrc:source:
The server closed the connection.
add_source 、stop_source just like this
def _stop_source(self, task_data, content=None):
if task_data["source_id"] == 0 :
return
pad_name = "sink_%u" % task_data["source_id"]
sinkpad = self.streammux.get_static_pad(pad_name)
re_sinkpad = self.streammux.request_pad_simple(pad_name)
print("stop sikpad=", sinkpad)
#sinkpad = re_sinkpad
if sinkpad:
state_return = self.g_source_bins[task_data["source_id"]].set_state(Gst.State.NULL)
else:
print('cant not get sinkpad,stop error:{0}'.format(content), re_sinkpad)
#self.out_queue.put(FrameCommon.response_status(task_data["datas"], 'remove', -1, 'cant not get sinkpad,stop error:{0}'.format(content)))
return
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS\n")
self.logger.info("task_id: {} STATE CHANGE SUCCESS".format(task_data['task_id']))
print(pad_name)
# Retrieve sink pad to be released
# if not sinkpad:
# self.out_queue.put(FrameCommon.response_status(task_data["datas"], 'remove', -1, 'cant not get sinkpad,stop error:{0}'.format(content)))
# Gst.State.PLAYING
# state_return = self.g_source_bins[task_data["source_id"]].set_state(Gst.State.PLAYING)
# return
# Send flush stop event to the sink pad, then release from the streammux
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self.logger.info("task_id: {} sinkpad send event success".format(task_data['task_id']))
self.streammux.release_request_pad(sinkpad)
print("streammux release SUCCESS\n")
self.logger.info("task_id: {} streammux release success".format(task_data['task_id']))
# Remove the source bin from the pipeline
self.pipeline.remove(self.g_source_bins[task_data["source_id"]])
del self.g_source_bins[task_data["source_id"]]
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE\n")
content = "STATE CHANGE FAILURE, occur action: {}".format(content)
self.out_queue.put(FrameCommon.response_status(task_data["datas"], 'add', -1, content))
return False
elif state_return == Gst.StateChangeReturn.ASYNC:
self.logger.info("task_id: {} state change async".format(task_data['task_id']))
state_return = self.g_source_bins[task_data["source_id"]].get_state(Gst.CLOCK_TIME_NONE)
pad_name = "sink_%u" % task_data["source_id"]
print(pad_name)
sinkpad = self.streammux.get_static_pad(pad_name)
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self.logger.info("task_id: {} sinkpad send event success".format(task_data['task_id']))
self.streammux.release_request_pad(sinkpad)
print("STATE CHANGE ASYNC\n")
self.logger.info("task_id: {} streammux release success".format(task_data['task_id']))
self.pipeline.remove(self.g_source_bins[task_data["source_id"]])
del self.g_source_bins[task_data["source_id"]]
return True
def _add_source(self, source_id, video_url):
source_name = 'source-' + str(source_id)
print('adding source ', source_name)
# Create a uridecode bin with the chosen source id
source_bin = self._create_uridecode_bin(source_id, video_url)
if (not source_bin):
self.logger.error("task_id: {}, stream_url: {}, Failed to create source bin. Exiting.".format(self.stream_data[source_id]['task_id'], video_url))
del self.stream_data[source_id]
return
# Add source bin to our list and to pipeline
self.g_source_bins[source_id] = source_bin
self.pipeline.add(source_bin)
print("source_bin=", source_bin)
# Set state of source bin to playing
state_return = self.g_source_bins[source_id].set_state(Gst.State.PLAYING)
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS\n")
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE\n")
self.logger.error("task_id: {}, stream_url: {}, STATE CHANGE FAILURE.".format(
self.stream_data[source_id]['task_id'], video_url))
self.out_queue.put(FrameCommon.response_status(self.stream_data[source_id], 'add', -1,
'STATE CHANGE FAILURE:{0}'.format(video_url),
target_type="video"))
del self.stream_data[source_id]
del self.g_source_bins[source_id]
return
elif state_return == Gst.StateChangeReturn.ASYNC:
print("STATE CHANGE ASYNC\n")
state_return = self.g_source_bins[source_id].get_state(Gst.CLOCK_TIME_NONE)
elif state_return == Gst.StateChangeReturn.NO_PREROLL:
print("STATE CHANGE NO PREROLL\n")
self.logger.error("task_id: {}, stream_url: {}, STATE CHANGE NO PREROLL.".format(
self.stream_data[source_id]['task_id'], video_url))
return True
Can you give me some ideas to solve the problem?