diff --git a/lib/nvmf/rdma.c b/lib/nvmf/rdma.c index 6f87f93c8..41deae3da 100644 --- a/lib/nvmf/rdma.c +++ b/lib/nvmf/rdma.c @@ -259,6 +259,38 @@ enum spdk_nvmf_rdma_qpair_disconnect_flags { RDMA_QP_SEND_DRAINED = 1 << 2 }; +struct spdk_nvmf_rdma_resources { + /* Array of size "max_queue_depth" containing RDMA requests. */ + struct spdk_nvmf_rdma_request *reqs; + + /* Array of size "max_queue_depth" containing RDMA recvs. */ + struct spdk_nvmf_rdma_recv *recvs; + + /* Array of size "max_queue_depth" containing 64 byte capsules + * used for receive. + */ + union nvmf_h2c_msg *cmds; + struct ibv_mr *cmds_mr; + + /* Array of size "max_queue_depth" containing 16 byte completions + * to be sent back to the user. + */ + union nvmf_c2h_msg *cpls; + struct ibv_mr *cpls_mr; + + /* Array of size "max_queue_depth * InCapsuleDataSize" containing + * buffers to be used for in capsule data. + */ + void *bufs; + struct ibv_mr *bufs_mr; + + /* Receives that are waiting for a request object */ + STAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue; + + /* Queue to track free requests */ + STAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue; +}; + struct spdk_nvmf_rdma_qpair { struct spdk_nvmf_qpair qpair; @@ -296,13 +328,7 @@ struct spdk_nvmf_rdma_qpair { /* The maximum number of SGEs per WR on the recv queue */ uint32_t max_recv_sge; -#ifndef SPDK_CONFIG_RDMA_SRQ - /* Receives that are waiting for a request object */ - STAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue; - - /* Queues to track requests in critical states */ - STAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue; -#endif + struct spdk_nvmf_rdma_resources *resources; STAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_read_queue; @@ -311,32 +337,6 @@ struct spdk_nvmf_rdma_qpair { /* Number of requests not in the free state */ uint32_t qd; -#ifndef SPDK_CONFIG_RDMA_SRQ - /* Array of size "max_queue_depth" containing RDMA requests. */ - struct spdk_nvmf_rdma_request *reqs; - - /* Array of size "max_queue_depth" containing RDMA recvs. */ - struct spdk_nvmf_rdma_recv *recvs; - - /* Array of size "max_queue_depth" containing 64 byte capsules - * used for receive. - */ - union nvmf_h2c_msg *cmds; - struct ibv_mr *cmds_mr; - - /* Array of size "max_queue_depth" containing 16 byte completions - * to be sent back to the user. - */ - union nvmf_c2h_msg *cpls; - struct ibv_mr *cpls_mr; - - /* Array of size "max_queue_depth * InCapsuleDataSize" containing - * buffers to be used for in capsule data. - */ - void *bufs; - struct ibv_mr *bufs_mr; -#endif - TAILQ_ENTRY(spdk_nvmf_rdma_qpair) link; /* IBV queue pair attributes: they are used to manage @@ -367,43 +367,13 @@ struct spdk_nvmf_rdma_poller { int required_num_wr; struct ibv_cq *cq; -#ifdef SPDK_CONFIG_RDMA_SRQ /* The maximum number of I/O outstanding on the shared receive queue at one time */ uint16_t max_srq_depth; /* Shared receive queue */ struct ibv_srq *srq; - /* Receives that are waiting for a request object */ - STAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue; - - /* Queue to track free requests */ - STAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue; - - /* Array of size "max_srq_depth" containing RDMA requests. */ - struct spdk_nvmf_rdma_request *reqs; - - /* Array of size "max_srq_depth" containing RDMA recvs. */ - struct spdk_nvmf_rdma_recv *recvs; - - /* Array of size "max_srq_depth" containing 64 byte capsules - * used for receive. - */ - union nvmf_h2c_msg *cmds; - struct ibv_mr *cmds_mr; - - /* Array of size "max_srq_depth" containing 16 byte completions - * to be sent back to the user. - */ - union nvmf_c2h_msg *cpls; - struct ibv_mr *cpls_mr; - - /* Array of size "max_srq_depth * InCapsuleDataSize" containing - * buffers to be used for in capsule data. - */ - void *bufs; - struct ibv_mr *bufs_mr; -#endif + struct spdk_nvmf_rdma_resources *resources; TAILQ_HEAD(, spdk_nvmf_rdma_qpair) qpairs; @@ -622,8 +592,8 @@ nvmf_rdma_dump_qpair_contents(struct spdk_nvmf_rdma_qpair *rqpair) SPDK_ERRLOG("Dumping contents of queue pair (QID %d)\n", rqpair->qpair.qid); for (i = 0; i < rqpair->max_queue_depth; i++) { - if (rqpair->reqs[i].state != RDMA_REQUEST_STATE_FREE) { - nvmf_rdma_dump_request(&rqpair->reqs[i]); + if (rqpair->resources->reqs[i].state != RDMA_REQUEST_STATE_FREE) { + nvmf_rdma_dump_request(&rqpair->resources->reqs[i]); } } } @@ -655,22 +625,22 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair) } #ifndef SPDK_CONFIG_RDMA_SRQ - if (rqpair->cmds_mr) { - ibv_dereg_mr(rqpair->cmds_mr); + if (rqpair->resources->cmds_mr) { + ibv_dereg_mr(rqpair->resources->cmds_mr); } - if (rqpair->cpls_mr) { - ibv_dereg_mr(rqpair->cpls_mr); + if (rqpair->resources->cpls_mr) { + ibv_dereg_mr(rqpair->resources->cpls_mr); } - if (rqpair->bufs_mr) { - ibv_dereg_mr(rqpair->bufs_mr); + if (rqpair->resources->bufs_mr) { + ibv_dereg_mr(rqpair->resources->bufs_mr); } #else /* Drop all received but unprocessed commands for this queue and return them to SRQ */ - STAILQ_FOREACH_SAFE(rdma_recv, &rqpair->poller->incoming_queue, link, recv_tmp) { + STAILQ_FOREACH_SAFE(rdma_recv, &rqpair->resources->incoming_queue, link, recv_tmp) { if (rqpair == rdma_recv->qpair) { - STAILQ_REMOVE_HEAD(&rqpair->poller->incoming_queue, link); + STAILQ_REMOVE_HEAD(&rqpair->resources->incoming_queue, link); rc = ibv_post_srq_recv(rqpair->poller->srq, &rdma_recv->wr, &bad_recv_wr); if (rc) { SPDK_ERRLOG("Unable to re-post rx descriptor\n"); @@ -690,11 +660,12 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair) /* Free all memory */ #ifndef SPDK_CONFIG_RDMA_SRQ - spdk_dma_free(rqpair->cmds); - spdk_dma_free(rqpair->cpls); - spdk_dma_free(rqpair->bufs); - free(rqpair->reqs); - free(rqpair->recvs); + spdk_dma_free(rqpair->resources->cmds); + spdk_dma_free(rqpair->resources->cpls); + spdk_dma_free(rqpair->resources->bufs); + free(rqpair->resources->reqs); + free(rqpair->resources->recvs); + free(rqpair->resources); #endif free(rqpair); } @@ -789,55 +760,60 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair) rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport); transport = &rtransport->transport; - rqpair->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->reqs)); - rqpair->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->recvs)); - rqpair->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cmds), - 0x1000, NULL); - rqpair->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cpls), - 0x1000, NULL); + rqpair->resources->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->resources->reqs)); + rqpair->resources->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->resources->recvs)); + rqpair->resources->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof( + *rqpair->resources->cmds), + 0x1000, NULL); + rqpair->resources->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof( + *rqpair->resources->cpls), + 0x1000, NULL); if (transport->opts.in_capsule_data_size > 0) { - rqpair->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth * - transport->opts.in_capsule_data_size, - 0x1000, NULL); + rqpair->resources->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth * + transport->opts.in_capsule_data_size, + 0x1000, NULL); } - if (!rqpair->reqs || !rqpair->recvs || !rqpair->cmds || - !rqpair->cpls || (transport->opts.in_capsule_data_size && !rqpair->bufs)) { + if (!rqpair->resources->reqs || !rqpair->resources->recvs || !rqpair->resources->cmds || + !rqpair->resources->cpls || (transport->opts.in_capsule_data_size && !rqpair->resources->bufs)) { SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n"); spdk_nvmf_rdma_qpair_destroy(rqpair); return -1; } - rqpair->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cmds, - rqpair->max_queue_depth * sizeof(*rqpair->cmds), - IBV_ACCESS_LOCAL_WRITE); - rqpair->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cpls, - rqpair->max_queue_depth * sizeof(*rqpair->cpls), - 0); + rqpair->resources->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->resources->cmds, + rqpair->max_queue_depth * sizeof(*rqpair->resources->cmds), + IBV_ACCESS_LOCAL_WRITE); + rqpair->resources->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->resources->cpls, + rqpair->max_queue_depth * sizeof(*rqpair->resources->cpls), + 0); if (transport->opts.in_capsule_data_size) { - rqpair->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->bufs, - rqpair->max_queue_depth * - transport->opts.in_capsule_data_size, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); + rqpair->resources->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->resources->bufs, + rqpair->max_queue_depth * + transport->opts.in_capsule_data_size, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); } - if (!rqpair->cmds_mr || !rqpair->cpls_mr || (transport->opts.in_capsule_data_size && - !rqpair->bufs_mr)) { + if (!rqpair->resources->cmds_mr || !rqpair->resources->cpls_mr || + (transport->opts.in_capsule_data_size && + !rqpair->resources->bufs_mr)) { SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n"); spdk_nvmf_rdma_qpair_destroy(rqpair); return -1; } SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n", - rqpair->cmds, rqpair->max_queue_depth * sizeof(*rqpair->cmds), rqpair->cmds_mr->lkey); + rqpair->resources->cmds, rqpair->max_queue_depth * sizeof(*rqpair->resources->cmds), + rqpair->resources->cmds_mr->lkey); SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n", - rqpair->cpls, rqpair->max_queue_depth * sizeof(*rqpair->cpls), rqpair->cpls_mr->lkey); - if (rqpair->bufs && rqpair->bufs_mr) { + rqpair->resources->cpls, rqpair->max_queue_depth * sizeof(*rqpair->resources->cpls), + rqpair->resources->cpls_mr->lkey); + if (rqpair->resources->bufs && rqpair->resources->bufs_mr) { SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n", - rqpair->bufs, rqpair->max_queue_depth * - transport->opts.in_capsule_data_size, rqpair->bufs_mr->lkey); + rqpair->resources->bufs, rqpair->max_queue_depth * + transport->opts.in_capsule_data_size, rqpair->resources->bufs_mr->lkey); } #endif @@ -846,31 +822,31 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair) STAILQ_INIT(&rqpair->pending_rdma_write_queue); #ifndef SPDK_CONFIG_RDMA_SRQ - STAILQ_INIT(&rqpair->free_queue); + STAILQ_INIT(&rqpair->resources->free_queue); for (i = 0; i < rqpair->max_queue_depth; i++) { struct ibv_recv_wr *bad_wr = NULL; - rdma_recv = &rqpair->recvs[i]; + rdma_recv = &rqpair->resources->recvs[i]; rdma_recv->qpair = rqpair; /* Set up memory to receive commands */ - if (rqpair->bufs) { - rdma_recv->buf = (void *)((uintptr_t)rqpair->bufs + (i * + if (rqpair->resources->bufs) { + rdma_recv->buf = (void *)((uintptr_t)rqpair->resources->bufs + (i * transport->opts.in_capsule_data_size)); } rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV; - rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->cmds[i]; - rdma_recv->sgl[0].length = sizeof(rqpair->cmds[i]); - rdma_recv->sgl[0].lkey = rqpair->cmds_mr->lkey; + rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->resources->cmds[i]; + rdma_recv->sgl[0].length = sizeof(rqpair->resources->cmds[i]); + rdma_recv->sgl[0].lkey = rqpair->resources->cmds_mr->lkey; rdma_recv->wr.num_sge = 1; - if (rdma_recv->buf && rqpair->bufs_mr) { + if (rdma_recv->buf && rqpair->resources->bufs_mr) { rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf; rdma_recv->sgl[1].length = transport->opts.in_capsule_data_size; - rdma_recv->sgl[1].lkey = rqpair->bufs_mr->lkey; + rdma_recv->sgl[1].lkey = rqpair->resources->bufs_mr->lkey; rdma_recv->wr.num_sge++; } @@ -889,17 +865,17 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair) assert(rqpair->current_recv_depth == 0); for (i = 0; i < rqpair->max_queue_depth; i++) { - rdma_req = &rqpair->reqs[i]; + rdma_req = &rqpair->resources->reqs[i]; rdma_req->req.qpair = &rqpair->qpair; rdma_req->req.cmd = NULL; /* Set up memory to send responses */ - rdma_req->req.rsp = &rqpair->cpls[i]; + rdma_req->req.rsp = &rqpair->resources->cpls[i]; - rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->cpls[i]; - rdma_req->rsp.sgl[0].length = sizeof(rqpair->cpls[i]); - rdma_req->rsp.sgl[0].lkey = rqpair->cpls_mr->lkey; + rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->resources->cpls[i]; + rdma_req->rsp.sgl[0].length = sizeof(rqpair->resources->cpls[i]); + rdma_req->rsp.sgl[0].lkey = rqpair->resources->cpls_mr->lkey; rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND; rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr; @@ -919,7 +895,7 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair) /* Initialize request state to FREE */ rdma_req->state = RDMA_REQUEST_STATE_FREE; - STAILQ_INSERT_HEAD(&rqpair->free_queue, rdma_req, state_link); + STAILQ_INSERT_HEAD(&rqpair->resources->free_queue, rdma_req, state_link); } #endif @@ -1147,15 +1123,25 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e return -1; } +#ifndef SPDK_CONFIG_RDMA_SRQ + rqpair->resources = calloc(1, sizeof(struct spdk_nvmf_rdma_resources)); + if (rqpair->resources == NULL) { + SPDK_ERRLOG("Unable to allocate resources for rdma qpair\n"); + free(rqpair); + spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES); + return -1; + } + STAILQ_INIT(&rqpair->resources->incoming_queue); + STAILQ_INIT(&rqpair->resources->free_queue); +#endif + rqpair->port = port; rqpair->max_queue_depth = max_queue_depth; rqpair->max_read_depth = max_read_depth; rqpair->cm_id = event->id; rqpair->listen_id = event->listen_id; rqpair->qpair.transport = transport; -#ifndef SPDK_CONFIG_RDMA_SRQ - STAILQ_INIT(&rqpair->incoming_queue); -#endif + event->id->context = &rqpair->qpair; cb_fn(&rqpair->qpair); @@ -1488,11 +1474,8 @@ nvmf_rdma_request_free(struct spdk_nvmf_rdma_request *rdma_req, rdma_req->req.iovcnt = 0; rdma_req->req.data = NULL; rqpair->qd--; -#ifndef SPDK_CONFIG_RDMA_SRQ - STAILQ_INSERT_HEAD(&rqpair->free_queue, rdma_req, state_link); -#else - STAILQ_INSERT_HEAD(&rqpair->poller->free_queue, rdma_req, state_link); -#endif + + STAILQ_INSERT_HEAD(&rqpair->resources->free_queue, rdma_req, state_link); rdma_req->state = RDMA_REQUEST_STATE_FREE; } @@ -2209,9 +2192,7 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport struct spdk_nvmf_rdma_qpair *rqpair, bool drain) { struct spdk_nvmf_rdma_request *rdma_req, *req_tmp; -#ifdef SPDK_CONFIG_RDMA_SRQ - struct spdk_nvmf_rdma_poller *rpoller = rqpair->poller; -#endif + struct spdk_nvmf_rdma_resources *resources; /* We process I/O in the data transfer pending queue at the highest priority. RDMA reads first */ STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_read_queue, state_link, req_tmp) { @@ -2235,21 +2216,18 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport } } -#ifndef SPDK_CONFIG_RDMA_SRQ - while (!STAILQ_EMPTY(&rqpair->free_queue) && !STAILQ_EMPTY(&rqpair->incoming_queue)) { - rdma_req = STAILQ_FIRST(&rqpair->free_queue); - STAILQ_REMOVE_HEAD(&rqpair->free_queue, state_link); - rdma_req->recv = STAILQ_FIRST(&rqpair->incoming_queue); - STAILQ_REMOVE_HEAD(&rqpair->incoming_queue, link); -#else - while (!STAILQ_EMPTY(&rpoller->free_queue) && !STAILQ_EMPTY(&rpoller->incoming_queue)) { - rdma_req = STAILQ_FIRST(&rpoller->free_queue); - STAILQ_REMOVE_HEAD(&rpoller->free_queue, state_link); - rdma_req->recv = STAILQ_FIRST(&rpoller->incoming_queue); - STAILQ_REMOVE_HEAD(&rpoller->incoming_queue, link); + resources = rqpair->resources; + while (!STAILQ_EMPTY(&resources->free_queue) && !STAILQ_EMPTY(&resources->incoming_queue)) { + rdma_req = STAILQ_FIRST(&resources->free_queue); + STAILQ_REMOVE_HEAD(&resources->free_queue, state_link); + rdma_req->recv = STAILQ_FIRST(&resources->incoming_queue); + STAILQ_REMOVE_HEAD(&resources->incoming_queue, link); +#ifdef SPDK_CONFIG_RDMA_SRQ rdma_req->req.qpair = &rdma_req->recv->qpair->qpair; -#endif + rdma_req->recv->qpair->qd++; +#else rqpair->qd++; +#endif rdma_req->state = RDMA_REQUEST_STATE_NEW; if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) { break; @@ -2626,92 +2604,103 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport) pthread_mutex_unlock(&rtransport->lock); return NULL; } + + poller->resources = calloc(1, sizeof(struct spdk_nvmf_rdma_resources)); + if (!poller->resources) { + SPDK_ERRLOG("Unable to allocate resources for shared receive queue.\n"); + spdk_nvmf_rdma_poll_group_destroy(&rgroup->group); + pthread_mutex_unlock(&rtransport->lock); + } + SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Created RDMA SRQ %p: max_wr %u, max_sge %u, srq_limit %u\n", poller->srq, srq_init_attr.attr.max_wr, srq_init_attr.attr.max_sge, srq_init_attr.attr.srq_limit); - poller->reqs = calloc(poller->max_srq_depth, sizeof(*poller->reqs)); - poller->recvs = calloc(poller->max_srq_depth, sizeof(*poller->recvs)); - poller->cmds = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->cmds), - 0x1000, NULL); - poller->cpls = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->cpls), - 0x1000, NULL); + poller->resources->reqs = calloc(poller->max_srq_depth, sizeof(*poller->resources->reqs)); + poller->resources->recvs = calloc(poller->max_srq_depth, sizeof(*poller->resources->recvs)); + poller->resources->cmds = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->resources->cmds), + 0x1000, NULL); + poller->resources->cpls = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->resources->cpls), + 0x1000, NULL); if (transport->opts.in_capsule_data_size > 0) { - poller->bufs = spdk_dma_zmalloc(poller->max_srq_depth * - transport->opts.in_capsule_data_size, - 0x1000, NULL); + poller->resources->bufs = spdk_dma_zmalloc(poller->max_srq_depth * + transport->opts.in_capsule_data_size, + 0x1000, NULL); } - if (!poller->reqs || !poller->recvs || !poller->cmds || - !poller->cpls || (transport->opts.in_capsule_data_size && !poller->bufs)) { + if (!poller->resources->reqs || !poller->resources->recvs || !poller->resources->cmds || + !poller->resources->cpls || (transport->opts.in_capsule_data_size && !poller->resources->bufs)) { SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA shared receive queue.\n"); spdk_nvmf_rdma_poll_group_destroy(&rgroup->group); pthread_mutex_unlock(&rtransport->lock); return NULL; } - poller->cmds_mr = ibv_reg_mr(device->pd, poller->cmds, - poller->max_srq_depth * sizeof(*poller->cmds), - IBV_ACCESS_LOCAL_WRITE); - poller->cpls_mr = ibv_reg_mr(device->pd, poller->cpls, - poller->max_srq_depth * sizeof(*poller->cpls), - 0); + poller->resources->cmds_mr = ibv_reg_mr(device->pd, poller->resources->cmds, + poller->max_srq_depth * sizeof(*poller->resources->cmds), + IBV_ACCESS_LOCAL_WRITE); + poller->resources->cpls_mr = ibv_reg_mr(device->pd, poller->resources->cpls, + poller->max_srq_depth * sizeof(*poller->resources->cpls), + 0); if (transport->opts.in_capsule_data_size) { - poller->bufs_mr = ibv_reg_mr(device->pd, poller->bufs, - poller->max_srq_depth * - transport->opts.in_capsule_data_size, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); + poller->resources->bufs_mr = ibv_reg_mr(device->pd, poller->resources->bufs, + poller->max_srq_depth * + transport->opts.in_capsule_data_size, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); } - if (!poller->cmds_mr || !poller->cpls_mr || (transport->opts.in_capsule_data_size && - !poller->bufs_mr)) { + if (!poller->resources->cmds_mr || !poller->resources->cpls_mr || + (transport->opts.in_capsule_data_size && + !poller->resources->bufs_mr)) { SPDK_ERRLOG("Unable to register required memory for RDMA shared receive queue.\n"); spdk_nvmf_rdma_poll_group_destroy(&rgroup->group); pthread_mutex_unlock(&rtransport->lock); return NULL; } SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n", - poller->cmds, poller->max_srq_depth * sizeof(*poller->cmds), poller->cmds_mr->lkey); + poller->resources->cmds, poller->max_srq_depth * sizeof(*poller->resources->cmds), + poller->resources->cmds_mr->lkey); SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n", - poller->cpls, poller->max_srq_depth * sizeof(*poller->cpls), poller->cpls_mr->lkey); - if (poller->bufs && poller->bufs_mr) { + poller->resources->cpls, poller->max_srq_depth * sizeof(*poller->resources->cpls), + poller->resources->cpls_mr->lkey); + if (poller->resources->bufs && poller->resources->bufs_mr) { SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n", - poller->bufs, poller->max_srq_depth * - transport->opts.in_capsule_data_size, poller->bufs_mr->lkey); + poller->resources->bufs, poller->max_srq_depth * + transport->opts.in_capsule_data_size, poller->resources->bufs_mr->lkey); } /* Initialize queues */ - STAILQ_INIT(&poller->incoming_queue); - STAILQ_INIT(&poller->free_queue); + STAILQ_INIT(&poller->resources->incoming_queue); + STAILQ_INIT(&poller->resources->free_queue); for (i = 0; i < poller->max_srq_depth; i++) { struct ibv_recv_wr *bad_wr = NULL; - rdma_recv = &poller->recvs[i]; + rdma_recv = &poller->resources->recvs[i]; rdma_recv->qpair = NULL; /* Set up memory to receive commands */ - if (poller->bufs) { - rdma_recv->buf = (void *)((uintptr_t)poller->bufs + (i * + if (poller->resources->bufs) { + rdma_recv->buf = (void *)((uintptr_t)poller->resources->bufs + (i * transport->opts.in_capsule_data_size)); } rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV; - rdma_recv->sgl[0].addr = (uintptr_t)&poller->cmds[i]; - rdma_recv->sgl[0].length = sizeof(poller->cmds[i]); - rdma_recv->sgl[0].lkey = poller->cmds_mr->lkey; + rdma_recv->sgl[0].addr = (uintptr_t)&poller->resources->cmds[i]; + rdma_recv->sgl[0].length = sizeof(poller->resources->cmds[i]); + rdma_recv->sgl[0].lkey = poller->resources->cmds_mr->lkey; rdma_recv->wr.num_sge = 1; - if (rdma_recv->buf && poller->bufs_mr) { + if (rdma_recv->buf && poller->resources->bufs_mr) { rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf; rdma_recv->sgl[1].length = transport->opts.in_capsule_data_size; - rdma_recv->sgl[1].lkey = poller->bufs_mr->lkey; + rdma_recv->sgl[1].lkey = poller->resources->bufs_mr->lkey; rdma_recv->wr.num_sge++; } @@ -2728,17 +2717,17 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport) } for (i = 0; i < poller->max_srq_depth; i++) { - rdma_req = &poller->reqs[i]; + rdma_req = &poller->resources->reqs[i]; rdma_req->req.qpair = NULL; rdma_req->req.cmd = NULL; /* Set up memory to send responses */ - rdma_req->req.rsp = &poller->cpls[i]; + rdma_req->req.rsp = &poller->resources->cpls[i]; - rdma_req->rsp.sgl[0].addr = (uintptr_t)&poller->cpls[i]; - rdma_req->rsp.sgl[0].length = sizeof(poller->cpls[i]); - rdma_req->rsp.sgl[0].lkey = poller->cpls_mr->lkey; + rdma_req->rsp.sgl[0].addr = (uintptr_t)&poller->resources->cpls[i]; + rdma_req->rsp.sgl[0].length = sizeof(poller->resources->cpls[i]); + rdma_req->rsp.sgl[0].lkey = poller->resources->cpls_mr->lkey; rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND; rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr; @@ -2758,7 +2747,7 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport) /* Initialize request state to FREE */ rdma_req->state = RDMA_REQUEST_STATE_FREE; - STAILQ_INSERT_TAIL(&poller->free_queue, rdma_req, state_link); + STAILQ_INSERT_TAIL(&poller->resources->free_queue, rdma_req, state_link); } #endif } @@ -2784,16 +2773,16 @@ spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group) TAILQ_REMOVE(&rgroup->pollers, poller, link); #ifdef SPDK_CONFIG_RDMA_SRQ - if (poller->cmds_mr) { - ibv_dereg_mr(poller->cmds_mr); + if (poller->resources->cmds_mr) { + ibv_dereg_mr(poller->resources->cmds_mr); } - if (poller->cpls_mr) { - ibv_dereg_mr(poller->cpls_mr); + if (poller->resources->cpls_mr) { + ibv_dereg_mr(poller->resources->cpls_mr); } - if (poller->bufs_mr) { - ibv_dereg_mr(poller->bufs_mr); + if (poller->resources->bufs_mr) { + ibv_dereg_mr(poller->resources->bufs_mr); } if (poller->srq) { @@ -2802,11 +2791,12 @@ spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group) } /* Free all memory */ - spdk_dma_free(poller->cmds); - spdk_dma_free(poller->cpls); - spdk_dma_free(poller->bufs); - free(poller->reqs); - free(poller->recvs); + spdk_dma_free(poller->resources->cmds); + spdk_dma_free(poller->resources->cpls); + spdk_dma_free(poller->resources->bufs); + free(poller->resources->reqs); + free(poller->resources->recvs); + free(poller->resources); #endif if (poller->cq) { @@ -2862,6 +2852,10 @@ spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group, TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link); rqpair->poller = poller; +#ifdef SPDK_CONFIG_RDMA_SRQ + rqpair->resources = rqpair->poller->resources; +#endif + rc = spdk_nvmf_rdma_qpair_initialize(qpair); if (rc < 0) { SPDK_ERRLOG("Failed to initialize nvmf_rdma_qpair with qpair=%p\n", qpair); @@ -3037,11 +3031,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport, /* Dump this into the incoming queue. This gets cleaned up when * the queue pair disconnects or recovers. */ -#ifndef SPDK_CONFIG_RDMA_SRQ - STAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link); -#else - STAILQ_INSERT_TAIL(&rpoller->incoming_queue, rdma_recv, link); -#endif + STAILQ_INSERT_TAIL(&rqpair->resources->incoming_queue, rdma_recv, link); rqpair->current_recv_depth++; break; case RDMA_WR_TYPE_DATA: @@ -3142,11 +3132,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport, rqpair->current_recv_depth++; } -#ifndef SPDK_CONFIG_RDMA_SRQ - STAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link); -#else - STAILQ_INSERT_TAIL(&rpoller->incoming_queue, rdma_recv, link); -#endif + STAILQ_INSERT_TAIL(&rqpair->resources->incoming_queue, rdma_recv, link); /* Try to process other queued requests */ spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false); break;