diff --git a/lib/nvmf/rdma.c b/lib/nvmf/rdma.c index 6836b0ae3..2ded0568b 100644 --- a/lib/nvmf/rdma.c +++ b/lib/nvmf/rdma.c @@ -289,6 +289,15 @@ struct spdk_nvmf_rdma_qpair { struct ibv_qp_attr ibv_attr; bool qpair_disconnected; + + /* Reference counter for how many unprocessed messages + * from other threads are currently outstanding. The + * qpair cannot be destroyed until this is 0. This is + * atomically incremented from any thread, but only + * decremented and read from the thread that owns this + * qpair. + */ + uint32_t refcnt; }; struct spdk_nvmf_rdma_poller { @@ -349,6 +358,26 @@ struct spdk_nvmf_rdma_mgmt_channel { TAILQ_HEAD(, spdk_nvmf_rdma_request) pending_data_buf_queue; }; +static inline void +spdk_nvmf_rdma_qpair_inc_refcnt(struct spdk_nvmf_rdma_qpair *rqpair) +{ + __sync_fetch_and_add(&rqpair->refcnt, 1); +} + +static inline uint32_t +spdk_nvmf_rdma_qpair_dec_refcnt(struct spdk_nvmf_rdma_qpair *rqpair) +{ + uint32_t old_refcnt, new_refcnt; + + do { + old_refcnt = rqpair->refcnt; + assert(old_refcnt > 0); + new_refcnt = old_refcnt - 1; + } while (__sync_bool_compare_and_swap(&rqpair->refcnt, old_refcnt, new_refcnt) == false); + + return new_refcnt; +} + /* API to IBV QueuePair */ static const char *str_ibv_qp_state[] = { "IBV_QPS_RESET", @@ -534,6 +563,10 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair) return; } + if (rqpair->refcnt > 0) { + return; + } + if (rqpair->poller) { TAILQ_REMOVE(&rqpair->poller->qpairs, rqpair, link); } @@ -967,6 +1000,11 @@ static void _nvmf_rdma_disconnect(void *ctx) { struct spdk_nvmf_qpair *qpair = ctx; + struct spdk_nvmf_rdma_qpair *rqpair; + + rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); + + spdk_nvmf_rdma_qpair_dec_refcnt(rqpair); spdk_nvmf_qpair_disconnect(qpair, NULL, NULL); } @@ -992,6 +1030,8 @@ nvmf_rdma_disconnect(struct rdma_cm_event *evt) spdk_trace_record(TRACE_RDMA_QP_DISCONNECT, 0, 0, (uintptr_t)rqpair->cm_id, 0); spdk_nvmf_rdma_update_ibv_state(rqpair); + spdk_nvmf_rdma_qpair_inc_refcnt(rqpair); + spdk_thread_send_msg(qpair->group->thread, _nvmf_rdma_disconnect, qpair); return 0; @@ -2145,6 +2185,8 @@ _spdk_nvmf_rdma_qp_error(void *arg) struct spdk_nvmf_rdma_qpair *rqpair = arg; enum ibv_qp_state state; + spdk_nvmf_rdma_qpair_dec_refcnt(rqpair); + state = rqpair->ibv_attr.qp_state; if (state != IBV_QPS_ERR) { /* Error was already recovered */ @@ -2168,6 +2210,8 @@ _spdk_nvmf_rdma_qp_last_wqe(void *arg) struct spdk_nvmf_rdma_qpair *rqpair = arg; enum ibv_qp_state state; + spdk_nvmf_rdma_qpair_dec_refcnt(rqpair); + state = rqpair->ibv_attr.qp_state; if (state != IBV_QPS_ERR) { /* Error was already recovered */ @@ -2211,12 +2255,14 @@ spdk_nvmf_process_ib_event(struct spdk_nvmf_rdma_device *device) spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0, (uintptr_t)rqpair->cm_id, event.event_type); spdk_nvmf_rdma_update_ibv_state(rqpair); + spdk_nvmf_rdma_qpair_inc_refcnt(rqpair); spdk_thread_send_msg(rqpair->qpair.group->thread, _spdk_nvmf_rdma_qp_error, rqpair); break; case IBV_EVENT_QP_LAST_WQE_REACHED: spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0, (uintptr_t)rqpair->cm_id, event.event_type); spdk_nvmf_rdma_update_ibv_state(rqpair); + spdk_nvmf_rdma_qpair_inc_refcnt(rqpair); spdk_thread_send_msg(rqpair->qpair.group->thread, _spdk_nvmf_rdma_qp_last_wqe, rqpair); break; case IBV_EVENT_SQ_DRAINED: @@ -2229,6 +2275,7 @@ spdk_nvmf_process_ib_event(struct spdk_nvmf_rdma_device *device) (uintptr_t)rqpair->cm_id, event.event_type); state = spdk_nvmf_rdma_update_ibv_state(rqpair); if (state == IBV_QPS_ERR) { + spdk_nvmf_rdma_qpair_inc_refcnt(rqpair); spdk_thread_send_msg(rqpair->qpair.group->thread, _spdk_nvmf_rdma_qp_error, rqpair); } break;