diff --git a/CHANGELOG.md b/CHANGELOG.md index 5693ff49c..a0643f2ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,15 @@ and disable CPU core locks in runtime. Added --rpcs-allowed command line option. Users can specify a comma-separated list of RPC names with this option to restrict allowed RPCs to only that list. +### nvme + +NVMe transport options were newly introduced. The NVMe transport options are defined via +the `spdk_nvme_transport_opts` structure and configured via `spdk_nvme_transport_get_opts` +and `spdk_nvme_transport_get_opts` functions. + +Shared receive queue was supported by the RDMA transport. It can be configured by +a new NVMe transport option `rdma_srq_size`. + ### rpc Added spdk_rpc_set_allowlist to restrict allowed RPCs to the specified list. diff --git a/include/spdk/nvme.h b/include/spdk/nvme.h index 3920fa727..dd0410307 100644 --- a/include/spdk/nvme.h +++ b/include/spdk/nvme.h @@ -4098,6 +4098,44 @@ static void __attribute__((constructor)) _spdk_nvme_transport_register_##name(vo spdk_nvme_transport_register(transport_ops); \ } +/** + * NVMe transport options. + */ +struct spdk_nvme_transport_opts { + /** + * It is used for RDMA transport. + * + * The queue depth of a shared rdma receive queue. + */ + uint32_t rdma_srq_size; + + /** + * The size of spdk_nvme_transport_opts according to the caller of this library is used for ABI + * compatibility. The library uses this field to know how many fields in this + * structure are valid. And the library will populate any remaining fields with default values. + */ + size_t opts_size; +} __attribute__((packed)); +SPDK_STATIC_ASSERT(sizeof(struct spdk_nvme_transport_opts) == 12, "Incorrect size"); + +/** + * Get the current NVMe transport options. + * + * \param[out] opts Will be filled with the current options for spdk_nvme_transport_set_opts(). + * \param opts_size Must be set to sizeof(struct spdk_nvme_transport_opts). + */ +void spdk_nvme_transport_get_opts(struct spdk_nvme_transport_opts *opts, size_t opts_size); + +/** + * Set the NVMe transport options. + * + * \param opts Pointer to the allocated spdk_nvme_transport_opts structure with new values. + * \param opts_size Must be set to sizeof(struct spdk_nvme_transport_opts). + * + * \return 0 on success, or negated errno on failure. + */ +int spdk_nvme_transport_set_opts(const struct spdk_nvme_transport_opts *opts, size_t opts_size); + #ifdef __cplusplus } #endif diff --git a/lib/nvme/nvme_internal.h b/lib/nvme/nvme_internal.h index 7cdf2ad78..a02f77528 100644 --- a/lib/nvme/nvme_internal.h +++ b/lib/nvme/nvme_internal.h @@ -34,6 +34,8 @@ extern pid_t g_spdk_nvme_pid; +extern struct spdk_nvme_transport_opts g_spdk_nvme_transport_opts; + /* * Some Intel devices support vendor-unique read latency log page even * though the log page directory says otherwise. diff --git a/lib/nvme/nvme_rdma.c b/lib/nvme/nvme_rdma.c index 88d8cd2de..8cd183809 100644 --- a/lib/nvme/nvme_rdma.c +++ b/lib/nvme/nvme_rdma.c @@ -133,10 +133,15 @@ struct nvme_rdma_poller_stats { }; struct nvme_rdma_poll_group; +struct nvme_rdma_rsps; struct nvme_rdma_poller { struct ibv_context *device; struct ibv_cq *cq; + struct spdk_rdma_srq *srq; + struct nvme_rdma_rsps *rsps; + struct ibv_pd *pd; + struct spdk_rdma_mem_map *mr_map; uint32_t refcnt; int required_num_wc; int current_num_wc; @@ -170,6 +175,7 @@ typedef int (*nvme_rdma_cm_event_cb)(struct nvme_rdma_qpair *rqpair, int ret); struct nvme_rdma_rsp_opts { uint16_t num_entries; struct nvme_rdma_qpair *rqpair; + struct spdk_rdma_srq *srq; struct spdk_rdma_mem_map *mr_map; }; @@ -193,6 +199,7 @@ struct nvme_rdma_qpair { struct spdk_rdma_qp *rdma_qp; struct rdma_cm_id *cm_id; struct ibv_cq *cq; + struct spdk_rdma_srq *srq; struct spdk_nvme_rdma_req *rdma_reqs; @@ -707,12 +714,16 @@ nvme_rdma_qpair_set_poller(struct spdk_nvme_qpair *qpair) return -EINVAL; } - if (nvme_rdma_resize_cq(rqpair, poller)) { - nvme_rdma_poll_group_put_poller(group, poller); - return -EPROTO; + if (!poller->srq) { + if (nvme_rdma_resize_cq(rqpair, poller)) { + nvme_rdma_poll_group_put_poller(group, poller); + return -EPROTO; + } } rqpair->cq = poller->cq; + rqpair->srq = poller->srq; + rqpair->rsps = poller->rsps; rqpair->poller = poller; return 0; } @@ -758,7 +769,11 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair) attr.send_cq = rqpair->cq; attr.recv_cq = rqpair->cq; attr.cap.max_send_wr = rqpair->num_entries; /* SEND operations */ - attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */ + if (rqpair->srq) { + attr.srq = rqpair->srq->srq; + } else { + attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */ + } attr.cap.max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge); attr.cap.max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge); @@ -839,6 +854,20 @@ nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair) return rc; } +static inline int +nvme_rdma_poller_submit_recvs(struct nvme_rdma_poller *poller) +{ + struct ibv_recv_wr *bad_recv_wr; + int rc; + + rc = spdk_rdma_srq_flush_recv_wrs(poller->srq, &bad_recv_wr); + if (spdk_unlikely(rc)) { + nvme_rdma_reset_failed_recvs(poller->rsps, bad_recv_wr, rc); + } + + return rc; +} + #define nvme_rdma_trace_ibv_sge(sg_list) \ if (sg_list) { \ SPDK_DEBUGLOG(nvme, "local addr %p length 0x%x lkey 0x%x\n", \ @@ -916,7 +945,11 @@ nvme_rdma_create_rsps(struct nvme_rdma_rsp_opts *opts) nvme_rdma_trace_ibv_sge(recv_wr->sg_list); - spdk_rdma_qp_queue_recv_wrs(opts->rqpair->rdma_qp, recv_wr); + if (opts->rqpair) { + spdk_rdma_qp_queue_recv_wrs(opts->rqpair->rdma_qp, recv_wr); + } else { + spdk_rdma_srq_queue_recv_wrs(opts->srq, recv_wr); + } } rsps->num_entries = opts->num_entries; @@ -1127,24 +1160,27 @@ nvme_rdma_connect_established(struct nvme_rdma_qpair *rqpair, int ret) } SPDK_DEBUGLOG(nvme, "RDMA requests created\n"); - opts.num_entries = rqpair->num_entries; - opts.rqpair = rqpair; - opts.mr_map = rqpair->mr_map; + if (!rqpair->srq) { + opts.num_entries = rqpair->num_entries; + opts.rqpair = rqpair; + opts.srq = NULL; + opts.mr_map = rqpair->mr_map; - rqpair->rsps = nvme_rdma_create_rsps(&opts); - if (!rqpair->rsps) { - SPDK_ERRLOG("Unable to create rqpair RDMA responses\n"); - return -1; - } - SPDK_DEBUGLOG(nvme, "RDMA responses created\n"); + rqpair->rsps = nvme_rdma_create_rsps(&opts); + if (!rqpair->rsps) { + SPDK_ERRLOG("Unable to create rqpair RDMA responses\n"); + return -1; + } + SPDK_DEBUGLOG(nvme, "RDMA responses created\n"); - ret = nvme_rdma_qpair_submit_recvs(rqpair); - SPDK_DEBUGLOG(nvme, "rc =%d\n", ret); - if (ret) { - SPDK_ERRLOG("Unable to submit rqpair RDMA responses\n"); - return -1; + ret = nvme_rdma_qpair_submit_recvs(rqpair); + SPDK_DEBUGLOG(nvme, "rc =%d\n", ret); + if (ret) { + SPDK_ERRLOG("Unable to submit rqpair RDMA responses\n"); + return -1; + } + SPDK_DEBUGLOG(nvme, "RDMA responses submitted\n"); } - SPDK_DEBUGLOG(nvme, "RDMA responses submitted\n"); rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND; @@ -1863,6 +1899,8 @@ nvme_rdma_qpair_destroy(struct nvme_rdma_qpair *rqpair) rqpair->poller = NULL; rqpair->cq = NULL; + rqpair->srq = NULL; + rqpair->rsps = NULL; } else if (rqpair->cq) { ibv_destroy_cq(rqpair->cq); rqpair->cq = NULL; @@ -1892,7 +1930,8 @@ nvme_rdma_qpair_disconnected(struct nvme_rdma_qpair *rqpair, int ret) goto quiet; } - if (rqpair->current_num_sends != 0 || rqpair->rsps->current_num_recvs != 0) { + if (rqpair->current_num_sends != 0 || + (!rqpair->srq && rqpair->rsps->current_num_recvs != 0)) { rqpair->state = NVME_RDMA_QPAIR_STATE_LINGERING; rqpair->evt_timeout_ticks = (NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC + spdk_get_ticks(); @@ -1913,7 +1952,8 @@ static int nvme_rdma_qpair_wait_until_quiet(struct nvme_rdma_qpair *rqpair) { if (spdk_get_ticks() < rqpair->evt_timeout_ticks && - (rqpair->current_num_sends != 0 || rqpair->rsps->current_num_recvs != 0)) { + (rqpair->current_num_sends != 0 || + (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) { return -EAGAIN; } @@ -2372,7 +2412,11 @@ nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_re recv_wr->next = NULL; nvme_rdma_trace_ibv_sge(recv_wr->sg_list); - spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr); + if (!rqpair->srq) { + spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr); + } else { + spdk_rdma_srq_queue_recv_wrs(rqpair->srq, recv_wr); + } } #define MAX_COMPLETIONS_PER_POLL 128 @@ -2431,29 +2475,45 @@ nvme_rdma_log_wc_status(struct nvme_rdma_qpair *rqpair, struct ibv_wc *wc) } static inline int -nvme_rdma_process_recv_completion(struct ibv_wc *wc, struct nvme_rdma_wr *rdma_wr) +nvme_rdma_process_recv_completion(struct nvme_rdma_poller *poller, struct ibv_wc *wc, + struct nvme_rdma_wr *rdma_wr) { struct nvme_rdma_qpair *rqpair; struct spdk_nvme_rdma_req *rdma_req; struct spdk_nvme_rdma_rsp *rdma_rsp; rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr); - rqpair = rdma_rsp->rqpair; + + if (poller && poller->srq) { + rqpair = get_rdma_qpair_from_wc(poller->group, wc); + if (spdk_unlikely(!rqpair)) { + /* Since we do not handle the LAST_WQE_REACHED event, we do not know when + * a Receive Queue in a QP, that is associated with an SRQ, is flushed. + * We may get a WC for a already destroyed QP. + * + * However, for the SRQ, this is not any error. Hence, just re-post the + * receive request to the SRQ to reuse for other QPs, and return 0. + */ + spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr); + return 0; + } + } else { + rqpair = rdma_rsp->rqpair; + } + assert(rqpair->rsps->current_num_recvs > 0); rqpair->rsps->current_num_recvs--; if (wc->status) { nvme_rdma_log_wc_status(rqpair, wc); - nvme_rdma_fail_qpair(&rqpair->qpair, 0); - return -ENXIO; + goto err_wc; } SPDK_DEBUGLOG(nvme, "CQ recv completion\n"); if (wc->byte_len < sizeof(struct spdk_nvme_cpl)) { SPDK_ERRLOG("recv length %u less than expected response size\n", wc->byte_len); - nvme_rdma_fail_qpair(&rqpair->qpair, 0); - return -ENXIO; + goto err_wc; } rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid]; rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED; @@ -2475,6 +2535,13 @@ nvme_rdma_process_recv_completion(struct ibv_wc *wc, struct nvme_rdma_wr *rdma_w rqpair->num_completions++; return 1; + +err_wc: + nvme_rdma_fail_qpair(&rqpair->qpair, 0); + if (poller && poller->srq) { + spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr); + } + return -ENXIO; } static inline int @@ -2505,6 +2572,9 @@ nvme_rdma_process_send_completion(struct nvme_rdma_poller *poller, rqpair->current_num_sends--; nvme_rdma_log_wc_status(rqpair, wc); nvme_rdma_fail_qpair(&rqpair->qpair, 0); + if (rdma_req->rdma_rsp && poller && poller->srq) { + spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_req->rdma_rsp->recv_wr); + } return -ENXIO; } @@ -2561,7 +2631,7 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size, rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id; switch (rdma_wr->type) { case RDMA_WR_TYPE_RECV: - _rc = nvme_rdma_process_recv_completion(&wc[i], rdma_wr); + _rc = nvme_rdma_process_recv_completion(poller, &wc[i], rdma_wr); break; case RDMA_WR_TYPE_SEND: @@ -2767,6 +2837,18 @@ nvme_rdma_poller_destroy(struct nvme_rdma_poller *poller) if (poller->cq) { ibv_destroy_cq(poller->cq); } + if (poller->rsps) { + nvme_rdma_free_rsps(poller->rsps); + } + if (poller->srq) { + spdk_rdma_srq_destroy(poller->srq); + } + if (poller->mr_map) { + spdk_rdma_free_mem_map(&poller->mr_map); + } + if (poller->pd) { + spdk_rdma_put_pd(poller->pd); + } free(poller); } @@ -2774,6 +2856,11 @@ static struct nvme_rdma_poller * nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx) { struct nvme_rdma_poller *poller; + struct ibv_device_attr dev_attr; + struct spdk_rdma_srq_init_attr srq_init_attr = {}; + struct nvme_rdma_rsp_opts opts; + int num_cqe; + int rc; poller = calloc(1, sizeof(*poller)); if (poller == NULL) { @@ -2783,7 +2870,68 @@ nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context * poller->group = group; poller->device = ctx; - poller->cq = ibv_create_cq(poller->device, DEFAULT_NVME_RDMA_CQ_SIZE, group, NULL, 0); + + if (g_spdk_nvme_transport_opts.rdma_srq_size != 0) { + rc = ibv_query_device(ctx, &dev_attr); + if (rc) { + SPDK_ERRLOG("Unable to query RDMA device.\n"); + goto fail; + } + + poller->pd = spdk_rdma_get_pd(ctx); + if (poller->pd == NULL) { + SPDK_ERRLOG("Unable to get PD.\n"); + goto fail; + } + + poller->mr_map = spdk_rdma_create_mem_map(poller->pd, &g_nvme_hooks, + SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR); + if (poller->mr_map == NULL) { + SPDK_ERRLOG("Unable to create memory map.\n"); + goto fail; + } + + srq_init_attr.stats = &poller->stats.rdma_stats.recv; + srq_init_attr.pd = poller->pd; + srq_init_attr.srq_init_attr.attr.max_wr = spdk_min((uint32_t)dev_attr.max_srq_wr, + g_spdk_nvme_transport_opts.rdma_srq_size); + srq_init_attr.srq_init_attr.attr.max_sge = spdk_min(dev_attr.max_sge, + NVME_RDMA_DEFAULT_RX_SGE); + + poller->srq = spdk_rdma_srq_create(&srq_init_attr); + if (poller->srq == NULL) { + SPDK_ERRLOG("Unable to create SRQ.\n"); + goto fail; + } + + opts.num_entries = g_spdk_nvme_transport_opts.rdma_srq_size; + opts.rqpair = NULL; + opts.srq = poller->srq; + opts.mr_map = poller->mr_map; + + poller->rsps = nvme_rdma_create_rsps(&opts); + if (poller->rsps == NULL) { + SPDK_ERRLOG("Unable to create poller RDMA responses.\n"); + goto fail; + } + + rc = nvme_rdma_poller_submit_recvs(poller); + if (rc) { + SPDK_ERRLOG("Unable to submit poller RDMA responses.\n"); + goto fail; + } + + /* + * When using an srq, fix the size of the completion queue at startup. + * The initiator sends only send and recv WRs. Hence, the multiplier is 2. + * (The target sends also data WRs. Hence, the multiplier is 3.) + */ + num_cqe = g_spdk_nvme_transport_opts.rdma_srq_size * 2; + } else { + num_cqe = DEFAULT_NVME_RDMA_CQ_SIZE; + } + + poller->cq = ibv_create_cq(poller->device, num_cqe, group, NULL, 0); if (poller->cq == NULL) { SPDK_ERRLOG("Unable to create CQ, errno %d.\n", errno); @@ -2792,7 +2940,7 @@ nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context * STAILQ_INSERT_HEAD(&group->pollers, poller, link); group->num_pollers++; - poller->current_num_wc = DEFAULT_NVME_RDMA_CQ_SIZE; + poller->current_num_wc = num_cqe; poller->required_num_wc = 0; return poller; @@ -2983,6 +3131,9 @@ nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group * } while (poller_completions < completions_per_poller); total_completions += poller_completions; poller->stats.completions += rdma_completions; + if (poller->srq) { + nvme_rdma_poller_submit_recvs(poller); + } } STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) { @@ -2997,7 +3148,9 @@ nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group * } nvme_rdma_qpair_submit_sends(rqpair); - nvme_rdma_qpair_submit_recvs(rqpair); + if (!rqpair->srq) { + nvme_rdma_qpair_submit_recvs(rqpair); + } if (rqpair->num_completions > 0) { nvme_qpair_resubmit_requests(qpair, rqpair->num_completions); } diff --git a/lib/nvme/nvme_transport.c b/lib/nvme/nvme_transport.c index 4603a99b9..85a48bb13 100644 --- a/lib/nvme/nvme_transport.c +++ b/lib/nvme/nvme_transport.c @@ -25,6 +25,10 @@ TAILQ_HEAD(nvme_transport_list, spdk_nvme_transport) g_spdk_nvme_transports = struct spdk_nvme_transport g_spdk_transports[SPDK_MAX_NUM_OF_TRANSPORTS] = {}; int g_current_transport_index = 0; +struct spdk_nvme_transport_opts g_spdk_nvme_transport_opts = { + .rdma_srq_size = 0, +}; + const struct spdk_nvme_transport * nvme_get_first_transport(void) { @@ -792,3 +796,59 @@ nvme_transport_get_trtype(const struct spdk_nvme_transport *transport) { return transport->ops.type; } + +void +spdk_nvme_transport_get_opts(struct spdk_nvme_transport_opts *opts, size_t opts_size) +{ + if (opts == NULL) { + SPDK_ERRLOG("opts should not be NULL.\n"); + return; + } + + if (opts_size == 0) { + SPDK_ERRLOG("opts_size should not be zero.\n"); + return; + } + + opts->opts_size = opts_size; + +#define SET_FIELD(field) \ + if (offsetof(struct spdk_nvme_transport_opts, field) + sizeof(opts->field) <= opts_size) { \ + opts->field = g_spdk_nvme_transport_opts.field; \ + } \ + + SET_FIELD(rdma_srq_size); + + /* Do not remove this statement, you should always update this statement when you adding a new field, + * and do not forget to add the SET_FIELD statement for your added field. */ + SPDK_STATIC_ASSERT(sizeof(struct spdk_nvme_transport_opts) == 12, "Incorrect size"); + +#undef SET_FIELD +} + +int +spdk_nvme_transport_set_opts(const struct spdk_nvme_transport_opts *opts, size_t opts_size) +{ + if (opts == NULL) { + SPDK_ERRLOG("opts should not be NULL.\n"); + return -EINVAL; + } + + if (opts_size == 0) { + SPDK_ERRLOG("opts_size should not be zero.\n"); + return -EINVAL; + } + +#define SET_FIELD(field) \ + if (offsetof(struct spdk_nvme_transport_opts, field) + sizeof(opts->field) <= opts->opts_size) { \ + g_spdk_nvme_transport_opts.field = opts->field; \ + } \ + + SET_FIELD(rdma_srq_size); + + g_spdk_nvme_transport_opts.opts_size = opts->opts_size; + +#undef SET_FIELD + + return 0; +} diff --git a/lib/nvme/spdk_nvme.map b/lib/nvme/spdk_nvme.map index 8b56b06bc..b14de0537 100644 --- a/lib/nvme/spdk_nvme.map +++ b/lib/nvme/spdk_nvme.map @@ -5,6 +5,8 @@ spdk_nvme_transport_register; spdk_nvme_transport_available; spdk_nvme_transport_available_by_name; + spdk_nvme_transport_get_opts; + spdk_nvme_transport_set_opts; spdk_nvme_transport_id_parse; spdk_nvme_transport_id_populate_trstring; spdk_nvme_transport_id_parse_trtype; diff --git a/test/common/lib/test_rdma.c b/test/common/lib/test_rdma.c index e41df3c5e..18f158301 100644 --- a/test/common/lib/test_rdma.c +++ b/test/common/lib/test_rdma.c @@ -11,7 +11,9 @@ #define RDMA_UT_LKEY 123 #define RDMA_UT_RKEY 312 +struct spdk_nvme_transport_opts g_spdk_nvme_transport_opts = {}; struct spdk_rdma_qp g_spdk_rdma_qp = {}; +struct spdk_rdma_srq g_spdk_rdma_srq = {}; DEFINE_STUB(spdk_rdma_qp_create, struct spdk_rdma_qp *, (struct rdma_cm_id *cm_id, struct spdk_rdma_qp_init_attr *qp_attr), &g_spdk_rdma_qp); DEFINE_STUB(spdk_rdma_qp_accept, int, (struct spdk_rdma_qp *spdk_rdma_qp, @@ -24,7 +26,7 @@ DEFINE_STUB(spdk_rdma_qp_queue_send_wrs, bool, (struct spdk_rdma_qp *spdk_rdma_q DEFINE_STUB(spdk_rdma_qp_flush_send_wrs, int, (struct spdk_rdma_qp *spdk_rdma_qp, struct ibv_send_wr **bad_wr), 0); DEFINE_STUB(spdk_rdma_srq_create, struct spdk_rdma_srq *, - (struct spdk_rdma_srq_init_attr *init_attr), NULL); + (struct spdk_rdma_srq_init_attr *init_attr), &g_spdk_rdma_srq); DEFINE_STUB(spdk_rdma_srq_destroy, int, (struct spdk_rdma_srq *rdma_srq), 0); DEFINE_STUB(spdk_rdma_srq_queue_recv_wrs, bool, (struct spdk_rdma_srq *rdma_srq, struct ibv_recv_wr *first), true);