Hello
In the function ibgda_poll_cq, the action of polling CQEs (Completion Queue Entries) is performed. However, it always reads a CQE with an index of 0, instead of reading each CQE in the CQ (Completion Queue) sequentially.The variable cqe64 has never been updated. I really can’t figure out why this is the case. Could you please help me understand this? Thank you very much!
ibgda_poll_cq source code :
nvshmem_src\src\include\non_abi\device\pt-to-pt\ibgda_device.cuh:
__device__ static inline int ibgda_poll_cq(nvshmemi_ibgda_device_cq_t *cq, uint64_t idx,
int *error) {
int status = 0;
struct mlx5_cqe64 *cqe64 = (struct mlx5_cqe64 *)cq->cqe;
const uint32_t ncqes = cq->ncqes;
uint8_t opown;
uint8_t opcode;
uint16_t wqe_counter;
uint16_t new_wqe_counter;
#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
uint64_t start = ibgda_query_globaltimer();
uint64_t now;
#endif
uint64_t cons_idx = ibgda_atomic_read(cq->cons_idx);
uint64_t new_cons_idx;
assert(likely(cq->qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_DCI ||
cq->qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC));
if (unlikely(cons_idx >= idx)) goto out;
#ifdef NVSHMEM_IBGDA_DEBUG
// We can skip opcode == MLX5_CQE_INVALID check because we have already
// initialized the CQ buffer to 0xff. With the QP depth range we enforce,
// cons_idx cannot progress unless wqe_counter read from the CQ buffer is
// a valid value.
do {
opown = ibgda_atomic_read(&cqe64->op_own);
opcode = opown >> 4;
#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
// TODO: Integrate timeout handler with the core NVSHMEM
now = ibgda_query_globaltimer();
status = ibgda_check_poll_timeout(cq, now, start, idx, error);
if (status != 0) goto check_opcode;
#endif /* NVSHMEM_TIMEOUT_DEVICE_POLLING */
} while (unlikely(opcode == MLX5_CQE_INVALID));
// Prevent reordering of the opcode wait above
IBGDA_MFENCE();
#endif /* NVSHMEM_IBGDA_DEBUG */
#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
start = ibgda_query_globaltimer();
#endif
// If idx is a lot greater than cons_idx, we might get incorrect result due
// to wqe_counter wraparound. We need to check prod_idx to be sure that idx
// has already been submitted.
while (unlikely(ibgda_atomic_read(cq->prod_idx) < idx))
;
IBGDA_MFENCE();
do {
new_wqe_counter = ibgda_atomic_read(&cqe64->wqe_counter);
new_wqe_counter = BSWAP16(new_wqe_counter);
#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
now = ibgda_query_globaltimer();
status = ibgda_check_poll_timeout(cq, now, start, idx, error);
if (status != 0) goto check_opcode;
// Observe progress. Reset the timer.
if (new_wqe_counter != wqe_counter) start = now;
#endif
wqe_counter = new_wqe_counter;
// Another thread may have updated cons_idx.
cons_idx = ibgda_atomic_read(cq->cons_idx);
if (likely(cons_idx >= idx)) goto out;
}
// NOTE: This while loop is part of do while above.
// wqe_counter is the HW consumer index. However, we always maintain index
// + 1 in SW. To be able to compare with idx, we need to use wqe_counter +
// 1. Because wqe_counter is uint16_t, it may wraparound. Still we know for
// sure that if idx - wqe_counter - 1 < ncqes, wqe_counter + 1 is less than
// idx, and thus we need to wait. We don't need to wait when idx ==
// wqe_counter + 1. That's why we use - (uint16_t)2 here to make this case
// wraparound.
while (unlikely(((uint16_t)((uint16_t)idx - wqe_counter - (uint16_t)2) < ncqes)));
// new_cons_idx is uint64_t but wqe_counter is uint16_t. Thus, we get the
// MSB from idx. We also need to take care of wraparound.
++wqe_counter;
new_cons_idx =
(idx & ~(0xffffULL) | wqe_counter) + (((uint16_t)idx > wqe_counter) ? 0x10000ULL : 0x0);
atomicMax((unsigned long long int *)cq->cons_idx, (unsigned long long int)new_cons_idx);
#ifdef NVSHMEM_TIMEOUT_DEVICE_POLLING
check_opcode:
#endif
// NVSHMEM always treats CQE errors as fatal.
// Even if this error doesn't belong to the CQE in cons_idx,
// we will just report and terminate the process.
opown = ibgda_atomic_read(&cqe64->op_own);
opcode = opown >> 4;
if (unlikely(opcode == MLX5_CQE_REQ_ERR)) {
ibgda_mlx5_err_cqe_t *cqe_err = (ibgda_mlx5_err_cqe_t *)cqe64;
*error = cqe_err->syndrome;
#ifdef NVSHMEM_IBGDA_DEBUG
__be16 wqe_counter = ibgda_atomic_read(&cqe_err->wqe_counter);
__be32 s_wqe_opcode_qpn = ibgda_atomic_read(&cqe_err->s_wqe_opcode_qpn);
printf(
"[%d] got completion with err:\n"
" syndrome=%#x, vendor_err_synd=%#x, hw_err_synd=%#x, hw_synd_type=%#x,\n"
" wqe_counter=%#x, s_wqe_opcode_qpn=%#x,\n"
" cqn=%#x, cons_idx=%#lx, prod_idx=%#lx, idx=%#lx\n",
nvshmemi_device_state_d.mype, cqe_err->syndrome, cqe_err->vendor_err_synd,
cqe_err->hw_err_synd, cqe_err->hw_synd_type, BSWAP16(wqe_counter),
BSWAP32(s_wqe_opcode_qpn), cq->cqn, cons_idx, ibgda_atomic_read(cq->prod_idx), idx);
#endif /* NVSHMEM_IBGDA_DEBUG */
status = -1;
}
out:
// Prevent reordering of this function and subsequent instructions
IBGDA_MFENCE();
return status;
}
