diff --git a/lib/nvme/nvme_rdma.c b/lib/nvme/nvme_rdma.c index b19742bf7..0abb51e7b 100644 --- a/lib/nvme/nvme_rdma.c +++ b/lib/nvme/nvme_rdma.c @@ -143,13 +143,14 @@ struct nvme_rdma_qpair { uint16_t num_entries; + bool delay_cmd_submit; + /* Parallel arrays of response buffers + response SGLs of size num_entries */ struct ibv_sge *rsp_sgls; struct spdk_nvme_cpl *rsps; struct ibv_recv_wr *rsp_recv_wrs; - bool delay_cmd_submit; struct spdk_nvme_send_wr_list sends_to_post; struct spdk_nvme_recv_wr_list recvs_to_post; @@ -174,8 +175,19 @@ struct nvme_rdma_qpair { struct rdma_cm_event *evt; }; +enum NVME_RDMA_COMPLETION_FLAGS { + NVME_RDMA_SEND_COMPLETED = 1u << 0, + NVME_RDMA_RECV_COMPLETED = 1u << 1, +}; + struct spdk_nvme_rdma_req { - int id; + uint16_t id; + uint16_t completion_flags: 2; + uint16_t reserved: 14; + /* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request + * during processing of RDMA_SEND. To complete the request we must know the index + * of nvme_cpl received in RDMA_RECV, so store it in this field */ + uint16_t rsp_idx; struct ibv_send_wr send_wr; @@ -184,8 +196,6 @@ struct spdk_nvme_rdma_req { struct ibv_sge send_sgl[NVME_RDMA_DEFAULT_TX_SGE]; TAILQ_ENTRY(spdk_nvme_rdma_req) link; - - bool request_ready_to_put; }; static const char *rdma_cm_event_str[] = { @@ -244,7 +254,7 @@ nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair) static void nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req) { - rdma_req->request_ready_to_put = false; + rdma_req->completion_flags = 0; TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link); TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link); } @@ -668,7 +678,8 @@ fail: static int nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair) { - int i, rc; + uint16_t i; + int rc; rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps, rqpair->num_entries * sizeof(*rqpair->rsps)); @@ -734,7 +745,7 @@ nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair) static int nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair) { - int i; + uint16_t i; rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req)); if (rqpair->rdma_reqs == NULL) { @@ -804,35 +815,6 @@ fail: return -ENOMEM; } -static int -nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx, int *reaped) -{ - struct spdk_nvme_rdma_req *rdma_req; - struct spdk_nvme_cpl *rsp; - struct nvme_request *req; - - assert(rsp_idx < rqpair->num_entries); - rsp = &rqpair->rsps[rsp_idx]; - rdma_req = &rqpair->rdma_reqs[rsp->cid]; - - req = rdma_req->req; - nvme_rdma_req_complete(req, rsp); - - if (rdma_req->request_ready_to_put) { - (*reaped)++; - nvme_rdma_req_put(rqpair, rdma_req); - } else { - rdma_req->request_ready_to_put = true; - } - - if (nvme_rdma_post_recv(rqpair, rsp_idx)) { - SPDK_ERRLOG("Unable to re-post rx descriptor\n"); - return -1; - } - - return 0; -} - static int nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair, struct sockaddr *src_addr, @@ -1944,6 +1926,14 @@ nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair) } } +static inline int +nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req) +{ + nvme_rdma_req_complete(rdma_req->req, &rqpair->rsps[rdma_req->rsp_idx]); + nvme_rdma_req_put(rqpair, rdma_req); + return nvme_rdma_post_recv(rqpair, rdma_req->rsp_idx); +} + #define MAX_COMPLETIONS_PER_POLL 128 int @@ -1953,10 +1943,12 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair, struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair); struct ibv_wc wc[MAX_COMPLETIONS_PER_POLL]; int i, rc = 0, batch_size; - uint32_t reaped; + uint32_t reaped = 0; + uint16_t rsp_idx; struct ibv_cq *cq; struct spdk_nvme_rdma_req *rdma_req; struct nvme_rdma_ctrlr *rctrlr; + struct spdk_nvme_cpl *rsp; if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) || nvme_rdma_qpair_submit_recvs(rqpair))) { @@ -1981,7 +1973,6 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair, cq = rqpair->cq; - reaped = 0; do { batch_size = spdk_min((max_completions - reaped), MAX_COMPLETIONS_PER_POLL); @@ -2011,20 +2002,32 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair, goto fail; } - if (nvme_rdma_recv(rqpair, wc[i].wr_id, &reaped)) { - SPDK_ERRLOG("nvme_rdma_recv processing failure\n"); - goto fail; + assert(wc[i].wr_id < rqpair->num_entries); + rsp_idx = (uint16_t)wc[i].wr_id; + rsp = &rqpair->rsps[rsp_idx]; + rdma_req = &rqpair->rdma_reqs[rsp->cid]; + rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED; + rdma_req->rsp_idx = rsp_idx; + + if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) { + if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) { + SPDK_ERRLOG("Unable to re-post rx descriptor\n"); + goto fail; + } + reaped++; } break; case IBV_WC_SEND: rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id; + rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED; - if (rdma_req->request_ready_to_put) { + if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) { + if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) { + SPDK_ERRLOG("Unable to re-post rx descriptor\n"); + goto fail; + } reaped++; - nvme_rdma_req_put(rqpair, rdma_req); - } else { - rdma_req->request_ready_to_put = true; } break;