Why kafka `partition-key` no effect

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) 3090
• DeepStream Version 7.1 docker

cfg_kafka.txt

[message-broker]
consumer-group-id = card
proto-cfg = "message.timeout.ms=2000;retries=3"
partition-key = sensorId
#share-connection = 1

msg is

{
  "messageid": "6d5be90f-c41a-465c-b7a6-9b669bd9b013",
  "mdsversion": "1.0",
  "@timestamp": "2024-12-05T03:36:06.754Z",
  "sensorId": 2,
  "otherAttrs": "xxxxx"
}

msgbroker = Gst.ElementFactory.make("nvmsgbroker", "msgbroker")
    if not msgbroker:
        logger.error(f"Unable to create msgbroker\n")
    msgbroker.set_property('proto-lib', args.proto_lib)
    msgbroker.set_property('conn-str', args.conn_str)
    msgbroker.set_property('sync', False)
    msgbroker.set_property('topic', args.topic)
    msgbroker.set_property('async', False)
    msgbroker.set_property('config', f"{script_dir}/{args.config}")

why all meg to a same Partition:0

log

Dec  5 04:05:48 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: cfg setting proto_cfg = message.timeout.ms=2000;retries=3#012
Dec  5 04:05:48 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: kafka partition key field name = sensorId#012
Dec  5 04:05:48 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: Kafka Consumer group = card #012
Dec  5 04:05:48 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: set config setting message.timeout.ms to 2000#012
Dec  5 04:05:48 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: set config setting retries to 3#012
Dec  5 04:05:48 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: Kafka connection successful#012
Dec  5 04:05:50 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: no matching json field found                 based on kafka key config; using default partition#012
Dec  5 04:05:50 a5b81c3ab586 python3: DSLOG:NVDS_KAFKA_PROTO: no matching json field found                 based on kafka key config; using default partition#012

update from partition-key = sensorId to partition-key = deviceId

has no DSLOG:NVDS_KAFKA_PROTO: no matching json field found in log

but

id =1 and id =5 all to send partition 5

nvmsgbroker plugin and low-level are opensource in DS7.1. please add log in nvds_msgapi_send_async of \opt\nvidia\deepstream\deepstream-7.1\sources\libs\kafka_protocol_adaptor\nvds_kafka_proto.cpp to check why “no matching json field found” was printed.

when update from partition-key = sensorId to partition-key = deviceId

has no DSLOG:NVDS_KAFKA_PROTO: no matching json field found in log

now want to know Why was the deviceId not sent to the corresponding partition?

deviceId 0 -> Partition 0
deviceId 1 -> Partition 1
deviceId 2 -> Partition 2
deviceId 3 -> Partition 3

please add log in nvds_msgapi_send_async to check if “protomsg.sensorid()” is passed to nvds_kafka_client_send.

in protomsg no sensorid,
protomsg is

{
  "messageid": "aea86cf3-29ea-4257-9f2f-02196e6d7a6f",
  "mdsversion": "1.0",
  "@timestamp": "2024-12-07T11:32:17.432Z",
  "sensor": {
    "id": "1"
  },
  "otherAttrs": ""
}

I’ve tested it, and the partition-keydoesn’t work.

from the function nvds_msgapi_send_async, the code will call json_get_key_value to parse partition_key_field from payload. can you please print payload to check why “no matching json field found” is printed when setting “partition-key = sensorId”?

in nvds_kafka_proto.cpp line 628
protomsg.ParseFromString(string((char *) payload, nbuf));

i am print protomsg is

{
  "messageid": "aea86cf3-29ea-4257-9f2f-02196e6d7a6f",
  "mdsversion": "1.0",
  "@timestamp": "2024-12-07T11:32:17.432Z",
  "sensor": {
    "id": "1"
  },
  "otherAttrs": ""
}

Isn’t this is payload?

could you share the content of payload?

msgbroker.set_property('sync', False)
msgbroker.set_property('topic', args.topic)
msgbroker.set_property('async', False)

IS thsi rtght?

yes. the payload is generated in nvsmgconv plugin. these property setting of nvsmbroker does not modify the payload. if async is true, the code will call nvds_msgapi_send_async.

why no anvds_msgapi_send_async function in nvidia/deepstream/deepstream-7.1/sources/libs/kafka_protocol_adaptor/nvds_kafka_proto.cpp

I can find nvds_msgapi_send_async at line 599 in nvds_kafka_proto.cpp.

when set msgbroker.set_property('async', True)

no msg to broker

**PERF:  {'stream0': 0.0, 'stream1': 0.0} 

why?

first could you check why did fps become 0? could you share the media pipeline?

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.