diff --git a/include/spdk/thread.h b/include/spdk/thread.h index 5c630f74c..5ad731d66 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -17,6 +17,7 @@ #include "spdk/stdinc.h" #include "spdk/cpuset.h" +#include "spdk/env.h" #ifdef __cplusplus extern "C" { @@ -950,6 +951,210 @@ void spdk_spin_unlock(struct spdk_spinlock *sspin); */ bool spdk_spin_held(struct spdk_spinlock *sspin); +struct spdk_iobuf_opts { + /** Maximum number of small buffers */ + uint64_t small_pool_count; + /** Maximum number of large buffers */ + uint64_t large_pool_count; + /** Size of a single small buffer */ + uint32_t small_bufsize; + /** Size of a single large buffer */ + uint32_t large_bufsize; +}; + +struct spdk_iobuf_entry; + +typedef void (*spdk_iobuf_get_cb)(struct spdk_iobuf_entry *entry, void *buf); + +/** iobuf queue entry */ +struct spdk_iobuf_entry { + spdk_iobuf_get_cb cb_fn; + const void *module; + STAILQ_ENTRY(spdk_iobuf_entry) stailq; +}; + +typedef STAILQ_HEAD(, spdk_iobuf_entry) spdk_iobuf_entry_stailq_t; + +struct spdk_iobuf_pool { + /** Buffer pool */ + struct spdk_mempool *pool; + /** Buffer wait queue */ + spdk_iobuf_entry_stailq_t *queue; + /** Buffer size */ + uint32_t bufsize; +}; + +/** iobuf channel */ +struct spdk_iobuf_channel { + /** Small buffer memory pool */ + struct spdk_iobuf_pool small; + /** Large buffer memory pool */ + struct spdk_iobuf_pool large; + /** Module pointer */ + const void *module; + /** Parent IO channel */ + struct spdk_io_channel *parent; +}; + +/** + * Initialize and allocate iobuf pools. + * + * \return 0 on success, negative errno otherwise. + */ +int spdk_iobuf_initialize(void); + +typedef void (*spdk_iobuf_finish_cb)(void *cb_arg); + +/** + * Clean up and free iobuf pools. + * + * \param cb_fn Callback to be executed once the clean up is completed. + * \param cb_arg Callback argument. + */ +void spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg); + +/** + * Set iobuf options. These options will be used during `spdk_iobuf_initialize()`. + * + * \param opts Options describing the size of the pools to reserve. + * + * \return 0 on success, negative errno otherwise. + */ +int spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts); + +/** + * Get iobuf options. + * + * \param opts Options to fill in. + */ +void spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts); + +/** + * Register a module as an iobuf pool user. Only registered users can request buffers from the + * iobuf pool. + * + * \name Name of the module. + * + * \return 0 on success, negative errno otherwise. + */ +int spdk_iobuf_register_module(const char *name); + +/** + * Initialize an iobuf channel. + * + * \param ch iobuf channel to initialize. + * \param name Name of the module registered via `spdk_iobuf_register_module()`. + * \param small_cache_size Number of small buffers to be cached by this channel. + * \param large_cache_size Number of large buffers to be cached by this channel. + * + * \return 0 on success, negative errno otherwise. + */ +int spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, + uint32_t small_cache_size, uint32_t large_cache_size); + +/** + * Release resources tied to an iobuf channel. + * + * \param ch iobuf channel. + */ +void spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch); + +typedef int (*spdk_iobuf_for_each_entry_fn)(struct spdk_iobuf_channel *ch, + struct spdk_iobuf_entry *entry, void *ctx); + +/** + * Iterate over all entries on a given queue and execute a callback on those that were requested + * using `ch`. The iteration is stopped if the callback returns non-zero status. + * + * \param ch iobuf channel to iterate over. + * \param pool Pool to iterate over (`small` or `large`). + * \param cb_fn Callback to execute on each entry on the queue that was requested using `ch`. + * \param cb_ctx Argument passed to `cb_fn`. + * + * \return status of the last callback. + */ +int spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, + spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx); + +/** + * Abort an outstanding request waiting for a buffer. + * + * \param ch iobuf channel on which the entry is waiting. + * \param entry Entry to remove from the wait queue. + * \param len Length of the requested buffer (must be the exact same value as specified in + * `spdk_iobuf_get()`. + */ +void spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, + uint64_t len); + +/** + * Get a buffer from the iobuf pool. If no buffers are available, the request is queued until a + * buffer is released. + * + * \param ch iobuf channel. + * \param len Length of the buffer to retrieve. The user is responsible for making sure the length + * doesn't exceed large_bufsize. + * \param entry Wait queue entry. + * \param cb_fn Callback to be executed once a buffer becomes available. If a buffer is available + * immediately, it is NOT be executed. + * + * \return pointer to a buffer or NULL if no buffers are currently available. + */ +static inline void * +spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, + struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) +{ + struct spdk_iobuf_pool *pool; + void *buf; + + assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); + if (len <= ch->small.bufsize) { + pool = &ch->small; + } else { + assert(len <= ch->large.bufsize); + pool = &ch->large; + } + + buf = spdk_mempool_get(pool->pool); + if (!buf) { + STAILQ_INSERT_TAIL(pool->queue, entry, stailq); + entry->module = ch->module; + entry->cb_fn = cb_fn; + } + + return buf; +} + +/** + * Release a buffer back to the iobuf pool. If there are outstanding requests waiting for a buffer, + * this buffer will be passed to one of them. + * + * \param ch iobuf channel. + * \param buf Buffer to release + * \param len Length of the buffer (must be the exact same value as specified in `spdk_iobuf_get()`). + */ +static inline void +spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) +{ + struct spdk_iobuf_entry *entry; + struct spdk_iobuf_pool *pool; + + assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); + if (len <= ch->small.bufsize) { + pool = &ch->small; + } else { + pool = &ch->large; + } + + if (STAILQ_EMPTY(pool->queue)) { + spdk_mempool_put(pool->pool, buf); + } else { + entry = STAILQ_FIRST(pool->queue); + STAILQ_REMOVE_HEAD(pool->queue, stailq); + entry->cb_fn(entry, buf); + } +} + #ifdef __cplusplus } #endif diff --git a/lib/thread/spdk_thread.map b/lib/thread/spdk_thread.map index a8b25bd21..040dbd7ec 100644 --- a/lib/thread/spdk_thread.map +++ b/lib/thread/spdk_thread.map @@ -62,6 +62,15 @@ spdk_spin_lock; spdk_spin_unlock; spdk_spin_held; + spdk_iobuf_initialize; + spdk_iobuf_finish; + spdk_iobuf_set_opts; + spdk_iobuf_get_opts; + spdk_iobuf_channel_init; + spdk_iobuf_channel_fini; + spdk_iobuf_register_module; + spdk_iobuf_for_each_entry; + spdk_iobuf_entry_abort; # internal functions in spdk_internal/thread.h spdk_poller_get_name; diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 1e2708dae..e5f76a821 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -7,6 +7,7 @@ #include "spdk/stdinc.h" #include "spdk/env.h" +#include "spdk/bdev.h" #include "spdk/likely.h" #include "spdk/queue.h" #include "spdk/string.h" @@ -32,6 +33,13 @@ #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 #define SPDK_MAX_POLLER_NAME_LEN 256 #define SPDK_MAX_THREAD_NAME_LEN 256 +#define IOBUF_MIN_SMALL_POOL_SIZE 8191 +#define IOBUF_MIN_LARGE_POOL_SIZE 1023 +#define IOBUF_ALIGNMENT 512 +#define IOBUF_MIN_SMALL_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \ + IOBUF_ALIGNMENT) +#define IOBUF_MIN_LARGE_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \ + IOBUF_ALIGNMENT) static struct spdk_thread *g_app_thread; @@ -230,6 +238,35 @@ struct io_device { bool unregistered; }; +struct iobuf_channel { + spdk_iobuf_entry_stailq_t small_queue; + spdk_iobuf_entry_stailq_t large_queue; +}; + +struct iobuf_module { + char *name; + TAILQ_ENTRY(iobuf_module) tailq; +}; + +struct iobuf { + struct spdk_mempool *small_pool; + struct spdk_mempool *large_pool; + struct spdk_iobuf_opts opts; + TAILQ_HEAD(, iobuf_module) modules; + spdk_iobuf_finish_cb finish_cb; + void *finish_arg; +}; + +static struct iobuf g_iobuf = { + .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), + .opts = { + .small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE, + .large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE, + .small_bufsize = IOBUF_MIN_SMALL_BUFSIZE, + .large_bufsize = IOBUF_MIN_LARGE_BUFSIZE, + }, +}; + static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); static int @@ -2879,4 +2916,263 @@ spdk_spin_held(struct spdk_spinlock *sspin) return sspin->thread == thread; } +static int +iobuf_channel_create_cb(void *io_device, void *ctx) +{ + struct iobuf_channel *ch = ctx; + + STAILQ_INIT(&ch->small_queue); + STAILQ_INIT(&ch->large_queue); + + return 0; +} + +static void +iobuf_channel_destroy_cb(void *io_device, void *ctx) +{ + struct iobuf_channel *ch __attribute__((unused)) = ctx; + + assert(STAILQ_EMPTY(&ch->small_queue)); + assert(STAILQ_EMPTY(&ch->large_queue)); +} + +int +spdk_iobuf_initialize(void) +{ + struct spdk_iobuf_opts *opts = &g_iobuf.opts; + int cache_size, rc = 0; + + /** + * Ensure no more than half of the total buffers end up local caches, by using + * spdk_env_get_core_count() to determine how many local caches we need to account for. + */ + cache_size = opts->small_pool_count / (2 * spdk_env_get_core_count()); + g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count, + opts->small_bufsize, cache_size, + SPDK_ENV_SOCKET_ID_ANY); + if (!g_iobuf.small_pool) { + SPDK_ERRLOG("Failed to create small iobuf pool\n"); + rc = -ENOMEM; + goto error; + } + + cache_size = opts->large_pool_count / (2 * spdk_env_get_core_count()); + g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count, + opts->large_bufsize, cache_size, + SPDK_ENV_SOCKET_ID_ANY); + if (!g_iobuf.large_pool) { + SPDK_ERRLOG("Failed to create large iobuf pool\n"); + rc = -ENOMEM; + goto error; + } + + spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, + sizeof(struct iobuf_channel), "iobuf"); + + return 0; +error: + spdk_mempool_free(g_iobuf.small_pool); + return rc; +} + +static void +iobuf_unregister_cb(void *io_device) +{ + struct iobuf_module *module; + + while (!TAILQ_EMPTY(&g_iobuf.modules)) { + module = TAILQ_FIRST(&g_iobuf.modules); + TAILQ_REMOVE(&g_iobuf.modules, module, tailq); + free(module->name); + free(module); + } + + if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { + SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", + spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); + } + + if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { + SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", + spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); + } + + spdk_mempool_free(g_iobuf.small_pool); + spdk_mempool_free(g_iobuf.large_pool); + + if (g_iobuf.finish_cb != NULL) { + g_iobuf.finish_cb(g_iobuf.finish_arg); + } +} + +void +spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) +{ + g_iobuf.finish_cb = cb_fn; + g_iobuf.finish_arg = cb_arg; + + spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); +} + +int +spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) +{ + if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { + SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", + IOBUF_MIN_SMALL_POOL_SIZE); + return -EINVAL; + } + if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { + SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", + IOBUF_MIN_LARGE_POOL_SIZE); + return -EINVAL; + } + if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { + SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n", + IOBUF_MIN_SMALL_BUFSIZE); + return -EINVAL; + } + if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { + SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n", + IOBUF_MIN_LARGE_BUFSIZE); + return -EINVAL; + } + + g_iobuf.opts = *opts; + + return 0; +} + +void +spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) +{ + *opts = g_iobuf.opts; +} + +int +spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, + uint32_t small_cache_size, uint32_t large_cache_size) +{ + struct spdk_io_channel *ioch; + struct iobuf_channel *iobuf_ch; + struct iobuf_module *module; + + if (small_cache_size != 0 || large_cache_size != 0) { + SPDK_ERRLOG("iobuf cache is currently unsupported\n"); + return -EINVAL; + } + + TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { + if (strcmp(name, module->name) == 0) { + break; + } + } + + if (module == NULL) { + SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); + return -ENODEV; + } + + ioch = spdk_get_io_channel(&g_iobuf); + if (ioch == NULL) { + SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); + return -ENOMEM; + } + + iobuf_ch = spdk_io_channel_get_ctx(ioch); + + ch->small.queue = &iobuf_ch->small_queue; + ch->large.queue = &iobuf_ch->large_queue; + ch->small.pool = g_iobuf.small_pool; + ch->large.pool = g_iobuf.large_pool; + ch->small.bufsize = g_iobuf.opts.small_bufsize; + ch->large.bufsize = g_iobuf.opts.large_bufsize; + ch->parent = ioch; + ch->module = module; + + return 0; +} + +void +spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) +{ + struct spdk_iobuf_entry *entry __attribute__((unused)); + + /* Make sure none of the wait queue entries are coming from this module */ + STAILQ_FOREACH(entry, ch->small.queue, stailq) { + assert(entry->module != ch->module); + } + STAILQ_FOREACH(entry, ch->large.queue, stailq) { + assert(entry->module != ch->module); + } + + spdk_put_io_channel(ch->parent); + ch->parent = NULL; +} + +int +spdk_iobuf_register_module(const char *name) +{ + struct iobuf_module *module; + + TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { + if (strcmp(name, module->name) == 0) { + return -EEXIST; + } + } + + module = calloc(1, sizeof(*module)); + if (module == NULL) { + return -ENOMEM; + } + + module->name = strdup(name); + if (module->name == NULL) { + free(module); + return -ENOMEM; + } + + TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); + + return 0; +} + +int +spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, + spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) +{ + struct spdk_iobuf_entry *entry, *tmp; + int rc; + + STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { + /* We only want to iterate over the entries requested by the module which owns ch */ + if (entry->module != ch->module) { + continue; + } + + rc = cb_fn(ch, entry, cb_ctx); + if (rc != 0) { + return rc; + } + } + + return 0; +} + +void +spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, + uint64_t len) +{ + struct spdk_iobuf_pool *pool; + + if (len <= ch->small.bufsize) { + pool = &ch->small; + } else { + assert(len <= ch->large.bufsize); + pool = &ch->large; + } + + STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); +} + SPDK_LOG_REGISTER_COMPONENT(thread) diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index e3ed4fa0f..f33446f24 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -1913,6 +1913,459 @@ spdk_spin(void) g_spin_abort_fn = __posix_abort; } +struct ut_iobuf_entry { + struct spdk_iobuf_channel *ioch; + struct spdk_iobuf_entry iobuf; + void *buf; + uint32_t thread_id; + const char *module; +}; + +static void +ut_iobuf_finish_cb(void *ctx) +{ + *(int *)ctx = 1; +} + +static void +ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf) +{ + struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf); + + ut_entry->buf = buf; +} + +static int +ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg) +{ + struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf); + + ut_entry->buf = cb_arg; + + return 0; +} + +static void +iobuf(void) +{ + struct spdk_iobuf_opts opts = { + .small_pool_count = 2, + .large_pool_count = 2, + .small_bufsize = 2, + .large_bufsize = 4, + }; + struct ut_iobuf_entry *entry; + struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2]; + struct ut_iobuf_entry mod0_entries[] = { + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 0, .module = "ut_module0", }, + { .thread_id = 1, .module = "ut_module0", }, + { .thread_id = 1, .module = "ut_module0", }, + { .thread_id = 1, .module = "ut_module0", }, + { .thread_id = 1, .module = "ut_module0", }, + }; + struct ut_iobuf_entry mod1_entries[] = { + { .thread_id = 0, .module = "ut_module1", }, + { .thread_id = 0, .module = "ut_module1", }, + { .thread_id = 0, .module = "ut_module1", }, + { .thread_id = 0, .module = "ut_module1", }, + { .thread_id = 1, .module = "ut_module1", }, + { .thread_id = 1, .module = "ut_module1", }, + { .thread_id = 1, .module = "ut_module1", }, + { .thread_id = 1, .module = "ut_module1", }, + }; + int rc, finish = 0; + uint32_t i; + + allocate_cores(2); + allocate_threads(2); + + set_thread(0); + + /* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */ + g_iobuf.opts = opts; + rc = spdk_iobuf_initialize(); + CU_ASSERT_EQUAL(rc, 0); + + rc = spdk_iobuf_register_module("ut_module0"); + CU_ASSERT_EQUAL(rc, 0); + + rc = spdk_iobuf_register_module("ut_module1"); + CU_ASSERT_EQUAL(rc, 0); + + set_thread(0); + rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0); + CU_ASSERT_EQUAL(rc, 0); + set_thread(1); + rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0); + CU_ASSERT_EQUAL(rc, 0); + for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) { + mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id]; + } + set_thread(0); + rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0); + CU_ASSERT_EQUAL(rc, 0); + set_thread(1); + rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0); + CU_ASSERT_EQUAL(rc, 0); + for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) { + mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id]; + } + + /* First check that it's possible to retrieve the whole pools from a single module */ + set_thread(0); + entry = &mod0_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[1]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + /* The next two should be put onto the large buf wait queue */ + entry = &mod0_entries[2]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[3]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + /* Pick the two next buffers from the small pool */ + set_thread(1); + entry = &mod0_entries[4]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[5]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + /* The next two should be put onto the small buf wait queue */ + entry = &mod0_entries[6]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[7]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + + /* Now return one of the large buffers to the pool and verify that the first request's + * (entry 2) callback was executed and it was removed from the wait queue. + */ + set_thread(0); + entry = &mod0_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod0_entries[2]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[3]; + CU_ASSERT_PTR_NULL(entry->buf); + + /* Return the second buffer and check that the other request is satisfied */ + entry = &mod0_entries[1]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod0_entries[3]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Return the remaining two buffers */ + entry = &mod0_entries[2]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod0_entries[3]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + + /* Check that it didn't change the requests waiting for the small buffers */ + entry = &mod0_entries[6]; + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[7]; + CU_ASSERT_PTR_NULL(entry->buf); + + /* Do the same test as above, this time using the small pool */ + set_thread(1); + entry = &mod0_entries[4]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod0_entries[6]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[7]; + CU_ASSERT_PTR_NULL(entry->buf); + + /* Return the second buffer and check that the other request is satisfied */ + entry = &mod0_entries[5]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod0_entries[7]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Return the remaining two buffers */ + entry = &mod0_entries[6]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod0_entries[7]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + + /* Now check requesting buffers from different modules - first request all of them from one + * module, starting from the large pool + */ + set_thread(0); + entry = &mod0_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[1]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + /* Request all of them from the small one */ + set_thread(1); + entry = &mod0_entries[4]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[5]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Request one buffer per module from each pool */ + set_thread(0); + entry = &mod1_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[3]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + /* Change the order from the small pool and request a buffer from mod0 first */ + set_thread(1); + entry = &mod0_entries[6]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[4]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + + /* Now return one buffer to the large pool */ + set_thread(0); + entry = &mod0_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + + /* Make sure the request from mod1 got the buffer, as it was the first to request it */ + entry = &mod1_entries[0]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[3]; + CU_ASSERT_PTR_NULL(entry->buf); + + /* Return second buffer to the large pool and check the outstanding mod0 request */ + entry = &mod0_entries[1]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod0_entries[3]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Return the remaining two buffers */ + entry = &mod1_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod0_entries[3]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + + /* Check the same for the small pool, but this time the order of the request is reversed + * (mod0 before mod1) + */ + set_thread(1); + entry = &mod0_entries[4]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod0_entries[6]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + /* mod1 request was second in this case, so it still needs to wait */ + entry = &mod1_entries[4]; + CU_ASSERT_PTR_NULL(entry->buf); + + /* Return the second requested buffer */ + entry = &mod0_entries[5]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod1_entries[4]; + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Return the remaining two buffers */ + entry = &mod0_entries[6]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod1_entries[4]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + + /* Request buffers to make the pools empty */ + set_thread(0); + entry = &mod0_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod1_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[1]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod1_entries[1]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Queue more requests from both modules */ + entry = &mod0_entries[2]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[2]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[3]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[3]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + + /* Check that abort correctly remove an entry from the queue */ + entry = &mod0_entries[2]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 4); + entry = &mod1_entries[3]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 2); + + entry = &mod0_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf); + entry = &mod0_entries[1]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf); + + /* Clean up */ + entry = &mod1_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod1_entries[2]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod1_entries[1]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod0_entries[3]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + + /* Request buffers to make the pools empty */ + set_thread(0); + entry = &mod0_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod1_entries[0]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod0_entries[1]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + entry = &mod1_entries[1]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NOT_NULL(entry->buf); + + /* Request a buffer from each queue and each module on thread 0 */ + set_thread(0); + entry = &mod0_entries[2]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[2]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[3]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[3]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + + /* Do the same on thread 1 */ + set_thread(1); + entry = &mod0_entries[6]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[6]; + entry->buf = spdk_iobuf_get(entry->ioch, 4, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod0_entries[7]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + entry = &mod1_entries[7]; + entry->buf = spdk_iobuf_get(entry->ioch, 2, &entry->iobuf, ut_iobuf_get_buf_cb); + CU_ASSERT_PTR_NULL(entry->buf); + + /* Now do the foreach and check that correct entries are iterated over by assigning their + * ->buf pointers to different values. + */ + set_thread(0); + rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].large, + ut_iobuf_foreach_cb, (void *)0xdeadbeef); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].small, + ut_iobuf_foreach_cb, (void *)0xbeefdead); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].large, + ut_iobuf_foreach_cb, (void *)0xfeedbeef); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].small, + ut_iobuf_foreach_cb, (void *)0xbeeffeed); + CU_ASSERT_EQUAL(rc, 0); + set_thread(1); + rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].large, + ut_iobuf_foreach_cb, (void *)0xcafebabe); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].small, + ut_iobuf_foreach_cb, (void *)0xbabecafe); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].large, + ut_iobuf_foreach_cb, (void *)0xbeefcafe); + CU_ASSERT_EQUAL(rc, 0); + rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].small, + ut_iobuf_foreach_cb, (void *)0xcafebeef); + CU_ASSERT_EQUAL(rc, 0); + + /* thread 0 */ + CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef); + CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xbeefdead); + CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef); + CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xbeeffeed); + /* thread 1 */ + CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe); + CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xbabecafe); + CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe); + CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xcafebeef); + + /* Clean everything up */ + set_thread(0); + entry = &mod0_entries[2]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 4); + entry = &mod0_entries[3]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 2); + entry = &mod1_entries[2]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 4); + entry = &mod1_entries[3]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 2); + + entry = &mod0_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod1_entries[0]; + spdk_iobuf_put(entry->ioch, entry->buf, 4); + entry = &mod0_entries[1]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + entry = &mod1_entries[1]; + spdk_iobuf_put(entry->ioch, entry->buf, 2); + + set_thread(1); + entry = &mod0_entries[6]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 4); + entry = &mod0_entries[7]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 2); + entry = &mod1_entries[6]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 4); + entry = &mod1_entries[7]; + spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, 2); + + set_thread(0); + spdk_iobuf_channel_fini(&mod0_ch[0]); + poll_threads(); + spdk_iobuf_channel_fini(&mod1_ch[0]); + poll_threads(); + set_thread(1); + spdk_iobuf_channel_fini(&mod0_ch[1]); + poll_threads(); + spdk_iobuf_channel_fini(&mod1_ch[1]); + poll_threads(); + + spdk_iobuf_finish(ut_iobuf_finish_cb, &finish); + poll_threads(); + + CU_ASSERT_EQUAL(finish, 1); + + free_threads(); + free_cores(); +} + int main(int argc, char **argv) { @@ -1942,6 +2395,7 @@ main(int argc, char **argv) CU_ADD_TEST(suite, multi_timed_pollers_have_same_expiration); CU_ADD_TEST(suite, io_device_lookup); CU_ADD_TEST(suite, spdk_spin); + CU_ADD_TEST(suite, iobuf); CU_basic_set_mode(CU_BRM_VERBOSE); CU_basic_run_tests();