Segfault on Removing the Last Stream From the Pipeline

• Hardware Platform (Jetson / GPU): GPU
• DeepStream Version: 6.4
• JetPack Version (valid for Jetson only): -
• TensorRT Version: 8.6.1.6-1+cuda12.0
• NVIDIA GPU Driver Version (valid for GPU only): 535.183.01
• Issue Type (questions, new requirements, bugs): questions, bugs
• How to reproduce the issue? (This is for bugs. Including which sample app is used, the configuration files content, the command line used, and other details for reproducing): runtime_source_add_delete and deepstream_test_4
• Requirement details (This is for the new requirement. Including the module name which plugin or for which sample application, and the function description): uridecodebin → nvstreammux → nvinfer → nvtracker → nvinfer → nvinfer → nvvideoconvert → capsfilter → nvmsgconv → nvmsgbroker

I have a pipeline implemented in Python by combining the Deepstream samples runtime_source_add_delete and deepstream_test_4. It adds multiple sources to the pipeline, then attaches metadata to each buffer using a probe function defined on the source pad of the Capsfilter, and finally publishes event messages through Redis using Nvmsgconv and Nvmsgbroker elements. You can find the code below:

# ruff: noqa: E402
# fmt: off
import sys

import gi  # type: ignore

gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst  # type: ignore

sys.path.append("../")
Gst.init(None)
# fmt: on

import configparser
import logging
import time
from enum import IntEnum
from threading import Thread

import cv2  # type: ignore
import numpy as np  # type: ignore
import pyds  # type: ignore
from common.platform_info import PlatformInfo  # type: ignore
from common.utils import long_to_uint64  # type: ignore
from numpy.typing import NDArray  # type: ignore

GPU_ID = 0
MAX_NUM_SOURCES = 4
UNTRACKED_OBJECT_ID = -1
MAX_TIMESTAMP_LENGTH = 32

STREAM_MUXER_RESOLUTION = (1920, 1080)
STREAM_MUXER_BATCH_SIZE = 30
STREAM_MUXER_BATCHED_PUSH_TIMEOUT = 25000

PRIMARY_INFERENCE_CONFIG_FILE = "dstest_pgie_config.txt"

TRACKER_CONFIG_FILE = "dstest_tracker_config.txt"

SECONDARY_INFERENCE_1_CONFIG_FILE = "dstest_sgie1_config.txt"
SECONDARY_INFERENCE_2_CONFIG_FILE = "dstest_sgie2_config.txt"

MESSAGE_CONVERTER_CONFIG_FILE = "dstest4_msgconv_config.txt"
MESSAGE_CONVERTER_SCHEMA_TYPE = 0

MESSAGE_BROKER_PROTO_LIB = (
    "/opt/nvidia/deepstream/deepstream/lib/libnvds_redis_proto.so"
)
MESSAGE_BROKER_CONFIG = "/opt/nvidia/deepstream/deepstream/sources/libs/redis_protocol_adaptor/cfg_redis.txt"
MESSAGE_BROKER_CONN_STR = "redis;6379;test"
MESSAGE_BROKER_TOPIC = "test"

g_num_sources = 0
g_source_id_list = [-1] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES
g_uri = "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4"

platform_info = None
loop = None
pipeline = None
stream_muxer = None

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class ClassID(IntEnum):
    VEHICLE = 0
    BICYCLE = 1
    PERSON = 2
    ROADSIGN = 3


def generate_vehicle_meta() -> pyds.NvDsVehicleObject:
    obj = pyds.alloc_nvds_vehicle_object()
    obj = pyds.NvDsVehicleObject.cast(obj)

    obj.type = "sedan"
    obj.color = "blue"
    obj.make = "Bugatti"
    obj.model = "M"
    obj.license = "XX1234"
    obj.region = "CA"

    return obj


def generate_person_meta() -> pyds.NvDsPersonObject:
    obj = pyds.alloc_nvds_person_object()
    obj = pyds.NvDsPersonObject.cast(obj)

    obj.age = 45
    obj.cap = "none"
    obj.hair = "black"
    obj.gender = "male"
    obj.apparel = "formal"

    return obj


def get_frame_array(gst_buffer: Gst.Buffer, batch_id: int) -> NDArray:
    frame_array = pyds.get_nvds_buf_surface(hash(gst_buffer), batch_id)
    frame_array = np.ascontiguousarray(frame_array)
    frame_array = np.frombuffer(frame_array, dtype=np.uint8)

    width, height = STREAM_MUXER_RESOLUTION
    frame_array = frame_array.reshape(height, width, 3)
    frame_array = cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB)

    return frame_array


def generate_event_metadata(
    source_id: int,
    frame_id: int,
    user_meta: pyds.NvDsUserMeta,
    object_meta: pyds.NvDsObjectMeta,
) -> pyds.NvDsEventMsgMeta:
    metadata = pyds.alloc_nvds_event_msg_meta(user_meta)
    metadata = pyds.NvDsEventMsgMeta.cast(metadata)

    metadata.sensorId = 0
    metadata.placeId = 0
    metadata.moduleId = 0
    metadata.sensorStr = "sensor-0"

    metadata.ts = pyds.alloc_buffer(MAX_TIMESTAMP_LENGTH + 1)
    pyds.generate_ts_rfc3339(metadata.ts, MAX_TIMESTAMP_LENGTH)

    metadata.bbox.top = object_meta.rect_params.top
    metadata.bbox.left = object_meta.rect_params.left
    metadata.bbox.width = object_meta.rect_params.width
    metadata.bbox.height = object_meta.rect_params.height

    metadata.frameId = frame_id
    metadata.trackingId = long_to_uint64(object_meta.object_id)
    metadata.confidence = object_meta.confidence

    match object_meta.class_id:
        case ClassID.VEHICLE:
            metadata.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
            metadata.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
            metadata.objClassId = ClassID.VEHICLE
            metadata.extMsg = generate_vehicle_meta()
            metadata.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
        case ClassID.PERSON:
            metadata.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
            metadata.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON
            metadata.objClassId = ClassID.PERSON
            metadata.extMsg = generate_person_meta()
            metadata.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)

    return metadata


def attach_metadata(
    frame_meta: pyds.NvDsFrameMeta,
    user_meta: pyds.NvDsUserMeta,
    object_meta: pyds.NvDsObjectMeta,
    frame_array: NDArray,
    source_id: int,
) -> None:
    event_metadata = generate_event_metadata(
        source_id=source_id,
        frame_id=frame_meta.frame_num,
        user_meta=user_meta,
        object_meta=object_meta,
    )

    user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
    user_meta.user_meta_data = event_metadata
    pyds.nvds_add_user_meta_to_frame(frame_meta, user_meta)


def iterate_object_meta_list(
    batch_meta: pyds.NvDsBatchMeta,
    frame_meta: pyds.NvDsFrameMeta,
    frame_array: NDArray,
    source_id: int,
) -> None:
    l_obj = frame_meta.obj_meta_list
    while l_obj is not None:
        try:
            object_meta = pyds.NvDsObjectMeta.cast(l_obj.data)

            tracking_id = long_to_uint64(object_meta.object_id)
            if tracking_id == UNTRACKED_OBJECT_ID:
                logger.debug("No tracking ID found for object")
                raise ValueError

            user_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            if user_meta:
                attach_metadata(
                    frame_meta, user_meta, object_meta, frame_array, source_id
                )
            else:
                logger.error("Failed to acquire user meta")
        except StopIteration:
            logger.debug("Iterating object meta list is over")
        except ValueError:
            logger.debug("Skipping an invalid object meta")
        finally:
            try:
                l_obj = l_obj.next
            except StopIteration:
                break


def extract_metadata(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn:
    global g_source_id_list

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        logger.warning("Unable to get buffer from pad")
        return Gst.PadProbeReturn.DROP

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    if not batch_meta:
        return Gst.PadProbeReturn.OK

    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            if frame_meta.pad_index not in g_source_id_list:
                logger.warning(f"No source found for pad index {frame_meta.pad_index}")
                raise StopIteration

            source_id = g_source_id_list[frame_meta.pad_index]
            logger.debug(
                f"Extracting metadata for stream '{source_id}' at frame {frame_meta.frame_num}"
            )

            frame_array = get_frame_array(gst_buffer, frame_meta.batch_id)
            iterate_object_meta_list(batch_meta, frame_meta, frame_array, source_id)
        except StopIteration:
            logger.debug("Iterating frame metadata list is over")
        finally:
            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    return Gst.PadProbeReturn.OK


def source_bin_child_added(
    child_proxy: Gst.ChildProxy,
    emitter_object: Gst.Object,
    element_name: str,
    source_id: int,
) -> None:
    global platform_info

    logger.debug(f"Element '{element_name}' added for source '{source_id}'")

    # decodebin
    if element_name.startswith("decodebin"):
        emitter_object.connect("child-added", source_bin_child_added, source_id)

    # nvv4l2decoder
    elif element_name.startswith("nvv4l2decoder"):
        if platform_info.is_integrated_gpu():  # type: ignore
            emitter_object.set_property("enable-max-performance", True)
            emitter_object.set_property("drop-frame-interval", 0)
            emitter_object.set_property("num-extra-surfaces", 0)
        else:
            emitter_object.set_property("gpu_id", GPU_ID)

    # source
    elif element_name.startswith("source"):
        source_element = child_proxy.get_by_name(element_name)
        if source_element.find_property("drop-on-latency") is not None:
            emitter_object.set_property("drop-on-latency", True)

        # rtspsrc
        if source_element.get_factory().get_name() == "rtspsrc":
            emitter_object.set_property("protocols", "tcp")
            emitter_object.set_property("tcp-timeout", 5000000)
            emitter_object.set_property("buffer-mode", "buffer")


def source_bin_pad_added(
    source_bin: Gst.Element,
    source_pad: Gst.Pad,
    source_id: int,
) -> None:
    global stream_muxer

    source_caps: Gst.Caps = source_pad.get_current_caps()
    source_name: str = source_caps.get_structure(0).get_name()
    if not source_name.startswith("video"):
        logger.debug(f"Skipped adding non-video pad [{source_name}]")
        return

    sink_name = f"sink_{source_id}"
    sink_pad = stream_muxer.request_pad_simple(sink_name)  # type: ignore
    if not sink_pad:
        raise ValueError("Unable to create sink pad")

    result = source_pad.link(sink_pad)
    if result != Gst.PadLinkReturn.OK:
        raise ValueError(f"Failed to link pad to stream muxer for stream '{source_id}'")

    logger.debug(f"Added new '{sink_name}' pad for stream '{source_id}'")


def remove_source(source_id: int) -> bool:
    global g_num_sources
    global g_source_bin_list

    global stream_muxer
    global pipeline

    logger.info(f"Removing source '{source_id}'")

    source_bin = g_source_bin_list[source_id]

    result = source_bin.set_state(Gst.State.NULL)  # type: ignore
    if result == Gst.StateChangeReturn.FAILURE:
        logger.error(f"Failed to set source state to NULL for stream '{source_id}'")
        return False

    sink_name = f"sink_{source_id}"
    sink_pad = stream_muxer.get_static_pad(sink_name)  # type: ignore
    if sink_pad is None:
        logger.error(f"Sink pad '{sink_name}' for source '{source_id}' not found")
        return False

    sink_pad.send_event(Gst.Event.new_flush_stop(False))
    stream_muxer.release_request_pad(sink_pad)  # type: ignore
    pipeline.remove(source_bin)  # type: ignore

    g_source_enabled[source_id] = False
    g_source_id_list[source_id] = -1
    g_source_bin_list[source_id] = None
    g_num_sources -= 1

    return True


def add_source(source_id: int) -> bool:
    global g_num_sources
    global g_source_enabled
    global g_source_bin_list
    global g_source_id_list
    global g_uri

    global pipeline

    logger.info(f"Adding source '{source_id}'")

    logger.debug(f"Creating source bin for stream '{source_id}' [{g_uri}]")
    source_bin = Gst.ElementFactory.make("uridecodebin", f"source-{source_id}")
    if source_bin is None:
        raise ValueError(
            f"Unable to create source bin for stream '{source_id}' [{g_uri}]"
        )
    source_bin.set_property("uri", g_uri)
    source_bin.connect("pad-added", source_bin_pad_added, source_id)
    source_bin.connect("child-added", source_bin_child_added, source_id)

    pipeline.add(source_bin)  # type: ignore
    result = source_bin.set_state(Gst.State.PLAYING)
    if result == Gst.StateChangeReturn.FAILURE:
        logger.error(f"Failed to set source state to PLAYING for stream '{source_id}'")
        return False

    g_source_enabled[source_id] = True
    g_source_id_list[source_id] = source_id
    g_source_bin_list[source_id] = source_bin
    g_num_sources += 1

    return True


def run() -> None:
    global loop
    global pipeline

    pipeline.set_state(Gst.State.PAUSED)  # type: ignore
    pipeline.set_state(Gst.State.PLAYING)  # type: ignore

    try:
        loop.run()  # type: ignore
    except Exception:
        print("Error happend while processing pipeline")

    pipeline.set_state(Gst.State.NULL)  # type: ignore


def start() -> Thread:
    thread = Thread(target=run, daemon=True)
    thread.start()
    return thread


def stop() -> None:
    global loop

    loop.quit()  # type: ignore


def handle_message(bus: Gst.Bus, message: Gst.Message) -> bool:
    global g_eos_list

    global loop

    logger.debug("Handle received message")

    match message.type:
        case Gst.MessageType.EOS:
            logger.info("Pipeline recieved EOS! Stopping pipeline...")
            stop()
        case Gst.MessageType.WARNING:
            warning, debug = message.parse_warning()
            logger.warning(f"Pipeline Warning: {warning}")
            logger.debug(f"Pipeline Warning Debug: {debug}")
        case Gst.MessageType.ERROR:
            error, debug = message.parse_error()
            logger.error(f"Pipeline Error: {error}")
            logger.debug(f"Pipeline Error Debug: {debug}")
            logger.info("Pipeline recieved an error! Stopping pipeline...")
            stop()
        case Gst.MessageType.ELEMENT:
            struct = message.get_structure()
            if struct is not None and struct.has_name("stream-eos"):
                parsed, stream_id = struct.get_uint("stream-id")
                if parsed:
                    logger.info(f"Received EOS for stream {stream_id}")
                    g_eos_list[stream_id] = True
                    remove_source(stream_id)

    return True


def create_pipeline():
    global platform_info
    global loop
    global pipeline
    global stream_muxer

    loop = GLib.MainLoop()
    platform_info = PlatformInfo()

    pipeline = Gst.Pipeline()
    if not pipeline:
        raise ValueError("Unable to create Pipeline")
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", handle_message)

    logger.debug("Creating stream muxer")
    stream_muxer = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
    if stream_muxer is None:
        raise ValueError("Unable to create stream muxer")
    width, height = STREAM_MUXER_RESOLUTION
    stream_muxer.set_property("gpu_id", GPU_ID)
    stream_muxer.set_property("batch-size", STREAM_MUXER_BATCH_SIZE)
    stream_muxer.set_property("batched-push-timeout", STREAM_MUXER_BATCHED_PUSH_TIMEOUT)
    stream_muxer.set_property("live-source", 1)
    stream_muxer.set_property("width", width)
    stream_muxer.set_property("height", height)
    stream_muxer.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_UNIFIED))

    logger.debug("Creating primary inference")
    primary_inference = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if primary_inference is None:
        raise ValueError("Unable to create primary inference")
    primary_inference.set_property("config-file-path", PRIMARY_INFERENCE_CONFIG_FILE)
    primary_inference.set_property("gpu_id", GPU_ID)
    batch_size = primary_inference.get_property("batch-size")
    if batch_size < MAX_NUM_SOURCES:
        logger.warning(
            f"Overriding infer-config batch-size {batch_size} with number of sources {MAX_NUM_SOURCES}"
        )
    primary_inference.set_property("batch-size", MAX_NUM_SOURCES)

    logger.debug("Creating tracker")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if tracker is None:
        raise ValueError("Unable to create tracker")
    config = configparser.ConfigParser()
    config.read(TRACKER_CONFIG_FILE)
    config.sections()
    for key in config["tracker"]:
        if key == "tracker-width":
            tracker_width = config.getint("tracker", key)
            tracker.set_property("tracker-width", tracker_width)
        if key == "tracker-height":
            tracker_height = config.getint("tracker", key)
            tracker.set_property("tracker-height", tracker_height)
        if key == "gpu-id":
            tracker_gpu_id = config.getint("tracker", key)
            tracker.set_property("gpu_id", tracker_gpu_id)
        if key == "ll-lib-file":
            tracker_ll_lib_file = config.get("tracker", key)
            tracker.set_property("ll-lib-file", tracker_ll_lib_file)
        if key == "ll-config-file":
            tracker_ll_config_file = config.get("tracker", key)
            tracker.set_property("ll-config-file", tracker_ll_config_file)
        if key == "enable-batch-process":
            tracker_enable_batch_process = config.getint("tracker", key)
            tracker.set_property("enable_batch_process", tracker_enable_batch_process)

    logger.debug("Creating secondary inference 1")
    secondary_inference_1 = Gst.ElementFactory.make(
        "nvinfer", "secondary1-nvinference-engine"
    )
    if secondary_inference_1 is None:
        raise ValueError("Unable to create secondary inference 1")
    secondary_inference_1.set_property("gpu_id", GPU_ID)
    secondary_inference_1.set_property(
        "config-file-path", SECONDARY_INFERENCE_1_CONFIG_FILE
    )

    logger.debug("Creating secondary inference 2")
    secondary_inference_2 = Gst.ElementFactory.make(
        "nvinfer", "secondary2-nvinference-engine"
    )
    if secondary_inference_2 is None:
        raise ValueError("Unable to create secondary inference 1")
    secondary_inference_2.set_property("gpu_id", GPU_ID)
    secondary_inference_2.set_property(
        "config-file-path", SECONDARY_INFERENCE_2_CONFIG_FILE
    )

    logger.debug("Creating video converter")
    video_converter = Gst.ElementFactory.make("nvvideoconvert", "video-convertor")
    if video_converter is None:
        raise ValueError("Unable to create video converter")
    video_converter.set_property("gpu_id", GPU_ID)
    video_converter.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_UNIFIED))

    logger.debug("Creating filter")
    filter = Gst.ElementFactory.make("capsfilter", "filter")
    if filter is None:
        raise ValueError("Unable to create filter")
    filter_caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGB")
    filter.set_property("caps", filter_caps)
    src_pad = filter.get_static_pad("src")
    if src_pad is None:
        raise ValueError("Unable to get src pad of filter")
    src_pad.add_probe(Gst.PadProbeType.BUFFER, extract_metadata)

    logger.debug("Creating message converter")
    message_converter = Gst.ElementFactory.make("nvmsgconv", "message-converter")
    if message_converter is None:
        raise ValueError("Unable to create message converter")
    message_converter.set_property("config", MESSAGE_CONVERTER_CONFIG_FILE)
    message_converter.set_property("payload-type", MESSAGE_CONVERTER_SCHEMA_TYPE)
    message_converter.set_property("msg2p-newapi", False)

    logger.debug("Creating message broker")
    message_broker = Gst.ElementFactory.make("nvmsgbroker", "message-broker")
    if message_broker is None:
        raise ValueError("Unable to create message broker")
    message_broker.set_property("sync", False)
    message_broker.set_property("proto-lib", MESSAGE_BROKER_PROTO_LIB)
    message_broker.set_property("conn-str", MESSAGE_BROKER_CONN_STR)
    message_broker.set_property("topic", MESSAGE_BROKER_TOPIC)
    if MESSAGE_BROKER_CONFIG:
        message_broker.set_property("config", MESSAGE_BROKER_CONFIG)

    logger.info("Adding elements to Pipeline")
    pipeline.add(stream_muxer)
    pipeline.add(primary_inference)
    pipeline.add(tracker)
    pipeline.add(secondary_inference_1)
    pipeline.add(secondary_inference_2)
    pipeline.add(video_converter)
    pipeline.add(filter)
    pipeline.add(message_converter)
    pipeline.add(message_broker)

    logger.info("Linking elements in the Pipeline")
    stream_muxer.link(primary_inference)
    primary_inference.link(tracker)
    tracker.link(secondary_inference_1)
    secondary_inference_1.link(secondary_inference_2)
    secondary_inference_2.link(video_converter)
    video_converter.link(filter)
    filter.link(message_converter)
    message_converter.link(message_broker)


if __name__ == "__main__":
    create_pipeline()
    thread = start()
    time.sleep(120)

    for source_id in range(4):
        add_source(source_id)
        time.sleep(10)

    for source_id in range(4):
        remove_source(source_id)
        time.sleep(5)

    stop()
    while thread.is_alive():
        time.sleep(1)

In this sample, I add 4 streams to the pipeline and then remove them one by one. However, when the last source-bin is removed from the pipeline by calling pipeline.remove(source_bin), I receive a segfault as follows:

WARNING:__main__:Pipeline Warning: gst-resource-error-quark: No Sources found at the input of muxer. Waiting for sources. (3)
DEBUG:__main__:Pipeline Warning Debug: gstnvstreammux.cpp(2825): gst_nvstreammux_src_push_loop (): /GstPipeline:pipeline0/GstNvStreamMux:stream-muxer
Segmentation fault (core dumped)

Furthermore, according to the core dump, it seems the Nvvideoconvert is problematic. Here is the parsed core dump of my script:

[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/x86_64-linux-gnu/libthread_db.so.1".
--Type <RET> for more, q to quit, c to continue without paging--
Core was generated by `python3 sample.py'.

Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x00007f725c7569e5 in ?? () from /usr/lib/x86_64-linux-gnu/gstreamer-1.0/deepstream/libgstnvvideoconvert.so
[Current thread is 1 (Thread 0x7f71edfff640 (LWP 783))]

Any suggestions or guidance would be greatly appreciated!

Let’s narrow down the problem first. Will there still be a crash issue if you use fakesink, like filter.link(fakesink)?

1 Like

Per your suggestion, I replaced the Nvmsgconv and Nvmsgbroker elements with a Fakesink, but the pipeline crashed again after removing the last source for the same reason.

Furthermore, I was suspicious about the probe function defined on the source pad of the Capsfilter for extracting the metadata. So, I tried removing that function but it didn’t help.

Here is the core dump after this try:

[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/x86_64-linux-gnu/libthread_db.so.1".
--Type <RET> for more, q to quit, c to continue without paging--
Core was generated by `python3 sample.py'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x00007f4cb86bc9e5 in ?? () from /usr/lib/x86_64-linux-gnu/gstreamer-1.0/deepstream/libgstnvvideoconvert.so
[Current thread is 1 (Thread 0x7f4c45fff640 (LWP 1494))]

Also, I tried something else. I changed the format of the Capsfilter to NV12 and the segfault didn’t happen again! But the pipeline keeps crashing when I use RGB or RGBA formats. I need this conversion to get the frame in NumPy format using get_nvds_buf_surface and store a JPEG image after all.

Here is my snippet to create the Capsfilter element:

filter = Gst.ElementFactory.make("capsfilter", "filter")
if filter is None:
    raise ValueError("Unable to create filter")
filter_caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=(string)RGB")
filter.set_property("caps", filter_caps)