From 36df38c059e5fe0b8875b456941db5edd2a7490b Mon Sep 17 00:00:00 2001 From: Konrad Sztyber Date: Wed, 23 Nov 2022 17:19:44 +0100 Subject: [PATCH] thread: cache a number of iobuf buffers on each channel Users can now specify a number of small/large buffers to be cached on each iobuf channel. Previously, we relied on the cache of the underlying spdk_mempool, which has per-core caches. However, since iobuf channels are tied to a module and an SPDK thread, each module and each thread is now guaranteed to have a number of buffers available, so it won't be starved by other modules/threads. Signed-off-by: Konrad Sztyber Change-Id: I1e29fe29f78a13de371ab21d3e40bf55fbc9c639 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15634 Reviewed-by: Aleksey Marchuk Reviewed-by: Shuhei Matsumoto Reviewed-by: Jim Harris Reviewed-by: Ben Walker Tested-by: SPDK CI Jenkins Community-CI: Mellanox Build Bot --- include/spdk/thread.h | 51 +++++-- lib/thread/thread.c | 69 ++++++++-- test/unit/lib/thread/thread.c/thread_ut.c | 159 ++++++++++++++++++++++ 3 files changed, 257 insertions(+), 22 deletions(-) diff --git a/include/spdk/thread.h b/include/spdk/thread.h index 5ad731d66..17c0f73cf 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -16,8 +16,10 @@ #endif #include "spdk/stdinc.h" +#include "spdk/assert.h" #include "spdk/cpuset.h" #include "spdk/env.h" +#include "spdk/util.h" #ifdef __cplusplus extern "C" { @@ -973,11 +975,27 @@ struct spdk_iobuf_entry { STAILQ_ENTRY(spdk_iobuf_entry) stailq; }; +#define SPDK_IOBUF_DATA_OFFSET SPDK_CACHE_LINE_SIZE + +struct spdk_iobuf_buffer { + STAILQ_ENTRY(spdk_iobuf_buffer) stailq; +}; + +SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= SPDK_IOBUF_DATA_OFFSET, + "Invalid data offset"); + typedef STAILQ_HEAD(, spdk_iobuf_entry) spdk_iobuf_entry_stailq_t; +typedef STAILQ_HEAD(, spdk_iobuf_buffer) spdk_iobuf_buffer_stailq_t; struct spdk_iobuf_pool { /** Buffer pool */ struct spdk_mempool *pool; + /** Buffer cache */ + spdk_iobuf_buffer_stailq_t cache; + /** Number of elements in the cache */ + uint32_t cache_count; + /** Size of the cache */ + uint32_t cache_size; /** Buffer wait queue */ spdk_iobuf_entry_stailq_t *queue; /** Buffer size */ @@ -1105,7 +1123,7 @@ spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) { struct spdk_iobuf_pool *pool; - void *buf; + struct spdk_iobuf_buffer *buf; assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); if (len <= ch->small.bufsize) { @@ -1115,14 +1133,23 @@ spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, pool = &ch->large; } - buf = spdk_mempool_get(pool->pool); - if (!buf) { - STAILQ_INSERT_TAIL(pool->queue, entry, stailq); - entry->module = ch->module; - entry->cb_fn = cb_fn; + buf = STAILQ_FIRST(&pool->cache); + if (buf) { + STAILQ_REMOVE_HEAD(&pool->cache, stailq); + assert(pool->cache_count > 0); + pool->cache_count--; + } else { + buf = (struct spdk_iobuf_buffer *)spdk_mempool_get(pool->pool); + if (!buf) { + STAILQ_INSERT_TAIL(pool->queue, entry, stailq); + entry->module = ch->module; + entry->cb_fn = cb_fn; + + return NULL; + } } - return buf; + return (char *)buf + SPDK_IOBUF_DATA_OFFSET; } /** @@ -1137,6 +1164,7 @@ static inline void spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) { struct spdk_iobuf_entry *entry; + struct spdk_iobuf_buffer *iobuf_buf; struct spdk_iobuf_pool *pool; assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); @@ -1147,7 +1175,14 @@ spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) } if (STAILQ_EMPTY(pool->queue)) { - spdk_mempool_put(pool->pool, buf); + iobuf_buf = (struct spdk_iobuf_buffer *)((char *)buf - SPDK_IOBUF_DATA_OFFSET); + + if (pool->cache_count < pool->cache_size) { + STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); + pool->cache_count++; + } else { + spdk_mempool_put(pool->pool, iobuf_buf); + } } else { entry = STAILQ_FIRST(pool->queue); STAILQ_REMOVE_HEAD(pool->queue, stailq); diff --git a/lib/thread/thread.c b/lib/thread/thread.c index e5f76a821..99bffcc77 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -2940,15 +2940,10 @@ int spdk_iobuf_initialize(void) { struct spdk_iobuf_opts *opts = &g_iobuf.opts; - int cache_size, rc = 0; + int rc = 0; - /** - * Ensure no more than half of the total buffers end up local caches, by using - * spdk_env_get_core_count() to determine how many local caches we need to account for. - */ - cache_size = opts->small_pool_count / (2 * spdk_env_get_core_count()); g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count, - opts->small_bufsize, cache_size, + opts->small_bufsize + SPDK_IOBUF_DATA_OFFSET, 0, SPDK_ENV_SOCKET_ID_ANY); if (!g_iobuf.small_pool) { SPDK_ERRLOG("Failed to create small iobuf pool\n"); @@ -2956,9 +2951,8 @@ spdk_iobuf_initialize(void) goto error; } - cache_size = opts->large_pool_count / (2 * spdk_env_get_core_count()); g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count, - opts->large_bufsize, cache_size, + opts->large_bufsize + SPDK_IOBUF_DATA_OFFSET, 0, SPDK_ENV_SOCKET_ID_ANY); if (!g_iobuf.large_pool) { SPDK_ERRLOG("Failed to create large iobuf pool\n"); @@ -3056,11 +3050,8 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, struct spdk_io_channel *ioch; struct iobuf_channel *iobuf_ch; struct iobuf_module *module; - - if (small_cache_size != 0 || large_cache_size != 0) { - SPDK_ERRLOG("iobuf cache is currently unsupported\n"); - return -EINVAL; - } + struct spdk_iobuf_buffer *buf; + uint32_t i; TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { if (strcmp(name, module->name) == 0) { @@ -3089,14 +3080,47 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, ch->large.bufsize = g_iobuf.opts.large_bufsize; ch->parent = ioch; ch->module = module; + ch->small.cache_size = small_cache_size; + ch->large.cache_size = large_cache_size; + ch->small.cache_count = 0; + ch->large.cache_count = 0; + + STAILQ_INIT(&ch->small.cache); + STAILQ_INIT(&ch->large.cache); + + for (i = 0; i < small_cache_size; ++i) { + buf = spdk_mempool_get(g_iobuf.small_pool); + if (buf == NULL) { + SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " + "You may need to increase spdk_iobuf_opts.small_pool_count\n"); + goto error; + } + STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); + ch->small.cache_count++; + } + for (i = 0; i < large_cache_size; ++i) { + buf = spdk_mempool_get(g_iobuf.large_pool); + if (buf == NULL) { + SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " + "You may need to increase spdk_iobuf_opts.large_pool_count\n"); + goto error; + } + STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); + ch->large.cache_count++; + } return 0; +error: + spdk_iobuf_channel_fini(ch); + + return -ENOMEM; } void spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) { struct spdk_iobuf_entry *entry __attribute__((unused)); + struct spdk_iobuf_buffer *buf; /* Make sure none of the wait queue entries are coming from this module */ STAILQ_FOREACH(entry, ch->small.queue, stailq) { @@ -3106,6 +3130,23 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) assert(entry->module != ch->module); } + /* Release cached buffers back to the pool */ + while (!STAILQ_EMPTY(&ch->small.cache)) { + buf = STAILQ_FIRST(&ch->small.cache); + STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); + spdk_mempool_put(ch->small.pool, buf); + ch->small.cache_count--; + } + while (!STAILQ_EMPTY(&ch->large.cache)) { + buf = STAILQ_FIRST(&ch->large.cache); + STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); + spdk_mempool_put(ch->large.pool, buf); + ch->large.cache_count--; + } + + assert(ch->small.cache_count == 0); + assert(ch->large.cache_count == 0); + spdk_put_io_channel(ch->parent); ch->parent = NULL; } diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index f33446f24..c2e58aaab 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -2366,6 +2366,164 @@ iobuf(void) free_cores(); } +static void +iobuf_cache(void) +{ + struct spdk_iobuf_opts opts = { + .small_pool_count = 4, + .large_pool_count = 4, + .small_bufsize = 2, + .large_bufsize = 4, + }; + struct spdk_iobuf_channel iobuf_ch[2]; + struct ut_iobuf_entry *entry; + struct ut_iobuf_entry mod0_entries[] = { + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 0, .module = "ut_module0", }, + }; + struct ut_iobuf_entry mod1_entries[] = { + { .thread_id = 0, .module = "ut_module1", }, + { .thread_id = 0, .module = "ut_module1", }, + }; + int rc, finish = 0; + uint32_t i, j, bufsize; + + allocate_cores(1); + allocate_threads(1); + + set_thread(0); + + /* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */ + g_iobuf.opts = opts; + rc = spdk_iobuf_initialize(); + CU_ASSERT_EQUAL(rc, 0); + + rc = spdk_iobuf_register_module("ut_module0"); + CU_ASSERT_EQUAL(rc, 0); + + rc = spdk_iobuf_register_module("ut_module1"); + CU_ASSERT_EQUAL(rc, 0); + + /* First check that channel initialization fails when it's not possible to fill in the cache + * from the pool. + */ + rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1); + CU_ASSERT_EQUAL(rc, -ENOMEM); + rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5); + CU_ASSERT_EQUAL(rc, -ENOMEM); + + rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4); + CU_ASSERT_EQUAL(rc, -ENOMEM); + + spdk_iobuf_channel_fini(&iobuf_ch[0]); + poll_threads(); + + /* Initialize one channel with cache, acquire buffers, and check that a second one can be + * created once the buffers acquired from the first one are returned to the pool + */ + rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2); + CU_ASSERT_EQUAL(rc, 0); + + for (i = 0; i < 3; ++i) { + mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], 4, &mod0_entries[i].iobuf, + ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf); + } + + /* It should be able to create a channel with a single entry in the cache */ + rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 1); + CU_ASSERT_EQUAL(rc, 0); + spdk_iobuf_channel_fini(&iobuf_ch[1]); + poll_threads(); + + /* But not with two entries */ + rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2); + CU_ASSERT_EQUAL(rc, -ENOMEM); + + for (i = 0; i < 2; ++i) { + spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, 4); + rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2); + CU_ASSERT_EQUAL(rc, -ENOMEM); + } + + spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, 4); + + /* The last buffer should be released back to the pool, so we should be able to create a new + * channel + */ + rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2); + CU_ASSERT_EQUAL(rc, 0); + + spdk_iobuf_channel_fini(&iobuf_ch[0]); + spdk_iobuf_channel_fini(&iobuf_ch[1]); + poll_threads(); + + /* Check that the pool is only used when the cache is empty and that the cache guarantees a + * certain set of buffers + */ + rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1); + CU_ASSERT_EQUAL(rc, 0); + + uint32_t buffer_sizes[] = { 2, 4 }; + for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) { + bufsize = buffer_sizes[i]; + + for (j = 0; j < 3; ++j) { + entry = &mod0_entries[j]; + entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf, + ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + } + + mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf, + ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf); + + /* The whole pool is exhausted now */ + mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf, + ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(mod1_entries[1].buf); + mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf, + ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(mod0_entries[3].buf); + + /* If there are outstanding requests waiting for a buffer, they should have priority + * over filling in the cache, even if they're from different modules. + */ + spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize); + /* Also make sure the queue is FIFO and doesn't care about which module requested + * and which module released the buffer. + */ + CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf); + CU_ASSERT_PTR_NULL(mod0_entries[3].buf); + + /* Return the buffers back */ + spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize); + for (j = 0; j < 2; ++j) { + spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize); + spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize); + } + } + + spdk_iobuf_channel_fini(&iobuf_ch[0]); + spdk_iobuf_channel_fini(&iobuf_ch[1]); + poll_threads(); + + spdk_iobuf_finish(ut_iobuf_finish_cb, &finish); + poll_threads(); + + CU_ASSERT_EQUAL(finish, 1); + + free_threads(); + free_cores(); +} + int main(int argc, char **argv) { @@ -2396,6 +2554,7 @@ main(int argc, char **argv) CU_ADD_TEST(suite, io_device_lookup); CU_ADD_TEST(suite, spdk_spin); CU_ADD_TEST(suite, iobuf); + CU_ADD_TEST(suite, iobuf_cache); CU_basic_set_mode(CU_BRM_VERBOSE); CU_basic_run_tests();