diff --git a/CHANGELOG.md b/CHANGELOG.md index 17df8abbf..e9de94414 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,10 @@ has been added. The public API is available at `include/spdk/pipe.h`. `delayed_pcie_doorbell` parameter in `spdk_nvme_io_qpair_opts` was renamed to `delay_cmd_submit` to allow reuse in other transports. +Added RDMA WR batching to NVMf RDMA initiator. Send and receive WRs are chained together +and posted with a single call to ibv_post_send(receive) in the next call to qpair completion +processing function. Batching is controlled by 'delay_cmd_submit' qpair option. + ### rpc Added optional 'delay_cmd_submit' parameter to 'bdev_nvme_set_options' RPC method. diff --git a/lib/nvme/nvme_rdma.c b/lib/nvme/nvme_rdma.c index 953477f7a..1b0773e08 100644 --- a/lib/nvme/nvme_rdma.c +++ b/lib/nvme/nvme_rdma.c @@ -117,6 +117,16 @@ struct nvme_rdma_ctrlr { struct nvme_rdma_cm_event_entry *cm_events; }; +struct spdk_nvme_send_wr_list { + struct ibv_send_wr *first; + struct ibv_send_wr *last; +}; + +struct spdk_nvme_recv_wr_list { + struct ibv_recv_wr *first; + struct ibv_recv_wr *last; +}; + /* NVMe RDMA qpair extensions for spdk_nvme_qpair */ struct nvme_rdma_qpair { struct spdk_nvme_qpair qpair; @@ -140,6 +150,9 @@ struct nvme_rdma_qpair { struct ibv_recv_wr *rsp_recv_wrs; bool delay_cmd_submit; + struct spdk_nvme_send_wr_list sends_to_post; + struct spdk_nvme_recv_wr_list recvs_to_post; + /* Memory region describing all rsps for this qpair */ struct ibv_mr *rsp_mr; @@ -491,6 +504,98 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair) return 0; } +static inline int +nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair) +{ + struct ibv_send_wr *bad_send_wr; + int rc; + + if (rqpair->sends_to_post.first) { + rc = ibv_post_send(rqpair->cm_id->qp, rqpair->sends_to_post.first, &bad_send_wr); + if (spdk_unlikely(rc)) { + SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n", + rc, spdk_strerror(rc), bad_send_wr); + /* Restart queue from bad wr. If it failed during + * completion processing, controller will be moved to + * failed state. Otherwise it will likely fail again + * in next submit attempt from completion processing. + */ + rqpair->sends_to_post.first = bad_send_wr; + return -1; + } + rqpair->sends_to_post.first = NULL; + } + return 0; +} + +static inline int +nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair) +{ + struct ibv_recv_wr *bad_recv_wr; + int rc; + + if (rqpair->recvs_to_post.first) { + rc = ibv_post_recv(rqpair->cm_id->qp, rqpair->recvs_to_post.first, &bad_recv_wr); + if (spdk_unlikely(rc)) { + SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n", + rc, spdk_strerror(rc), bad_recv_wr); + /* Restart queue from bad wr. If it failed during + * completion processing, controller will be moved to + * failed state. Otherwise it will likely fail again + * in next submit attempt from completion processing. + */ + rqpair->recvs_to_post.first = bad_recv_wr; + return -1; + } + rqpair->recvs_to_post.first = NULL; + } + return 0; +} + +/* Append the given send wr structure to the qpair's outstanding sends list. */ +/* This function accepts only a single wr. */ +static inline int +nvme_rdma_qpair_queue_send_wr(struct nvme_rdma_qpair *rqpair, struct ibv_send_wr *wr) +{ + assert(wr->next == NULL); + + if (rqpair->sends_to_post.first == NULL) { + rqpair->sends_to_post.first = wr; + } else { + rqpair->sends_to_post.last->next = wr; + } + + rqpair->sends_to_post.last = wr; + + if (!rqpair->delay_cmd_submit) { + return nvme_rdma_qpair_submit_sends(rqpair); + } + + return 0; +} + +/* Append the given recv wr structure to the qpair's outstanding recvs list. */ +/* This function accepts only a single wr. */ +static inline int +nvme_rdma_qpair_queue_recv_wr(struct nvme_rdma_qpair *rqpair, struct ibv_recv_wr *wr) +{ + assert(wr->next == NULL); + + if (rqpair->recvs_to_post.first == NULL) { + rqpair->recvs_to_post.first = wr; + } else { + rqpair->recvs_to_post.last->next = wr; + } + + rqpair->recvs_to_post.last = wr; + + if (!rqpair->delay_cmd_submit) { + return nvme_rdma_qpair_submit_recvs(rqpair); + } + + return 0; +} + #define nvme_rdma_trace_ibv_sge(sg_list) \ if (sg_list) { \ SPDK_DEBUGLOG(SPDK_LOG_NVME, "local addr %p length 0x%x lkey 0x%x\n", \ @@ -500,18 +605,12 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair) static int nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx) { - struct ibv_recv_wr *wr, *bad_wr = NULL; - int rc; + struct ibv_recv_wr *wr; wr = &rqpair->rsp_recv_wrs[rsp_idx]; + wr->next = NULL; nvme_rdma_trace_ibv_sge(wr->sg_list); - - rc = ibv_post_recv(rqpair->cm_id->qp, wr, &bad_wr); - if (rc) { - SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc); - } - - return rc; + return nvme_rdma_qpair_queue_recv_wr(rqpair, wr); } static void @@ -590,11 +689,14 @@ nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair) rqpair->rsp_recv_wrs[i].num_sge = 1; if (nvme_rdma_post_recv(rqpair, i)) { - SPDK_ERRLOG("Unable to post connection rx desc\n"); goto fail; } } + if (nvme_rdma_qpair_submit_recvs(rqpair)) { + goto fail; + } + return 0; fail: @@ -1763,8 +1865,7 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair, { struct nvme_rdma_qpair *rqpair; struct spdk_nvme_rdma_req *rdma_req; - struct ibv_send_wr *wr, *bad_wr = NULL; - int rc; + struct ibv_send_wr *wr; rqpair = nvme_rdma_qpair(qpair); assert(rqpair != NULL); @@ -1783,16 +1884,9 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair, } wr = &rdma_req->send_wr; - + wr->next = NULL; nvme_rdma_trace_ibv_sge(wr->sg_list); - - rc = ibv_post_send(rqpair->cm_id->qp, wr, &bad_wr); - if (rc) { - SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, spdk_strerror(rc)); - nvme_rdma_req_put(rqpair, rdma_req); - } - - return rc; + return nvme_rdma_qpair_queue_send_wr(rqpair, wr); } int @@ -1894,6 +1988,11 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair, struct spdk_nvme_rdma_req *rdma_req; struct nvme_rdma_ctrlr *rctrlr; + if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) || + nvme_rdma_qpair_submit_recvs(rqpair))) { + return -1; + } + if (max_completions == 0) { max_completions = rqpair->num_entries; } else {