How to image frame input deepstream pipeline

how to use opencv to process image frame( format:array) → input deepstream pipeline → output rtsp stream
my code below:

def main(args):

# Standard GStreamer initialization
GObject.threads_init()
Gst.init(None)

# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()

if not pipeline:
    sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")

# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
    sys.stderr.write(" Unable to create NvStreamMux \n")

pipeline.add(streammux)
for i in range(number_sources):
    print("Creating source_bin ",i," \n ")

appsource = Gst.ElementFactory.make("appsrc", "numpy-source")

padname="sink_%u" %i
sinkpad= streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")

print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
    sys.stderr.write(" Unable to create pgie \n")
print("Creating tiler \n ")
tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
    sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
    sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
    sys.stderr.write(" Unable to create nvosd \n")
nvosd.set_property('process-mode',OSD_PROCESS_MODE)
nvosd.set_property('display-text',OSD_DISPLAY_TEXT)
if(is_aarch64()):
    print("Creating transform \n ")
    transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
    if not transform:
        sys.stderr.write(" Unable to create transform \n")
transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")

#===========rtsp=============#
nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "nv-videoconv")
if not nvvidconv_postosd:
    sys.stderr.write(" Unable to create nvvidconv_postosd \n")
caps = Gst.ElementFactory.make("capsfilter", "filter")
caps_in=Gst.Caps.from_string("video/x-raw,format=RGBA,width=640,height=480,framerate=30/1")
appsource.set_property('caps',caps_in)
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12,width=640,height=480,framerate=30/1"))
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
encoder.set_property('bitrate', 4000000)
encoder.set_property('preset-level', 1)
encoder.set_property('insert-sps-pps', 1)
encoder.set_property('bufapi-version', 1)
rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
updsink_port_num = 5400
udpsink = Gst.ElementFactory.make("udpsink", "udpsink")
udpsink.set_property('host', '224.224.255.255')
udpsink.set_property('port', updsink_port_num)
udpsink.set_property('async', False)
udpsink.set_property('sync', 1)

if not sink:
    sys.stderr.write(" Unable to create egl sink \n")

if is_live:
    print("Atleast one of the sources is live")
    streammux.set_property('live-source', 1)

streammux.set_property('width', 640)
streammux.set_property('height', 480)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
    print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
    pgie.set_property("batch-size",number_sources)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos",0)

print("Adding elements to Pipeline \n")
pipeline.add(appsource)
pipeline.add(pgie)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(nvvidconv_postosd)#rtsp
pipeline.add(caps)
pipeline.add(encoder)

appsource.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
#caps.link(transform)
#transform.link(sink)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(udpsink)

# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
tiler_src_pad=pgie.get_static_pad("src")
#if not tiler_src_pad:
    #sys.stderr.write(" Unable to get src pad \n")
#else:
    #tiler_src_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_src_pad_buffer_probe, 0)
rtsp_port_num = 8554
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch( "( udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, "H264"))
factory.set_shared(True)
server.get_mount_points().add_factory("/ds-test", factory)
print("\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test ***\n\n" % rtsp_port_num)
#osdsinkpad = nvosd.get_static_pad("udpsink")
#osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

pipeline.set_state(Gst.State.PLAYING)
for _ in range(5):
    arr=np.random.randint(low=0,high=255,size=(480,640,3),dtype=np.uint8)
    img=cv2.cvtColor(arr, cv2.COLOR_BGR2RGBA)
    appsource.emit("push-buffer",ndarray_to_gst_buffer(img))
    time.sleep(1)
appsource.emit("end-of-stream")
try:
    loop.run()
except:
    pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)

if name == ‘main’:
sys.exit(main(sys.argv))

I try img into buffer , and hope output rtsp stream , and use another camera catch rtsp stream to show image
I run the result below:

jetson2021030510@jetson2021030510:/opt/nvidia/deepstream/deepstream-5.0/sources/deepstream_python_apps/apps/deepstream-test3$ python3 deepstream_test_3_rtsp.py ‘rtsp://admin:abc541287@192.168.0.106:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif’
deepstream_test_3_rtsp.py:30: PyGIWarning: GstRtspServer was imported without specifying a version first. Use gi.require_version(‘GstRtspServer’, ‘1.0’) before import to ensure that the right version gets loaded.
from gi.repository import GObject, Gst, GstRtspServer
2021-09-03 12:11:47.787169: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.2
Creating Pipeline

Creating streamux

Creating source_bin 0

Creating Pgie

Creating tiler

Creating nvvidconv

Creating nvosd

Creating transform

Creating EGLSink

Atleast one of the sources is live
Adding elements to Pipeline

Linking elements in the Pipeline

*** DeepStream: Launched RTSP Streaming at rtsp://localhost:8554/ds-test ***

Now playing…
Starting pipeline

Opening in BLOCKING MODE
NvMMLiteOpen : Block : BlockType = 4
===== NVMEDIA: NVENC =====
NvMMLiteBlockCreate : Block : BlockType = 4
0:00:14.312013008 16322 0x392730c0 INFO nvinfer gstnvinfer.cpp:619:gst_nvinfer_logger: NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1701> [UID = 1]: deserialized trt engine from :/opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine
INFO: [Implicit Engine Info]: layers num: 3
0 INPUT kFLOAT input_1 3x368x640
1 OUTPUT kFLOAT conv2d_bbox 16x23x40
2 OUTPUT kFLOAT conv2d_cov/Sigmoid 4x23x40

0:00:14.312475642 16322 0x392730c0 INFO nvinfer gstnvinfer.cpp:619:gst_nvinfer_logger: NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:1805> [UID = 1]: Use deserialized engine model: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine
0:00:14.370511384 16322 0x392730c0 INFO nvinfer gstnvinfer_impl.cpp:313:notifyLoadModelStatus: [UID 1]: Load new model:dstest3_pgie_config.txt sucessfully
H264: Profile = 66, Level = 0

(python3:16322): GStreamer-CRITICAL **: 12:12:00.599: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:00.599: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:00.600: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:00.601: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:00.602: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:01.569: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:02.576: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:03.584: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

(python3:16322): GStreamer-CRITICAL **: 12:12:04.591: gst_segment_to_running_time: assertion ‘segment->format == format’ failed
End-of-stream
Exiting app

jetson2021030510@jetson2021030510:/opt/nvidia/deepstream/deepstream-5.0/sources/deepstream_python_apps/apps/deepstream-test3$

how to solve the problem? or what methods to arrive my target

thanks

Hi,
For appsrc with numpy, please refer to this sample:
Appsrc with numpy input in Python - #8 by gautampt6ul

For your use-case, you may integrate it with
https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/deepstream-test1-rtsp-out

thank you your reply, but I want image array input deepstream pipeline, and result output rtsp, if is it connect problem?

my code :

appsource.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(udpsink)

how do the problem :
appsrc–> nvvidconv_postosd—> streammux ?

If you have python example ?

thank you

Hi,
There is no existing python sample for this use-case and you would need to do customization. In deepstream-test1-rtsp-out, it is

filesrc ! h264parse ! nvv4l2decoder ! nvstreammux ! ...

And you need to customize it to

appsrc ! nvvideoconvert ! nvstreammux ! ...

may customize to videotestsrc first to make sure it works:

videotestsrc ! nvvideoconvert ! nvstreammux ! ...

And then appsrc.

thank you your reply, please look my code , relationship appsrc ! nvvideoconvert ! nvstreammux
connect problem

Create nvstreammux instance to form batches from one or more sources.

streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
    sys.stderr.write(" Unable to create NvStreamMux \n")

appsource = Gst.ElementFactory.make(“appsrc”, “numpy-source”)
print(“Creating Pgie \n “)
pgie = Gst.ElementFactory.make(“nvinfer”, “primary-inference”)
if not pgie:
sys.stderr.write(” Unable to create pgie \n”)
print(“Creating tiler \n “)
tiler=Gst.ElementFactory.make(“nvmultistreamtiler”, “nvtiler”)
if not tiler:
sys.stderr.write(” Unable to create tiler \n”)
print(“Creating nvvidconv \n “)
nvvidconv = Gst.ElementFactory.make(“nvvideoconvert”, “nv-videoconv”)
if not nvvidconv:
sys.stderr.write(” Unable to create nvvidconv \n”)

 #===========rtsp=============#
nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
if not nvvidconv_postosd:
    sys.stderr.write(" Unable to create nvvidconv_postosd \n")
caps1 = Gst.ElementFactory.make("capsfilter", "capsfilter1")
caps_in=Gst.Caps.from_string("video/x-raw,format=RGBA")
appsource.set_property('caps',caps_in)
caps1.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12"))
caps = Gst.ElementFactory.make("capsfilter", "filter")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
encoder.set_property('bitrate', 4000000)
encoder.set_property('preset-level', 1)
encoder.set_property('insert-sps-pps', 1)
encoder.set_property('bufapi-version', 1)
rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
updsink_port_num = 5400
udpsink = Gst.ElementFactory.make("udpsink", "udpsink")
udpsink.set_property('host', '224.224.255.255')

udpsink.set_property('port', updsink_port_num)
udpsink.set_property('async', False)
udpsink.set_property('sync', 1)

streammux.set_property('width', 640)
streammux.set_property('height', 480)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
    print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
    pgie.set_property("batch-size",number_sources)

 print("Adding elements to Pipeline \n")
pipeline.add(appsource)
pipeline.add(pgie)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(nvvidconv_postosd)#rtsp
pipeline.add(caps)
pipeline.add(caps1)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(udpsink)
pipeline.add(transform)
pipeline.add(sink)

appsource.link(nvvidconv)
nvvidconv.link(caps1)
sinkpad = streammux.get_request_pad("sink_0")
if not sinkpad:
    sys.stderr.write(" Unable to get the sink pad of streammux \n")

srcpad = caps1.get_static_pad("src")
if not srcpad:
    sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)
streammux.link(pgie)
pgie.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(udpsink)
# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)

rtsp_port_num = 8554
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch( "( udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, "H264"))
factory.set_shared(True)
server.get_mount_points().add_factory("/ds-test001", factory)
print("\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test001 ***\n\n" % rtsp_port_num)

pipeline.set_state(Gst.State.PLAYING)
for _ in range(5):
    arr=np.random.randint(low=0,high=255,size=(480,640,3),dtype=np.uint8)
    img=cv2.cvtColor(arr, cv2.COLOR_BGR2RGBA)
    appsource.emit("push-buffer",ndarray_to_gst_buffer(img))
    time.sleep(1)
appsource.emit("end-of-stream")
try:
    loop.run()
except:
    pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)

if name == ‘main’:
sys.exit(main(sys.argv))

run result below :

user123@user123-desktop:/opt/nvidia/deepstream/deepstream-5.0/sources/deepstream_python_apps-1.0/apps/deepstream-test3$ python3 alien_try.py u’rtsp://admin:abc541287@192.168.0.106:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif’
alien_try.py:30: PyGIWarning: GstRtspServer was imported without specifying a version first. Use gi.require_version(‘GstRtspServer’, ‘1.0’) before import to ensure that the right version gets loaded.
from gi.repository import GObject, Gst, GstRtspServer
Creating Pipeline

Creating streamux

Creating source_bin 0

Creating Pgie

Creating tiler

Creating nvvidconv

Creating nvosd

Creating transform

Creating EGLSink

Adding elements to Pipeline

Linking elements in the Pipeline

*** DeepStream: Launched RTSP Streaming at rtsp://localhost:8554/ds-test001 ***

Now playing…
Starting pipeline

Using winsys: x11
Opening in BLOCKING MODE
Opening in BLOCKING MODE
ERROR: Deserialize engine failed because file path: /opt/nvidia/deepstream/deepstream-5.0/sources/deepstream_python_apps-1.0/apps/deepstream-test3/…/…/…/…/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine open error
0:00:04.269091034 9997 0xe8f46c0 WARN nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger: NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1690> [UID = 1]: deserialize engine from file :/opt/nvidia/deepstream/deepstream-5.0/sources/deepstream_python_apps-1.0/apps/deepstream-test3/…/…/…/…/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine failed
0:00:04.269284280 9997 0xe8f46c0 WARN nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger: NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:1797> [UID = 1]: deserialize backend context from engine from file :/opt/nvidia/deepstream/deepstream-5.0/sources/deepstream_python_apps-1.0/apps/deepstream-test3/…/…/…/…/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine failed, try rebuild
0:00:04.269325272 9997 0xe8f46c0 INFO nvinfer gstnvinfer.cpp:619:gst_nvinfer_logger: NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:1715> [UID = 1]: Trying to create engine from model files
INFO: [TRT]: Reading Calibration Cache for calibrator: EntropyCalibration2
INFO: [TRT]: Generated calibration scales using calibration cache. Make sure that calibration cache has latest scales.
INFO: [TRT]: To regenerate calibration cache, please delete the existing one. TensorRT will generate a new calibration cache.
INFO: [TRT]:
INFO: [TRT]: --------------- Layers running on DLA:
INFO: [TRT]:
INFO: [TRT]: --------------- Layers running on GPU:
INFO: [TRT]: conv1 + activation_1/Relu, block_1a_conv_1 + activation_2/Relu, block_1a_conv_2, block_1a_conv_shortcut + add_1 + activation_3/Relu, block_2a_conv_1 + activation_4/Relu, block_2a_conv_2, block_2a_conv_shortcut + add_2 + activation_5/Relu, block_3a_conv_1 + activation_6/Relu, block_3a_conv_2, block_3a_conv_shortcut + add_3 + activation_7/Relu, block_4a_conv_1 + activation_8/Relu, block_4a_conv_2, block_4a_conv_shortcut + add_4 + activation_9/Relu, conv2d_cov, conv2d_cov/Sigmoid, conv2d_bbox,
INFO: [TRT]: Detected 1 inputs and 2 output network tensors.
ERROR: Serialize engine failed because of file path: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine opened error
0:00:26.892163844 9997 0xe8f46c0 WARN nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger: NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:1743> [UID = 1]: failed to serialize cude engine to file: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine
INFO: [Implicit Engine Info]: layers num: 3
0 INPUT kFLOAT input_1 3x368x640
1 OUTPUT kFLOAT conv2d_bbox 16x23x40
2 OUTPUT kFLOAT conv2d_cov/Sigmoid 4x23x40

0:00:26.909249485 9997 0xe8f46c0 INFO nvinfer gstnvinfer_impl.cpp:313:notifyLoadModelStatus: [UID 1]: Load new model:dstest3_pgie_config.txt sucessfully
Error: gst-stream-error-quark: Internal data stream error. (1): gstbasesrc.c(3055): gst_base_src_loop (): /GstPipeline:pipeline0/GstAppSrc:numpy-source:
streaming stopped, reason not-negotiated (-4)
Exiting app

what error mean ? how to solve it ?

thank you

Hi,
Doe it work with videotestsrc? Would need to have the plugin working first.

Hi, not use videotestsrc,I use appsrc to rtsp, I run the code ,output : rtsp://localhost:%d/ds-test001

I run another code :
cap1=cv2.VideoCapture(‘rtsp://localhost:8556/ds-test001’)
while cap1.isOpened():
ret1, frame1 = cap1.read()
print(ret1)
if (ret1)==True:
cv2.imshow(‘frame1’,frame1)
if cv2.waitKey(1) == ord(‘q’):
break
else:
break
image is not show, run result below:

**user123@user123-desktop:~/Downloads$ python3 test.py **
[rtsp @ 0x2894f5a0] method DESCRIBE failed: 503 Service Unavailable
[ WARN:0] global /home/nvidia/host/build_opencv/nv_opencv/modules/videoio/src/cap_gstreamer.cpp (1757) handleMessage OpenCV | GStreamer warning: Embedded video playback halted; module source reported: Unhandled error
[ WARN:0] global /home/nvidia/host/build_opencv/nv_opencv/modules/videoio/src/cap_gstreamer.cpp (886) open OpenCV | GStreamer warning: unable to start pipeline
[ WARN:0] global /home/nvidia/host/build_opencv/nv_opencv/modules/videoio/src/cap_gstreamer.cpp (480) isPipelinePlaying OpenCV | GStreamer warning: GStreamer: pipeline have not been created
ashfjdsgnjdnjbdkbnjdnfkbjn

how to solve the problem?

reference appsrc deepstream rtsp code :

def main(args):
# Check input arguments
if len(args) < 2:
sys.stderr.write(“usage: %s [uri2] … [uriN]\n” % args[0])
sys.exit(1)

for i in range(0,len(args)-1):
    fps_streams["stream{0}".format(i)]=GETFPS(i)
number_sources=len(args)-1

# Standard GStreamer initialization
GObject.threads_init()
Gst.init(None)

# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False

if not pipeline:
    sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")

# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
    sys.stderr.write(" Unable to create NvStreamMux \n")

pipeline.add(streammux)
for i in range(number_sources):
    print("Creating source_bin ",i," \n ")
    uri_name=args[i+1]
    if uri_name.find("rtsp://") == 0 :
        is_live = True
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue5=Gst.ElementFactory.make("queue","queue5")

queue6=Gst.ElementFactory.make(“queue”,“queue6”)
queue7=Gst.ElementFactory.make(“queue”,“queue7”)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
pipeline.add(queue5)
pipeline.add(queue6)
pipeline.add(queue7)
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property(“nvbuf-memory-type”, 4)
appsource = Gst.ElementFactory.make(“appsrc”, “numpy-source”)
appsource.set_property(‘stream-type’, ‘stream’)
print(“Creating Pgie \n “)
pgie = Gst.ElementFactory.make(“nvinfer”, “primary-inference”)
if not pgie:
sys.stderr.write(” Unable to create pgie \n”)
print(“Creating tiler \n “)
tiler=Gst.ElementFactory.make(“nvmultistreamtiler”, “nvtiler”)
if not tiler:
sys.stderr.write(” Unable to create tiler \n”)
print(“Creating nvvidconv \n “)
nvvidconv = Gst.ElementFactory.make(“nvvideoconvert”, “nv-videoconv”)
nvvidconv1 = Gst.ElementFactory.make(“nvvideoconvert”, “convertor”)
nvvidconv.set_property(‘nvbuf-memory-type’,4)
if not nvvidconv:
sys.stderr.write(” Unable to create nvvidconv \n”)
print(“Creating nvosd \n “)
nvosd = Gst.ElementFactory.make(“nvdsosd”, “onscreendisplay”)
if not nvosd:
sys.stderr.write(” Unable to create nvosd \n”)
nvosd.set_property(‘process-mode’,OSD_PROCESS_MODE)
nvosd.set_property(‘display-text’,OSD_DISPLAY_TEXT)
if(is_aarch64()):
print(“Creating transform \n “)
transform=Gst.ElementFactory.make(“nvegltransform”, “nvegl-transform”)
if not transform:
sys.stderr.write(” Unable to create transform \n”)
transform=Gst.ElementFactory.make(“nvegltransform”, “nvegl-transform”)
h264parser = Gst.ElementFactory.make(“h264parse”, “h264-parser”)

decoder = Gst.ElementFactory.make(“nvv4l2decoder”, “nvv4l2-decoder”)
print(“Creating EGLSink \n”)
caps_v4l2src = Gst.ElementFactory.make(“capsfilter”, “v4l2src_caps”)
vidconvsrc = Gst.ElementFactory.make(“videoconvert”, “convertor_src1”)
sink = Gst.ElementFactory.make(“nveglglessink”, “nvvideo-renderer”)
#===========rtsp=============#
nvvidconv_postosd = Gst.ElementFactory.make(“nvvideoconvert”, “convertor_postosd”)
if not nvvidconv_postosd:
sys.stderr.write(" Unable to create nvvidconv_postosd \n")
caps_v4l2src.set_property(‘caps’, Gst.Caps.from_string(“video/x-raw, framerate=30/1”))
caps1 = Gst.ElementFactory.make(“capsfilter”, “capsfilter1”)
caps1.set_property(“caps”, Gst.Caps.from_string(“video/x-raw(memory:NVMM)”))
caps_in=Gst.Caps.from_string(“video/x-raw,format=RGBA,width=640,height=480,framerate=30/1”)
appsource.set_property(‘caps’,caps_in)
caps = Gst.ElementFactory.make(“capsfilter”, “filter”)
caps.set_property(“caps”, Gst.Caps.from_string(“video/x-raw(memory:NVMM), format=I420”))
encoder = Gst.ElementFactory.make(“nvv4l2h264enc”, “encoder”)
encoder.set_property(‘bitrate’, 4000000)
encoder.set_property(‘preset-level’, 1)
encoder.set_property(‘insert-sps-pps’, 1)
encoder.set_property(‘bufapi-version’, 1)
rtppay = Gst.ElementFactory.make(“rtph264pay”, “rtppay”)
updsink_port_num = 5400
udpsink = Gst.ElementFactory.make(“udpsink”, “udpsink”)
udpsink.set_property(‘host’, ‘224.224.255.255’)
udpsink.set_property(‘port’, updsink_port_num)
udpsink.set_property(‘async’, False)
udpsink.set_property(‘sync’, 1)
streammux.set_property(‘width’, 640)
streammux.set_property(‘height’, 480)
streammux.set_property(‘batch-size’, number_sources)
streammux.set_property(‘batched-push-timeout’, 4000000)
pgie.set_property(‘config-file-path’, “dstest1_pgie_config.txt”)
pgie_batch_size=pgie.get_property(“batch-size”)
if(pgie_batch_size != number_sources):
print(“WARNING: Overriding infer-config batch-size”,pgie_batch_size," with number of sources “, number_sources,” \n")
pgie.set_property(“batch-size”,number_sources)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property(“rows”,tiler_rows)
tiler.set_property(“columns”,tiler_columns)
tiler.set_property(“width”, TILED_OUTPUT_WIDTH)
tiler.set_property(“height”, TILED_OUTPUT_HEIGHT)
sink.set_property(“qos”,0)

print("Adding elements to Pipeline \n")
pipeline.add(appsource)
pipeline.add(pgie)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvvidconv1)
pipeline.add(caps_v4l2src)
pipeline.add(vidconvsrc)
pipeline.add(nvosd)
pipeline.add(nvvidconv_postosd)#rtsp
pipeline.add(caps)
pipeline.add(caps1)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(udpsink)
pipeline.add(transform)
pipeline.add(sink)

appsource.link(nvvidconv)
nvvidconv.link(caps1)
sinkpad = streammux.get_request_pad(“sink_0”)
if not sinkpad:
sys.stderr.write(" Unable to get the sink pad of streammux \n")

srcpad = caps1.get_static_pad("src")
if not srcpad:
    sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)
streammux.link(pgie)
'''if is_aarch64():
    pgie.link(tiler)
    tiler.link(nvvidconv1)
    nvvidconv1.link(nvosd)
    nvosd.link(transform)
    transform.link(sink)'''
pgie.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(udpsink)
print(udpsink)
# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)

#tiler_src_pad=pgie.get_static_pad("src")
#if not tiler_src_pad:
    #sys.stderr.write(" Unable to get src pad \n")
#else:
    #tiler_src_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_src_pad_buffer_probe, 0)
rtsp_port_num = 8556
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)
 factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch( "( udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, 
clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, "H264"))
factory.set_shared(True)
server.get_mount_points().add_factory("/ds-test001", factory)
print("\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test001 ***\n\n" % rtsp_port_num)

start play back and listed to events

pipeline.set_state(Gst.State.PLAYING)
for _ in range(10):
    arr=np.random.randint(low=0,high=255,size=(480,640,3),dtype=np.uint8)
    img=cv2.cvtColor(arr, cv2.COLOR_BGR2RGBA)
    appsource.emit("push-buffer",ndarray_to_gst_buffer(img))
    time.sleep(1)
appsource.emit("end-of-stream")

try:
    loop.run()
except:
    pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)

if name == ‘main’:
sys.exit(main(sys.argv))