Environment
- OS: Ubuntu 22.04 LTS
- Nic: Mellanox Technologies MT28908 Family [ConnectX-6]
I have 3 machines here, 190 192 68. I used the nvmf offload function on 190 and 192 to expose 8 nvmf targets to 68. When I use 16 threads to connect these targets concurrently on 68, I will encounter some connection problems, but if I use locks to control concurrency, I won’t have this problem. Does it mean that the nvmf offload function of ConnectX-6 does not support low-concurrency establishment of connections?
Also, if I use less threads, like 4 threads, I won’t have this problem either.
Error message:
[2024-05-22 16:17:59.806912] nvme_fabric.c: 598:nvme_fabric_qpair_connect_poll: *ERROR*: Connect command failed, rc -125, trtype:RDMA adrfam:IPv4 traddr:192.168.3.160 trsvcid:4414 subnqn:testsubsystem4
[2024-05-22 16:17:59.806999] nvme_rdma.c:1412:nvme_rdma_ctrlr_connect_qpair_poll: *ERROR*: Failed to poll NVMe-oF Fabric CONNECT command
[2024-05-22 16:17:59.807012] nvme_rdma.c:2737:nvme_rdma_qpair_process_completions: *ERROR*: Failed to connect rqpair=0x200000c00540
[2024-05-22 16:17:59.809016] nvme_qpair.c: 797:spdk_nvme_qpair_process_completions: *ERROR*: CQ transport error -6 (No such device or address) on qpair id 0
[2024-05-22 16:17:59.809034] nvme_ctrlr.c:1028:nvme_ctrlr_fail: *ERROR*: [testsubsystem4] in failed state.
[2024-05-22 16:17:59.809044] nvme_ctrlr.c:4003:nvme_ctrlr_process_init: *ERROR*: [testsubsystem4] Ctrlr is in error state
[2024-05-22 16:17:59.809054] nvme.c: 702:nvme_ctrlr_poll_internal: *ERROR*: Failed to initialize SSD: 192.168.3.160
no NVMe controllers found
Sometimes I will also get this error message:
[2024-05-22 16:37:47.590280] nvme_fabric.c: 609:nvme_fabric_qpair_connect_poll: *ERROR*: Connect command completed with error: sct 0, sc 6
[2024-05-22 16:37:47.590291] nvme_rdma.c:1412:nvme_rdma_ctrlr_connect_qpair_poll: *ERROR*: Failed to poll NVMe-oF Fabric CONNECT command
[2024-05-22 16:37:47.590301] nvme_rdma.c:2737:nvme_rdma_qpair_process_completions: *ERROR*: Failed to connect rqpair=0x200001733540
[2024-05-22 16:37:47.592269] nvme_qpair.c: 797:spdk_nvme_qpair_process_completions: *ERROR*: CQ transport error -6 (No such device or address) on qpair id 1
[2024-05-22 16:37:47.592287] nvme_ctrlr.c: 497:spdk_nvme_ctrlr_alloc_io_qpair: *ERROR*: [testsubsystem6] nvme_transport_ctrlr_connect_io_qpair() failed
ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed
[2024-05-22 16:37:47.719190] nvme_fabric.c: 598:nvme_fabric_qpair_connect_poll: *ERROR*: Connect command failed, rc -5, trtype:RDMA adrfam:IPv4 traddr:192.168.3.162 trsvcid:4416 subnqn:testsubsystem6
Here are my code snippets:
#include "spdk/stdinc.h"
#include "spdk/nvme.h"
#include "spdk/vmd.h"
#include "spdk/nvme_zns.h"
#include "spdk/env.h"
#include "spdk/string.h"
#include "spdk/log.h"
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
struct ctrlr_entry {
struct spdk_nvme_ctrlr *ctrlr;
char name[1024];
};
struct ns_entry {
struct spdk_nvme_ctrlr *ctrlr;
struct spdk_nvme_ns *ns;
struct spdk_nvme_qpair *qpair;
uint64_t on_fly;
uint64_t lba_size;
};
struct nvmf_ctx {
std::vector<ctrlr_entry*> g_controllers;
std::vector<ns_entry*> g_namespaces;
};
class NVMF {
public:
NVMF(const char* addr);
~NVMF(void);
static void* Alloc(size_t size) {
void* buf = spdk_malloc(size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
if (buf == NULL) {
printf("ERROR: write buffer allocation failed\n");
return NULL;
}
return buf;
}
static void Free(void* buf) {
spdk_free(buf);
}
private:
void cleanup();
nvmf_ctx *_ctx;
};
void register_ns(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_ns *ns, std::vector<ns_entry*> &g_namespaces)
{
struct ns_entry *entry;
if (!spdk_nvme_ns_is_active(ns)) { return; }
entry = (struct ns_entry *)malloc(sizeof(struct ns_entry));
if (entry == NULL) {
perror("ns_entry malloc");
exit(1);
}
entry->ctrlr = ctrlr;
entry->ns = ns;
g_namespaces.push_back(entry);
}
static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
struct spdk_nvme_ctrlr_opts *opts){ return true; }
static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
{
int nsid;
struct ctrlr_entry *entry;
struct spdk_nvme_ns *ns;
const struct spdk_nvme_ctrlr_data *cdata;
entry = (struct ctrlr_entry *)malloc(sizeof(struct ctrlr_entry));
if (entry == NULL) {
perror("ctrlr_entry malloc");
exit(1);
}
cdata = spdk_nvme_ctrlr_get_data(ctrlr);
snprintf(entry->name, sizeof(entry->name), "%-20.20s (%-20.20s)", cdata->mn, cdata->sn);
entry->ctrlr = ctrlr;
nvmf_ctx *ctx = (nvmf_ctx *)cb_ctx;
ctx->g_controllers.push_back(entry);
for (nsid = spdk_nvme_ctrlr_get_first_active_ns(ctrlr); nsid != 0;
nsid = spdk_nvme_ctrlr_get_next_active_ns(ctrlr, nsid)) {
ns = spdk_nvme_ctrlr_get_ns(ctrlr, nsid);
if (ns == NULL) {
continue;
}
register_ns(ctrlr, ns, ctx->g_namespaces);
}
}
void NVMF::cleanup()
{
struct ns_entry *ns_entry;
struct ctrlr_entry *ctrlr_entry, *tmp_ctrlr_entry;
struct spdk_nvme_detach_ctx *detach_ctx = NULL;
for (auto ns_entry: _ctx->g_namespaces) {
spdk_nvme_ctrlr_free_io_qpair(ns_entry->qpair);
free(ns_entry);
}
_ctx->g_namespaces.clear();
for (auto ctrlr_entry: _ctx->g_controllers) {
spdk_nvme_detach_async(ctrlr_entry->ctrlr, &detach_ctx);
free(ctrlr_entry);
}
_ctx->g_controllers.clear();
if (detach_ctx) {
spdk_nvme_detach_poll(detach_ctx);
}
}
void init() {
int rc;
struct spdk_env_opts *opts = (spdk_env_opts *)calloc(1, sizeof(spdk_env_opts) * 2);
spdk_env_opts_init(opts);
char core_mask[100];
snprintf(core_mask, 100, "[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]");
printf("core_mask %s\n", core_mask);
opts->core_mask = core_mask;
opts->name = "nvmf";
if (spdk_env_init(opts) < 0) {
fprintf(stderr, "Unable to initialize SPDK env\n");
return;
}
free(opts);
}
NVMF::NVMF(const char* addr) {
_ctx = new nvmf_ctx;
struct spdk_nvme_transport_id g_trid = {};
if (spdk_nvme_transport_id_parse(&g_trid, addr) != 0) {
fprintf(stderr, "Error parsing transport address\n");
return;
}
int rc = spdk_nvme_probe(&g_trid, _ctx, probe_cb, attach_cb, NULL);
if (rc != 0) {
fprintf(stderr, "spdk_nvme_probe() failed\n");
return;
}
if (_ctx->g_controllers.empty()) {
fprintf(stderr, "no NVMe controllers found\n");
return;
}
for (auto ns_entry: _ctx->g_namespaces) {
ns_entry->qpair = spdk_nvme_ctrlr_alloc_io_qpair(ns_entry->ctrlr, NULL, 0);
if (ns_entry->qpair == NULL) {
printf("ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed\n");
return;
}
ns_entry->on_fly = 0;
ns_entry->lba_size = spdk_nvme_ns_get_sector_size(ns_entry->ns);
}
}
NVMF::~NVMF(void) {
cleanup();
free(_ctx);
}
std::atomic_bool start_flag;
std::atomic_int start_thread_cnt, end_thread_cnt;
std::vector<std::string> targets;
void bind_core(int core_id){
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}
std::mutex lock[16];
void test_worker(int tid){
bind_core(tid);
std::vector<NVMF *> nvmfs;
start_thread_cnt.fetch_add(1);
while(start_flag.load() == 0);
int offset = 0;
int target_size = targets.size();
printf("target_size %d\n", target_size);
for(int i = 0; i < target_size; i++){
int tag = (i + offset) % target_size;
// lock[tag].lock(); // if I use this lock, I won't have the connection problem
nvmfs.push_back(new NVMF(targets[tag].c_str()));
// lock[tag].unlock();
}
end_thread_cnt.fetch_add(1);
for(auto each : nvmfs){
delete each;
}
}
int main(int argc, char **argv) {
init();
int subsys = 1;
int cid = 4411;
int ssd_cnt_per_ip = 8;
for(int i = 0; i < ssd_cnt_per_ip; i++){
char buf[256];
sprintf(buf, "trtype:rdma adrfam:ipv4 traddr:192.168.3.160 trsvcid:%d subnqn:testsubsystem%d", cid + i, i + subsys);
targets.push_back(buf);
}
for(int i = 0; i < ssd_cnt_per_ip; i++){
char buf[256];
sprintf(buf, "trtype:rdma adrfam:ipv4 traddr:192.168.3.162 trsvcid:%d subnqn:testsubsystem%d", cid + i, i + subsys);
targets.push_back(buf);
}
std::vector<std::thread> threads;
const int thread_cnt = 16;
for(int i = 0; i < thread_cnt; i++){
threads.push_back(std::thread(test_worker, i));
}
while(start_thread_cnt != thread_cnt);
start_flag.store(1);
while(end_thread_cnt != thread_cnt);
for(auto &each : threads){
each.join();
}
spdk_env_fini();
return 0;
}