nvmf/tcp: Use writev_async for sending data on sockets
This eliminates the flushing logic, simplifying the tcp transport. This also happens to greatly improve performance, especially on random read tests. The batching done in spdk_sock_writev_async seems to be more effectively than the previous batching logic in the tcp transport. Change-Id: Id980ac6073e380dc75f95df3f69cb224f50fb01b Signed-off-by: Ben Walker <benjamin.walker@intel.com> Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/470532 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Community-CI: Broadcom SPDK FC-NVMe CI <spdk-ci.pdl@broadcom.com> Community-CI: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Ziye Yang <ziye.yang@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
parent
7db1ed8b67
commit
5d497f6cf5
@ -111,6 +111,13 @@ struct nvme_tcp_pdu {
|
|||||||
|
|
||||||
nvme_tcp_qpair_xfer_complete_cb cb_fn;
|
nvme_tcp_qpair_xfer_complete_cb cb_fn;
|
||||||
void *cb_arg;
|
void *cb_arg;
|
||||||
|
|
||||||
|
/* The sock request ends with a 0 length iovec. Place the actual iovec immediately
|
||||||
|
* after it. There is a static assert below to check if the compiler inserted
|
||||||
|
* any unwanted padding */
|
||||||
|
struct spdk_sock_request sock_req;
|
||||||
|
struct iovec iov[NVME_TCP_MAX_SGL_DESCRIPTORS * 2];
|
||||||
|
|
||||||
struct iovec data_iov[NVME_TCP_MAX_SGL_DESCRIPTORS];
|
struct iovec data_iov[NVME_TCP_MAX_SGL_DESCRIPTORS];
|
||||||
uint32_t data_iovcnt;
|
uint32_t data_iovcnt;
|
||||||
uint32_t data_len;
|
uint32_t data_len;
|
||||||
@ -127,6 +134,9 @@ struct nvme_tcp_pdu {
|
|||||||
void *req; /* data tied to a tcp request */
|
void *req; /* data tied to a tcp request */
|
||||||
void *qpair;
|
void *qpair;
|
||||||
};
|
};
|
||||||
|
SPDK_STATIC_ASSERT(offsetof(struct nvme_tcp_pdu,
|
||||||
|
sock_req) + sizeof(struct spdk_sock_request) == offsetof(struct nvme_tcp_pdu, iov),
|
||||||
|
"Compiler inserted padding between iov and sock_req");
|
||||||
|
|
||||||
enum nvme_tcp_pdu_recv_state {
|
enum nvme_tcp_pdu_recv_state {
|
||||||
/* Ready to wait for PDU */
|
/* Ready to wait for PDU */
|
||||||
|
186
lib/nvmf/tcp.c
186
lib/nvmf/tcp.c
@ -204,7 +204,6 @@ struct spdk_nvmf_tcp_qpair {
|
|||||||
struct spdk_nvmf_tcp_poll_group *group;
|
struct spdk_nvmf_tcp_poll_group *group;
|
||||||
struct spdk_nvmf_tcp_port *port;
|
struct spdk_nvmf_tcp_port *port;
|
||||||
struct spdk_sock *sock;
|
struct spdk_sock *sock;
|
||||||
struct spdk_poller *flush_poller;
|
|
||||||
|
|
||||||
enum nvme_tcp_pdu_recv_state recv_state;
|
enum nvme_tcp_pdu_recv_state recv_state;
|
||||||
enum nvme_tcp_qpair_state state;
|
enum nvme_tcp_qpair_state state;
|
||||||
@ -413,18 +412,8 @@ static void
|
|||||||
spdk_nvmf_tcp_cleanup_all_states(struct spdk_nvmf_tcp_qpair *tqpair)
|
spdk_nvmf_tcp_cleanup_all_states(struct spdk_nvmf_tcp_qpair *tqpair)
|
||||||
{
|
{
|
||||||
struct spdk_nvmf_tcp_req *tcp_req, *req_tmp;
|
struct spdk_nvmf_tcp_req *tcp_req, *req_tmp;
|
||||||
struct nvme_tcp_pdu *pdu, *tmp_pdu;
|
|
||||||
|
|
||||||
/* Free the pdus in the send_queue */
|
assert(TAILQ_EMPTY(&tqpair->send_queue));
|
||||||
TAILQ_FOREACH_SAFE(pdu, &tqpair->send_queue, tailq, tmp_pdu) {
|
|
||||||
TAILQ_REMOVE(&tqpair->send_queue, pdu, tailq);
|
|
||||||
/* Also check the pdu type, we need to calculte the c2h_data_pdu_cnt later */
|
|
||||||
if (pdu->hdr->common.pdu_type == SPDK_NVME_TCP_PDU_TYPE_C2H_DATA) {
|
|
||||||
assert(tqpair->c2h_data_pdu_cnt > 0);
|
|
||||||
tqpair->c2h_data_pdu_cnt--;
|
|
||||||
}
|
|
||||||
spdk_nvmf_tcp_pdu_put(tqpair, pdu);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!STAILQ_EMPTY(&tqpair->queued_c2h_data_tcp_req)) {
|
while (!STAILQ_EMPTY(&tqpair->queued_c2h_data_tcp_req)) {
|
||||||
STAILQ_REMOVE_HEAD(&tqpair->queued_c2h_data_tcp_req, link);
|
STAILQ_REMOVE_HEAD(&tqpair->queued_c2h_data_tcp_req, link);
|
||||||
@ -468,8 +457,8 @@ spdk_nvmf_tcp_qpair_destroy(struct spdk_nvmf_tcp_qpair *tqpair)
|
|||||||
|
|
||||||
SPDK_DEBUGLOG(SPDK_LOG_NVMF_TCP, "enter\n");
|
SPDK_DEBUGLOG(SPDK_LOG_NVMF_TCP, "enter\n");
|
||||||
|
|
||||||
spdk_poller_unregister(&tqpair->flush_poller);
|
err = spdk_sock_close(&tqpair->sock);
|
||||||
spdk_sock_close(&tqpair->sock);
|
assert(err == 0);
|
||||||
spdk_nvmf_tcp_cleanup_all_states(tqpair);
|
spdk_nvmf_tcp_cleanup_all_states(tqpair);
|
||||||
|
|
||||||
if (tqpair->free_pdu_num != (tqpair->max_queue_depth + NVMF_TCP_QPAIR_MAX_C2H_PDU_NUM)) {
|
if (tqpair->free_pdu_num != (tqpair->max_queue_depth + NVMF_TCP_QPAIR_MAX_C2H_PDU_NUM)) {
|
||||||
@ -771,137 +760,41 @@ spdk_nvmf_tcp_stop_listen(struct spdk_nvmf_transport *transport,
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
|
||||||
spdk_nvmf_tcp_qpair_flush_pdus_internal(struct spdk_nvmf_tcp_qpair *tqpair)
|
|
||||||
{
|
|
||||||
const int array_size = 32;
|
|
||||||
struct iovec iovs[array_size];
|
|
||||||
int iovcnt = 0;
|
|
||||||
int bytes = 0;
|
|
||||||
int total_length = 0;
|
|
||||||
uint32_t mapped_length = 0;
|
|
||||||
struct nvme_tcp_pdu *pdu;
|
|
||||||
int pdu_length;
|
|
||||||
TAILQ_HEAD(, nvme_tcp_pdu) completed_pdus_list;
|
|
||||||
|
|
||||||
pdu = TAILQ_FIRST(&tqpair->send_queue);
|
|
||||||
|
|
||||||
if (pdu == NULL) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Build up a list of iovecs for the first few PDUs in the
|
|
||||||
* tqpair 's send_queue.
|
|
||||||
*/
|
|
||||||
while (pdu != NULL && ((array_size - iovcnt) >= (2 + (int)pdu->data_iovcnt))) {
|
|
||||||
iovcnt += nvme_tcp_build_iovs(&iovs[iovcnt],
|
|
||||||
array_size - iovcnt,
|
|
||||||
pdu,
|
|
||||||
tqpair->host_hdgst_enable,
|
|
||||||
tqpair->host_ddgst_enable,
|
|
||||||
&mapped_length);
|
|
||||||
total_length += mapped_length;
|
|
||||||
pdu = TAILQ_NEXT(pdu, tailq);
|
|
||||||
}
|
|
||||||
|
|
||||||
spdk_trace_record(TRACE_TCP_FLUSH_WRITEBUF_START, 0, total_length, 0, iovcnt);
|
|
||||||
|
|
||||||
bytes = spdk_sock_writev(tqpair->sock, iovs, iovcnt);
|
|
||||||
if (bytes == -1) {
|
|
||||||
if (errno == EWOULDBLOCK || errno == EAGAIN) {
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
SPDK_ERRLOG("spdk_sock_writev() failed, errno %d: %s\n",
|
|
||||||
errno, spdk_strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
spdk_trace_record(TRACE_TCP_FLUSH_WRITEBUF_DONE, 0, bytes, 0, 0);
|
|
||||||
|
|
||||||
pdu = TAILQ_FIRST(&tqpair->send_queue);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Free any PDUs that were fully written. If a PDU was only
|
|
||||||
* partially written, update its writev_offset so that next
|
|
||||||
* time only the unwritten portion will be sent to writev().
|
|
||||||
*/
|
|
||||||
TAILQ_INIT(&completed_pdus_list);
|
|
||||||
while (bytes > 0) {
|
|
||||||
pdu_length = pdu->hdr->common.plen - pdu->writev_offset;
|
|
||||||
if (bytes >= pdu_length) {
|
|
||||||
bytes -= pdu_length;
|
|
||||||
TAILQ_REMOVE(&tqpair->send_queue, pdu, tailq);
|
|
||||||
TAILQ_INSERT_TAIL(&completed_pdus_list, pdu, tailq);
|
|
||||||
pdu = TAILQ_FIRST(&tqpair->send_queue);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
pdu->writev_offset += bytes;
|
|
||||||
bytes = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!TAILQ_EMPTY(&completed_pdus_list)) {
|
|
||||||
pdu = TAILQ_FIRST(&completed_pdus_list);
|
|
||||||
TAILQ_REMOVE(&completed_pdus_list, pdu, tailq);
|
|
||||||
assert(pdu->cb_fn != NULL);
|
|
||||||
pdu->cb_fn(pdu->cb_arg);
|
|
||||||
spdk_nvmf_tcp_pdu_put(tqpair, pdu);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TAILQ_EMPTY(&tqpair->send_queue) ? 0 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
spdk_nvmf_tcp_qpair_flush_pdus(void *_tqpair)
|
|
||||||
{
|
|
||||||
struct spdk_nvmf_tcp_qpair *tqpair = _tqpair;
|
|
||||||
int rc;
|
|
||||||
|
|
||||||
if (tqpair->state == NVME_TCP_QPAIR_STATE_RUNNING) {
|
|
||||||
rc = spdk_nvmf_tcp_qpair_flush_pdus_internal(tqpair);
|
|
||||||
if (rc == 0 && tqpair->flush_poller != NULL) {
|
|
||||||
spdk_poller_unregister(&tqpair->flush_poller);
|
|
||||||
} else if (rc == 1 && tqpair->flush_poller == NULL) {
|
|
||||||
tqpair->flush_poller = spdk_poller_register(spdk_nvmf_tcp_qpair_flush_pdus,
|
|
||||||
tqpair, 50);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/*
|
|
||||||
* If the tqpair state is not RUNNING, then
|
|
||||||
* keep trying to flush PDUs until our list is
|
|
||||||
* empty - to make sure all data is sent before
|
|
||||||
* closing the connection.
|
|
||||||
*/
|
|
||||||
do {
|
|
||||||
rc = spdk_nvmf_tcp_qpair_flush_pdus_internal(tqpair);
|
|
||||||
} while (rc == 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rc < 0 && tqpair->state < NVME_TCP_QPAIR_STATE_EXITING) {
|
|
||||||
/*
|
|
||||||
* If the poller has already started destruction of the tqpair,
|
|
||||||
* i.e. the socket read failed, then the connection state may already
|
|
||||||
* be EXITED. We don't want to set it back to EXITING in that case.
|
|
||||||
*/
|
|
||||||
tqpair->state = NVME_TCP_QPAIR_STATE_EXITING;
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
spdk_nvmf_tcp_qpair_disconnect(struct spdk_nvmf_tcp_qpair *tqpair)
|
spdk_nvmf_tcp_qpair_disconnect(struct spdk_nvmf_tcp_qpair *tqpair)
|
||||||
{
|
{
|
||||||
SPDK_DEBUGLOG(SPDK_LOG_NVMF_TCP, "Disconnecting qpair %p\n", tqpair);
|
SPDK_DEBUGLOG(SPDK_LOG_NVMF_TCP, "Disconnecting qpair %p\n", tqpair);
|
||||||
|
|
||||||
tqpair->state = NVME_TCP_QPAIR_STATE_EXITED;
|
tqpair->state = NVME_TCP_QPAIR_STATE_EXITED;
|
||||||
spdk_nvmf_tcp_qpair_flush_pdus(tqpair);
|
|
||||||
spdk_poller_unregister(&tqpair->timeout_poller);
|
spdk_poller_unregister(&tqpair->timeout_poller);
|
||||||
spdk_nvmf_qpair_disconnect(&tqpair->qpair, NULL, NULL);
|
spdk_nvmf_qpair_disconnect(&tqpair->qpair, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
_pdu_write_done(void *cb_arg, int err)
|
||||||
|
{
|
||||||
|
struct nvme_tcp_pdu *pdu = cb_arg;
|
||||||
|
struct spdk_nvmf_tcp_qpair *tqpair = pdu->qpair;
|
||||||
|
|
||||||
|
TAILQ_REMOVE(&tqpair->send_queue, pdu, tailq);
|
||||||
|
|
||||||
|
if (err != 0) {
|
||||||
|
if (pdu->hdr->common.pdu_type == SPDK_NVME_TCP_PDU_TYPE_C2H_DATA) {
|
||||||
|
assert(tqpair->c2h_data_pdu_cnt > 0);
|
||||||
|
tqpair->c2h_data_pdu_cnt--;
|
||||||
|
}
|
||||||
|
|
||||||
|
spdk_nvmf_tcp_pdu_put(tqpair, pdu);
|
||||||
|
spdk_nvmf_tcp_qpair_disconnect(tqpair);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pdu->cb_fn != NULL);
|
||||||
|
pdu->cb_fn(pdu->cb_arg);
|
||||||
|
|
||||||
|
spdk_nvmf_tcp_pdu_put(tqpair, pdu);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
spdk_nvmf_tcp_qpair_write_pdu(struct spdk_nvmf_tcp_qpair *tqpair,
|
spdk_nvmf_tcp_qpair_write_pdu(struct spdk_nvmf_tcp_qpair *tqpair,
|
||||||
struct nvme_tcp_pdu *pdu,
|
struct nvme_tcp_pdu *pdu,
|
||||||
@ -911,6 +804,8 @@ spdk_nvmf_tcp_qpair_write_pdu(struct spdk_nvmf_tcp_qpair *tqpair,
|
|||||||
int enable_digest;
|
int enable_digest;
|
||||||
int hlen;
|
int hlen;
|
||||||
uint32_t crc32c;
|
uint32_t crc32c;
|
||||||
|
uint32_t mapped_length = 0;
|
||||||
|
ssize_t rc;
|
||||||
|
|
||||||
hlen = pdu->hdr->common.hlen;
|
hlen = pdu->hdr->common.hlen;
|
||||||
enable_digest = 1;
|
enable_digest = 1;
|
||||||
@ -934,8 +829,25 @@ spdk_nvmf_tcp_qpair_write_pdu(struct spdk_nvmf_tcp_qpair *tqpair,
|
|||||||
|
|
||||||
pdu->cb_fn = cb_fn;
|
pdu->cb_fn = cb_fn;
|
||||||
pdu->cb_arg = cb_arg;
|
pdu->cb_arg = cb_arg;
|
||||||
|
|
||||||
|
pdu->sock_req.iovcnt = nvme_tcp_build_iovs(pdu->iov, SPDK_COUNTOF(pdu->iov), pdu,
|
||||||
|
tqpair->host_hdgst_enable, tqpair->host_ddgst_enable,
|
||||||
|
&mapped_length);
|
||||||
|
pdu->sock_req.cb_fn = _pdu_write_done;
|
||||||
|
pdu->sock_req.cb_arg = pdu;
|
||||||
TAILQ_INSERT_TAIL(&tqpair->send_queue, pdu, tailq);
|
TAILQ_INSERT_TAIL(&tqpair->send_queue, pdu, tailq);
|
||||||
spdk_nvmf_tcp_qpair_flush_pdus(tqpair);
|
if (pdu->hdr->common.pdu_type == SPDK_NVME_TCP_PDU_TYPE_IC_RESP ||
|
||||||
|
pdu->hdr->common.pdu_type == SPDK_NVME_TCP_PDU_TYPE_C2H_TERM_REQ) {
|
||||||
|
rc = spdk_sock_writev(tqpair->sock, pdu->iov, pdu->sock_req.iovcnt);
|
||||||
|
if (rc == mapped_length) {
|
||||||
|
_pdu_write_done(pdu, 0);
|
||||||
|
} else {
|
||||||
|
SPDK_ERRLOG("IC_RESP or TERM_REQ could not write to socket.\n");
|
||||||
|
_pdu_write_done(pdu, -1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
spdk_sock_writev_async(tqpair->sock, &pdu->sock_req);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
@ -2711,7 +2623,7 @@ spdk_nvmf_tcp_req_complete(struct spdk_nvmf_request *req)
|
|||||||
static void
|
static void
|
||||||
spdk_nvmf_tcp_close_qpair(struct spdk_nvmf_qpair *qpair)
|
spdk_nvmf_tcp_close_qpair(struct spdk_nvmf_qpair *qpair)
|
||||||
{
|
{
|
||||||
SPDK_DEBUGLOG(SPDK_LOG_NVMF_TCP, "enter\n");
|
SPDK_DEBUGLOG(SPDK_LOG_NVMF_TCP, "Qpair: %p\n", qpair);
|
||||||
|
|
||||||
spdk_nvmf_tcp_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_tcp_qpair, qpair));
|
spdk_nvmf_tcp_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_tcp_qpair, qpair));
|
||||||
}
|
}
|
||||||
|
@ -530,8 +530,6 @@ test_nvmf_tcp_send_c2h_data(void)
|
|||||||
CU_ASSERT(tqpair.c2h_data_pdu_cnt == 3);
|
CU_ASSERT(tqpair.c2h_data_pdu_cnt == 3);
|
||||||
CU_ASSERT(STAILQ_EMPTY(&tqpair.queued_c2h_data_tcp_req));
|
CU_ASSERT(STAILQ_EMPTY(&tqpair.queued_c2h_data_tcp_req));
|
||||||
|
|
||||||
spdk_poller_unregister(&tqpair.flush_poller);
|
|
||||||
|
|
||||||
spdk_thread_exit(thread);
|
spdk_thread_exit(thread);
|
||||||
spdk_thread_destroy(thread);
|
spdk_thread_destroy(thread);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user