I have an application where I need to receive multiple IPv4/UDP flows that have separate multicast group destination addresses but a common destination port number. I am using the ibverbs API, and I would like to receive them using a single receive queue so that I get all of the packets in the order they were transmitted by the data source (an external piece of custom hardware). This works well by creating a flow matcher that simply matches on the destination UDP port number; I am able to receive the packets from the data source with no ordering issues.
However, I have created a simulator for the custom hardware to aid in development. It uses an mlx5 NIC (I have used both ConnectX-5 and ConnectX-6 Dx) on an Ubuntu 22.04 server to transmit the UDP flows in the same way that the custom hardware would. I then loop the transmit connection back to the receiver (I have done so both via a 100-GbE switch and by a direct DAC cable loopback).
When I do this, I observe that occasionally packets are received out of order. Typically, there will be just two packets swapped in their places in the receive queue, but occasionally, there will be larger jumps where packets might be received 3-4 packets out of the proper order. Note that packets are not dropped, just reordered from where they would be expected.
The key observation I have made is that this only seems to occur when the stream of packets contains more than one separate multicast flow. If the input packets all have the same destination IP address, then my test always completes with no packet order problems. However, if the stream of packets contains multiple interleaved IPv4/UDP flows, then the ordering issue is observed.
Here is the source code for a C++ test program that demonstrates the issue on my system. Instructions on how to build/run it are shown in the comments at the top. To run it, simply connect the two ports together on an mlx5 device (e.g. ConnectX-6 Dx) using a cable. You can observe that running it with one UDP flow completes the test successfully (sending 1 billion packets with no ordering issues), while increasing to more than one flow fails the test within a second or two.
// Example program that demonstrates unpredictable transmit ordering across separate IPv4/UDP flows on mlx5 devices.
// It uses the ibverbs API to create a single send queue on one mlx5 device and a single receive queue on another.
// The two ports should be wired directly to one another via a cable. A flow matcher is created to route all of the
// test traffic to the receive queue. All of the transmitted packets are routed through the single send queue.
//
// The test can be done with a configurable number of separate IPv4/UDP flows. The test will continually send packets
// from one device and check whether they are received in the same order on the other device. If any packets are
// observed out of order at the receiver, the test will abort. Otherwise, the test will run for a duration of one
// billion packets.
//
// Building:
//
// This program can be built using the following command line (tested on CentOS 8.5 with gcc 8.5.0):
//
// g++ tx-order.cc -o tx-order -libverbs
//
// Usage:
//
// ./tx-order [number of UDP flows]
//
// This will run a loopback test with the specified number of UDP flows. Each flow shares a common port number,
// but has a separate IPv4 multicast group destination address. `[number of UDP flows]` specifies how many
// separate destination multicast groups that the test should cycle through. It defaults to 1; that is, the
// test will only use a single UDP flow destined to `224.224.224.0` by default.
//
// Example: `./tx_order 4` will cause the test to cycle through four destination addresses on a per-packet basis:
//
// Packet #0 will have `224.224.224.0` as a destination address
// Packet #1 will have `224.224.224.1` as a destination address
// Packet #2 will have `224.224.224.2` as a destination address
// Packet #3 will have `224.224.224.3` as a destination address
// Packet #4 will have `224.224.224.0` as a destination address
// ...
//
// NOTE: as this is just a proof of concept, all MLX5 device parameters are hard-coded in the source code below;
// they may need to be adjusted for the system under test.
//
#include <arpa/inet.h>
#include <infiniband/verbs.h>
#include <linux/if_ether.h>
#include <linux/in.h>
#include <linux/ip.h>
#include <linux/udp.h>
#include <stdexcept>
#include <string>
#include <time.h>
#include <vector>
#include <chrono>
#include <thread>
ibv_device *find_device_by_ibv_name(const char *dev_name)
{
// Iterate over all of the ibverbs devices that were found.
int num_devices = 0;
auto dev_list = ibv_get_device_list(&num_devices);
ibv_device *device = nullptr;
for (int i = 0; i < num_devices; ++i)
{
if (!strcmp(dev_name, ibv_get_device_name(dev_list[i])))
{
device = dev_list[i];
break;
}
}
// Clean up the device list.
if (dev_list) ibv_free_device_list(dev_list);
// Return the device we found, if any.
return device;
}
uint32_t ipv4_from_string(const char *s)
{
in_addr addr;
auto ret = inet_pton(AF_INET, s, &addr);
if (ret != 1) throw std::runtime_error("invalid IPv4 address");
return addr.s_addr;
}
// Constants used below.
size_t cq_len = 32768;
size_t cq_vec = 0;
const char *rx_ibv_name = "mlx5_1";
size_t rx_port_num = 1;
uint8_t dest_mac_addr[] = { 0x0c, 0x42, 0xa1, 0x41, 0x8a, 0x8a };
const char *dest_ipv4_addr_start_str = "224.224.224.0";
uint16_t dest_udp_port = 12345;
const char *tx_ibv_name = "mlx5_0";
size_t tx_port_num = 1;
uint8_t src_mac_addr[] = { 0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa };
const char *src_ipv4_addr_str = "55.55.55.55";
uint32_t src_ipv4_addr = ipv4_from_string(src_ipv4_addr_str);
uint16_t src_udp_port = 12345;
size_t eth_hdr_len = 14;
size_t ip_hdr_len = 20;
size_t udp_hdr_len = 8;
size_t udp_payload_len = 64;
size_t packet_len = eth_hdr_len + ip_hdr_len + udp_hdr_len + udp_payload_len;
int main(int argc, const char *argv[])
{
// Get how many separate UDP flows to send from the command line.
int num_dest_ipv4_addr = 1;
if (argc > 1) num_dest_ipv4_addr = std::atoi(argv[1]);
printf("setting up IBV TX device '%s'; source MAC: %02x:%02x:%02x:%02x:%02x:%02x; source UDP endpoint: %s:%u\n",
tx_ibv_name, src_mac_addr[0], src_mac_addr[1], src_mac_addr[2], src_mac_addr[3], src_mac_addr[4], src_mac_addr[5],
src_ipv4_addr_str, src_udp_port);
// Make a list of the destination IP addresses that we will use.
printf("destination UDP endpoints:\n");
std::vector<uint32_t> dest_ipv4_addr_list;
for (size_t i = 0; i < num_dest_ipv4_addr; ++i)
{
auto addr = ntohl(ipv4_from_string(dest_ipv4_addr_start_str)) + i;
printf(" %u.%u.%u.%u:%u\n", (addr >> 24) & 0xff, (addr >> 16) & 0xff, (addr >> 8) & 0xff, (addr >> 0) & 0xff, dest_udp_port);
dest_ipv4_addr_list.push_back(addr);
}
// Set up the transmit interface.
auto tx_device = find_device_by_ibv_name(tx_ibv_name);
if (!tx_device) throw std::runtime_error("error finding TX device");
auto tx_context = ibv_open_device(tx_device);
if (!tx_context) throw std::runtime_error("error creating TX device context");
auto tx_pd = ibv_alloc_pd(tx_context);
if (!tx_pd) throw std::runtime_error("error creating TX protection domain");
ibv_cq_init_attr_ex tx_cq_attr;
memset(&tx_cq_attr, 0, sizeof(tx_cq_attr));
tx_cq_attr.cqe = cq_len;
tx_cq_attr.wc_flags = IBV_WC_EX_WITH_BYTE_LEN | IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK;
tx_cq_attr.comp_mask = IBV_CQ_INIT_ATTR_MASK_FLAGS;
tx_cq_attr.flags = IBV_CREATE_CQ_ATTR_SINGLE_THREADED;
auto tx_cq = ibv_create_cq_ex(tx_context, &tx_cq_attr);
if (!tx_cq) throw std::runtime_error("error creating TX completion queue");
size_t num_packets = cq_len;
ibv_qp_init_attr_ex tx_qp_init_attr;
memset(&tx_qp_init_attr, 0, sizeof(tx_qp_init_attr));
tx_qp_init_attr.cap.max_send_wr = num_packets;
tx_qp_init_attr.cap.max_send_sge = 1;
tx_qp_init_attr.send_cq = ibv_cq_ex_to_cq(tx_cq);
tx_qp_init_attr.recv_cq = ibv_cq_ex_to_cq(tx_cq);
tx_qp_init_attr.cap.max_inline_data = 0;
tx_qp_init_attr.qp_type = IBV_QPT_RAW_PACKET;
tx_qp_init_attr.sq_sig_all = 1;
tx_qp_init_attr.comp_mask = IBV_QP_INIT_ATTR_PD | IBV_QP_INIT_ATTR_CREATE_FLAGS;
tx_qp_init_attr.pd = tx_pd;
auto tx_qp = ibv_create_qp_ex(tx_context, &tx_qp_init_attr);
if (!tx_qp) throw std::runtime_error("error creating TX queue pair");
// Initialize its state machine as described in the documentation for `ibv_modify_qp()` for raw Ethernet operation.
ibv_qp_attr tx_qp_attr;
memset(&tx_qp_attr, 0, sizeof(&tx_qp_attr));
tx_qp_attr.qp_state = IBV_QPS_INIT;
tx_qp_attr.port_num = tx_port_num;
if (auto err = ibv_modify_qp(tx_qp, &tx_qp_attr, IBV_QP_STATE | IBV_QP_PORT))
throw std::runtime_error("error initializing state of TX queue pair");
memset(&tx_qp_attr, 0, sizeof(&tx_qp_attr));
tx_qp_attr.qp_state = IBV_QPS_RTR;
if (auto err = ibv_modify_qp(tx_qp, &tx_qp_attr, IBV_QP_STATE))
throw std::runtime_error("error setting ready to read on TX queue pair");
memset(&tx_qp_attr, 0, sizeof(&tx_qp_attr));
tx_qp_attr.qp_state = IBV_QPS_RTS;
if (auto err = ibv_modify_qp(tx_qp, &tx_qp_attr, IBV_QP_STATE))
throw std::runtime_error("error setting ready to send on TX queue pair");
// Allocate memory that we will send packets from.
std::vector<uint8_t> tx_buf(packet_len * num_packets);
// Register the memory region.
auto tx_mr = ibv_reg_mr(tx_pd, &tx_buf[0], tx_buf.size(), IBV_ACCESS_LOCAL_WRITE);
if (!tx_mr) throw std::runtime_error("couldn't register TX memory region");
// Fill each of the packets in the buffer.
for (size_t i = 0; i < num_packets; ++i)
{
// Start filling the packet buffer with the Ethernet header.
auto eh = reinterpret_cast<ethhdr *>(&tx_buf[i * packet_len]);
// Calculate the destination MAC address based on the destionation IPv4 multicast group.
auto dest_ipv4_addr = dest_ipv4_addr_list[i % num_dest_ipv4_addr];
uint8_t mc_dest_mac_addr[6] = { 0x01, 0x00, 0x5e, (dest_ipv4_addr >> 8) & 0x7f, (dest_ipv4_addr >> 16) & 0xff, (dest_ipv4_addr >> 24) & 0xff };
memcpy(eh->h_dest, mc_dest_mac_addr, sizeof(eh->h_dest));
memcpy(eh->h_source, src_mac_addr, sizeof(eh->h_source));
eh->h_proto = htons(ETH_P_IP);
// Insert the IPv4 header.
auto ip = reinterpret_cast<iphdr *>(eh + 1);
ip->ihl = 5;
ip->version = 4;
ip->tos = 0;
ip->tot_len = htons(packet_len - ETH_HLEN);
ip->id = 0;
ip->frag_off = 0;
ip->ttl = 255;
ip->protocol = IPPROTO_UDP;
// The IPv4 checksum should be calculated by the hardware for us.
ip->check = 0;
ip->saddr = src_ipv4_addr;
// Cycle through destination addresses in order, so first packet goes to the first destination adderss, the second goes to the second, and so on.
ip->daddr = htonl(dest_ipv4_addr_list[i % num_dest_ipv4_addr]);
// Insert the UDP header.
auto udp = reinterpret_cast<udphdr *>(ip + 1);
udp->source = htons(src_udp_port);
udp->dest = htons(dest_udp_port);
udp->len = htons(packet_len - ETH_HLEN - ip_hdr_len);
// The checksum should be filled in by the hardware.
udp->check = 0;
// Fill the payload with bytes according to its position in the sequence.
auto payload = reinterpret_cast<uint16_t *>(udp + 1);
for (int j = 0; j < udp_payload_len / sizeof(uint16_t); ++j) payload[j] = i;
}
printf("setting up IBV RX device '%s'\n", rx_ibv_name);
// First, set up the receiving interface.
auto rx_device = find_device_by_ibv_name(rx_ibv_name);
if (!rx_device) throw std::runtime_error("error finding RX device");
auto rx_context = ibv_open_device(rx_device);
if (!rx_context) throw std::runtime_error("error creating RX device context");
auto rx_pd = ibv_alloc_pd(rx_context);
if (!rx_pd) throw std::runtime_error("error creating RX protection domain");
ibv_cq_init_attr_ex rx_cq_attr;
memset(&rx_cq_attr, 0, sizeof(rx_cq_attr));
rx_cq_attr.cqe = cq_len;
rx_cq_attr.wc_flags = IBV_WC_EX_WITH_BYTE_LEN | IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK;
rx_cq_attr.comp_mask = IBV_CQ_INIT_ATTR_MASK_FLAGS;
rx_cq_attr.flags = IBV_CREATE_CQ_ATTR_SINGLE_THREADED;
auto rx_cq = ibv_create_cq_ex(rx_context, &rx_cq_attr);
if (!rx_cq) throw std::runtime_error("error creating TX completion queue");
ibv_qp_init_attr_ex rx_qp_init_attr;
memset(&rx_qp_init_attr, 0, sizeof(rx_qp_init_attr));
rx_qp_init_attr.cap.max_recv_wr = num_packets;
rx_qp_init_attr.cap.max_recv_sge = 1;
rx_qp_init_attr.send_cq = ibv_cq_ex_to_cq(rx_cq);
rx_qp_init_attr.recv_cq = ibv_cq_ex_to_cq(rx_cq);
rx_qp_init_attr.cap.max_inline_data = 0;
rx_qp_init_attr.qp_type = IBV_QPT_RAW_PACKET;
rx_qp_init_attr.sq_sig_all = 1;
rx_qp_init_attr.comp_mask = IBV_QP_INIT_ATTR_PD | IBV_QP_INIT_ATTR_CREATE_FLAGS;
rx_qp_init_attr.pd = rx_pd;
auto rx_qp = ibv_create_qp_ex(rx_context, &rx_qp_init_attr);
if (!rx_qp) throw std::runtime_error("error creating RX queue pair");
// Initialize its state machine as described in the documentation for `ibv_modify_qp()` for raw Ethernet operation.
ibv_qp_attr rx_qp_attr;
memset(&rx_qp_attr, 0, sizeof(&rx_qp_attr));
rx_qp_attr.qp_state = IBV_QPS_INIT;
rx_qp_attr.port_num = rx_port_num;
if (auto err = ibv_modify_qp(rx_qp, &rx_qp_attr, IBV_QP_STATE | IBV_QP_PORT))
throw std::runtime_error("error initializing state of RX queue pair");
memset(&rx_qp_attr, 0, sizeof(&rx_qp_attr));
rx_qp_attr.qp_state = IBV_QPS_RTR;
if (auto err = ibv_modify_qp(rx_qp, &rx_qp_attr, IBV_QP_STATE))
throw std::runtime_error("error setting ready to read on RX queue pair");
// Create a flow that will steer the requested packets to this queue. We need a buffer to encode all of the data that we pass to the API.
auto buf_size = sizeof(ibv_flow_attr) + sizeof(ibv_flow_spec_tcp_udp);
std::vector<uint8_t> buf(buf_size);
// Populate the header.
auto fa = reinterpret_cast<ibv_flow_attr *>(&buf[0]);
memset(fa, 0, sizeof(ibv_flow_attr));
fa->type = IBV_FLOW_ATTR_NORMAL;
fa->size = sizeof(ibv_flow_attr);
fa->num_of_specs = 1;
fa->port = rx_port_num;
// Add the UDP port steering rule.
auto ua = reinterpret_cast<ibv_flow_spec_tcp_udp *>(fa + 1);
memset(ua, 0, sizeof(ibv_flow_spec_tcp_udp));
ua->type = IBV_FLOW_SPEC_UDP;
ua->size = sizeof(ibv_flow_spec_tcp_udp);
ua->val.dst_port = htons(dest_udp_port);
ua->mask.dst_port = 0xffff;
// Create the flow.
auto flow = ibv_create_flow(rx_qp, fa);
if (!flow) throw std::runtime_error("couldn't create steering rule for UDP flow");
// Allocate memory that we will receive packets into.
std::vector<uint8_t> rx_buf(packet_len * num_packets);
memset(rx_buf.data(), 0, rx_buf.size());
printf("registering host memory region for RX...\n");
// Register the memory region.
auto rx_mr = ibv_reg_mr(rx_pd, rx_buf.data(), rx_buf.size(), IBV_ACCESS_LOCAL_WRITE);
if (!rx_mr) throw std::runtime_error("couldn't register RX memory region");
// Function used to post a WR to the receive queue.
auto post_recv_wr = [&](size_t packet_ind)
{
// Wrap the index into the appropriate range for the buffer.
auto packet_ind_wrapped = packet_ind % num_packets;
// Post a work request to receive a packet.
ibv_sge rx_sge;
rx_sge.addr = reinterpret_cast<uint64_t>(rx_buf.data() + packet_ind_wrapped * packet_len);
rx_sge.length = packet_len;
rx_sge.lkey = rx_mr->lkey;
ibv_recv_wr rx_wr;
memset(&rx_wr, 0, sizeof(rx_wr));
rx_wr.wr_id = packet_ind;
rx_wr.next = nullptr;
rx_wr.sg_list = &rx_sge;
rx_wr.num_sge = 1;
ibv_recv_wr *bad_rx_wr;
if (ibv_post_recv(rx_qp, &rx_wr, &bad_rx_wr)) throw std::runtime_error("error posting to RX queue: error: " + std::string(strerror(errno)));
};
// Function used to post a WR to the send queue.
auto post_send_wr = [&](size_t packet_ind)
{
// Wrap the index into the appropriate range for the buffer.
auto packet_ind_wrapped = packet_ind % num_packets;
// Post a work request to send the packet.
ibv_sge tx_sge;
tx_sge.addr = reinterpret_cast<uint64_t>(tx_buf.data() + packet_ind_wrapped * packet_len);
tx_sge.length = packet_len;
tx_sge.lkey = tx_mr->lkey;
ibv_send_wr tx_wr;
memset(&tx_wr, 0, sizeof(tx_wr));
tx_wr.wr_id = packet_ind;
tx_wr.next = nullptr;
tx_wr.sg_list = &tx_sge;
tx_wr.num_sge = 1;
tx_wr.opcode = IBV_WR_SEND;
tx_wr.send_flags = IBV_SEND_IP_CSUM;
ibv_send_wr *bad_tx_wr;
if (ibv_post_send(tx_qp, &tx_wr, &bad_tx_wr)) throw std::runtime_error("error posting to TX queue: error: " + std::string(strerror(errno)));
};
// Read the payload value from the `packet_ind`-th packet in the transmit queue.
auto read_tx_payload = [&](size_t packet_ind)
{
// Wrap this to the position in the receive buffer where it would land.
auto packet_ind_wrapped = packet_ind % num_packets;
return *reinterpret_cast<const uint16_t *>(tx_buf.data() + packet_ind_wrapped * packet_len + eth_hdr_len + ip_hdr_len + udp_hdr_len);
};
// Read the payload value from the `packet_ind`-th packet in the receive queue.
auto read_rx_payload = [&](size_t packet_ind)
{
// Wrap this to the position in the receive buffer where it would land.
auto packet_ind_wrapped = packet_ind % num_packets;
return *reinterpret_cast<const uint16_t *>(rx_buf.data() + packet_ind_wrapped * packet_len + eth_hdr_len + ip_hdr_len + udp_hdr_len);
};
// Format the completion wallclock timestamp from the current WR in `cq` into a `std::string`.
auto format_timestamp = [](ibv_cq_ex *cq)
{
auto time_nsec = ibv_wc_read_completion_wallclock_ns(cq);
time_t time_sec = time_nsec / 1000000000;
time_nsec %= 1000000000;
struct tm t;
gmtime_r(&time_sec, &t);
char buf[1024];
auto len = strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &t);
snprintf(buf + len, sizeof(buf) - len, ".%09llu", time_nsec);
return std::string(buf);
};
// Initially post WRs for each of the receive queue entries, then post WRs for each of the transmit queue entries.
for (size_t i = 0; i < num_packets; ++i) post_recv_wr(i);
for (size_t i = 0; i < num_packets; ++i) post_send_wr(i);
// Run the loopback test until we have received the desired number of packets.
size_t test_count = 1000000000, test_ind = 0;
ibv_poll_cq_attr poll_cq_attr;
memset(&poll_cq_attr, 0, sizeof(poll_cq_attr));
while (test_ind < test_count)
{
// Check to see if there's anything in the receive completion queue.
int num_rx = 0;
if (ibv_start_poll(rx_cq, &poll_cq_attr) == 0)
{
// Make sure the transmit completion queue has elements in it too.
if (ibv_start_poll(tx_cq, &poll_cq_attr) != 0) throw std::runtime_error("TX queue wasn't ready when expected");
do
{
// Make sure the received packet is correct.
if (rx_cq->status == IBV_WC_SUCCESS)
{
// If we got the right length...
if (ibv_wc_read_byte_len(rx_cq) == packet_len)
{
// Make sure the indices of the transmitted and received packets match.
if (rx_cq->wr_id != tx_cq->wr_id) throw std::runtime_error("index mismatch");
// Get the received payload value.
auto rx_payload = read_rx_payload(rx_cq->wr_id);
// The payload portion of the packet should contain the packet index in the receive buffer if the data was transmitted
// in the order it was expected to be. Make sure this is the case.
uint16_t expected_payload = rx_cq->wr_id % num_packets;
if (rx_payload != expected_payload)
{
printf("packet data starting at error:\n");
auto packet_ind = rx_cq->wr_id;
for (int i = 0; i < 10; ++i)
{
printf("tx slot %5u value %5u ts %s -> rx slot %5u value %5u ts %s\n",
tx_cq->wr_id % num_packets, read_tx_payload(tx_cq->wr_id), format_timestamp(tx_cq).c_str(),
rx_cq->wr_id % num_packets, read_rx_payload(rx_cq->wr_id), format_timestamp(rx_cq).c_str());
if (ibv_next_poll(rx_cq) || ibv_next_poll(tx_cq)) break;
}
printf("\n");
throw std::runtime_error("rx packet index " + std::to_string(packet_ind) + " expected payload: " + std::to_string(expected_payload) +
" but got payload: " + std::to_string(rx_payload));
}
}
else throw std::runtime_error("packet received with wrong length");
}
else throw std::runtime_error("packet received with error");
++num_rx;
}
while (!ibv_next_poll(rx_cq) && !ibv_next_poll(tx_cq));
ibv_end_poll(rx_cq);
ibv_end_poll(tx_cq);
// Queue new receive/send entries to replace the packets that were processed above.
for (size_t i = 0; i < num_rx; ++i)
{
post_recv_wr(test_ind + i + num_packets);
post_send_wr(test_ind + i + num_packets);
}
// Increment the number of packets received.
test_ind += num_rx;
}
}
}
I have seen some similar topics where people reported out-of-order reception in some cases while only using a single queue, but haven’t seen any useful information there:
I believe this to be a problem on the transmit side, because I have not yet observed this problem when using the separate hardware device as the data source.
Are there any firmware/API settings that I can use to ensure that ordering is always observed in the single-queue case?