diff --git a/lib/jsonrpc/jsonrpc_internal.h b/lib/jsonrpc/jsonrpc_internal.h index 48d4e24ca..b0ea21e29 100644 --- a/lib/jsonrpc/jsonrpc_internal.h +++ b/lib/jsonrpc/jsonrpc_internal.h @@ -36,7 +36,6 @@ #include "spdk/stdinc.h" -#include "spdk/env.h" #include "spdk/jsonrpc.h" #include "spdk_internal/log.h" @@ -64,6 +63,8 @@ struct spdk_jsonrpc_request { size_t send_offset; uint8_t *send_buf; + + STAILQ_ENTRY(spdk_jsonrpc_request) link; }; struct spdk_jsonrpc_server_conn { @@ -74,7 +75,10 @@ struct spdk_jsonrpc_server_conn { size_t recv_len; uint8_t recv_buf[SPDK_JSONRPC_RECV_BUF_SIZE]; uint32_t outstanding_requests; - struct spdk_ring *send_queue; + + pthread_spinlock_t queue_lock; + STAILQ_HEAD(, spdk_jsonrpc_request) send_queue; + struct spdk_jsonrpc_request *send_request; }; diff --git a/lib/jsonrpc/jsonrpc_server_tcp.c b/lib/jsonrpc/jsonrpc_server_tcp.c index 18a3ae3e3..64f92f6b6 100644 --- a/lib/jsonrpc/jsonrpc_server_tcp.c +++ b/lib/jsonrpc/jsonrpc_server_tcp.c @@ -123,7 +123,8 @@ spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) spdk_jsonrpc_server_conn_close(conn); - spdk_ring_free(conn->send_queue); + pthread_spin_destroy(&conn->queue_lock); + assert(STAILQ_EMPTY(&conn->send_queue)); /* Swap conn with the last entry in conns */ server->conns[conn_idx] = server->conns[server->num_conns - 1]; @@ -146,13 +147,9 @@ spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server) conn->closed = false; conn->recv_len = 0; conn->outstanding_requests = 0; + pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE); + STAILQ_INIT(&conn->send_queue); conn->send_request = NULL; - conn->send_queue = spdk_ring_create(SPDK_RING_TYPE_SP_SC, 128, SPDK_ENV_SOCKET_ID_ANY); - if (conn->send_queue == NULL) { - SPDK_ERRLOG("send_queue allocation failed\n"); - close(conn->sockfd); - return -1; - } flag = fcntl(conn->sockfd, F_GETFL); if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { @@ -261,9 +258,26 @@ spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_server_conn *conn, struct spdk_jsonrpc_request *request) { /* Queue the response to be sent */ - spdk_ring_enqueue(conn->send_queue, (void **)&request, 1); + pthread_spin_lock(&conn->queue_lock); + STAILQ_INSERT_TAIL(&conn->send_queue, request, link); + pthread_spin_unlock(&conn->queue_lock); } +static struct spdk_jsonrpc_request * +spdk_jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) +{ + struct spdk_jsonrpc_request *request = NULL; + + pthread_spin_lock(&conn->queue_lock); + request = STAILQ_FIRST(&conn->send_queue); + if (request) { + STAILQ_REMOVE_HEAD(&conn->send_queue, link); + } + pthread_spin_unlock(&conn->queue_lock); + return request; +} + + static int spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) { @@ -276,7 +290,8 @@ more: } if (conn->send_request == NULL) { - if (spdk_ring_dequeue(conn->send_queue, (void **)&conn->send_request, 1) != 1) { + conn->send_request = spdk_jsonrpc_server_dequeue_request(conn); + if (conn->send_request == NULL) { return 0; } } @@ -338,7 +353,7 @@ spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) conn->send_request = NULL; } - while (spdk_ring_dequeue(conn->send_queue, (void **)&request, 1) == 1) { + while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) { spdk_jsonrpc_free_request(request); }