Please provide complete information as applicable to your setup.
• Hardware Platform (Jetson / GPU) Jetson orin NX
• DeepStream Version 7.0
I need to use multiple appsrc to push images for inference, and metadata must be attached to each frame. However, after I push the frame and attach metadata successfully through the signal callback function of appsrc, I cannot find this metadata by adding probes to the src or sink of any subsequent element in the pipeline. I am confused, even if I add probes to the exit of appsrc, I cannot find the metadata.
My pipeline is as follows,
char pipeline_str[4096];
snprintf(pipeline_str, sizeof(pipeline_str),
"appsrc name=appsrc0 ! jpegdec name=jpegdec0 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_0 "
"appsrc name=appsrc1 ! jpegdec name=jpegdec1 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_1 "
"appsrc name=appsrc2 ! jpegdec name=jpegdec2 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_2 "
"appsrc name=appsrc3 ! jpegdec name=jpegdec3 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_3 "
"appsrc name=appsrc4 ! jpegdec name=jpegdec4 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_4 "
"appsrc name=appsrc5 ! jpegdec name=jpegdec5 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_5 "
"appsrc name=appsrc6 ! jpegdec name=jpegdec6 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_6 "
"appsrc name=appsrc7 ! jpegdec name=jpegdec7 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_7 "
"appsrc name=appsrc8 ! jpegdec name=jpegdec8 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_8 "
"appsrc name=appsrc9 ! jpegdec name=jpegdec9 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_9 "
"appsrc name=appsrc10 ! jpegdec name=jpegdec10 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_10 "
"appsrc name=appsrc11 ! jpegdec name=jpegdec11 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_11 "
"appsrc name=appsrc12 ! jpegdec name=jpegdec12 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_12 "
"appsrc name=appsrc13 ! jpegdec name=jpegdec13 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_13 "
"appsrc name=appsrc14 ! jpegdec name=jpegdec14 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_14 "
"appsrc name=appsrc15 ! jpegdec name=jpegdec15 ! videoconvert ! video/x-raw,format=RGBA ! "
"nvvideoconvert ! video/x-raw(memory:NVMM),format=NV12 ! queue ! mux.sink_15 "
"nvstreammux name=mux batch-size=8 width=640 height=640 live-source=0 ! "
// "nvinfer name=infer config-file-path=%s ! nvdsosd name=osd ! nvvideoconvert ! video/x-raw,format=RGBA ! appsink name=sink",
"nvinfer name=infer config-file-path=%s ! "
// "fakesink name=sink sync=false",
"nvvideoconvert ! "
"nvdsosd name=osd display-text=1 ! "
"nvv4l2h264enc bitrate=4000000 preset-level=1 ! "
"h264parse ! "
"matroskamux ! "
"filesink location=output11.mkv",
CONFIG_PATH);
GError *err = NULL;
pipeline = gst_parse_launch(pipeline_str, &err);
if (!pipeline)
{
g_printerr("Pipeline creation failed: %s\n", err->message);
return -1;
}
printf("使用的GStreamer管道: %s\n", pipeline_str);
appsrc0 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc0");
appsrc1 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc1");
appsrc2 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc2");
appsrc3 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc3");
appsrc4 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc4");
appsrc5 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc5");
appsrc6 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc6");
appsrc7 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc7");
appsrc8 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc8");
appsrc9 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc9");
appsrc10 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc10");
appsrc11 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc11");
appsrc12 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc12");
appsrc13 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc13");
appsrc14 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc14");
appsrc15 = gst_bin_get_by_name(GST_BIN(pipeline), "appsrc15");
// appsink = gst_bin_get_by_name(GST_BIN(pipeline), "sink");
GstElement *infer = gst_bin_get_by_name(GST_BIN(pipeline), "infer");
for (int i = 0; i < 4; i++)
{
gchar *jpegdec_name = g_strdup_printf("jpegdec%d", i);
GstElement *jpegdec = gst_bin_get_by_name(GST_BIN(pipeline), jpegdec_name);
if (jpegdec)
{
GstPad *jpegdec_src_pad = gst_element_get_static_pad(jpegdec, "src");
if (jpegdec_src_pad)
{
gst_pad_add_probe(jpegdec_src_pad, GST_PAD_PROBE_TYPE_BUFFER,
jpegdec_src_probe, NULL, NULL);
gst_object_unref(jpegdec_src_pad);
}
gst_object_unref(jpegdec);
}
g_free(jpegdec_name);
}
// 添加探针到 nvinfer 的 src pad
GstPad *infer_src_pad = gst_element_get_static_pad(infer, "src");
if (!infer_src_pad)
{
g_printerr("Unable to get src pad from nvinfer element\n");
}
else
{
gst_pad_add_probe(infer_src_pad, GST_PAD_PROBE_TYPE_BUFFER,
osd_sink_pad_buffer_probe, NULL, NULL);
gst_object_unref(infer_src_pad);
}
// g_object_set(appsink, "emit-signals", TRUE, NULL);
// g_signal_connect(appsink, "new-sample", G_CALLBACK(on_new_sample), NULL);
g_signal_connect(appsrc0, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc1, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc2, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc3, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc4, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc5, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc6, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc7, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc8, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc9, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc10, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc11, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc12, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc13, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc14, "need-data", G_CALLBACK(push_sample), NULL);
g_signal_connect(appsrc15, "need-data", G_CALLBACK(push_sample), NULL);
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
gst_bus_add_signal_watch(bus);
g_signal_connect(bus, "message", G_CALLBACK(on_bus_message), NULL);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(loop);
My probe and signal callback appsrc push frame function is as follows:
void push_sample(GstElement *src, guint length, gpointer user_data)
{
int source_id = -1;
if (src == appsrc0)
source_id = 0;
else if (src == appsrc1)
source_id = 1;
else if (src == appsrc2)
source_id = 2;
else if (src == appsrc3)
source_id = 3;
else if (src == appsrc4)
source_id = 4;
else if (src == appsrc5)
source_id = 5;
else if (src == appsrc6)
source_id = 6;
else if (src == appsrc7)
source_id = 7;
else if (src == appsrc8)
source_id = 8;
else if (src == appsrc9)
source_id = 9;
else if (src == appsrc10)
source_id = 10;
else if (src == appsrc11)
source_id = 11;
else if (src == appsrc12)
source_id = 12;
else if (src == appsrc13)
source_id = 13;
else if (src == appsrc14)
source_id = 14;
else if (src == appsrc15)
source_id = 15;
else
{
g_printerr("未知 appsrc 地址\n");
return;
}
g_mutex_lock(&prepared_queue_mutex);
// 等待预处理的数据,但设置超时避免死锁
while (g_queue_is_empty(prepared_tile_queue) && !preprocessing_finished)
{
gint64 end_time = g_get_monotonic_time() + 10 * G_TIME_SPAN_MILLISECOND; // 10ms超时
if (!g_cond_wait_until(&prepared_queue_cond, &prepared_queue_mutex, end_time))
{
printf("等待预处理数据超时\n");
continue;
}
}
PreparedTile *prepared = g_queue_pop_head(prepared_tile_queue);
g_mutex_unlock(&prepared_queue_mutex);
if (!prepared)
{
// 没有更多数据,结束流
static gboolean eos_sent = FALSE;
if (!eos_sent && preprocessing_finished)
{
printf("所有瓦片处理完成,发送EOS\n");
gst_app_src_end_of_stream(GST_APP_SRC(src));
eos_sent = TRUE;
}
return;
}
// 创建GstBuffer - 使用gst_buffer_fill而不是map/unmap
GstBuffer *buffer = gst_buffer_new_allocate(NULL, prepared->jpeg_size, NULL);
if (!buffer)
{
g_print("Source %d: Failed to allocate buffer\n", source_id);
free(prepared->jpeg_data);
free(prepared);
return;
}
gst_buffer_fill(buffer, 0, prepared->jpeg_data, prepared->jpeg_size);
// 直接将PreparedTile中的TileInfo放入队列 保证绝对的一一对应
TileInfo *info = malloc(sizeof(TileInfo));
*info = prepared->tile_info; // 从prepared中取出,保证100%对应
g_queue_push_tail(tile_info_queue, info);
// 添加时间戳元数据
struct timeval precise_time;
gchar *timestamp_str = get_precise_timestamp(&precise_time);
if (timestamp_str)
{
printf("push_sample Timestamp: source_id=%d, timestamp=%s, num=%d\n",
source_id, timestamp_str, q++);
// 分配时间戳数据结构
CustomTimestampData *timestamp_data = g_malloc0(sizeof(CustomTimestampData));
if (timestamp_data)
{
// 安全地复制字符串,确保以null结尾
g_strlcpy(timestamp_data->timestamp_str, timestamp_str, sizeof(timestamp_data->timestamp_str));
timestamp_data->source_id = source_id;
timestamp_data->precise_time = precise_time;
// 添加DeepStream元数据
NvDsMeta *dsmeta = gst_buffer_add_nvds_meta(buffer, timestamp_data, NULL,
copy_timestamp_meta, release_timestamp_meta);
if (dsmeta)
{
dsmeta->meta_type = NVDS_USER_META;
printf("Source %d: Successfully added timestamp metadata\n", source_id);
}
else
{
g_print("Source %d: Failed to add timestamp metadata to frame\n",
source_id);
g_free(timestamp_data);
}
}
else
{
g_print("Source %d: Failed to allocate timestamp data for frame\n",
source_id);
}
g_free(timestamp_str);
}
else
{
g_print("Source %d: Failed to get timestamp for frame\n",
source_id);
}
// 推送buffer到pipeline - 使用g_signal_emit_by_name而不是gst_app_src_push_buffer
GstFlowReturn ret;
g_signal_emit_by_name(src, "push-buffer", buffer, &ret);
if (ret == GST_FLOW_OK)
{
ImageInfo *current_img = &images_info[prepared->tile_info.image_index];
if ((info->image_index + 1) % 10 == 0 && info->tile_index + 1 == current_img->total_tiles)
{
printf("Appsrc推送第 %d 张图像(共 %d 张)的第 %d 个瓦片(共 %d 个),位置 (%d, %d);推送队列长度: %d\n",
info->image_index + 1, total_images,
info->tile_index + 1, current_img->total_tiles,
info->tile_x, info->tile_y, g_queue_get_length(tile_info_queue));
}
}
else if (ret != GST_FLOW_FLUSHING)
{
g_print("Source %d: Push buffer failed: %s\n", source_id, gst_flow_get_name(ret));
}
gst_buffer_unref(buffer);
// 清理预处理数据
free(prepared->jpeg_data);
free(prepared);
}
static GstFlowReturn on_new_sample(GstAppSink *sink, gpointer user_data)
{
GstSample *sample = gst_app_sink_pull_sample(sink);
if (sample)
{
gst_sample_unref(sample); // 不处理,释放即可
}
return GST_FLOW_OK;
}
// 在 appsrc 的 src pad 添加探针
static GstPadProbeReturn appsrc_src_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER(info);
g_print("appsrc src buffer address: %p\n", buffer);
// 检查元数据
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buffer);
if (batch_meta)
{
g_print("Found batch meta in appsrc src!\n");
}
else
{
g_print("No batch meta in appsrc src\n");
}
return GST_PAD_PROBE_OK;
}
GstPadProbeReturn osd_sink_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
GstBuffer *buf = (GstBuffer *)info->data;
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);
if (!batch_meta)
return GST_PAD_PROBE_OK;
// g_print("=== OSD Probe: Processing batch with %d frames ===\n", batch_meta->num_frames_in_batch);
for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
{
NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
if (!frame_meta)
continue;
for (NvDsMetaList *l_user = frame_meta->frame_user_meta_list; l_user != NULL; l_user = l_user->next)
{
NvDsUserMeta *user_meta = (NvDsUserMeta *)(l_user->data);
if (!user_meta || user_meta->base_meta.meta_type != NVDS_USER_META)
continue;
CustomTimestampData *timestamp_data = (CustomTimestampData *)user_meta->user_meta_data;
if (timestamp_data)
{
g_print(" OSD Timestamp: source_id=%d, timestamp=%s\n",
timestamp_data->source_id, timestamp_data->timestamp_str);
}
}
}
for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
{
NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
TileInfo *tile_info = (TileInfo *)g_queue_pop_head(tile_info_queue);
if (!tile_info)
{
g_printerr("Tile queue underflow!\n");
continue;
}
int img_idx = tile_info->image_index;
int tile_x = tile_info->tile_x;
int tile_y = tile_info->tile_y;
// 原有逻辑改为用 tile_info 来更新检测信息
int count = 0;
for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj; l_obj = l_obj->next)
{
NvDsObjectMeta *obj = (NvDsObjectMeta *)(l_obj->data);
if (obj->class_id < 0 || obj->class_id > 4)
continue;
count++;
}
if (count > 0)
{
int prev = detection_counts[img_idx];
detection_counts[img_idx] += count;
all_detections[img_idx] = realloc(all_detections[img_idx], detection_counts[img_idx] * sizeof(DetectionResult));
int idx = prev;
for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj; l_obj = l_obj->next)
{
NvDsObjectMeta *obj = (NvDsObjectMeta *)(l_obj->data);
if (obj->class_id < 0 || obj->class_id > 4)
continue;
all_detections[img_idx][idx].label = labels[obj->class_id];
all_detections[img_idx][idx].confidence = obj->confidence;
all_detections[img_idx][idx].x = tile_x + obj->rect_params.left;
all_detections[img_idx][idx].y = tile_y + obj->rect_params.top;
all_detections[img_idx][idx].width = obj->rect_params.width;
all_detections[img_idx][idx].height = obj->rect_params.height;
idx++;
}
}
free(tile_info); // 用完释放
}
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn jpegdec_src_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER(info);
if (!buffer)
return GST_PAD_PROBE_OK;
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buffer);
if (batch_meta)
{
g_print("=== JPEGDec Src Probe: Batch meta found ===\n");
}
else
{
g_print("=== JPEGDec Src Probe: No batch meta found ===\n");
}
return GST_PAD_PROBE_OK;
}
clear
Qídài nǐ de huífù xièxiè
8 / 5,000
翻译结果
翻译结果
Looking forward to your reply, thanks