How to compress the size of a streaming image

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)
GPU
• DeepStream Version
6.3.0
• JetPack Version (valid for Jetson only)
• TensorRT Version
8.5
• NVIDIA GPU Driver Version (valid for GPU only)
535.54.03
• Issue Type( questions, new requirements, bugs)
questions
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

I use the deepstream program defined in python to connect to multi-channel monitoring through rtsp. The capacity of the pulled image is too large. A 1920*1080 image is about 7M in the memory. Is there any way to compress the size without reducing the size?

My current processing logic is as follows, ‘success, buffer = cv2.imencode(’.jpg’, frame_new)'. Because the original image is too large, the execution of this method takes a long time, about 25ms, which seriously affects the subsequent processing flow, is there any good way to deal with it?

n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
frame_new = np.array(n_frame, copy=False, order='C')
frame_new = cv2.cvtColor(frame_new, cv2.COLOR_RGBA2BGR)
success, buffer = cv2.imencode('.jpg', frame_new)

There is hardware image encoding sample in c/c++. /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-image-meta-test

The rtsp streams are already compressed videos, why do you need to pull the images from the compressed video? What is your pipeline?

Ok, thank you for your reply, the order of element connection, the log and part of the code are as follows, calculated the capacity through the image memory and found that it is very large:

streammux->uridecodebin->queue->nvvideoconvert->capsfilter->fakesink

info.txt (2.8 KB)

code.txt (2.9 KB)

Did you write the wrong order of uridecodebin and nvstreammux?

If you just want to compress to jpeg, you can use the following pipeline

                                                  |--> nvvideoconvert->nvjpegenc->filesink
uridecodebin->streammux->queue->nvstreamdemux --> |--> nvvideoconvert->nvjpegenc->filesink
                                                  |--> nvvideoconvert->nvjpegenc->filesink

1 Like

Your code is just encoding and then fakesink, can you share the reasons and usage scenarios?

In addition, I would like to know your GPU model. I am not sure whether it can support 80-channel simultaneous encoding.

The limit on the number of encoder instances is explained in this table

Finally, you can use the following command to check whether the GPU is fully loaded, and use top command to check the CPU load.

 nvidia-smi dmon  

Thank you for your reply. My GPU is A40, and the hardware should be able to support it. I feel that there is a problem with the code and the stream is blocked. The scenario is that I need to connect to the 100-channel monitoring through rtsp, and then capture the key frames. Put it into redis for subsequent recognition of other AI skills


image

Before checking the code problem, nvinfer cannot provide inference results in real time?

Set the skip-frames property of nvv4l2decoder to 2, then only key frames will be decoded.

and the interval property of nvinfer can specify the number of consecutive batches to skip inference.

You can view them via gst-inspect-1.0.

In addition, if you don’t need real-time processing, you just want to decode and store the imge to redis,
nvstreammux and nvstreamdemux are unnecessary, you can refer to the pipeline below.

rtspsrc location=xxx -> application/x-rtp, media=video, encoding-name=H265 -> rtph265depay -> h265parse -> nvv4l2decoder -> nvvideoconvert --> nvjpegenc --> filesink

Because the business needs to be decoupled, I have to isolate the flow and detection here, so I can’t use the nvinfer component. I don’t need to process the monitoring image of each frame, I just need to extract the key frames. I tried uridecodebin->streammux->queue- >nvvideoconvert->nvjpegenc ->fakesink In this pipeline, the image is indeed compressed at the source of the stream, which is good, but later I found that there were many damaged images, so I referred to this post and yours The pipeline structure mentioned before uses nvstreamdemux, but the current status is that the stream is blocked

If you are sure that you only need to save key frames, configuring the skip-frames property of nvv4l2decoder can still greatly improve performance.

nvstreammux is used to construct batch, it is not needed here.

It is recommended that each rtsp camera channel independently decodes and then stores.

Thank you for your suggestion. How should I modify the code? Are there any examples that I can refer to?

Use gst-launch-1.0 or python code to build this pipeline

Ok I will try it, thanks again for your help and wish you all the best

Sorry to bother you again, but I have problems using the pipeline suggestions you provided before. I have looked through many official python examples, but I still don’t know how to make them work correctly. Can you provide a simple demo? I know you are not obligated to help write the code, but this matter has troubled me for a long time and I can’t write it out. Please help me. Thanks again. Another question is, are you sure you don’t need to use the nvstreammux plug-in? Because I need to use python to access 100 channel at the same time through rtsp and extract their key frames. The monitored i-frame is 50 and the fps is 25. That is, 100 images will be extracted as a cycle in about 2 seconds. In theory, Should I use batch push?

This is a multi-process(avoid GIL), high-performance way to encode data into jpeg images

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import signal
import time
import multiprocessing
import sys

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        loop.quit()
    elif t == Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True


frame_number = 0
def fake_sink_pad_buffer_probe(pad, info, u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    jpeg_data = gst_buffer.extract_dup(0, gst_buffer.get_size())

    # just for debug, save image every 300 frames
    global frame_number
    frame_number += 1
    if frame_number % 300 == 0:
        index = u_data
        print(f"save image to redis {index}")
        with open(f"captured_frame-{time.time()}.jpg", "wb") as f:
            f.write(jpeg_data)

    return Gst.PadProbeReturn.OK


def run_pipeline(index, pipeline_str):
    pipeline = Gst.parse_launch(pipeline_str)
    print(f"{pipeline_str}")
    fake_sink = pipeline.get_by_name(f"fsink_{index}")
    if not fake_sink:
        sys.stderr.write(f"Unable to get fake sink fsink_{index}\n")
        return
    fake_sink_pad = fake_sink.get_static_pad("sink")
    if not fake_sink_pad:
        sys.stderr.write("Unable to get sink pad of fakesink \n")
    fake_sink_pad.add_probe(Gst.PadProbeType.BUFFER,
                            fake_sink_pad_buffer_probe, index)
    pipeline.set_state(Gst.State.PLAYING)
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
    try:
        loop.run()
    except:
        pass
    pipeline.set_state(Gst.State.NULL)


def pool_initializer():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def main(uris):
    # Standard GStreamer initialization
    Gst.init(None)
    index = 0
    results = []
    pool = multiprocessing.Pool(initializer=pool_initializer)
    try:
        for uri in uris:
            # Only decode key frames and reconnect if unable to connect within 20 seconds.
            pipeline_str = f"nvurisrcbin uri={uri} dec-skip-frames=2 rtsp-reconnect-interval=20 ! \
                nvvideoconvert ! nvjpegenc ! fakesink name=fsink_{index}"
            results.append(pool.apply_async(run_pipeline, args=(index, pipeline_str)))
            index += 1
        for result in results:
            result.get()
    except KeyboardInterrupt:
        pool.terminate()


if __name__ == '__main__':
    uris = ["rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1",
            "rtsp://xx.xx.xxx.xxx/media/video1"]
    sys.exit(main(uris))

1 Like

Thank you very much for your reply, you are awesome, I will understand the code, thanks again

The program did not develop as expected, and some streams did not output for a long time. Anyway, thank you for your help. I am studying this example. Maybe it can give me ideas.

You’d better check the i-frame interval of the rtsp stream and whether the rtsp camera can read the data normally.

I think you may decode the key frame and it may not meet your needs.

The above example may not be a better choice

Yes, you are right. It may be due to network problems on the test server or other reasons. I changed a server and the streaming output is now normal. It’s great. You’re absolutely amazing.

But I still have a question, could you please answer it? It is processed by allocating an asynchronous task to each rtsp. For example, if I connect 100 rtsps at the same time, then there will be 100 processes in this process pool? But my CPU only has 26 cores. Isn’t it theoretically possible to create up to 26 processes? Will the program not crash? Currently, I have access to 70 channels of rtsp. I checked the resource usage and found that only 35% of the CPU was used.