From 2446c5c6f3f4c061b0c1bf9d52739efe2a8a1525 Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Wed, 30 Jan 2019 13:02:37 -0700 Subject: [PATCH] thread: Keep caches of message objects on the thread object. Change-Id: I2b34d0c44fb2c4d3ec5d9e4b3c22bfce53543ea1 Signed-off-by: Ben Walker Reviewed-on: https://review.gerrithub.io/c/442774 Tested-by: SPDK CI Jenkins Reviewed-by: Jim Harris Reviewed-by: Shuhei Matsumoto --- lib/thread/thread.c | 71 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 0b7e6de1a..3935e2bdd 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -77,8 +77,11 @@ static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_device struct spdk_msg { spdk_msg_fn fn; void *arg; + + SLIST_ENTRY(spdk_msg) link; }; +#define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 static struct spdk_mempool *g_spdk_msg_mempool = NULL; enum spdk_poller_state { @@ -123,6 +126,9 @@ struct spdk_thread { TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; struct spdk_ring *messages; + + SLIST_HEAD(, spdk_msg) msg_cache; + size_t msg_cache_count; }; static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); @@ -160,7 +166,7 @@ spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn) g_spdk_msg_mempool = spdk_mempool_create(mempool_name, 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ sizeof(struct spdk_msg), - SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, + 0, /* No cache. We do our own. */ SPDK_ENV_SOCKET_ID_ANY); if (!g_spdk_msg_mempool) { @@ -188,6 +194,8 @@ struct spdk_thread * spdk_thread_create(const char *name) { struct spdk_thread *thread; + struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; + int rc, i; thread = calloc(1, sizeof(*thread)); if (!thread) { @@ -198,6 +206,8 @@ spdk_thread_create(const char *name) TAILQ_INIT(&thread->io_channels); TAILQ_INIT(&thread->active_pollers); TAILQ_INIT(&thread->timer_pollers); + SLIST_INIT(&thread->msg_cache); + thread->msg_cache_count = 0; thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); if (!thread->messages) { @@ -206,6 +216,17 @@ spdk_thread_create(const char *name) return NULL; } + /* Fill the local message pool cache. */ + rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); + if (rc == 0) { + /* If we can't populate the cache it's ok. The cache will get filled + * up organically as messages are passed to the thread. */ + for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { + SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); + thread->msg_cache_count++; + } + } + if (name) { _set_thread_name(name); thread->name = strdup(name); @@ -237,6 +258,7 @@ void spdk_thread_exit(struct spdk_thread *thread) { struct spdk_io_channel *ch; + struct spdk_msg *msg; SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name); @@ -257,6 +279,19 @@ spdk_thread_exit(struct spdk_thread *thread) free(thread->name); + msg = SLIST_FIRST(&thread->msg_cache); + while (msg != NULL) { + SLIST_REMOVE_HEAD(&thread->msg_cache, link); + + assert(thread->msg_cache_count > 0); + thread->msg_cache_count--; + spdk_mempool_put(g_spdk_msg_mempool, msg); + + msg = SLIST_FIRST(&thread->msg_cache); + } + + assert(thread->msg_cache_count == 0); + if (thread->messages) { spdk_ring_free(thread->messages); } @@ -295,9 +330,16 @@ _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) assert(msg != NULL); msg->fn(msg->arg); - } - spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count); + if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { + /* Insert the messages at the head. We want to re-use the hot + * ones. */ + SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); + thread->msg_cache_count++; + } else { + spdk_mempool_put(g_spdk_msg_mempool, msg); + } + } return count; } @@ -452,6 +494,7 @@ spdk_thread_get_name(const struct spdk_thread *thread) void spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) { + struct spdk_thread *local_thread; struct spdk_msg *msg; int rc; @@ -460,10 +503,24 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx return; } - msg = spdk_mempool_get(g_spdk_msg_mempool); - if (!msg) { - assert(false); - return; + local_thread = _get_thread(); + + msg = NULL; + if (local_thread != NULL) { + if (local_thread->msg_cache_count > 0) { + msg = SLIST_FIRST(&local_thread->msg_cache); + assert(msg != NULL); + SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); + local_thread->msg_cache_count--; + } + } + + if (msg == NULL) { + msg = spdk_mempool_get(g_spdk_msg_mempool); + if (!msg) { + assert(false); + return; + } } msg->fn = fn;