lib/nvme: rdma poll group with shared cq.

Signed-off-by: Seth Howell <seth.howell@intel.com>
Change-Id: Ifde29f633f09cccbebfdcde5ab2f96d9590449f1
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1167
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Broadcom CI
Community-CI: Mellanox Build Bot
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com>
This commit is contained in:
Seth Howell 2020-03-05 14:47:49 -07:00 committed by Tomasz Zawadzki
parent 922d90c806
commit 8bef6f0bdf
3 changed files with 325 additions and 30 deletions

View File

@ -69,6 +69,9 @@
/* CM event processing timeout */ /* CM event processing timeout */
#define NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US 1000000 #define NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US 1000000
/* The default size for a shared rdma completion queue. */
#define DEFAULT_NVME_RDMA_CQ_SIZE 4096
/* /*
* In the special case of a stale connection we don't expose a mechanism * In the special case of a stale connection we don't expose a mechanism
* for the user to retry the connection so we need to handle it internally. * for the user to retry the connection so we need to handle it internally.
@ -86,6 +89,12 @@
*/ */
#define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT 31 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT 31
/*
* Number of poller cycles to keep a pointer to destroyed qpairs
* in the poll group.
*/
#define NVME_RDMA_DESTROYED_QPAIR_EXPIRATION_CYCLES 10
enum nvme_rdma_wr_type { enum nvme_rdma_wr_type {
RDMA_WR_TYPE_RECV, RDMA_WR_TYPE_RECV,
RDMA_WR_TYPE_SEND, RDMA_WR_TYPE_SEND,
@ -134,8 +143,23 @@ struct nvme_rdma_ctrlr {
struct nvme_rdma_cm_event_entry *cm_events; struct nvme_rdma_cm_event_entry *cm_events;
}; };
struct nvme_rdma_destroyed_qpair {
struct nvme_rdma_qpair *destroyed_qpair_tracker;
uint32_t completed_cycles;
STAILQ_ENTRY(nvme_rdma_destroyed_qpair) link;
};
struct nvme_rdma_poller {
struct ibv_context *device;
struct ibv_cq *cq;
STAILQ_ENTRY(nvme_rdma_poller) link;
};
struct nvme_rdma_poll_group { struct nvme_rdma_poll_group {
struct spdk_nvme_transport_poll_group group; struct spdk_nvme_transport_poll_group group;
STAILQ_HEAD(, nvme_rdma_poller) pollers;
int num_pollers;
STAILQ_HEAD(, nvme_rdma_destroyed_qpair) destroyed_qpairs;
}; };
struct spdk_nvme_send_wr_list { struct spdk_nvme_send_wr_list {
@ -172,6 +196,8 @@ struct nvme_rdma_qpair {
bool delay_cmd_submit; bool delay_cmd_submit;
bool poll_group_disconnect_in_progress;
uint32_t num_completions; uint32_t num_completions;
/* Parallel arrays of response buffers + response SGLs of size num_entries */ /* Parallel arrays of response buffers + response SGLs of size num_entries */
@ -202,6 +228,9 @@ struct nvme_rdma_qpair {
/* Placed at the end of the struct since it is not used frequently */ /* Placed at the end of the struct since it is not used frequently */
struct rdma_cm_event *evt; struct rdma_cm_event *evt;
/* Used by poll group to keep the qpair around until it is ready to remove it. */
bool defer_deletion_to_pg;
}; };
enum NVME_RDMA_COMPLETION_FLAGS { enum NVME_RDMA_COMPLETION_FLAGS {
@ -293,6 +322,12 @@ nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair); return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
} }
static inline struct nvme_rdma_poll_group *
nvme_rdma_poll_group(struct spdk_nvme_transport_poll_group *group)
{
return (SPDK_CONTAINEROF(group, struct nvme_rdma_poll_group, group));
}
static inline struct nvme_rdma_ctrlr * static inline struct nvme_rdma_ctrlr *
nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr) nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
{ {
@ -544,10 +579,20 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
return -1; return -1;
} }
rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0); if (rqpair->qpair.poll_group) {
if (!rqpair->cq) { assert(!rqpair->cq);
SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno)); rc = nvme_poll_group_connect_qpair(&rqpair->qpair);
return -1; if (rc) {
SPDK_ERRLOG("Unable to activate the rdmaqpair.\n");
return -1;
}
assert(rqpair->cq);
} else {
rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
if (!rqpair->cq) {
SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
return -1;
}
} }
rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr); rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
@ -1773,15 +1818,15 @@ nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_
{ {
struct nvme_rdma_qpair *rqpair; struct nvme_rdma_qpair *rqpair;
if (!qpair) { rqpair = nvme_rdma_qpair(qpair);
return -1;
}
nvme_transport_ctrlr_disconnect_qpair(ctrlr, qpair); nvme_transport_ctrlr_disconnect_qpair(ctrlr, qpair);
if (rqpair->defer_deletion_to_pg) {
return 0;
}
nvme_rdma_qpair_abort_reqs(qpair, 1); nvme_rdma_qpair_abort_reqs(qpair, 1);
nvme_qpair_deinit(qpair); nvme_qpair_deinit(qpair);
rqpair = nvme_rdma_qpair(qpair);
nvme_rdma_free_reqs(rqpair); nvme_rdma_free_reqs(rqpair);
nvme_rdma_free_rsps(rqpair); nvme_rdma_free_rsps(rqpair);
nvme_rdma_free(rqpair); nvme_rdma_free(rqpair);
@ -2091,8 +2136,28 @@ nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
nvme_ctrlr_disconnect_qpair(qpair); nvme_ctrlr_disconnect_qpair(qpair);
} }
static void
nvme_rdma_conditional_fail_qpair(struct nvme_rdma_qpair *rqpair, struct nvme_rdma_poll_group *group)
{
struct nvme_rdma_destroyed_qpair *qpair_tracker;
if (!rqpair) {
return;
}
if (group) {
STAILQ_FOREACH(qpair_tracker, &group->destroyed_qpairs, link) {
if (qpair_tracker->destroyed_qpair_tracker == rqpair) {
return;
}
}
}
nvme_rdma_fail_qpair(&rqpair->qpair, 0);
}
static int static int
nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size) nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
struct nvme_rdma_poll_group *group)
{ {
struct ibv_wc wc[MAX_COMPLETIONS_PER_POLL]; struct ibv_wc wc[MAX_COMPLETIONS_PER_POLL];
struct nvme_rdma_qpair *rqpair; struct nvme_rdma_qpair *rqpair;
@ -2122,7 +2187,7 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
if (wc[i].status) { if (wc[i].status) {
SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n", SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status)); rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
nvme_rdma_fail_qpair(&rqpair->qpair, 0); nvme_rdma_conditional_fail_qpair(rqpair, group);
completion_rc = -ENXIO; completion_rc = -ENXIO;
continue; continue;
} }
@ -2131,7 +2196,7 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) { if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len); SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
nvme_rdma_fail_qpair(&rqpair->qpair, 0); nvme_rdma_conditional_fail_qpair(rqpair, group);
completion_rc = -ENXIO; completion_rc = -ENXIO;
continue; continue;
} }
@ -2142,7 +2207,7 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) { if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) {
if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) { if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n"); SPDK_ERRLOG("Unable to re-post rx descriptor\n");
nvme_rdma_fail_qpair(&rqpair->qpair, 0); nvme_rdma_conditional_fail_qpair(rqpair, group);
completion_rc = -ENXIO; completion_rc = -ENXIO;
continue; continue;
} }
@ -2157,9 +2222,8 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
/* If we are flushing I/O */ /* If we are flushing I/O */
if (wc[i].status) { if (wc[i].status) {
rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL; rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
if (rqpair) { nvme_rdma_conditional_fail_qpair(rqpair, group);
nvme_rdma_fail_qpair(&rqpair->qpair, 0);
}
SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n", SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status)); rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
completion_rc = -ENXIO; completion_rc = -ENXIO;
@ -2169,11 +2233,10 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
rqpair = nvme_rdma_qpair(rdma_req->req->qpair); rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED; rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) { if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) {
if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) { if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n"); SPDK_ERRLOG("Unable to re-post rx descriptor\n");
nvme_rdma_fail_qpair(&rqpair->qpair, 0); nvme_rdma_conditional_fail_qpair(rqpair, group);
completion_rc = -ENXIO; completion_rc = -ENXIO;
continue; continue;
} }
@ -2195,6 +2258,12 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
return reaped; return reaped;
} }
static void
dummy_disconnected_qpair_cb(struct spdk_nvme_qpair *qpair, void *poll_group_ctx)
{
}
static int static int
nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair, nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
uint32_t max_completions) uint32_t max_completions)
@ -2204,6 +2273,16 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
struct ibv_cq *cq; struct ibv_cq *cq;
struct nvme_rdma_ctrlr *rctrlr; struct nvme_rdma_ctrlr *rctrlr;
/*
* This is used during the connection phase. It's possible that we are still reaping error completions
* from other qpairs so we need to call the poll group function. Also, it's more correct since the cq
* is shared.
*/
if (qpair->poll_group != NULL) {
return spdk_nvme_poll_group_process_completions(qpair->poll_group->group, max_completions,
dummy_disconnected_qpair_cb);
}
if (max_completions == 0) { if (max_completions == 0) {
max_completions = rqpair->num_entries; max_completions = rqpair->num_entries;
} else { } else {
@ -2226,7 +2305,7 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
rqpair->num_completions = 0; rqpair->num_completions = 0;
do { do {
batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL); batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
rc = nvme_rdma_cq_process_completions(cq, batch_size); rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL);
if (rc == 0) { if (rc == 0) {
break; break;
@ -2296,28 +2375,146 @@ nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
} }
} }
static int
nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx)
{
struct nvme_rdma_poller *poller;
poller = calloc(1, sizeof(*poller));
if (poller == NULL) {
SPDK_ERRLOG("Unable to allocate poller.\n");
return -ENOMEM;
}
poller->device = ctx;
poller->cq = ibv_create_cq(poller->device, DEFAULT_NVME_RDMA_CQ_SIZE, group, NULL, 0);
if (poller->cq == NULL) {
free(poller);
return -EINVAL;
}
STAILQ_INSERT_HEAD(&group->pollers, poller, link);
group->num_pollers++;
return 0;
}
static void
nvme_rdma_poll_group_free_pollers(struct nvme_rdma_poll_group *group)
{
struct nvme_rdma_poller *poller, *tmp_poller;
STAILQ_FOREACH_SAFE(poller, &group->pollers, link, tmp_poller) {
if (poller->cq) {
ibv_destroy_cq(poller->cq);
}
STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
free(poller);
}
}
static struct spdk_nvme_transport_poll_group * static struct spdk_nvme_transport_poll_group *
nvme_rdma_poll_group_create(void) nvme_rdma_poll_group_create(void)
{ {
struct nvme_rdma_poll_group *group = calloc(1, sizeof(*group)); struct nvme_rdma_poll_group *group;
struct ibv_context **contexts;
int i = 0;
group = calloc(1, sizeof(*group));
if (group == NULL) { if (group == NULL) {
SPDK_ERRLOG("Unable to allocate poll group.\n"); SPDK_ERRLOG("Unable to allocate poll group.\n");
return NULL; return NULL;
} }
STAILQ_INIT(&group->pollers);
contexts = rdma_get_devices(NULL);
if (contexts == NULL) {
SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
free(group);
return NULL;
}
while (contexts[i] != NULL) {
if (nvme_rdma_poller_create(group, contexts[i])) {
nvme_rdma_poll_group_free_pollers(group);
free(group);
rdma_free_devices(contexts);
return NULL;
}
i++;
}
rdma_free_devices(contexts);
STAILQ_INIT(&group->destroyed_qpairs);
return &group->group; return &group->group;
} }
static int static int
nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair) nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
{ {
struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);
struct nvme_rdma_poller *poller;
assert(rqpair->cq == NULL);
STAILQ_FOREACH(poller, &group->pollers, link) {
if (poller->device == rqpair->cm_id->verbs) {
rqpair->cq = poller->cq;
break;
}
}
if (rqpair->cq == NULL) {
SPDK_ERRLOG("Unable to find a cq for qpair %p on poll group %p\n", qpair, qpair->poll_group);
return -EINVAL;
}
return 0; return 0;
} }
static int static int
nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair) nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
{ {
struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
struct nvme_rdma_poll_group *group;
struct nvme_rdma_destroyed_qpair *destroyed_qpair;
enum nvme_qpair_state state;
if (rqpair->poll_group_disconnect_in_progress) {
return -EINPROGRESS;
}
rqpair->poll_group_disconnect_in_progress = true;
state = nvme_qpair_get_state(qpair);
group = nvme_rdma_poll_group(qpair->poll_group);
rqpair->cq = NULL;
/*
* We want to guard against an endless recursive loop while making
* sure the qpair is disconnected before we disconnect it from the qpair.
*/
if (state > NVME_QPAIR_DISCONNECTING && state != NVME_QPAIR_DESTROYING) {
nvme_ctrlr_disconnect_qpair(qpair);
}
/*
* If this fails, the system is in serious trouble,
* just let the qpair get cleaned up immediately.
*/
destroyed_qpair = calloc(1, sizeof(*destroyed_qpair));
if (destroyed_qpair == NULL) {
return 0;
}
destroyed_qpair->destroyed_qpair_tracker = rqpair;
destroyed_qpair->completed_cycles = 0;
STAILQ_INSERT_TAIL(&group->destroyed_qpairs, destroyed_qpair, link);
rqpair->defer_deletion_to_pg = true;
rqpair->poll_group_disconnect_in_progress = false;
return 0; return 0;
} }
@ -2332,6 +2529,10 @@ static int
nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup, nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
struct spdk_nvme_qpair *qpair) struct spdk_nvme_qpair *qpair)
{ {
if (qpair->poll_group_tailq_head == &tgroup->connected_qpairs) {
return nvme_poll_group_disconnect_qpair(qpair);
}
return 0; return 0;
} }
@ -2339,21 +2540,87 @@ static int64_t
nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup, nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb) uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
{ {
struct spdk_nvme_qpair *qpair, *tmp_qpair; struct spdk_nvme_qpair *qpair, *tmp_qpair;
int32_t local_completions = 0; struct nvme_rdma_destroyed_qpair *qpair_tracker, *tmp_qpair_tracker;
int64_t total_completions = 0; struct nvme_rdma_qpair *rqpair;
struct nvme_rdma_poll_group *group;
struct nvme_rdma_poller *poller;
int num_qpairs = 0, batch_size, rc;
int64_t total_completions = 0;
uint64_t completions_allowed = 0;
uint64_t completions_per_poller = 0;
uint64_t poller_completions = 0;
if (completions_per_qpair == 0) {
completions_per_qpair = MAX_COMPLETIONS_PER_POLL;
}
group = nvme_rdma_poll_group(tgroup);
STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) { STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) {
disconnected_qpair_cb(qpair, tgroup->group->ctx); disconnected_qpair_cb(qpair, tgroup->group->ctx);
} }
STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) { STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
local_completions = spdk_nvme_qpair_process_completions(qpair, completions_per_qpair); rqpair = nvme_rdma_qpair(qpair);
if (local_completions < 0) { rqpair->num_completions = 0;
nvme_rdma_qpair_process_cm_event(rqpair);
if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
nvme_rdma_fail_qpair(qpair, 0);
disconnected_qpair_cb(qpair, tgroup->group->ctx); disconnected_qpair_cb(qpair, tgroup->group->ctx);
local_completions = 0; continue;
}
num_qpairs++;
}
completions_allowed = completions_per_qpair * num_qpairs;
completions_per_poller = spdk_max(completions_allowed / group->num_pollers, 1);
STAILQ_FOREACH(poller, &group->pollers, link) {
poller_completions = 0;
do {
batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, group);
if (rc <= 0) {
if (rc == -ECANCELED) {
return -EIO;
}
break;
}
poller_completions += rc;
} while (poller_completions < completions_per_poller);
total_completions += poller_completions;
}
STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
rqpair = nvme_rdma_qpair(qpair);
if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
nvme_rdma_qpair_check_timeout(qpair);
}
nvme_rdma_qpair_submit_sends(rqpair);
nvme_rdma_qpair_submit_recvs(rqpair);
nvme_qpair_resubmit_requests(&rqpair->qpair, rqpair->num_completions);
}
/*
* Once a qpair is disconnected, we can still get flushed completions for those disconnected qpairs.
* For most pieces of hardware, those requests will complete immediately. However, there are certain
* cases where flushed requests will linger.
*/
STAILQ_FOREACH_SAFE(qpair_tracker, &group->destroyed_qpairs, link, tmp_qpair_tracker) {
qpair_tracker->completed_cycles++;
rqpair = qpair_tracker->destroyed_qpair_tracker;
if (qpair_tracker->completed_cycles > NVME_RDMA_DESTROYED_QPAIR_EXPIRATION_CYCLES) {
rqpair->defer_deletion_to_pg = false;
if (nvme_qpair_get_state(&rqpair->qpair) == NVME_QPAIR_DESTROYING) {
nvme_rdma_ctrlr_delete_io_qpair(rqpair->qpair.ctrlr, &rqpair->qpair);
}
STAILQ_REMOVE(&group->destroyed_qpairs, qpair_tracker, nvme_rdma_destroyed_qpair, link);
free(qpair_tracker);
} }
total_completions += local_completions;
} }
return total_completions; return total_completions;
@ -2362,11 +2629,27 @@ nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *
static int static int
nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup) nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup)
{ {
struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(tgroup);
struct nvme_rdma_destroyed_qpair *qpair_tracker, *tmp_qpair_tracker;
struct nvme_rdma_qpair *rqpair;
if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) { if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) {
return -EBUSY; return -EBUSY;
} }
free(tgroup); STAILQ_FOREACH_SAFE(qpair_tracker, &group->destroyed_qpairs, link, tmp_qpair_tracker) {
rqpair = qpair_tracker->destroyed_qpair_tracker;
if (nvme_qpair_get_state(&rqpair->qpair) == NVME_QPAIR_DESTROYING) {
rqpair->defer_deletion_to_pg = false;
nvme_rdma_ctrlr_delete_io_qpair(rqpair->qpair.ctrlr, &rqpair->qpair);
}
STAILQ_REMOVE(&group->destroyed_qpairs, qpair_tracker, nvme_rdma_destroyed_qpair, link);
free(qpair_tracker);
}
nvme_rdma_poll_group_free_pollers(group);
free(group);
return 0; return 0;
} }

View File

@ -526,6 +526,11 @@ nvme_transport_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
qpair->poll_group_tailq_head = &tgroup->disconnected_qpairs; qpair->poll_group_tailq_head = &tgroup->disconnected_qpairs;
STAILQ_REMOVE(&tgroup->connected_qpairs, qpair, spdk_nvme_qpair, poll_group_stailq); STAILQ_REMOVE(&tgroup->connected_qpairs, qpair, spdk_nvme_qpair, poll_group_stailq);
STAILQ_INSERT_TAIL(&tgroup->disconnected_qpairs, qpair, poll_group_stailq); STAILQ_INSERT_TAIL(&tgroup->disconnected_qpairs, qpair, poll_group_stailq);
/* EINPROGRESS indicates that a call has already been made to this function.
* It just keeps us from segfaulting on a double removal/insert.
*/
} else if (rc == -EINPROGRESS) {
rc = 0;
} }
return rc; return rc;
} }
@ -552,7 +557,8 @@ nvme_transport_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
STAILQ_REMOVE(&tgroup->disconnected_qpairs, qpair, spdk_nvme_qpair, poll_group_stailq); STAILQ_REMOVE(&tgroup->disconnected_qpairs, qpair, spdk_nvme_qpair, poll_group_stailq);
STAILQ_INSERT_TAIL(&tgroup->connected_qpairs, qpair, poll_group_stailq); STAILQ_INSERT_TAIL(&tgroup->connected_qpairs, qpair, poll_group_stailq);
} }
return rc;
return rc == -EINPROGRESS ? 0 : rc;
} }

View File

@ -48,6 +48,12 @@ DEFINE_STUB(spdk_mem_map_alloc, struct spdk_mem_map *, (uint64_t default_transla
const struct spdk_mem_map_ops *ops, void *cb_ctx), NULL); const struct spdk_mem_map_ops *ops, void *cb_ctx), NULL);
DEFINE_STUB_V(spdk_mem_map_free, (struct spdk_mem_map **pmap)); DEFINE_STUB_V(spdk_mem_map_free, (struct spdk_mem_map **pmap));
DEFINE_STUB(nvme_poll_group_connect_qpair, int, (struct spdk_nvme_qpair *qpair), 0);
DEFINE_STUB_V(nvme_qpair_resubmit_requests, (struct spdk_nvme_qpair *qpair, uint32_t num_requests));
DEFINE_STUB(spdk_nvme_poll_group_process_completions, int64_t, (struct spdk_nvme_poll_group *group,
uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb), 0)
/* used to mock out having to split an SGL over a memory region */ /* used to mock out having to split an SGL over a memory region */
uint64_t g_mr_size; uint64_t g_mr_size;
struct ibv_mr g_nvme_rdma_mr; struct ibv_mr g_nvme_rdma_mr;