nvme/rdma: Consolidate send_cq and recv_cq

The send completions must be processed prior to the
recv completions. However, if the completion queues
are separate this leaves a small window where
a send+recv completion arrive between polling
the send_cq and the recv_cq, resulting in the code
seeing the recv completion prior to the send
completion.

By combining the completion queues, this eliminates
any potential gap. The send completion will always
be processed before the recv completion.

Change-Id: I06bfef6af48559d0b9e00524ebc10f1a102e7387
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Ben Walker 2017-03-08 10:30:49 -07:00 committed by Daniel Verkamp
parent d706fa417f
commit ac9b92c853

View File

@ -85,6 +85,8 @@ struct nvme_rdma_qpair {
struct rdma_cm_id *cm_id;
struct ibv_cq *cq;
struct spdk_nvme_rdma_req *rdma_reqs;
uint16_t num_entries;
@ -204,8 +206,17 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
int rc;
struct ibv_qp_init_attr attr;
rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
if (!rqpair->cq) {
SPDK_ERRLOG("Unable to create completion queue\n");
SPDK_ERRLOG("Errno %d: %s\n", errno, strerror(errno));
return -1;
}
memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
attr.qp_type = IBV_QPT_RC;
attr.send_cq = rqpair->cq;
attr.recv_cq = rqpair->cq;
attr.cap.max_send_wr = rqpair->num_entries; /* SEND operations */
attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */
attr.cap.max_send_sge = NVME_RDMA_DEFAULT_TX_SGE;
@ -217,18 +228,6 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
return -1;
}
rc = fcntl(rqpair->cm_id->send_cq_channel->fd, F_SETFL, O_NONBLOCK);
if (rc < 0) {
SPDK_ERRLOG("fcntl to set comp channel to non-blocking failed\n");
return -1;
}
rc = fcntl(rqpair->cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK);
if (rc < 0) {
SPDK_ERRLOG("fcntl to set comp channel to non-blocking failed\n");
return -1;
}
rqpair->cm_id->context = &rqpair->qpair;
return 0;
@ -1032,6 +1031,10 @@ nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
rdma_destroy_id(rqpair->cm_id);
}
if (rqpair->cq) {
ibv_destroy_cq(rqpair->cq);
}
if (rqpair->cm_channel) {
rdma_destroy_event_channel(rqpair->cm_channel);
}
@ -1380,6 +1383,7 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
struct ibv_wc wc[MAX_COMPLETIONS_PER_POLL];
int i, rc, batch_size;
uint32_t reaped;
struct ibv_cq *cq;
if (max_completions == 0) {
max_completions = rqpair->num_entries;
@ -1387,29 +1391,13 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
max_completions = spdk_min(max_completions, rqpair->num_entries);
}
/* Consume all send completions */
reaped = 0;
do {
batch_size = spdk_min((max_completions - reaped),
MAX_COMPLETIONS_PER_POLL);
rc = ibv_poll_cq(rqpair->cm_id->send_cq, batch_size, wc);
if (rc < 0) {
SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
errno, strerror(errno));
return -1;
} else if (rc == 0) {
/* Ran out of completions */
break;
}
reaped += rc;
} while (reaped < max_completions);
cq = rqpair->cq;
/* Poll for recv completions */
reaped = 0;
do {
batch_size = spdk_min((max_completions - reaped),
MAX_COMPLETIONS_PER_POLL);
rc = ibv_poll_cq(rqpair->cm_id->recv_cq, batch_size, wc);
rc = ibv_poll_cq(cq, batch_size, wc);
if (rc < 0) {
SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
errno, strerror(errno));
@ -1419,7 +1407,6 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
break;
}
reaped += rc;
for (i = 0; i < rc; i++) {
if (wc[i].status) {
SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
@ -1430,6 +1417,9 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
switch (wc[i].opcode) {
case IBV_WC_RECV:
SPDK_TRACELOG(SPDK_TRACE_DEBUG, "CQ recv completion\n");
reaped++;
if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
return -1;
@ -1441,6 +1431,9 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
}
break;
case IBV_WC_SEND:
break;
default:
SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", wc[i].opcode);
return -1;