diff --git a/include/spdk/sock.h b/include/spdk/sock.h index 74ac19b28..b4a614c0a 100644 --- a/include/spdk/sock.h +++ b/include/spdk/sock.h @@ -45,6 +45,7 @@ extern "C" { #endif struct spdk_sock; +struct spdk_sock_group; int spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, char *caddr, int clen); struct spdk_sock *spdk_sock_connect(const char *ip, int port); @@ -61,6 +62,16 @@ int spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz); bool spdk_sock_is_ipv6(struct spdk_sock *sock); bool spdk_sock_is_ipv4(struct spdk_sock *sock); +typedef void (*spdk_sock_cb)(void *arg, struct spdk_sock_group *group, struct spdk_sock *sock); + +struct spdk_sock_group *spdk_sock_group_create(void); +int spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock, + spdk_sock_cb cb_fn, void *cb_arg); +int spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock); +int spdk_sock_group_poll(struct spdk_sock_group *group); +int spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events); +int spdk_sock_group_close(struct spdk_sock_group **group); + #ifdef __cplusplus } #endif diff --git a/lib/iscsi/conn.c b/lib/iscsi/conn.c index 79a63e55b..06ae37d6e 100644 --- a/lib/iscsi/conn.c +++ b/lib/iscsi/conn.c @@ -121,211 +121,6 @@ spdk_find_iscsi_connection_by_id(int cid) } } -/* - * Some of this code may be useful once we add back an epoll/kqueue descriptor - * for normal processing. So just #if 0 it out for now. - */ -#if 0 -#if defined(__FreeBSD__) - -static int -init_idle_conns(void) -{ - assert(g_poll_fd == 0); - g_poll_fd = kqueue(); - if (g_poll_fd < 0) { - SPDK_ERRLOG("kqueue() failed, errno %d: %s\n", errno, spdk_strerror(errno)); - return -1; - } - - return 0; -} - -static int -add_idle_conn(struct spdk_iscsi_conn *conn) -{ - struct kevent event; - struct timespec ts = {0}; - int rc; - - EV_SET(&event, conn->sock, EVFILT_READ, EV_ADD, 0, 0, conn); - - rc = kevent(g_poll_fd, &event, 1, NULL, 0, &ts); - if (rc == -1) { - SPDK_ERRLOG("kevent(EV_ADD) failed\n"); - return -1; - } - - return 0; -} - -static int -del_idle_conn(struct spdk_iscsi_conn *conn) -{ - struct kevent event; - struct timespec ts = {0}; - int rc; - - EV_SET(&event, conn->sock, EVFILT_READ, EV_DELETE, 0, 0, NULL); - - rc = kevent(g_poll_fd, &event, 1, NULL, 0, &ts); - if (rc == -1) { - SPDK_ERRLOG("kevent(EV_DELETE) failed\n"); - return -1; - } - if (event.flags & EV_ERROR) { - SPDK_ERRLOG("kevent(EV_DELETE) failed: %s\n", spdk_strerror(event.data)); - return -1; - } - - return 0; -} - -static void -check_idle_conns(void) -{ - struct kevent events[SPDK_MAX_POLLERS_PER_CORE]; - int i; - int nfds; - struct spdk_iscsi_conn *conn; - struct timespec ts = {0}; - - /* if nothing idle, can exit now */ - if (STAILQ_EMPTY(&g_idle_conn_list_head)) { - /* this kevent is needed to finish socket closing process */ - kevent(g_poll_fd, NULL, 0, events, SPDK_MAX_POLLERS_PER_CORE, &ts); - } - - /* Perform a non-blocking poll */ - nfds = kevent(g_poll_fd, NULL, 0, events, SPDK_MAX_POLLERS_PER_CORE, &ts); - if (nfds < 0) { - SPDK_ERRLOG("kevent failed! (ret: %d)\n", nfds); - return; - } - - if (nfds > SPDK_MAX_POLLERS_PER_CORE) { - SPDK_ERRLOG("kevent events exceeded limit! %d > %d\n", nfds, - SPDK_MAX_POLLERS_PER_CORE); - assert(0); - } - - /* - * In the case of any event cause (EPOLLIN or EPOLLERR) - * just make the connection active for normal process loop. - */ - for (i = 0; i < nfds; i++) { - - conn = (struct spdk_iscsi_conn *)events[i].udata; - - /* - * Flag the connection that an event was noticed - * such that during the list scan process it will - * be re-inserted into the active ring - */ - conn->pending_activate_event = true; - } -} - -#else - -static int -init_idle_conns(void) -{ - assert(g_poll_fd == 0); - g_poll_fd = epoll_create1(0); - if (g_poll_fd < 0) { - SPDK_ERRLOG("epoll_create1() failed, errno %d: %s\n", errno, spdk_strerror(errno)); - return -1; - } - - return 0; -} - -static int -add_idle_conn(struct spdk_iscsi_conn *conn) -{ - struct epoll_event event; - int rc; - - event.events = EPOLLIN; - event.data.u64 = 0LL; - event.data.ptr = conn; - - rc = epoll_ctl(g_poll_fd, EPOLL_CTL_ADD, conn->sock, &event); - if (rc == 0) { - return 0; - } else { - SPDK_ERRLOG("conn epoll_ctl failed\n"); - return -1; - } -} - -static int -del_idle_conn(struct spdk_iscsi_conn *conn) -{ - struct epoll_event event; - int rc; - - /* - * The event parameter is ignored but needs to be non-NULL to work around a bug in old - * kernel versions. - */ - rc = epoll_ctl(g_poll_fd, EPOLL_CTL_DEL, conn->sock, &event); - if (rc == 0) { - return 0; - } else { - SPDK_ERRLOG("epoll_ctl(EPOLL_CTL_DEL) failed\n"); - return -1; - } -} - -static void -check_idle_conns(void) -{ - struct epoll_event events[SPDK_MAX_POLLERS_PER_CORE]; - int i; - int nfds; - struct spdk_iscsi_conn *conn; - - /* if nothing idle, can exit now */ - if (STAILQ_EMPTY(&g_idle_conn_list_head)) { - /* this epoll_wait is needed to finish socket closing process */ - epoll_wait(g_poll_fd, events, SPDK_MAX_POLLERS_PER_CORE, 0); - } - - /* Perform a non-blocking epoll */ - nfds = epoll_wait(g_poll_fd, events, SPDK_MAX_POLLERS_PER_CORE, 0); - if (nfds < 0) { - SPDK_ERRLOG("epoll_wait failed! (ret: %d)\n", nfds); - return; - } - - if (nfds > SPDK_MAX_POLLERS_PER_CORE) { - SPDK_ERRLOG("epoll_wait events exceeded limit! %d > %d\n", nfds, - SPDK_MAX_POLLERS_PER_CORE); - assert(0); - } - - /* - * In the case of any event cause (EPOLLIN or EPOLLERR) - * just make the connection active for normal process loop. - */ - for (i = 0; i < nfds; i++) { - - conn = (struct spdk_iscsi_conn *)events[i].data.ptr; - - /* - * Flag the connection that an event was noticed - * such that during the list scan process it will - * be re-inserted into the active ring - */ - conn->pending_activate_event = true; - } -} - -#endif -#endif - int spdk_initialize_iscsi_conns(void) { size_t conns_size; diff --git a/lib/net/sock.c b/lib/net/sock.c index 8562bff68..3613c4895 100644 --- a/lib/net/sock.c +++ b/lib/net/sock.c @@ -33,14 +33,31 @@ #include "spdk/stdinc.h" +#if defined(__linux__) +#include +#elif defined(__FreeBSD__) +#include +#endif + #include "spdk/log.h" #include "spdk/sock.h" +#include "spdk/queue.h" #define MAX_TMPBUF 1024 #define PORTNUMLEN 32 +#define MAX_EVENTS_PER_POLL 32 + struct spdk_sock { - int fd; + int fd; + spdk_sock_cb cb_fn; + void *cb_arg; + TAILQ_ENTRY(spdk_sock) link; +}; + +struct spdk_sock_group { + int fd; + TAILQ_HEAD(, spdk_sock) socks; }; static int get_addr_str(struct sockaddr *sa, char *host, size_t hlen) @@ -307,6 +324,12 @@ spdk_sock_close(struct spdk_sock **sock) return -1; } + if ((*sock)->cb_fn != NULL) { + /* This sock is still part of a sock_group. */ + errno = EBUSY; + return -1; + } + rc = close((*sock)->fd); if (rc == 0) { @@ -412,3 +435,184 @@ spdk_sock_is_ipv4(struct spdk_sock *sock) return (sa.ss_family == AF_INET); } + +struct spdk_sock_group * +spdk_sock_group_create(void) +{ + struct spdk_sock_group *sock_group; + int fd; + +#if defined(__linux__) + fd = epoll_create1(0); +#elif defined(__FreeBSD__) + fd = kqueue(); +#endif + if (fd == -1) { + return NULL; + } + + sock_group = calloc(1, sizeof(*sock_group)); + if (sock_group == NULL) { + SPDK_ERRLOG("sock_group allocation failed\n"); + close(fd); + return NULL; + } + + sock_group->fd = fd; + TAILQ_INIT(&sock_group->socks); + + return sock_group; +} + +int +spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock, + spdk_sock_cb cb_fn, void *cb_arg) +{ + int rc; + + if (cb_fn == NULL) { + errno = EINVAL; + return -1; + } + + if (sock->cb_fn != NULL) { + /* + * This sock is already part of a sock_group. Currently we don't + * support this. + */ + errno = EBUSY; + return -1; + } + +#if defined(__linux__) + struct epoll_event event; + + event.events = EPOLLIN; + event.data.ptr = sock; + + rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); +#elif defined(__FreeBSD__) + struct kevent event; + struct timespec ts = {0}; + + EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); + + rc = kevent(group->fd, &event, 1, NULL, 0, &ts); +#endif + if (rc == 0) { + TAILQ_INSERT_TAIL(&group->socks, sock, link); + sock->cb_fn = cb_fn; + sock->cb_arg = cb_arg; + } + + return rc; +} + +int +spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock) +{ + int rc; +#if defined(__linux__) + struct epoll_event event; + + /* Event parameter is ignored but some old kernel version still require it. */ + rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); +#elif defined(__FreeBSD__) + struct kevent event; + struct timespec ts = {0}; + + EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + + rc = kevent(group->fd, &event, 1, NULL, 0, &ts); + if (rc == 0 && event.flags & EV_ERROR) { + rc = -1; + errno = event.data; + } +#endif + if (rc == 0) { + TAILQ_REMOVE(&group->socks, sock, link); + sock->cb_fn = NULL; + sock->cb_arg = NULL; + } + + return rc; +} + +int +spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events) +{ + struct spdk_sock *sock; + int num_events, i; + + if (max_events < 1) { + errno = -EINVAL; + return -1; + } + + /* + * Only poll for up to 32 events at a time - if more events are pending, + * the next call to this function will reap them. + */ + if (max_events > MAX_EVENTS_PER_POLL) { + max_events = MAX_EVENTS_PER_POLL; + } + +#if defined(__linux__) + struct epoll_event events[MAX_EVENTS_PER_POLL]; + + num_events = epoll_wait(group->fd, events, max_events, 0); +#elif defined(__FreeBSD__) + struct kevent events[MAX_EVENTS_PER_POLL]; + struct timespec ts = {0}; + + num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); +#endif + + if (num_events == -1) { + return -1; + } + + for (i = 0; i < num_events; i++) { +#if defined(__linux__) + sock = events[i].data.ptr; +#elif defined(__FreeBSD__) + sock = events[i].udata; +#endif + + assert(sock->cb_fn != NULL); + sock->cb_fn(sock->cb_arg, group, sock); + } + + return 0; +} + +int +spdk_sock_group_poll(struct spdk_sock_group *group) +{ + return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL); +} + +int +spdk_sock_group_close(struct spdk_sock_group **group) +{ + int rc; + + if (*group == NULL) { + errno = EBADF; + return -1; + } + + if (!TAILQ_EMPTY(&(*group)->socks)) { + errno = EBUSY; + return -1; + } + + rc = close((*group)->fd); + + if (rc == 0) { + free(*group); + *group = NULL; + } + + return rc; +} diff --git a/test/unit/lib/net/sock.c/sock_ut.c b/test/unit/lib/net/sock.c/sock_ut.c index 2a9649edd..284c3830d 100644 --- a/test/unit/lib/net/sock.c/sock_ut.c +++ b/test/unit/lib/net/sock.c/sock_ut.c @@ -37,6 +37,11 @@ #include "sock.c" +bool g_read_data_called; +ssize_t g_bytes_read; +char g_buf[256]; +struct spdk_sock *g_server_sock_read; + static void sock(void) { @@ -98,6 +103,227 @@ sock(void) CU_ASSERT(rc == 0); } +static void +read_data(void *cb_arg, struct spdk_sock_group *group, struct spdk_sock *sock) +{ + struct spdk_sock *server_sock = cb_arg; + + CU_ASSERT(server_sock == sock); + + g_read_data_called = true; + g_bytes_read += spdk_sock_recv(server_sock, g_buf + g_bytes_read, sizeof(g_buf) - g_bytes_read); +} + +static void +sock_group(void) +{ + struct spdk_sock_group *group; + struct spdk_sock *listen_sock; + struct spdk_sock *server_sock; + struct spdk_sock *client_sock; + char *test_string = "abcdef"; + ssize_t bytes_written; + struct iovec iov; + int rc; + + listen_sock = spdk_sock_listen("127.0.0.1", 3260); + SPDK_CU_ASSERT_FATAL(listen_sock != NULL); + + server_sock = spdk_sock_accept(listen_sock); + CU_ASSERT(server_sock == NULL); + CU_ASSERT(errno == EAGAIN || errno == EWOULDBLOCK); + + client_sock = spdk_sock_connect("127.0.0.1", 3260); + SPDK_CU_ASSERT_FATAL(client_sock != NULL); + + usleep(1000); + + server_sock = spdk_sock_accept(listen_sock); + SPDK_CU_ASSERT_FATAL(server_sock != NULL); + + group = spdk_sock_group_create(); + SPDK_CU_ASSERT_FATAL(group != NULL); + + /* pass null cb_fn */ + rc = spdk_sock_group_add_sock(group, server_sock, NULL, NULL); + CU_ASSERT(rc == -1); + CU_ASSERT(errno == EINVAL); + + rc = spdk_sock_group_add_sock(group, server_sock, read_data, server_sock); + CU_ASSERT(rc == 0); + + /* try adding sock a second time */ + rc = spdk_sock_group_add_sock(group, server_sock, read_data, server_sock); + CU_ASSERT(rc == -1); + CU_ASSERT(errno == EBUSY); + + g_read_data_called = false; + g_bytes_read = 0; + rc = spdk_sock_group_poll(group); + + CU_ASSERT(rc == 0); + CU_ASSERT(g_read_data_called == false); + + iov.iov_base = test_string; + iov.iov_len = 7; + bytes_written = spdk_sock_writev(client_sock, &iov, 1); + CU_ASSERT(bytes_written == 7); + + usleep(1000); + + g_read_data_called = false; + g_bytes_read = 0; + rc = spdk_sock_group_poll(group); + + CU_ASSERT(rc == 0); + CU_ASSERT(g_read_data_called == true); + CU_ASSERT(g_bytes_read == 7); + + CU_ASSERT(strncmp(test_string, g_buf, 7) == 0); + + rc = spdk_sock_close(&client_sock); + CU_ASSERT(client_sock == NULL); + CU_ASSERT(rc == 0); + + /* Try to close sock_group while it still has sockets. */ + rc = spdk_sock_group_close(&group); + CU_ASSERT(rc == -1); + CU_ASSERT(errno == EBUSY); + + /* Try to close sock while it is still part of a sock_group. */ + rc = spdk_sock_close(&server_sock); + CU_ASSERT(rc == -1); + CU_ASSERT(errno == EBUSY); + + rc = spdk_sock_group_remove_sock(group, server_sock); + CU_ASSERT(rc == 0); + + rc = spdk_sock_group_close(&group); + CU_ASSERT(group == NULL); + CU_ASSERT(rc == 0); + + rc = spdk_sock_close(&server_sock); + CU_ASSERT(server_sock == NULL); + CU_ASSERT(rc == 0); + + rc = spdk_sock_close(&listen_sock); + CU_ASSERT(listen_sock == NULL); + CU_ASSERT(rc == 0); +} + +static void +read_data_fairness(void *cb_arg, struct spdk_sock_group *group, struct spdk_sock *sock) +{ + struct spdk_sock *server_sock = cb_arg; + ssize_t bytes_read; + char buf[1]; + + CU_ASSERT(g_server_sock_read == NULL); + CU_ASSERT(server_sock == sock); + + g_server_sock_read = server_sock; + bytes_read = spdk_sock_recv(server_sock, buf, 1); + CU_ASSERT(bytes_read == 1); +} + +static void +sock_group_fairness(void) +{ + struct spdk_sock_group *group; + struct spdk_sock *listen_sock; + struct spdk_sock *server_sock[3]; + struct spdk_sock *client_sock[3]; + char test_char = 'a'; + ssize_t bytes_written; + struct iovec iov; + int i, rc; + + listen_sock = spdk_sock_listen("127.0.0.1", 3260); + SPDK_CU_ASSERT_FATAL(listen_sock != NULL); + + group = spdk_sock_group_create(); + SPDK_CU_ASSERT_FATAL(group != NULL); + + for (i = 0; i < 3; i++) { + client_sock[i] = spdk_sock_connect("127.0.0.1", 3260); + SPDK_CU_ASSERT_FATAL(client_sock[i] != NULL); + + usleep(1000); + + server_sock[i] = spdk_sock_accept(listen_sock); + SPDK_CU_ASSERT_FATAL(server_sock[i] != NULL); + + rc = spdk_sock_group_add_sock(group, server_sock[i], + read_data_fairness, server_sock[i]); + CU_ASSERT(rc == 0); + } + + iov.iov_base = &test_char; + iov.iov_len = 1; + + for (i = 0; i < 3; i++) { + bytes_written = spdk_sock_writev(client_sock[i], &iov, 1); + CU_ASSERT(bytes_written == 1); + } + + usleep(1000); + + /* + * Poll for just one event - this should be server sock 0, since that + * is the peer of the first client sock that we wrote to. + */ + g_server_sock_read = NULL; + rc = spdk_sock_group_poll_count(group, 1); + CU_ASSERT(rc == 0); + CU_ASSERT(g_server_sock_read == server_sock[0]); + + /* + * Now write another byte to client sock 0. We want to ensure that + * the sock group does not unfairly process the event for this sock + * before the socks that were written to earlier. + */ + bytes_written = spdk_sock_writev(client_sock[0], &iov, 1); + CU_ASSERT(bytes_written == 1); + + usleep(1000); + + g_server_sock_read = NULL; + rc = spdk_sock_group_poll_count(group, 1); + CU_ASSERT(rc == 0); + CU_ASSERT(g_server_sock_read == server_sock[1]); + + g_server_sock_read = NULL; + rc = spdk_sock_group_poll_count(group, 1); + CU_ASSERT(rc == 0); + CU_ASSERT(g_server_sock_read == server_sock[2]); + + g_server_sock_read = NULL; + rc = spdk_sock_group_poll_count(group, 1); + CU_ASSERT(rc == 0); + CU_ASSERT(g_server_sock_read == server_sock[0]); + + for (i = 0; i < 3; i++) { + rc = spdk_sock_group_remove_sock(group, server_sock[i]); + CU_ASSERT(rc == 0); + + rc = spdk_sock_close(&client_sock[i]); + CU_ASSERT(client_sock[i] == NULL); + CU_ASSERT(rc == 0); + + rc = spdk_sock_close(&server_sock[i]); + CU_ASSERT(server_sock[i] == NULL); + CU_ASSERT(rc == 0); + } + + rc = spdk_sock_group_close(&group); + CU_ASSERT(group == NULL); + CU_ASSERT(rc == 0); + + rc = spdk_sock_close(&listen_sock); + CU_ASSERT(listen_sock == NULL); + CU_ASSERT(rc == 0); +} + int main(int argc, char **argv) { @@ -115,7 +341,9 @@ main(int argc, char **argv) } if ( - CU_add_test(suite, "sock", sock) == NULL) { + CU_add_test(suite, "sock", sock) == NULL || + CU_add_test(suite, "sock_group", sock_group) == NULL || + CU_add_test(suite, "sock_group_fairness", sock_group_fairness) == NULL) { CU_cleanup_registry(); return CU_get_error(); }