nvmf/rdma: Add shared receive queue support

This is a new feature for NVMEoF RDMA target, that is intended to save
resource allocation (by sharing them) and utilize the
locality (completions and memory) to get the best performance with
Shared Receive Queues (SRQs). We'll create a SRQ per core (poll
group), per device and associate each created QP/CQ with an
appropriate SRQ.

Our testing environment has 2 hosts.
Host 1:
  CPU: Intel(R) Xeon(R) CPU E5-2609 0 @ 2.40GHz dual socket (8 cores total)
  Network: ConnectX-5, ConnectX-5 VPI , 100GbE, single-port QSFP28, PCIe3.0 x16
  Disk: Intel Optane SSD 900P Series
  OS: Fedora 27 x86_64
Host 2:
  CPU: Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz dual-socket (24 cores total)
  Network: ConnectX-4 VPI , 100GbE, dual-port QSFP28
  Disk: Intel Optane SSD 900P Series
  OS : CentOS 7.5.1804 x86_64
Hosts are connected via Spectrum switch.
Host 1 is running SPDK NVMeoF target.
Host 2 is used as initiator running fio with SPDK plugin.

Configuration:
- SPDK NVMeoF target: cpu mask 0x0F (4 cores), max queue depth 128,
  max SRQ depth 1024, max QPs per controller 1024
- Single NVMf subsystem with single namespace backed by physical SSD disk
- fio with SPDK plugin: randread pattern, 1-256 jobs, block size 4k,
  IO depth 16, cpu_mask 0xFFF0, IO rate 10k, rate process “poisson”

Here is a full fio command line:
fio  --name=Job --stats=1 --group_reporting=1 --idle-prof=percpu \
--loops=1 --numjobs=1 --thread=1 --time_based=1 --runtime=30s \
--ramp_time=5s --bs=4k --size=4G --iodepth=16 --readwrite=randread \
--rwmixread=75 --randrepeat=1 --ioengine=spdk --direct=1 \
--gtod_reduce=0 --cpumask=0xFFF0 --rate_iops=10k \
--rate_process=poisson \
--filename='trtype=RDMA adrfam=IPv4 traddr=1.1.79.1 trsvcid=4420 ns=1'

SPDK allocates the following entities for every work request in
receive queue (shared or not): reqs (1024 bytes), recvs (96 bytes),
cmds (64 bytes), cpls (16 bytes), in_capsule_buffer. All except the
last one are fixed size. In capsule data size is configured to 4096.
Memory consumption calculation (target):
- Multiple SRQ: core_num * ib_devs_num * SRQ_depth * (1200 +
  in_capsule_data_size)
- Multiple RQ: queue_num * RQ_depth * (1200 + in_capsule_data_size)
We ignore admin queues in calculations for simplicity.

Cases:
1. Multiple SRQ with 1024 entries:
   - Mem = 4 * 1 * 1024 * (1200 + 4096) = 20.7 MiB
     (Constant number – does not depend on initiators number)
2. RQ with 128 entries for 64 initiators:
   - Mem = 64 * 128 * (1200 + 4096) = 41.4 MiB

Results:
FIO_JOBS   kIOPS     Bandwidth,MiB/s  AvgLatency,us  MaxResidentSize,kiB
       RQ       SRQ     RQ      SRQ    RQ       SRQ      RQ       SRQ
1      8.623    8.623   33.7    33.7   13.89    14.03    144376   155624
2      17.3     17.3    67.4    67.4   14.03    14.1     145776   155700
4      34.5     34.5    135     135    14.15    14.23    146540   156184
8      69.1     69.1    270     270    14.64    14.49    148116   156960
16     138      138     540     540    14.84    15.38    151216   158668
32     276      276     1079    1079   16.5     16.61    157560   161936
64     513      502     2005    1960   1673     1612     170408   168440
128    535      526     2092    2054   3329     3344     195796   181524
256    571      571     2232    2233   6854     6873     246484   207856

We can see the benefit in memory consumption.

Change-Id: I40c70f6ccbad7754918bcc6cb397e955b09d1033
Signed-off-by: Evgeniy Kochetov <evgeniik@mellanox.com>
Signed-off-by: Sasha Kotchubievsky <sashakot@mellanox.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/428458
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Evgeniy Kochetov 2018-10-04 14:59:08 +00:00 committed by Jim Harris
parent f186434ec2
commit ed0b611fc5
9 changed files with 386 additions and 40 deletions

View File

@ -35,6 +35,13 @@ to be performed on the thread at given time.
An new API `spdk_bdev_get_data_block_size` has been added to get size of data
block except for metadata.
### NVMe-oF Target
Support for per-device shared receive queues in the RDMA transport has been added.
The size of a shared receive queue is defined by transport configuration file parameter
`MaxSRQDepth` and `nvmf_create_transport` RPC method parameter `max_srq_depth`.
Default size is 4096.
## v19.01:
### ocf bdev

View File

@ -99,6 +99,9 @@
# Set the number of shared buffers to be cached per poll group
#BufCacheSize 32
# Set the maximum number outstanding I/O per shared receive queue. Relevant only for RDMA transport
#MaxSRQDepth 4096
[Transport]
# Set TCP transport type.
Type TCP

View File

@ -1,8 +1,8 @@
/*-
* BSD LICENSE
*
* Copyright (c) Intel Corporation.
* All rights reserved.
* Copyright (c) Intel Corporation. All rights reserved.
* Copyright (c) 2018 Mellanox Technologies LTD. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -72,6 +72,7 @@ struct spdk_nvmf_transport_opts {
uint32_t max_aq_depth;
uint32_t num_shared_buffers;
uint32_t buf_cache_size;
uint32_t max_srq_depth;
};
/**

View File

@ -1,8 +1,8 @@
/*-
* BSD LICENSE
*
* Copyright (c) Intel Corporation.
* All rights reserved.
* Copyright (c) Intel Corporation. All rights reserved.
* Copyright (c) 2018 Mellanox Technologies LTD. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -529,6 +529,17 @@ spdk_nvmf_parse_transport(struct spdk_nvmf_parse_transport_ctx *ctx)
opts.buf_cache_size = val;
}
val = spdk_conf_section_get_intval(ctx->sp, "MaxSRQDepth");
if (val >= 0) {
if (trtype == SPDK_NVME_TRANSPORT_RDMA) {
opts.max_srq_depth = val;
} else {
SPDK_ERRLOG("MaxSRQDepth is relevant only for RDMA transport '%s'\n", type);
ctx->cb_fn(-1);
free(ctx);
return;
}
}
transport = spdk_nvmf_transport_create(trtype, &opts);
if (transport) {

View File

@ -1,8 +1,8 @@
/*-
* BSD LICENSE
*
* Copyright (c) Intel Corporation.
* All rights reserved.
* Copyright (c) Intel Corporation. All rights reserved.
* Copyright (c) 2018 Mellanox Technologies LTD. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -1435,6 +1435,10 @@ static const struct spdk_json_object_decoder nvmf_rpc_create_transport_decoder[]
"buf_cache_size", offsetof(struct nvmf_rpc_create_transport_ctx, opts.buf_cache_size),
spdk_json_decode_uint32, true
},
{
"max_srq_depth", offsetof(struct nvmf_rpc_create_transport_ctx, opts.max_srq_depth),
spdk_json_decode_uint32, true
},
};
static void

View File

@ -1,8 +1,8 @@
/*-
* BSD LICENSE
*
* Copyright (c) Intel Corporation.
* All rights reserved.
* Copyright (c) Intel Corporation. All rights reserved.
* Copyright (c) 2018 Mellanox Technologies LTD. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -466,6 +466,9 @@ spdk_nvmf_tgt_write_config_json(struct spdk_json_write_ctx *w, struct spdk_nvmf_
spdk_json_write_named_uint32(w, "max_io_size", transport->opts.max_io_size);
spdk_json_write_named_uint32(w, "io_unit_size", transport->opts.io_unit_size);
spdk_json_write_named_uint32(w, "max_aq_depth", transport->opts.max_aq_depth);
if (transport->ops->type == SPDK_NVME_TRANSPORT_RDMA) {
spdk_json_write_named_uint32(w, "max_srq_depth", transport->opts.max_srq_depth);
}
spdk_json_write_object_end(w);
spdk_json_write_object_end(w);

View File

@ -296,11 +296,13 @@ struct spdk_nvmf_rdma_qpair {
/* The maximum number of SGEs per WR on the recv queue */
uint32_t max_recv_sge;
#ifndef SPDK_CONFIG_RDMA_SRQ
/* Receives that are waiting for a request object */
STAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue;
/* Queues to track requests in critical states */
STAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue;
#endif
STAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_read_queue;
@ -309,6 +311,7 @@ struct spdk_nvmf_rdma_qpair {
/* Number of requests not in the free state */
uint32_t qd;
#ifndef SPDK_CONFIG_RDMA_SRQ
/* Array of size "max_queue_depth" containing RDMA requests. */
struct spdk_nvmf_rdma_request *reqs;
@ -332,6 +335,7 @@ struct spdk_nvmf_rdma_qpair {
*/
void *bufs;
struct ibv_mr *bufs_mr;
#endif
TAILQ_ENTRY(spdk_nvmf_rdma_qpair) link;
@ -363,6 +367,44 @@ struct spdk_nvmf_rdma_poller {
int required_num_wr;
struct ibv_cq *cq;
#ifdef SPDK_CONFIG_RDMA_SRQ
/* The maximum number of I/O outstanding on the shared receive queue at one time */
uint16_t max_srq_depth;
/* Shared receive queue */
struct ibv_srq *srq;
/* Receives that are waiting for a request object */
STAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue;
/* Queue to track free requests */
STAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue;
/* Array of size "max_srq_depth" containing RDMA requests. */
struct spdk_nvmf_rdma_request *reqs;
/* Array of size "max_srq_depth" containing RDMA recvs. */
struct spdk_nvmf_rdma_recv *recvs;
/* Array of size "max_srq_depth" containing 64 byte capsules
* used for receive.
*/
union nvmf_h2c_msg *cmds;
struct ibv_mr *cmds_mr;
/* Array of size "max_srq_depth" containing 16 byte completions
* to be sent back to the user.
*/
union nvmf_c2h_msg *cpls;
struct ibv_mr *cpls_mr;
/* Array of size "max_srq_depth * InCapsuleDataSize" containing
* buffers to be used for in capsule data.
*/
void *bufs;
struct ibv_mr *bufs_mr;
#endif
TAILQ_HEAD(, spdk_nvmf_rdma_qpair) qpairs;
TAILQ_ENTRY(spdk_nvmf_rdma_poller) link;
@ -559,6 +601,8 @@ spdk_nvmf_rdma_set_ibv_state(struct spdk_nvmf_rdma_qpair *rqpair,
return 0;
}
#ifndef SPDK_CONFIG_RDMA_SRQ
static void
nvmf_rdma_dump_request(struct spdk_nvmf_rdma_request *req)
{
@ -584,15 +628,25 @@ nvmf_rdma_dump_qpair_contents(struct spdk_nvmf_rdma_qpair *rqpair)
}
}
#endif
static void
spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
{
#ifdef SPDK_CONFIG_RDMA_SRQ
struct spdk_nvmf_rdma_recv *rdma_recv, *recv_tmp;
struct ibv_recv_wr *bad_recv_wr = NULL;
int rc;
#endif
spdk_trace_record(TRACE_RDMA_QP_DESTROY, 0, 0, (uintptr_t)rqpair->cm_id, 0);
spdk_poller_unregister(&rqpair->destruct_poller);
if (rqpair->qd != 0) {
#ifndef SPDK_CONFIG_RDMA_SRQ
nvmf_rdma_dump_qpair_contents(rqpair);
#endif
SPDK_WARNLOG("Destroying qpair when queue depth is %d\n", rqpair->qd);
}
@ -600,6 +654,7 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
TAILQ_REMOVE(&rqpair->poller->qpairs, rqpair, link);
}
#ifndef SPDK_CONFIG_RDMA_SRQ
if (rqpair->cmds_mr) {
ibv_dereg_mr(rqpair->cmds_mr);
}
@ -611,6 +666,18 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
if (rqpair->bufs_mr) {
ibv_dereg_mr(rqpair->bufs_mr);
}
#else
/* Drop all received but unprocessed commands for this queue and return them to SRQ */
STAILQ_FOREACH_SAFE(rdma_recv, &rqpair->poller->incoming_queue, link, recv_tmp) {
if (rqpair == rdma_recv->qpair) {
STAILQ_REMOVE_HEAD(&rqpair->poller->incoming_queue, link);
rc = ibv_post_srq_recv(rqpair->poller->srq, &rdma_recv->wr, &bad_recv_wr);
if (rc) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n");
}
}
}
#endif
if (rqpair->cm_id) {
rdma_destroy_qp(rqpair->cm_id);
@ -622,30 +689,33 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
}
/* Free all memory */
#ifndef SPDK_CONFIG_RDMA_SRQ
spdk_dma_free(rqpair->cmds);
spdk_dma_free(rqpair->cpls);
spdk_dma_free(rqpair->bufs);
free(rqpair->reqs);
free(rqpair->recvs);
#endif
free(rqpair);
}
static int
spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
{
struct spdk_nvmf_rdma_transport *rtransport;
struct spdk_nvmf_rdma_qpair *rqpair;
struct spdk_nvmf_rdma_poller *rpoller;
int rc, i, num_cqe, required_num_wr;;
int rc, num_cqe, required_num_wr;
#ifndef SPDK_CONFIG_RDMA_SRQ
int i;
struct spdk_nvmf_rdma_recv *rdma_recv;
struct spdk_nvmf_rdma_request *rdma_req;
struct spdk_nvmf_rdma_transport *rtransport;
struct spdk_nvmf_transport *transport;
#endif
struct spdk_nvmf_rdma_device *device;
struct ibv_qp_init_attr ibv_init_attr;
rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
transport = &rtransport->transport;
device = rqpair->port->device;
memset(&ibv_init_attr, 0, sizeof(struct ibv_qp_init_attr));
@ -653,10 +723,15 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
ibv_init_attr.qp_type = IBV_QPT_RC;
ibv_init_attr.send_cq = rqpair->poller->cq;
ibv_init_attr.recv_cq = rqpair->poller->cq;
#ifdef SPDK_CONFIG_RDMA_SRQ
ibv_init_attr.srq = rqpair->poller->srq;
#endif
ibv_init_attr.cap.max_send_wr = rqpair->max_queue_depth *
2 + 1; /* SEND, READ, and WRITE operations + dummy drain WR */
#ifndef SPDK_CONFIG_RDMA_SRQ
ibv_init_attr.cap.max_recv_wr = rqpair->max_queue_depth +
1; /* RECV operations + dummy drain WR */
#endif
ibv_init_attr.cap.max_send_sge = spdk_min(device->attr.max_sge, NVMF_DEFAULT_TX_SGE);
ibv_init_attr.cap.max_recv_sge = spdk_min(device->attr.max_sge, NVMF_DEFAULT_RX_SGE);
@ -710,6 +785,10 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
spdk_trace_record(TRACE_RDMA_QP_CREATE, 0, 0, (uintptr_t)rqpair->cm_id, 0);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "New RDMA Connection: %p\n", qpair);
#ifndef SPDK_CONFIG_RDMA_SRQ
rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
transport = &rtransport->transport;
rqpair->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->reqs));
rqpair->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->recvs));
rqpair->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cmds),
@ -760,12 +839,15 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
rqpair->bufs, rqpair->max_queue_depth *
transport->opts.in_capsule_data_size, rqpair->bufs_mr->lkey);
}
#endif
STAILQ_INIT(&rqpair->free_queue);
rqpair->current_recv_depth = rqpair->max_queue_depth;
STAILQ_INIT(&rqpair->pending_rdma_read_queue);
STAILQ_INIT(&rqpair->pending_rdma_write_queue);
rqpair->current_recv_depth = rqpair->max_queue_depth;
#ifndef SPDK_CONFIG_RDMA_SRQ
STAILQ_INIT(&rqpair->free_queue);
for (i = 0; i < rqpair->max_queue_depth; i++) {
struct ibv_recv_wr *bad_wr = NULL;
@ -839,6 +921,7 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
rdma_req->state = RDMA_REQUEST_STATE_FREE;
STAILQ_INSERT_HEAD(&rqpair->free_queue, rdma_req, state_link);
}
#endif
return 0;
}
@ -900,7 +983,11 @@ request_transfer_out(struct spdk_nvmf_request *req, int *data_posted)
assert(rdma_req->recv != NULL);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
rqpair);
#ifndef SPDK_CONFIG_RDMA_SRQ
rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
#else
rc = ibv_post_srq_recv(rqpair->poller->srq, &rdma_req->recv->wr, &bad_recv_wr);
#endif
if (rc) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n");
return rc;
@ -1066,7 +1153,9 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e
rqpair->cm_id = event->id;
rqpair->listen_id = event->listen_id;
rqpair->qpair.transport = transport;
#ifndef SPDK_CONFIG_RDMA_SRQ
STAILQ_INIT(&rqpair->incoming_queue);
#endif
event->id->context = &rqpair->qpair;
cb_fn(&rqpair->qpair);
@ -1399,7 +1488,11 @@ nvmf_rdma_request_free(struct spdk_nvmf_rdma_request *rdma_req,
rdma_req->req.iovcnt = 0;
rdma_req->req.data = NULL;
rqpair->qd--;
#ifndef SPDK_CONFIG_RDMA_SRQ
STAILQ_INSERT_HEAD(&rqpair->free_queue, rdma_req, state_link);
#else
STAILQ_INSERT_HEAD(&rqpair->poller->free_queue, rdma_req, state_link);
#endif
rdma_req->state = RDMA_REQUEST_STATE_FREE;
}
@ -1631,6 +1724,7 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
#define SPDK_NVMF_RDMA_DEFAULT_MAX_QUEUE_DEPTH 128
#define SPDK_NVMF_RDMA_DEFAULT_AQ_DEPTH 128
#define SPDK_NVMF_RDMA_DEFAULT_SRQ_DEPTH 4096
#define SPDK_NVMF_RDMA_DEFAULT_MAX_QPAIRS_PER_CTRLR 64
#define SPDK_NVMF_RDMA_DEFAULT_IN_CAPSULE_DATA_SIZE 4096
#define SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE 131072
@ -1649,6 +1743,7 @@ spdk_nvmf_rdma_opts_init(struct spdk_nvmf_transport_opts *opts)
opts->max_aq_depth = SPDK_NVMF_RDMA_DEFAULT_AQ_DEPTH;
opts->num_shared_buffers = SPDK_NVMF_RDMA_DEFAULT_NUM_SHARED_BUFFERS;
opts->buf_cache_size = SPDK_NVMF_RDMA_DEFAULT_BUFFER_CACHE_SIZE;
opts->max_srq_depth = SPDK_NVMF_RDMA_DEFAULT_SRQ_DEPTH;
}
const struct spdk_mem_map_ops g_nvmf_rdma_map_ops = {
@ -1691,15 +1786,16 @@ spdk_nvmf_rdma_create(struct spdk_nvmf_transport_opts *opts)
SPDK_INFOLOG(SPDK_LOG_RDMA, "*** RDMA Transport Init ***\n"
" Transport opts: max_ioq_depth=%d, max_io_size=%d,\n"
" max_qpairs_per_ctrlr=%d, io_unit_size=%d,\n"
" in_capsule_data_size=%d, max_aq_depth=%d\n"
" num_shared_buffers=%d\n",
" in_capsule_data_size=%d, max_aq_depth=%d,\n"
" num_shared_buffers=%d, max_srq_depth=%d\n",
opts->max_queue_depth,
opts->max_io_size,
opts->max_qpairs_per_ctrlr,
opts->io_unit_size,
opts->in_capsule_data_size,
opts->max_aq_depth,
opts->num_shared_buffers);
opts->num_shared_buffers,
opts->max_srq_depth);
/* I/O unit size cannot be larger than max I/O size */
if (opts->io_unit_size > opts->max_io_size) {
@ -2126,6 +2222,9 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport
struct spdk_nvmf_rdma_qpair *rqpair, bool drain)
{
struct spdk_nvmf_rdma_request *rdma_req, *req_tmp;
#ifdef SPDK_CONFIG_RDMA_SRQ
struct spdk_nvmf_rdma_poller *rpoller = rqpair->poller;
#endif
/* We process I/O in the data transfer pending queue at the highest priority. RDMA reads first */
STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_read_queue, state_link, req_tmp) {
@ -2149,13 +2248,20 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport
}
}
#ifndef SPDK_CONFIG_RDMA_SRQ
while (!STAILQ_EMPTY(&rqpair->free_queue) && !STAILQ_EMPTY(&rqpair->incoming_queue)) {
rdma_req = STAILQ_FIRST(&rqpair->free_queue);
STAILQ_REMOVE_HEAD(&rqpair->free_queue, state_link);
rdma_req->recv = STAILQ_FIRST(&rqpair->incoming_queue);
STAILQ_REMOVE_HEAD(&rqpair->incoming_queue, link);
#else
while (!STAILQ_EMPTY(&rpoller->free_queue) && !STAILQ_EMPTY(&rpoller->incoming_queue)) {
rdma_req = STAILQ_FIRST(&rpoller->free_queue);
STAILQ_REMOVE_HEAD(&rpoller->free_queue, state_link);
rdma_req->recv = STAILQ_FIRST(&rpoller->incoming_queue);
STAILQ_REMOVE_HEAD(&rpoller->incoming_queue, link);
rdma_req->req.qpair = &rdma_req->recv->qpair->qpair;
#endif
rqpair->qd++;
rdma_req->state = RDMA_REQUEST_STATE_NEW;
if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
@ -2467,13 +2573,22 @@ spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
}
static void
spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group);
static struct spdk_nvmf_transport_poll_group *
spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
{
struct spdk_nvmf_rdma_transport *rtransport;
struct spdk_nvmf_rdma_poll_group *rgroup;
struct spdk_nvmf_rdma_poller *poller, *tpoller;
struct spdk_nvmf_rdma_poller *poller;
struct spdk_nvmf_rdma_device *device;
#ifdef SPDK_CONFIG_RDMA_SRQ
int i, rc;
struct ibv_srq_init_attr srq_init_attr;
struct spdk_nvmf_rdma_recv *rdma_recv;
struct spdk_nvmf_rdma_request *rdma_req;
#endif
rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
@ -2490,7 +2605,9 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
poller = calloc(1, sizeof(*poller));
if (!poller) {
SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n");
goto err_exit;
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
poller->device = device;
@ -2501,29 +2618,166 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
poller->cq = ibv_create_cq(device->context, DEFAULT_NVMF_RDMA_CQ_SIZE, poller, NULL, 0);
if (!poller->cq) {
SPDK_ERRLOG("Unable to create completion queue\n");
free(poller);
goto err_exit;
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
poller->num_cqe = DEFAULT_NVMF_RDMA_CQ_SIZE;
TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link);
#ifdef SPDK_CONFIG_RDMA_SRQ
poller->max_srq_depth = transport->opts.max_srq_depth;
memset(&srq_init_attr, 0, sizeof(struct ibv_srq_init_attr));
srq_init_attr.attr.max_wr = poller->max_srq_depth;
srq_init_attr.attr.max_sge = spdk_min(device->attr.max_sge, NVMF_DEFAULT_RX_SGE);
poller->srq = ibv_create_srq(device->pd, &srq_init_attr);
if (!poller->srq) {
SPDK_ERRLOG("Unable to create shared receive queue, errno %d\n", errno);
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Created RDMA SRQ %p: max_wr %u, max_sge %u, srq_limit %u\n",
poller->srq,
srq_init_attr.attr.max_wr,
srq_init_attr.attr.max_sge,
srq_init_attr.attr.srq_limit);
poller->reqs = calloc(poller->max_srq_depth, sizeof(*poller->reqs));
poller->recvs = calloc(poller->max_srq_depth, sizeof(*poller->recvs));
poller->cmds = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->cmds),
0x1000, NULL);
poller->cpls = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->cpls),
0x1000, NULL);
if (transport->opts.in_capsule_data_size > 0) {
poller->bufs = spdk_dma_zmalloc(poller->max_srq_depth *
transport->opts.in_capsule_data_size,
0x1000, NULL);
}
if (!poller->reqs || !poller->recvs || !poller->cmds ||
!poller->cpls || (transport->opts.in_capsule_data_size && !poller->bufs)) {
SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA shared receive queue.\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
poller->cmds_mr = ibv_reg_mr(device->pd, poller->cmds,
poller->max_srq_depth * sizeof(*poller->cmds),
IBV_ACCESS_LOCAL_WRITE);
poller->cpls_mr = ibv_reg_mr(device->pd, poller->cpls,
poller->max_srq_depth * sizeof(*poller->cpls),
0);
if (transport->opts.in_capsule_data_size) {
poller->bufs_mr = ibv_reg_mr(device->pd, poller->bufs,
poller->max_srq_depth *
transport->opts.in_capsule_data_size,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
}
if (!poller->cmds_mr || !poller->cpls_mr || (transport->opts.in_capsule_data_size &&
!poller->bufs_mr)) {
SPDK_ERRLOG("Unable to register required memory for RDMA shared receive queue.\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
poller->cmds, poller->max_srq_depth * sizeof(*poller->cmds), poller->cmds_mr->lkey);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
poller->cpls, poller->max_srq_depth * sizeof(*poller->cpls), poller->cpls_mr->lkey);
if (poller->bufs && poller->bufs_mr) {
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
poller->bufs, poller->max_srq_depth *
transport->opts.in_capsule_data_size, poller->bufs_mr->lkey);
}
/* Initialize queues */
STAILQ_INIT(&poller->incoming_queue);
STAILQ_INIT(&poller->free_queue);
for (i = 0; i < poller->max_srq_depth; i++) {
struct ibv_recv_wr *bad_wr = NULL;
rdma_recv = &poller->recvs[i];
rdma_recv->qpair = NULL;
/* Set up memory to receive commands */
if (poller->bufs) {
rdma_recv->buf = (void *)((uintptr_t)poller->bufs + (i *
transport->opts.in_capsule_data_size));
}
rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV;
rdma_recv->sgl[0].addr = (uintptr_t)&poller->cmds[i];
rdma_recv->sgl[0].length = sizeof(poller->cmds[i]);
rdma_recv->sgl[0].lkey = poller->cmds_mr->lkey;
rdma_recv->wr.num_sge = 1;
if (rdma_recv->buf && poller->bufs_mr) {
rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
rdma_recv->sgl[1].length = transport->opts.in_capsule_data_size;
rdma_recv->sgl[1].lkey = poller->bufs_mr->lkey;
rdma_recv->wr.num_sge++;
}
rdma_recv->wr.wr_id = (uintptr_t)&rdma_recv->rdma_wr;
rdma_recv->wr.sg_list = rdma_recv->sgl;
rc = ibv_post_srq_recv(poller->srq, &rdma_recv->wr, &bad_wr);
if (rc) {
SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
}
for (i = 0; i < poller->max_srq_depth; i++) {
rdma_req = &poller->reqs[i];
rdma_req->req.qpair = NULL;
rdma_req->req.cmd = NULL;
/* Set up memory to send responses */
rdma_req->req.rsp = &poller->cpls[i];
rdma_req->rsp.sgl[0].addr = (uintptr_t)&poller->cpls[i];
rdma_req->rsp.sgl[0].length = sizeof(poller->cpls[i]);
rdma_req->rsp.sgl[0].lkey = poller->cpls_mr->lkey;
rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND;
rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr;
rdma_req->rsp.wr.next = NULL;
rdma_req->rsp.wr.opcode = IBV_WR_SEND;
rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
/* Set up memory for data buffers */
rdma_req->data.rdma_wr.type = RDMA_WR_TYPE_DATA;
rdma_req->data.wr.wr_id = (uintptr_t)&rdma_req->data.rdma_wr;
rdma_req->data.wr.next = NULL;
rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->data.wr.sg_list = rdma_req->data.sgl;
rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
/* Initialize request state to FREE */
rdma_req->state = RDMA_REQUEST_STATE_FREE;
STAILQ_INSERT_TAIL(&poller->free_queue, rdma_req, state_link);
}
#endif
}
pthread_mutex_unlock(&rtransport->lock);
return &rgroup->group;
err_exit:
TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tpoller) {
TAILQ_REMOVE(&rgroup->pollers, poller, link);
if (poller->cq) {
ibv_destroy_cq(poller->cq);
}
free(poller);
}
free(rgroup);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
static void
@ -2542,6 +2796,32 @@ spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) {
TAILQ_REMOVE(&rgroup->pollers, poller, link);
#ifdef SPDK_CONFIG_RDMA_SRQ
if (poller->cmds_mr) {
ibv_dereg_mr(poller->cmds_mr);
}
if (poller->cpls_mr) {
ibv_dereg_mr(poller->cpls_mr);
}
if (poller->bufs_mr) {
ibv_dereg_mr(poller->bufs_mr);
}
if (poller->srq) {
ibv_destroy_srq(poller->srq);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Destroyed RDMA shared queue %p\n", poller->srq);
}
/* Free all memory */
spdk_dma_free(poller->cmds);
spdk_dma_free(poller->cpls);
spdk_dma_free(poller->bufs);
free(poller->reqs);
free(poller->recvs);
#endif
if (poller->cq) {
ibv_destroy_cq(poller->cq);
}
@ -2689,6 +2969,22 @@ spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
NVMF_RDMA_QPAIR_DESTROY_TIMEOUT_US);
}
#ifdef SPDK_CONFIG_RDMA_SRQ
static struct spdk_nvmf_rdma_qpair *
get_rdma_qpair_from_wc(struct spdk_nvmf_rdma_poller *rpoller, struct ibv_wc *wc)
{
struct spdk_nvmf_rdma_qpair *rqpair;
/* @todo: improve QP search */
TAILQ_FOREACH(rqpair, &rpoller->qpairs, link) {
if (wc->qp_num == rqpair->cm_id->qp->qp_num) {
return rqpair;
}
}
SPDK_ERRLOG("Didn't find QP with qp_num %u\n", wc->qp_num);
return NULL;
}
#endif
#ifdef DEBUG
static int
spdk_nvmf_rdma_req_is_completing(struct spdk_nvmf_rdma_request *rdma_req)
@ -2746,14 +3042,20 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
break;
case RDMA_WR_TYPE_RECV:
rdma_recv = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr);
#ifdef SPDK_CONFIG_RDMA_SRQ
rdma_recv->qpair = get_rdma_qpair_from_wc(rpoller, &wc[i]);
assert(rdma_recv->qpair != NULL);
#endif
rqpair = rdma_recv->qpair;
/* Dump this into the incoming queue. This gets cleaned up when
* the queue pair disconnects or recovers. */
#ifndef SPDK_CONFIG_RDMA_SRQ
STAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
#else
STAILQ_INSERT_TAIL(&rpoller->incoming_queue, rdma_recv, link);
#endif
rqpair->current_recv_depth++;
/* Don't worry about responding to recv overflow, we are disconnecting anyways */
break;
case RDMA_WR_TYPE_DATA:
/* If the data transfer fails still force the queue into the error state,
@ -2841,6 +3143,10 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
case IBV_WC_RECV:
assert(rdma_wr->type == RDMA_WR_TYPE_RECV);
rdma_recv = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr);
#ifdef SPDK_CONFIG_RDMA_SRQ
rdma_recv->qpair = get_rdma_qpair_from_wc(rpoller, &wc[i]);
assert(rdma_recv->qpair != NULL);
#endif
rqpair = rdma_recv->qpair;
/* The qpair should not send more requests than are allowed per qpair. */
if (rqpair->current_recv_depth >= rqpair->max_queue_depth) {
@ -2848,7 +3154,12 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
} else {
rqpair->current_recv_depth++;
}
#ifndef SPDK_CONFIG_RDMA_SRQ
STAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
#else
STAILQ_INSERT_TAIL(&rpoller->incoming_queue, rdma_recv, link);
#endif
/* Try to process other queued requests */
spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);
break;

View File

@ -1369,7 +1369,8 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse
io_unit_size=args.io_unit_size,
max_aq_depth=args.max_aq_depth,
num_shared_buffers=args.num_shared_buffers,
buf_cache_size=args.buf_cache_size)
buf_cache_size=args.buf_cache_size,
max_srq_depth=args.max_srq_depth)
p = subparsers.add_parser('nvmf_create_transport', help='Create NVMf transport')
p.add_argument('-t', '--trtype', help='Transport type (ex. RDMA)', type=str, required=True)
@ -1381,6 +1382,7 @@ Format: 'user:u1 secret:s1 muser:mu1 msecret:ms1,user:u2 secret:s2 muser:mu2 mse
p.add_argument('-a', '--max-aq-depth', help='Max number of admin cmds per AQ', type=int)
p.add_argument('-n', '--num-shared-buffers', help='The number of pooled data buffers available to the transport', type=int)
p.add_argument('-b', '--buf-cache-size', help='The number of shared buffers to reserve for each poll group', type=int)
p.add_argument('-s', '--max-srq-depth', help='Max number of outstanding I/O per SRQ. Relevant only for RDMA transport', type=int)
p.set_defaults(func=nvmf_create_transport)
def get_nvmf_transports(args):

View File

@ -44,7 +44,8 @@ def nvmf_create_transport(client,
io_unit_size=None,
max_aq_depth=None,
num_shared_buffers=None,
buf_cache_size=None):
buf_cache_size=None,
max_srq_depth=None):
"""NVMf Transport Create options.
Args:
@ -57,6 +58,7 @@ def nvmf_create_transport(client,
max_aq_depth: Max size admin quque per controller (optional)
num_shared_buffers: The number of pooled data buffers available to the transport (optional)
buf_cache_size: The number of shared buffers to reserve for each poll group(optional)
max_srq_depth: Max number of outstanding I/O per shared receive queue (optional)
Returns:
True or False
@ -80,6 +82,8 @@ def nvmf_create_transport(client,
params['num_shared_buffers'] = num_shared_buffers
if buf_cache_size:
params['buf_cache_size'] = buf_cache_size
if max_srq_depth:
params['max_srq_depth'] = max_srq_depth
return client.call('nvmf_create_transport', params)