Connecting error when multiple threads connect to the same nvmf target using cx6 nvmf offload

Environment

  • OS: Ubuntu 22.04 LTS
  • Nic: Mellanox Technologies MT28908 Family [ConnectX-6]

I have 3 machines here, 190 192 68. I used the nvmf offload function on 190 and 192 to expose 8 nvmf targets to 68. When I use 16 threads to connect these targets concurrently on 68, I will encounter some connection problems, but if I use locks to control concurrency, I won’t have this problem. Does it mean that the nvmf offload function of ConnectX-6 does not support low-concurrency establishment of connections?

Also, if I use less threads, like 4 threads, I won’t have this problem either.

Error message:

[2024-05-22 16:17:59.806912] nvme_fabric.c: 598:nvme_fabric_qpair_connect_poll: *ERROR*: Connect command failed, rc -125, trtype:RDMA adrfam:IPv4 traddr:192.168.3.160 trsvcid:4414 subnqn:testsubsystem4
[2024-05-22 16:17:59.806999] nvme_rdma.c:1412:nvme_rdma_ctrlr_connect_qpair_poll: *ERROR*: Failed to poll NVMe-oF Fabric CONNECT command
[2024-05-22 16:17:59.807012] nvme_rdma.c:2737:nvme_rdma_qpair_process_completions: *ERROR*: Failed to connect rqpair=0x200000c00540
[2024-05-22 16:17:59.809016] nvme_qpair.c: 797:spdk_nvme_qpair_process_completions: *ERROR*: CQ transport error -6 (No such device or address) on qpair id 0
[2024-05-22 16:17:59.809034] nvme_ctrlr.c:1028:nvme_ctrlr_fail: *ERROR*: [testsubsystem4] in failed state.
[2024-05-22 16:17:59.809044] nvme_ctrlr.c:4003:nvme_ctrlr_process_init: *ERROR*: [testsubsystem4] Ctrlr is in error state
[2024-05-22 16:17:59.809054] nvme.c: 702:nvme_ctrlr_poll_internal: *ERROR*: Failed to initialize SSD: 192.168.3.160
no NVMe controllers found

Sometimes I will also get this error message:

[2024-05-22 16:37:47.590280] nvme_fabric.c: 609:nvme_fabric_qpair_connect_poll: *ERROR*: Connect command completed with error: sct 0, sc 6
[2024-05-22 16:37:47.590291] nvme_rdma.c:1412:nvme_rdma_ctrlr_connect_qpair_poll: *ERROR*: Failed to poll NVMe-oF Fabric CONNECT command
[2024-05-22 16:37:47.590301] nvme_rdma.c:2737:nvme_rdma_qpair_process_completions: *ERROR*: Failed to connect rqpair=0x200001733540
[2024-05-22 16:37:47.592269] nvme_qpair.c: 797:spdk_nvme_qpair_process_completions: *ERROR*: CQ transport error -6 (No such device or address) on qpair id 1
[2024-05-22 16:37:47.592287] nvme_ctrlr.c: 497:spdk_nvme_ctrlr_alloc_io_qpair: *ERROR*: [testsubsystem6] nvme_transport_ctrlr_connect_io_qpair() failed
ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed
[2024-05-22 16:37:47.719190] nvme_fabric.c: 598:nvme_fabric_qpair_connect_poll: *ERROR*: Connect command failed, rc -5, trtype:RDMA adrfam:IPv4 traddr:192.168.3.162 trsvcid:4416 subnqn:testsubsystem6

Here are my code snippets:

#include "spdk/stdinc.h"
#include "spdk/nvme.h"
#include "spdk/vmd.h"
#include "spdk/nvme_zns.h"
#include "spdk/env.h"
#include "spdk/string.h"
#include "spdk/log.h"

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>

struct ctrlr_entry {
	struct spdk_nvme_ctrlr		*ctrlr;
	char				name[1024];
};

struct ns_entry {
	struct spdk_nvme_ctrlr	*ctrlr;
	struct spdk_nvme_ns	*ns;
	struct spdk_nvme_qpair	*qpair;

  uint64_t on_fly;
  uint64_t lba_size;
};

struct nvmf_ctx {
  std::vector<ctrlr_entry*> g_controllers;
  std::vector<ns_entry*> g_namespaces;
};

class NVMF {
 public:
  NVMF(const char* addr);
  ~NVMF(void);

  static void* Alloc(size_t size) {
    void* buf = spdk_malloc(size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
    if (buf == NULL) {
      printf("ERROR: write buffer allocation failed\n");
      return NULL;
    }
    return buf;
  }

  static void Free(void* buf) {
    spdk_free(buf);
  }

 private:
  void cleanup();
  nvmf_ctx *_ctx;
};

void register_ns(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_ns *ns, std::vector<ns_entry*> &g_namespaces)
{
	struct ns_entry *entry;

	if (!spdk_nvme_ns_is_active(ns)) { return; }

	entry = (struct ns_entry *)malloc(sizeof(struct ns_entry));
	if (entry == NULL) {
		perror("ns_entry malloc");
		exit(1);
	}

	entry->ctrlr = ctrlr;
	entry->ns = ns;
	g_namespaces.push_back(entry);
}

static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
	 struct spdk_nvme_ctrlr_opts *opts){ return true; }

static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
	  struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
{
	int nsid;
	struct ctrlr_entry *entry;
	struct spdk_nvme_ns *ns;
	const struct spdk_nvme_ctrlr_data *cdata;

	entry = (struct ctrlr_entry *)malloc(sizeof(struct ctrlr_entry));
	if (entry == NULL) {
		perror("ctrlr_entry malloc");
		exit(1);
	}

	cdata = spdk_nvme_ctrlr_get_data(ctrlr);

	snprintf(entry->name, sizeof(entry->name), "%-20.20s (%-20.20s)", cdata->mn, cdata->sn);

	entry->ctrlr = ctrlr;
	nvmf_ctx *ctx = (nvmf_ctx *)cb_ctx;
	ctx->g_controllers.push_back(entry);

	for (nsid = spdk_nvme_ctrlr_get_first_active_ns(ctrlr); nsid != 0;
	     nsid = spdk_nvme_ctrlr_get_next_active_ns(ctrlr, nsid)) {
		ns = spdk_nvme_ctrlr_get_ns(ctrlr, nsid);
		if (ns == NULL) {
			continue;
		}
		register_ns(ctrlr, ns, ctx->g_namespaces);
	}
}

void NVMF::cleanup()
{
	struct ns_entry *ns_entry;
	struct ctrlr_entry *ctrlr_entry, *tmp_ctrlr_entry;
	struct spdk_nvme_detach_ctx *detach_ctx = NULL;

	for (auto ns_entry: _ctx->g_namespaces) {
		spdk_nvme_ctrlr_free_io_qpair(ns_entry->qpair);
		free(ns_entry);
	}
	
	_ctx->g_namespaces.clear();

	for (auto ctrlr_entry: _ctx->g_controllers) {
		spdk_nvme_detach_async(ctrlr_entry->ctrlr, &detach_ctx);
		free(ctrlr_entry);
	}

	_ctx->g_controllers.clear();

	if (detach_ctx) {
		spdk_nvme_detach_poll(detach_ctx);
	}
}

void init() {
	int rc;
	struct spdk_env_opts *opts = (spdk_env_opts *)calloc(1, sizeof(spdk_env_opts) * 2);
	spdk_env_opts_init(opts);
	char core_mask[100];

	snprintf(core_mask, 100, "[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]");
	printf("core_mask %s\n", core_mask);
	opts->core_mask = core_mask;

	opts->name = "nvmf";
	if (spdk_env_init(opts) < 0) {
		fprintf(stderr, "Unable to initialize SPDK env\n");
		return;
	}

	free(opts);
}

NVMF::NVMF(const char* addr) {
	_ctx = new nvmf_ctx;

	struct spdk_nvme_transport_id g_trid = {};
	if (spdk_nvme_transport_id_parse(&g_trid, addr) != 0) {
		fprintf(stderr, "Error parsing transport address\n");
		return;
	}

	int rc = spdk_nvme_probe(&g_trid, _ctx, probe_cb, attach_cb, NULL);
	if (rc != 0) {
		fprintf(stderr, "spdk_nvme_probe() failed\n");
		return;
	}

	if (_ctx->g_controllers.empty()) {
		fprintf(stderr, "no NVMe controllers found\n");
		return;
	}

	for (auto ns_entry: _ctx->g_namespaces) {
		ns_entry->qpair = spdk_nvme_ctrlr_alloc_io_qpair(ns_entry->ctrlr, NULL, 0);
		if (ns_entry->qpair == NULL) {
			printf("ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed\n");
			return;
		}
		ns_entry->on_fly = 0;
		ns_entry->lba_size = spdk_nvme_ns_get_sector_size(ns_entry->ns);
	}
}

NVMF::~NVMF(void) {
	cleanup();
	free(_ctx);
}

std::atomic_bool start_flag;
std::atomic_int start_thread_cnt, end_thread_cnt;

std::vector<std::string> targets;

void bind_core(int core_id){
	cpu_set_t cpuset;
	CPU_ZERO(&cpuset);
	CPU_SET(core_id, &cpuset);
	pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}

std::mutex lock[16];

void test_worker(int tid){
	bind_core(tid);
	std::vector<NVMF *> nvmfs;
	start_thread_cnt.fetch_add(1);
	while(start_flag.load() == 0);

	int offset = 0;
	int target_size = targets.size();

	printf("target_size %d\n", target_size);
	for(int i = 0; i < target_size; i++){
		int tag = (i + offset) % target_size;
		// lock[tag].lock();  // if I use this lock, I won't have the connection problem
		nvmfs.push_back(new NVMF(targets[tag].c_str()));
		// lock[tag].unlock();
	}
	end_thread_cnt.fetch_add(1);
	for(auto each : nvmfs){
		delete each;
	}
}

int main(int argc, char **argv) {
	init();
	int subsys = 1;
	int cid = 4411;
	int ssd_cnt_per_ip = 8;
	for(int i = 0; i < ssd_cnt_per_ip; i++){
		char buf[256];
		sprintf(buf, "trtype:rdma adrfam:ipv4 traddr:192.168.3.160 trsvcid:%d subnqn:testsubsystem%d", cid + i, i + subsys);
		targets.push_back(buf);
	}
	for(int i = 0; i < ssd_cnt_per_ip; i++){
		char buf[256];
		sprintf(buf, "trtype:rdma adrfam:ipv4 traddr:192.168.3.162 trsvcid:%d subnqn:testsubsystem%d", cid + i, i + subsys);
		targets.push_back(buf);
	}

    std::vector<std::thread> threads;
	const int thread_cnt = 16;
    for(int i = 0; i < thread_cnt; i++){
        threads.push_back(std::thread(test_worker, i));
    }
	while(start_thread_cnt != thread_cnt);
	start_flag.store(1);
	while(end_thread_cnt != thread_cnt);

	for(auto &each : threads){
		each.join();
	}
	spdk_env_fini();
	return 0;
}