sock/uring: mark non-listen sockets as blocking
Instead, we pass MSG_DONTWAIT flag to all recvmsg()/sendmsg() calls. That way, the IOs remain non-blocking, but the socket is marked as blocking allowing us to pass flags like MSG_WAITALL, which only make sense if a socket is in blocking mode. Signed-off-by: Konrad Sztyber <konrad.sztyber@intel.com> Change-Id: Ic52162e84aa14efaf6ad0fb3343d289822758e81 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/12592 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Community-CI: Mellanox Build Bot Reviewed-by: Ben Walker <benjamin.walker@intel.com> Reviewed-by: Shuhei Matsumoto <smatsumoto@nvidia.com>
This commit is contained in:
parent
466f9e8b20
commit
c61554bb68
@ -499,6 +499,15 @@ retry:
|
|||||||
fd = -1;
|
fd = -1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
flag = fcntl(fd, F_GETFL);
|
||||||
|
if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
|
||||||
|
SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
|
||||||
|
close(fd);
|
||||||
|
fd = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_server;
|
enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_server;
|
||||||
} else if (type == SPDK_SOCK_CREATE_CONNECT) {
|
} else if (type == SPDK_SOCK_CREATE_CONNECT) {
|
||||||
rc = connect(fd, res->ai_addr, res->ai_addrlen);
|
rc = connect(fd, res->ai_addr, res->ai_addrlen);
|
||||||
@ -510,15 +519,15 @@ retry:
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_client;
|
flag = fcntl(fd, F_GETFL);
|
||||||
}
|
if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
|
||||||
|
SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
|
||||||
|
close(fd);
|
||||||
|
fd = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
flag = fcntl(fd, F_GETFL);
|
enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_client;
|
||||||
if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
|
|
||||||
SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
|
|
||||||
close(fd);
|
|
||||||
fd = -1;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -575,8 +584,8 @@ uring_sock_accept(struct spdk_sock *_sock)
|
|||||||
fd = rc;
|
fd = rc;
|
||||||
|
|
||||||
flag = fcntl(fd, F_GETFL);
|
flag = fcntl(fd, F_GETFL);
|
||||||
if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
|
if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
|
||||||
SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
|
SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
|
||||||
close(fd);
|
close(fd);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -658,6 +667,17 @@ uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int
|
|||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline ssize_t
|
||||||
|
sock_readv(int fd, struct iovec *iov, int iovcnt)
|
||||||
|
{
|
||||||
|
struct msghdr msg = {
|
||||||
|
.msg_iov = iov,
|
||||||
|
.msg_iovlen = iovcnt,
|
||||||
|
};
|
||||||
|
|
||||||
|
return recvmsg(fd, &msg, MSG_DONTWAIT);
|
||||||
|
}
|
||||||
|
|
||||||
static inline ssize_t
|
static inline ssize_t
|
||||||
uring_sock_read(struct spdk_uring_sock *sock)
|
uring_sock_read(struct spdk_uring_sock *sock)
|
||||||
{
|
{
|
||||||
@ -668,7 +688,7 @@ uring_sock_read(struct spdk_uring_sock *sock)
|
|||||||
bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
|
bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
|
||||||
|
|
||||||
if (bytes > 0) {
|
if (bytes > 0) {
|
||||||
bytes = readv(sock->fd, iov, 2);
|
bytes = sock_readv(sock->fd, iov, 2);
|
||||||
if (bytes > 0) {
|
if (bytes > 0) {
|
||||||
spdk_pipe_writer_advance(sock->recv_pipe, bytes);
|
spdk_pipe_writer_advance(sock->recv_pipe, bytes);
|
||||||
if (sock->base.group_impl) {
|
if (sock->base.group_impl) {
|
||||||
@ -690,7 +710,7 @@ uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
|
|||||||
size_t len;
|
size_t len;
|
||||||
|
|
||||||
if (sock->recv_pipe == NULL) {
|
if (sock->recv_pipe == NULL) {
|
||||||
return readv(sock->fd, iov, iovcnt);
|
return sock_readv(sock->fd, iov, iovcnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
len = 0;
|
len = 0;
|
||||||
@ -702,7 +722,7 @@ uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
|
|||||||
/* If the user is receiving a sufficiently large amount of data,
|
/* If the user is receiving a sufficiently large amount of data,
|
||||||
* receive directly to their buffers. */
|
* receive directly to their buffers. */
|
||||||
if (len >= MIN_SOCK_PIPE_SIZE) {
|
if (len >= MIN_SOCK_PIPE_SIZE) {
|
||||||
return readv(sock->fd, iov, iovcnt);
|
return sock_readv(sock->fd, iov, iovcnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Otherwise, do a big read into our pipe */
|
/* Otherwise, do a big read into our pipe */
|
||||||
@ -730,13 +750,17 @@ static ssize_t
|
|||||||
uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
|
uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
|
||||||
{
|
{
|
||||||
struct spdk_uring_sock *sock = __uring_sock(_sock);
|
struct spdk_uring_sock *sock = __uring_sock(_sock);
|
||||||
|
struct msghdr msg = {
|
||||||
|
.msg_iov = iov,
|
||||||
|
.msg_iovlen = iovcnt,
|
||||||
|
};
|
||||||
|
|
||||||
if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
|
if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
|
||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return writev(sock->fd, iov, iovcnt);
|
return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
@ -1178,7 +1202,7 @@ _sock_flush_client(struct spdk_sock *_sock)
|
|||||||
/* Perform the vectored write */
|
/* Perform the vectored write */
|
||||||
msg.msg_iov = iovs;
|
msg.msg_iov = iovs;
|
||||||
msg.msg_iovlen = iovcnt;
|
msg.msg_iovlen = iovcnt;
|
||||||
rc = sendmsg(sock->fd, &msg, flags);
|
rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
|
||||||
if (rc <= 0) {
|
if (rc <= 0) {
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
return 0;
|
return 0;
|
||||||
@ -1299,7 +1323,7 @@ uring_sock_is_connected(struct spdk_sock *_sock)
|
|||||||
uint8_t byte;
|
uint8_t byte;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
rc = recv(sock->fd, &byte, 1, MSG_PEEK);
|
rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user