sock/vpp: Add spdk_vpp_sock_writev_async support

Purpose: To support SPDK iSCSI target applications
when it uses spdk_sock_writev_async.

PS: for some duplicated code between posix and vpp,
we should consider it later. Since if we do it early,
if it cannot be abstracted in the common header,
it will invoke addtional work.

Signed-off-by: Ziye Yang <ziye.yang@intel.com>
Change-Id: I30d8ee81f80ea5e74c53ff726ee44b0612867c71
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/481749
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Community-CI: SPDK CI Jenkins <sys_sgci@intel.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
Ziye Yang 2020-01-16 21:34:39 +08:00 committed by Jim Harris
parent 9857ff6015
commit be6aa42708

View File

@ -75,6 +75,7 @@
#define SPDK_VPP_LISTEN_QUEUE_SIZE SPDK_VPP_SESSIONS_MAX
#define SPDK_VPP_SEGMENT_BASEVA 0x200000000ULL
#define SPDK_VPP_SEGMENT_TIMEOUT 20
#define IOV_BATCH_SIZE 64
/* VPP connection state */
enum spdk_vpp_state {
@ -917,7 +918,7 @@ spdk_vpp_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
}
static ssize_t
spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
struct spdk_vpp_session *session = __vpp_session(_sock);
ssize_t total = 0;
@ -962,15 +963,151 @@ spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
return total;
}
static int
_sock_flush(struct spdk_sock *sock)
{
struct iovec iovs[IOV_BATCH_SIZE];
int iovcnt;
int retval;
struct spdk_sock_request *req;
int i;
ssize_t rc;
unsigned int offset;
size_t len;
/* Can't flush from within a callback or we end up with recursive calls */
if (sock->cb_cnt > 0) {
return 0;
}
/* Gather an iov */
iovcnt = 0;
req = TAILQ_FIRST(&sock->queued_reqs);
while (req) {
offset = req->internal.offset;
for (i = 0; i < req->iovcnt; i++) {
/* Consume any offset first */
if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
continue;
}
iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
iovcnt++;
offset = 0;
if (iovcnt >= IOV_BATCH_SIZE) {
break;
}
}
if (iovcnt >= IOV_BATCH_SIZE) {
break;
}
req = TAILQ_NEXT(req, internal.link);
}
if (iovcnt == 0) {
return 0;
}
/* Perform the vectored write */
rc = _vpp_sock_writev(sock, iovs, iovcnt);
if (rc <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
return rc;
}
/* Consume the requests that were actually written */
req = TAILQ_FIRST(&sock->queued_reqs);
while (req) {
offset = req->internal.offset;
for (i = 0; i < req->iovcnt; i++) {
/* Advance by the offset first */
if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
continue;
}
/* Calculate the remaining length of this element */
len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
if (len > (size_t)rc) {
/* This element was partially sent. */
req->internal.offset += rc;
return 0;
}
offset = 0;
req->internal.offset += len;
rc -= len;
}
/* Handled a full request. */
req->internal.offset = 0;
spdk_sock_request_pend(sock, req);
/* The _vpp_sock_writev above isn't currently asynchronous,
* so it's already done. */
retval = spdk_sock_request_put(sock, req, 0);
if (rc == 0 || retval) {
break;
}
req = TAILQ_FIRST(&sock->queued_reqs);
}
return 0;
}
static ssize_t
spdk_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
int rc;
/* In order to process a writev, we need to flush any asynchronous writes
* first. */
rc = _sock_flush(_sock);
if (rc < 0) {
return rc;
}
if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
/* We weren't able to flush all requests */
errno = EAGAIN;
return -1;
}
return _vpp_sock_writev(_sock, iov, iovcnt);
}
static void
spdk_vpp_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
{
/* TODO. We don't test VPP with NVMe-oF TCP right now. This will
* need to be implemented later. */
assert(false);
int rc;
spdk_sock_request_queue(sock, req);
spdk_sock_request_put(sock, req, -ENOTSUP);
if (sock->group_impl == NULL) {
spdk_sock_request_put(sock, req, -ENOTSUP);
return;
}
/* If there are a sufficient number queued, just flush them out immediately. */
if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
rc = _sock_flush(sock);
if (rc) {
spdk_sock_abort_requests(sock);
}
}
}
static int
@ -1092,8 +1229,8 @@ static int
spdk_vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
struct spdk_sock **socks)
{
int num_events;
struct spdk_sock *sock;
int num_events, rc;
struct spdk_sock *sock, *tmp;
struct spdk_vpp_session *session;
struct spdk_vpp_sock_group_impl *group;
@ -1104,6 +1241,16 @@ spdk_vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_event
group = __vpp_group_impl(_group);
num_events = 0;
/* This must be a TAILQ_FOREACH_SAFE because while flushing,
* a completion callback could remove the sock from the
* group. */
TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
rc = _sock_flush(sock);
if (rc) {
spdk_sock_abort_requests(sock);
}
}
sock = group->last_sock;
if (sock == NULL) {
sock = TAILQ_FIRST(&group->base.socks);