How to save frames with USB camera Pipeline?

Please provide complete information as applicable to your setup.

• Hardware Platform (RTX3050)
• DeepStream Version: 7.0
• TensorRT Version: 8.6
• NVIDIA GPU Driver Version (550.120)
• Issue Type( questions)
**• How to reproduce the issue ?

Hello Nvs,
My pipeline is a USB camera inference YOLOV8-POSE + Tracker pipeline,

source_bin is specifically composed of
source → caps_v4l2src → jpegdec → vidconvsrc → nvvidconv → caps_vidconvsrc

Then my overall pipeline is streammux → source_bin → pgie → tracker → tiler → converter → osd → fakesink

I can now infer my pipeline smoothly, but I want to save the original image of each frame during the inference process, and then send it to other ports for app development. The result of model inference is easy to make, but I don’t know how to store the original image together, because this is a USB camera that can only be accessed by one application, not multiple devices like RTSP.

below is my pipline:

def create_usb_source_bin(stream_id, uri, streammux):

    bin_name = f'source-bin-{stream_id:04d}'

    # create source bin
    bin = Gst.Bin.new(bin_name)
    if not bin:
        logger.error('Unable to create source bin\n')
        return None

    # create v4l2src
    source = Gst.ElementFactory.make('v4l2src', f'usb-cam-source-{stream_id}')
    if not source:
        logger.error('Unable to create v4l2src\n')
        return None
    source.set_property('device', uri)

    # create capsfilter
    caps_v4l2src = Gst.ElementFactory.make('capsfilter', f'v4l2src-caps-{stream_id}')
    if not caps_v4l2src:
        logger.error('Unable to create v4l2src capsfilter\n')
        return None

    caps_v4l2src.set_property('caps',
        Gst.Caps.from_string('image/jpeg, width=1920, height=1080, framerate=30/1'))

    jpegdec = Gst.ElementFactory.make('jpegdec', f'jpegdec-{stream_id}')
    if not jpegdec:
        logger.error('Unable to create jpegdec\n')
        return None

    vidconvsrc = Gst.ElementFactory.make('videoconvert', f'convertor-{stream_id}')
    if not vidconvsrc:
        logger.error('Unable to create videoconvert\n')
        return None

    nvvidconv = Gst.ElementFactory.make('nvvideoconvert', f'nvvidconv-{stream_id}')
    if not nvvidconv:
        logger.error('Unable to create nvvideoconvert\n')
        return None

    caps_vidconvsrc = Gst.ElementFactory.make("capsfilter", f"nvmm_caps_{stream_id}")
    if not caps_vidconvsrc:
        logger.error("ERROR: Unable to create capsfilter\n")
        return None
    caps_vidconvsrc.set_property('caps',
        Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12"))

    bin.add(source)
    bin.add(caps_v4l2src)
    bin.add(jpegdec)
    bin.add(vidconvsrc)
    bin.add(nvvidconv)
    bin.add(caps_vidconvsrc)

    source.link(caps_v4l2src)
    caps_v4l2src.link(jpegdec)
    jpegdec.link(vidconvsrc)
    vidconvsrc.link(nvvidconv)
    nvvidconv.link(caps_vidconvsrc)

    pad = caps_vidconvsrc.get_static_pad('src')
    ghost_pad = Gst.GhostPad.new('src', pad)
    ghost_pad.set_active(True)
    bin.add_pad(ghost_pad)

    fps_streams[f'stream{stream_id}'] = GETFPS(stream_id)  
    return bin

def main():
    global logger
    logger = setup_logger()
    
    logger.info("DeepStream Pipeline Starting...")
    Gst.init(None)

    sources = SOURCE.split(";")
    num_sources = len(sources)
    
    logger.info(f"Configuration:")
    logger.info(f"Source: {SOURCE}")
    logger.info(f"Config Infer: {CONFIG_INFER}")
    logger.info(f"Streammux Width: {STREAMMUX_WIDTH}")
    logger.info(f"Streammux Height: {STREAMMUX_HEIGHT}")
    logger.info(f"GPU ID: {GPU_ID}")
    logger.info(f"FPS Measurement Interval: {PERF_MEASUREMENT_INTERVAL_SEC}")
    logger.info(f"Platform: {'Jetson' if is_aarch64() else 'x86'}")

    loop = GLib.MainLoop()  
    pipeline = Gst.Pipeline() 

    if not pipeline:
        logger.error('Failed to create pipeline')
        sys.exit(1)

    # streammux
    streammux = Gst.ElementFactory.make('nvstreammux', 'nvstreammux')
    if not streammux:
        logger.error('Failed to create nvstreammux\n')
        sys.exit(1)

    # config streammux
    streammux.set_property('batch-size', num_sources)
    streammux.set_property('width', STREAMMUX_WIDTH)
    streammux.set_property('height', STREAMMUX_HEIGHT)
    streammux.set_property('batched-push-timeout', 4000000)
    streammux.set_property('live-source', 0)
    streammux.set_property('attach-sys-ts', 1)
    

    pipeline.add(streammux)

    # create source bin for each source 
    for i, source_uri in enumerate(sources):
        if source_uri.startswith('/dev/video'):
            source_bin = create_usb_source_bin(i, source_uri, streammux)
        else:
            source_bin = create_uridecode_bin(i, source_uri, streammux)

        if not source_bin:
            logger.error(f'Failed to create source_bin {i}')
            sys.exit(1)

        pipeline.add(source_bin)

        # usb camera link streammux
        if source_uri.startswith('/dev/video'):
            srcpad = source_bin.get_static_pad('src')
            sinkpad = streammux.get_request_pad(f'sink_{i}')
            # sinkpad = streammux.request_pad(f'sink_{i}')
            if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
                logger.error(f'Failed to link source {i} to streammux')
                sys.exit(1)

    # create tiler
    tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        logger.error("ERROR: Unable to create tiler\n")
        sys.exit(1)

    # config tiler 
    tiles = num_sources
    if tiles <= 1:
        rows = cols = 1
    elif tiles <= 2:
        rows = 1
        cols = 2
    elif tiles <= 4:
        rows = cols = 2
    elif tiles <= 6:
        rows = 2
        cols = 3
    elif tiles <= 9:
        rows = cols = 3
    else:
        rows = cols = 4

    tiler.set_property('rows', rows)
    tiler.set_property('columns', cols)
    tiler.set_property('width', STREAMMUX_WIDTH)
    tiler.set_property('height', STREAMMUX_HEIGHT)

    # create PGIE   
    pgie = Gst.ElementFactory.make('nvinfer', 'pgie')
    if not pgie:
        logger.error('Failed to create nvinfer\n')
        sys.exit(1)

    tracker = Gst.ElementFactory.make('nvtracker', 'nvtracker')
    if not tracker:
        logger.error('Failed to create nvtracker\n')
        sys.exit(1)

    converter = Gst.ElementFactory.make('nvvideoconvert', 'nvvideoconvert')
    if not converter:
        logger.error('Failed to create nvvideoconvert\n')
        sys.exit(1)

    osd = Gst.ElementFactory.make('nvdsosd', 'nvdsosd')
    if not osd:
        logger.error('Failed to create nvdsosd\n')
        sys.exit(1)

    sink = None
    # if is_aarch64():
    #     sink = Gst.ElementFactory.make('nv3dsink', 'nv3dsink')
    #     if not sink:
    #         logger.error('Failed to create nv3dsink\n')
    #         sys.exit(1)
    # else:
    #     sink = Gst.ElementFactory.make('nveglglessink', 'nveglglessink')
    #     if not sink:
    #         logger.error('Failed to create nveglglessink\n')
    #         sys.exit(1)

    # save as fakesink
    sink = Gst.ElementFactory.make('fakesink', 'sink')
    if not sink:
        logger.error('Failed to create fakesink\n')
        sys.exit(1)

    # logging 
    logger.info('\n=== Configuration Info ===')
    logger.info(f'SOURCE: {SOURCE}')
    logger.info(f'CONFIG_INFER: {CONFIG_INFER}')
    logger.info(f'STREAMMUX_WIDTH: {STREAMMUX_WIDTH}')
    logger.info(f'STREAMMUX_HEIGHT: {STREAMMUX_HEIGHT}')
    logger.info(f'GPU_ID: {GPU_ID}')
    logger.info(f'PERF_MEASUREMENT_INTERVAL_SEC: {PERF_MEASUREMENT_INTERVAL_SEC}')
    logger.info(f'JETSON: {"TRUE" if is_aarch64() else "FALSE"}')
    logger.info('========================\n')

    pgie.set_property('config-file-path', CONFIG_INFER)
    pgie.set_property('qos', 0)

    tracker.set_property('tracker-width', 640)
    tracker.set_property('tracker-height', 384)
    tracker.set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
    tracker.set_property('ll-config-file',
                         '/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml')
    tracker.set_property('display-tracking-id', 1)
    tracker.set_property('qos', 0)

    osd.set_property('process-mode', int(pyds.MODE_GPU))
    osd.set_property('qos', 0)

    sink.set_property('async', 0)
    sink.set_property('sync', 1)
    sink.set_property('qos', 0)

    if tracker.find_property('enable_batch_process') is not None:
        tracker.set_property('enable_batch_process', 1)

    if tracker.find_property('enable_past_frame') is not None:
        tracker.set_property('enable_past_frame', 1)

    if not is_aarch64():
        streammux.set_property('nvbuf-memory-type', 0)
        streammux.set_property('gpu_id', GPU_ID)
        pgie.set_property('gpu_id', GPU_ID)
        tracker.set_property('gpu_id', GPU_ID)
        converter.set_property('nvbuf-memory-type', 0)
        converter.set_property('gpu_id', GPU_ID)
        osd.set_property('gpu_id', GPU_ID)

    # add element to pipeline 
    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(tiler)
    pipeline.add(converter)
    pipeline.add(osd)
    pipeline.add(sink)

    # link element 
    streammux.link(pgie)
    pgie.link(tracker)
    tracker.link(tiler)
    tiler.link(converter)
    converter.link(osd)
    osd.link(sink)

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect('message', bus_call, loop)

    # create probe src pad
    tracker_src_pad = tracker.get_static_pad('src')
    if not tracker_src_pad:
        logger.error('Failed to get tracker src pad\n')
        sys.exit(1)
    else:
        tracker_src_pad.add_probe(Gst.PadProbeType.BUFFER, tracker_src_pad_buffer_probe, 0)

    # create pipeline 
    pipeline.set_state(Gst.State.PLAYING)

    logger.info('\n')

    try:
        loop.run()
    except KeyboardInterrupt:
        pass
    finally:
        # 确保正确关闭数据发送器
        data_sender.close()
        pipeline.set_state(Gst.State.NULL)

Can you give me some advice to achieve that?

Many Many Thanks!!

You can use a tee plugin like below. You can save the original picture on one branch after the tee plugin, and reason the image on the other branch.

               ->(process the original image)
source_bin->tee
              ->nvstreammux->...