Nvshmem ibgda_poll_cq

Hello
In the function ibgda_poll_cq, the action of polling CQEs (Completion Queue Entries) is performed. However, it always reads a CQE with an index of 0, instead of reading each CQE in the CQ (Completion Queue) sequentially.The variable cqe64 has never been updated. I really can’t figure out why this is the case. Could you please help me understand this? Thank you very much!

ibgda_poll_cq source code :
nvshmem_src\src\include\non_abi\device\pt-to-pt\ibgda_device.cuh:

__device__ static inline int ibgda_poll_cq(nvshmemi_ibgda_device_cq_t *cq, uint64_t idx,
                                           int *error) {
    int status = 0;
    struct mlx5_cqe64 *cqe64 = (struct mlx5_cqe64 *)cq->cqe;

    const uint32_t ncqes = cq->ncqes;

    uint8_t opown;
    uint8_t opcode;
    uint16_t wqe_counter;
    uint16_t new_wqe_counter;

#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
    uint64_t start = ibgda_query_globaltimer();
    uint64_t now;
#endif

    uint64_t cons_idx = ibgda_atomic_read(cq->cons_idx);
    uint64_t new_cons_idx;

    assert(likely(cq->qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_DCI ||
                  cq->qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC));

    if (unlikely(cons_idx >= idx)) goto out;

#ifdef NVSHMEM_IBGDA_DEBUG
    // We can skip opcode == MLX5_CQE_INVALID check because we have already
    // initialized the CQ buffer to 0xff. With the QP depth range we enforce,
    // cons_idx cannot progress unless wqe_counter read from the CQ buffer is
    // a valid value.
    do {
        opown = ibgda_atomic_read(&cqe64->op_own);
        opcode = opown >> 4;

#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
        // TODO: Integrate timeout handler with the core NVSHMEM
        now = ibgda_query_globaltimer();
        status = ibgda_check_poll_timeout(cq, now, start, idx, error);
        if (status != 0) goto check_opcode;
#endif /* NVSHMEM_TIMEOUT_DEVICE_POLLING */
    } while (unlikely(opcode == MLX5_CQE_INVALID));

    // Prevent reordering of the opcode wait above
    IBGDA_MFENCE();
#endif /* NVSHMEM_IBGDA_DEBUG */

#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
    start = ibgda_query_globaltimer();
#endif

    // If idx is a lot greater than cons_idx, we might get incorrect result due
    // to wqe_counter wraparound. We need to check prod_idx to be sure that idx
    // has already been submitted.
    while (unlikely(ibgda_atomic_read(cq->prod_idx) < idx))
        ;
    IBGDA_MFENCE();

    do {
        new_wqe_counter = ibgda_atomic_read(&cqe64->wqe_counter);
        new_wqe_counter = BSWAP16(new_wqe_counter);
#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
        now = ibgda_query_globaltimer();
        status = ibgda_check_poll_timeout(cq, now, start, idx, error);
        if (status != 0) goto check_opcode;

        // Observe progress. Reset the timer.
        if (new_wqe_counter != wqe_counter) start = now;
#endif
        wqe_counter = new_wqe_counter;

        // Another thread may have updated cons_idx.
        cons_idx = ibgda_atomic_read(cq->cons_idx);
        if (likely(cons_idx >= idx)) goto out;
    }
    // NOTE: This while loop is part of do while above.
    // wqe_counter is the HW consumer index. However, we always maintain index
    // + 1 in SW. To be able to compare with idx, we need to use wqe_counter +
    // 1. Because wqe_counter is uint16_t, it may wraparound. Still we know for
    // sure that if idx - wqe_counter - 1 < ncqes, wqe_counter + 1 is less than
    // idx, and thus we need to wait. We don't need to wait when idx ==
    // wqe_counter + 1. That's why we use - (uint16_t)2 here to make this case
    // wraparound.
    while (unlikely(((uint16_t)((uint16_t)idx - wqe_counter - (uint16_t)2) < ncqes)));

    // new_cons_idx is uint64_t but wqe_counter is uint16_t. Thus, we get the
    // MSB from idx. We also need to take care of wraparound.
    ++wqe_counter;
    new_cons_idx =
        (idx & ~(0xffffULL) | wqe_counter) + (((uint16_t)idx > wqe_counter) ? 0x10000ULL : 0x0);
    atomicMax((unsigned long long int *)cq->cons_idx, (unsigned long long int)new_cons_idx);

#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
check_opcode:
#endif

    // NVSHMEM always treats CQE errors as fatal.
    // Even if this error doesn't belong to the CQE in cons_idx,
    // we will just report and terminate the process.
    opown = ibgda_atomic_read(&cqe64->op_own);
    opcode = opown >> 4;

    if (unlikely(opcode == MLX5_CQE_REQ_ERR)) {
        ibgda_mlx5_err_cqe_t *cqe_err = (ibgda_mlx5_err_cqe_t *)cqe64;
        *error = cqe_err->syndrome;
#ifdef NVSHMEM_IBGDA_DEBUG
        __be16 wqe_counter = ibgda_atomic_read(&cqe_err->wqe_counter);
        __be32 s_wqe_opcode_qpn = ibgda_atomic_read(&cqe_err->s_wqe_opcode_qpn);
        printf(
            "[%d] got completion with err:\n"
            "   syndrome=%#x, vendor_err_synd=%#x, hw_err_synd=%#x, hw_synd_type=%#x,\n"
            "   wqe_counter=%#x, s_wqe_opcode_qpn=%#x,\n"
            "   cqn=%#x, cons_idx=%#lx, prod_idx=%#lx, idx=%#lx\n",
            nvshmemi_device_state_d.mype, cqe_err->syndrome, cqe_err->vendor_err_synd,
            cqe_err->hw_err_synd, cqe_err->hw_synd_type, BSWAP16(wqe_counter),
            BSWAP32(s_wqe_opcode_qpn), cq->cqn, cons_idx, ibgda_atomic_read(cq->prod_idx), idx);
#endif /* NVSHMEM_IBGDA_DEBUG */
        status = -1;
    }

out:
    // Prevent reordering of this function and subsequent instructions
    IBGDA_MFENCE();

    return status;
}

I find the answer, because the cq is created with attribute collapsed flag when call ibgda_create_cq function in ibgda.cpp , which means all CQE are writen collapsed to first CQ entry.
nvshmem_src\src\modules\transport\ibgda\ibgda.cpp:

int ibgda_create_cq(struct ibgda_cq **pgcq, struct ibgda_device *device)
{

     ......
    DEVX_SET(cqc, cq_context, cc, 0x1);  // Use collapsed CQ


}

Yes, you are correct. For some additional context:

On ConnectX NICs, the CQ is implemented as a ring buffer, and the NIC maintains a producer index as it writes CQEs. The consumer index is maintained by the software (in this case, on the GPU) and advanced when the CQE is consumed.

When IBGDA maps a CQ to the GPU, it doesn’t map the full ring buffer into GPU memory. Instead, it only maps a single CQE slot in GPU memory, and the NIC updates that slot each time the consumer index is advanced. This is by design. IBGDA optimizes for the common (happy) path where the GPU doesn’t actually inspect the CQE contents at all; it just needs to know whether its WQE has completed. That “done/not-done” status is determined by checking the producer and consumer indices and the wqe_counter.

So the sequence of events looks like this:

  1. The NIC writes CQEs into its internal ring buffer and advances the producer index.

  2. The GPU polls the single mapped slot in GPU memory, sees the expected CQE is complete, and atomically increments the consumer index.

  3. The NIC notices the updated consumer index and updates the same mapped slot in GPU memory to show the next CQE.

  4. Repeat.

@benjaming1 Where are all the CQ, QP etc created on GPU Memory or host memory?

@alok.prasad7 , I think this blog post would be your best starting point: Improving Network Performance of HPC Systems Using NVIDIA Magnum IO NVSHMEM and GPUDirect Async | NVIDIA Technical Blog

Quoting from that page:

The following steps describe the sequence of operations performed by the proxy thread when interacting with an NVIDIA InfiniBand host channel adapter (HCA), such as the ConnectX-6 HCA:

  1. The CPU creates a work descriptor and enqueues it on the work queue (WQ) buffer, which resides in the host memory.
  2. This descriptor indicates the requested operation such as an RDMA write, and contains the source address, destination address, size, and other necessary network information.
  3. The CPU updates the doorbell record (DBR) buffer in the host memory. This buffer is used in the recovery path in case the NIC drops the write to its doorbell (DB).
  4. The CPU notifies the NIC by writing to its DB, which is a register in the NIC hardware.
  5. The NIC reads the work descriptor from the WQ buffer.
  6. The NIC directly copies the data from the GPU memory using GPUDirect RDMA.
  7. The NIC transfers the data to the remote node.
  8. The NIC indicates that the network operation is completed by writing an event to the completion queue (CQ) buffer on the host memory.
  9. The CPU polls on the CQ buffer to detect completion of the network operation.
  10. The CPU notifies the GPU that the operation has completed. If GDRCopy is present, it writes a notification flag to the GPU memory directly. Otherwise, it writes that flag to the proxy buffer. The GPU polls on the corresponding memory for the status of the work request.


Hello,I want to ask,Line 497. Is cq->prod_idx updated by hardware ?

The cq->prod_idx is updated by software running on the device, as @lewis021 pointed below.

1 Like

Hi benjaming1, I think cq->prod_idx ptr actually point to qp’s sq prod_idx, and sq’s prod_idx is updated by software in ibgda_post_send.

cq->prod_idx ptr is assigned to with cq_h[cq_idx].prod_idx = (uint64_t *)(base_mvars_d_addr + prod_idx_offset).

src\modules\transport\ibgda\ibgda.cpp : 2345

static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
    nvshmemt_ibgda_state_t *ibgda_state;
    ibgda_state = (nvshmemt_ibgda_state_t *)t->state;
    .....
    if (num_rc_handles > 0) {
        for (int i = 0; i < num_rc_handles / n_devs_selected; ++i) {
            int arr_offset = i * n_devs_selected;
            /* No RC QP to self */
            if ((i / (num_rc_handles / n_devs_selected / n_pes)) == mype) {
                continue;
            }
            for (int j = 0; j < n_devs_selected; j++) {
                int arr_idx = arr_offset + j;
                int dev_idx = ibgda_state->selected_dev_ids[j];
                struct ibgda_device *device = (struct ibgda_device *)ibgda_state->devices + dev_idx;
                uintptr_t base_mvars_d_addr = (uintptr_t)(&rc_d[arr_idx]) + mvars_offset;

                ibgda_get_device_qp(&rc_h[arr_idx], device, device->rc.eps[i], j);

                rc_h[arr_idx].tx_wq.cq = &cq_d[cq_idx];

                ibgda_get_device_cq(&cq_h[cq_idx], device->rc.eps[i]->send_cq);
                cq_h[cq_idx].cons_idx = (uint64_t *)(base_mvars_d_addr + cons_t_offset);
                cq_h[cq_idx].resv_head = (uint64_t *)(base_mvars_d_addr + wqe_h_offset);
                cq_h[cq_idx].ready_head = (uint64_t *)(base_mvars_d_addr + wqe_t_offset);
                cq_h[cq_idx].qpn = rc_h[arr_idx].qpn;
                cq_h[cq_idx].qp_type = rc_h[arr_idx].qp_type;

                if (ibgda_nic_handler == IBGDA_NIC_HANDLER_GPU) {
                    rc_h[arr_idx].tx_wq.prod_idx =
                        (uint64_t *)(base_mvars_d_addr + prod_idx_offset);
                    // note: cq's prod_idx is assigned to sq prod_idx's address
                    cq_h[cq_idx].prod_idx = (uint64_t *)(base_mvars_d_addr + prod_idx_offset);
                } else {
                    rc_h[arr_idx].tx_wq.prod_idx =
                        &((uint64_t *)device->qp_shared_object.prod_idx_mobject->aligned
                              .gpu_ptr)[device->dci.num_eps + i];
                    cq_h[cq_idx].prod_idx = rc_h[arr_idx].tx_wq.prod_idx;
                }

                ++cq_idx;
            }
        }
}

src\include\non_abi\device\pt-to-pt\ibgda_device.cuh:1596
sq prod_idx is updated by atomicMax((unsigned long long int *)&mvars->tx_wq.prod_idx, (unsigned long long int)new_prod_idx).

template <bool need_strong_flush>
__device__ static inline void ibgda_post_send(nvshmemi_ibgda_device_qp_t *qp,
                                              uint64_t new_prod_idx) {
    nvshmemi_ibgda_device_qp_management_t *mvars = &qp->mvars;
    uint64_t old_prod_idx;

    // Update prod_idx before ringing the db so that we know which index is needed in quiet/fence.
    ibgda_lock_acquire<NVSHMEMI_THREADGROUP_THREAD>(&mvars->post_send_lock);

    if (need_strong_flush)
         // Note: sq prod_idx is updated here
        old_prod_idx = atomicMax((unsigned long long int *)&mvars->tx_wq.prod_idx,
                                 (unsigned long long int)new_prod_idx);
    else
        old_prod_idx = atomicMax_block((unsigned long long int *)&mvars->tx_wq.prod_idx,
                                       (unsigned long long int)new_prod_idx);

    if (likely(new_prod_idx > old_prod_idx)) {
        IBGDA_MEMBAR();
        ibgda_update_dbr(qp, new_prod_idx);
        IBGDA_MEMBAR();
        ibgda_ring_db(qp, new_prod_idx);
    }

    ibgda_lock_release<NVSHMEMI_THREADGROUP_THREAD>(&mvars->post_send_lock);
}

Hi @lewis021 , thank you for adding these details.