thread: Move iobuf code to a separate compilation unit.

This makes it much easier to mock this code in unit tests without having
to mock up the entire thread library.

Change-Id: Ic3d9cb826ae71af780a06f88669c37cef2c9a4ae
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16173
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: Aleksey Marchuk <alexeymar@nvidia.com>
This commit is contained in:
Ben Walker 2023-01-17 07:50:36 +00:00 committed by Jim Harris
parent 297182a083
commit a9bcb7f261
8 changed files with 1009 additions and 953 deletions

View File

@ -9,7 +9,7 @@ include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
SO_VER := 8 SO_VER := 8
SO_MINOR := 0 SO_MINOR := 0
C_SRCS = thread.c C_SRCS = thread.c iobuf.c
LIBNAME = thread LIBNAME = thread
SPDK_MAP_FILE = $(abspath $(CURDIR)/spdk_thread.map) SPDK_MAP_FILE = $(abspath $(CURDIR)/spdk_thread.map)

349
lib/thread/iobuf.c Normal file
View File

@ -0,0 +1,349 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (C) 2023 Intel Corporation.
* All rights reserved.
*/
#include "spdk/env.h"
#include "spdk/util.h"
#include "spdk/likely.h"
#include "spdk/log.h"
#include "spdk/thread.h"
#include "spdk/bdev.h"
#define IOBUF_MIN_SMALL_POOL_SIZE 8191
#define IOBUF_MIN_LARGE_POOL_SIZE 1023
#define IOBUF_ALIGNMENT 512
#define IOBUF_MIN_SMALL_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \
IOBUF_ALIGNMENT)
#define IOBUF_MIN_LARGE_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \
IOBUF_ALIGNMENT)
SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
"Invalid data offset");
struct iobuf_channel {
spdk_iobuf_entry_stailq_t small_queue;
spdk_iobuf_entry_stailq_t large_queue;
};
struct iobuf_module {
char *name;
TAILQ_ENTRY(iobuf_module) tailq;
};
struct iobuf {
struct spdk_mempool *small_pool;
struct spdk_mempool *large_pool;
struct spdk_iobuf_opts opts;
TAILQ_HEAD(, iobuf_module) modules;
spdk_iobuf_finish_cb finish_cb;
void *finish_arg;
};
static struct iobuf g_iobuf = {
.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
.opts = {
.small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE,
.large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE,
.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE,
.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE,
},
};
static int
iobuf_channel_create_cb(void *io_device, void *ctx)
{
struct iobuf_channel *ch = ctx;
STAILQ_INIT(&ch->small_queue);
STAILQ_INIT(&ch->large_queue);
return 0;
}
static void
iobuf_channel_destroy_cb(void *io_device, void *ctx)
{
struct iobuf_channel *ch __attribute__((unused)) = ctx;
assert(STAILQ_EMPTY(&ch->small_queue));
assert(STAILQ_EMPTY(&ch->large_queue));
}
int
spdk_iobuf_initialize(void)
{
struct spdk_iobuf_opts *opts = &g_iobuf.opts;
int rc = 0;
g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
opts->small_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
if (!g_iobuf.small_pool) {
SPDK_ERRLOG("Failed to create small iobuf pool\n");
rc = -ENOMEM;
goto error;
}
g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
opts->large_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
if (!g_iobuf.large_pool) {
SPDK_ERRLOG("Failed to create large iobuf pool\n");
rc = -ENOMEM;
goto error;
}
spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
sizeof(struct iobuf_channel), "iobuf");
return 0;
error:
spdk_mempool_free(g_iobuf.small_pool);
return rc;
}
static void
iobuf_unregister_cb(void *io_device)
{
struct iobuf_module *module;
while (!TAILQ_EMPTY(&g_iobuf.modules)) {
module = TAILQ_FIRST(&g_iobuf.modules);
TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
free(module->name);
free(module);
}
if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
}
if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
}
spdk_mempool_free(g_iobuf.small_pool);
spdk_mempool_free(g_iobuf.large_pool);
if (g_iobuf.finish_cb != NULL) {
g_iobuf.finish_cb(g_iobuf.finish_arg);
}
}
void
spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
{
g_iobuf.finish_cb = cb_fn;
g_iobuf.finish_arg = cb_arg;
spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
}
int
spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
{
if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
IOBUF_MIN_SMALL_POOL_SIZE);
return -EINVAL;
}
if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
IOBUF_MIN_LARGE_POOL_SIZE);
return -EINVAL;
}
if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
IOBUF_MIN_SMALL_BUFSIZE);
return -EINVAL;
}
if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
IOBUF_MIN_LARGE_BUFSIZE);
return -EINVAL;
}
g_iobuf.opts = *opts;
return 0;
}
void
spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
{
*opts = g_iobuf.opts;
}
int
spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
uint32_t small_cache_size, uint32_t large_cache_size)
{
struct spdk_io_channel *ioch;
struct iobuf_channel *iobuf_ch;
struct iobuf_module *module;
struct spdk_iobuf_buffer *buf;
uint32_t i;
TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
if (strcmp(name, module->name) == 0) {
break;
}
}
if (module == NULL) {
SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
return -ENODEV;
}
ioch = spdk_get_io_channel(&g_iobuf);
if (ioch == NULL) {
SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
return -ENOMEM;
}
iobuf_ch = spdk_io_channel_get_ctx(ioch);
ch->small.queue = &iobuf_ch->small_queue;
ch->large.queue = &iobuf_ch->large_queue;
ch->small.pool = g_iobuf.small_pool;
ch->large.pool = g_iobuf.large_pool;
ch->small.bufsize = g_iobuf.opts.small_bufsize;
ch->large.bufsize = g_iobuf.opts.large_bufsize;
ch->parent = ioch;
ch->module = module;
ch->small.cache_size = small_cache_size;
ch->large.cache_size = large_cache_size;
ch->small.cache_count = 0;
ch->large.cache_count = 0;
STAILQ_INIT(&ch->small.cache);
STAILQ_INIT(&ch->large.cache);
for (i = 0; i < small_cache_size; ++i) {
buf = spdk_mempool_get(g_iobuf.small_pool);
if (buf == NULL) {
SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
"You may need to increase spdk_iobuf_opts.small_pool_count\n");
goto error;
}
STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
ch->small.cache_count++;
}
for (i = 0; i < large_cache_size; ++i) {
buf = spdk_mempool_get(g_iobuf.large_pool);
if (buf == NULL) {
SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
"You may need to increase spdk_iobuf_opts.large_pool_count\n");
goto error;
}
STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
ch->large.cache_count++;
}
return 0;
error:
spdk_iobuf_channel_fini(ch);
return -ENOMEM;
}
void
spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
{
struct spdk_iobuf_entry *entry __attribute__((unused));
struct spdk_iobuf_buffer *buf;
/* Make sure none of the wait queue entries are coming from this module */
STAILQ_FOREACH(entry, ch->small.queue, stailq) {
assert(entry->module != ch->module);
}
STAILQ_FOREACH(entry, ch->large.queue, stailq) {
assert(entry->module != ch->module);
}
/* Release cached buffers back to the pool */
while (!STAILQ_EMPTY(&ch->small.cache)) {
buf = STAILQ_FIRST(&ch->small.cache);
STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
spdk_mempool_put(ch->small.pool, buf);
ch->small.cache_count--;
}
while (!STAILQ_EMPTY(&ch->large.cache)) {
buf = STAILQ_FIRST(&ch->large.cache);
STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
spdk_mempool_put(ch->large.pool, buf);
ch->large.cache_count--;
}
assert(ch->small.cache_count == 0);
assert(ch->large.cache_count == 0);
spdk_put_io_channel(ch->parent);
ch->parent = NULL;
}
int
spdk_iobuf_register_module(const char *name)
{
struct iobuf_module *module;
TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
if (strcmp(name, module->name) == 0) {
return -EEXIST;
}
}
module = calloc(1, sizeof(*module));
if (module == NULL) {
return -ENOMEM;
}
module->name = strdup(name);
if (module->name == NULL) {
free(module);
return -ENOMEM;
}
TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
return 0;
}
int
spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
{
struct spdk_iobuf_entry *entry, *tmp;
int rc;
STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
/* We only want to iterate over the entries requested by the module which owns ch */
if (entry->module != ch->module) {
continue;
}
rc = cb_fn(ch, entry, cb_ctx);
if (rc != 0) {
return rc;
}
}
return 0;
}
void
spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
uint64_t len)
{
struct spdk_iobuf_pool *pool;
if (len <= ch->small.bufsize) {
pool = &ch->small;
} else {
assert(len <= ch->large.bufsize);
pool = &ch->large;
}
STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
}

View File

@ -7,7 +7,6 @@
#include "spdk/stdinc.h" #include "spdk/stdinc.h"
#include "spdk/env.h" #include "spdk/env.h"
#include "spdk/bdev.h"
#include "spdk/likely.h" #include "spdk/likely.h"
#include "spdk/queue.h" #include "spdk/queue.h"
#include "spdk/string.h" #include "spdk/string.h"
@ -33,13 +32,6 @@
#define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5
#define SPDK_MAX_POLLER_NAME_LEN 256 #define SPDK_MAX_POLLER_NAME_LEN 256
#define SPDK_MAX_THREAD_NAME_LEN 256 #define SPDK_MAX_THREAD_NAME_LEN 256
#define IOBUF_MIN_SMALL_POOL_SIZE 8191
#define IOBUF_MIN_LARGE_POOL_SIZE 1023
#define IOBUF_ALIGNMENT 512
#define IOBUF_MIN_SMALL_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \
IOBUF_ALIGNMENT)
#define IOBUF_MIN_LARGE_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \
IOBUF_ALIGNMENT)
static struct spdk_thread *g_app_thread; static struct spdk_thread *g_app_thread;
@ -245,35 +237,6 @@ struct io_device {
bool unregistered; bool unregistered;
}; };
struct iobuf_channel {
spdk_iobuf_entry_stailq_t small_queue;
spdk_iobuf_entry_stailq_t large_queue;
};
struct iobuf_module {
char *name;
TAILQ_ENTRY(iobuf_module) tailq;
};
struct iobuf {
struct spdk_mempool *small_pool;
struct spdk_mempool *large_pool;
struct spdk_iobuf_opts opts;
TAILQ_HEAD(, iobuf_module) modules;
spdk_iobuf_finish_cb finish_cb;
void *finish_arg;
};
static struct iobuf g_iobuf = {
.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
.opts = {
.small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE,
.large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE,
.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE,
.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE,
},
};
static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices); static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);
static int static int
@ -2977,302 +2940,4 @@ spdk_spin_held(struct spdk_spinlock *sspin)
return sspin->thread == thread; return sspin->thread == thread;
} }
static int
iobuf_channel_create_cb(void *io_device, void *ctx)
{
struct iobuf_channel *ch = ctx;
STAILQ_INIT(&ch->small_queue);
STAILQ_INIT(&ch->large_queue);
return 0;
}
static void
iobuf_channel_destroy_cb(void *io_device, void *ctx)
{
struct iobuf_channel *ch __attribute__((unused)) = ctx;
assert(STAILQ_EMPTY(&ch->small_queue));
assert(STAILQ_EMPTY(&ch->large_queue));
}
int
spdk_iobuf_initialize(void)
{
struct spdk_iobuf_opts *opts = &g_iobuf.opts;
int rc = 0;
g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
opts->small_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
if (!g_iobuf.small_pool) {
SPDK_ERRLOG("Failed to create small iobuf pool\n");
rc = -ENOMEM;
goto error;
}
g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
opts->large_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
if (!g_iobuf.large_pool) {
SPDK_ERRLOG("Failed to create large iobuf pool\n");
rc = -ENOMEM;
goto error;
}
spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
sizeof(struct iobuf_channel), "iobuf");
return 0;
error:
spdk_mempool_free(g_iobuf.small_pool);
return rc;
}
static void
iobuf_unregister_cb(void *io_device)
{
struct iobuf_module *module;
while (!TAILQ_EMPTY(&g_iobuf.modules)) {
module = TAILQ_FIRST(&g_iobuf.modules);
TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
free(module->name);
free(module);
}
if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
}
if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
}
spdk_mempool_free(g_iobuf.small_pool);
spdk_mempool_free(g_iobuf.large_pool);
if (g_iobuf.finish_cb != NULL) {
g_iobuf.finish_cb(g_iobuf.finish_arg);
}
}
void
spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
{
g_iobuf.finish_cb = cb_fn;
g_iobuf.finish_arg = cb_arg;
spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
}
int
spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
{
if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
IOBUF_MIN_SMALL_POOL_SIZE);
return -EINVAL;
}
if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
IOBUF_MIN_LARGE_POOL_SIZE);
return -EINVAL;
}
if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
IOBUF_MIN_SMALL_BUFSIZE);
return -EINVAL;
}
if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
IOBUF_MIN_LARGE_BUFSIZE);
return -EINVAL;
}
g_iobuf.opts = *opts;
return 0;
}
void
spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
{
*opts = g_iobuf.opts;
}
int
spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
uint32_t small_cache_size, uint32_t large_cache_size)
{
struct spdk_io_channel *ioch;
struct iobuf_channel *iobuf_ch;
struct iobuf_module *module;
struct spdk_iobuf_buffer *buf;
uint32_t i;
TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
if (strcmp(name, module->name) == 0) {
break;
}
}
if (module == NULL) {
SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
return -ENODEV;
}
ioch = spdk_get_io_channel(&g_iobuf);
if (ioch == NULL) {
SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
return -ENOMEM;
}
iobuf_ch = spdk_io_channel_get_ctx(ioch);
ch->small.queue = &iobuf_ch->small_queue;
ch->large.queue = &iobuf_ch->large_queue;
ch->small.pool = g_iobuf.small_pool;
ch->large.pool = g_iobuf.large_pool;
ch->small.bufsize = g_iobuf.opts.small_bufsize;
ch->large.bufsize = g_iobuf.opts.large_bufsize;
ch->parent = ioch;
ch->module = module;
ch->small.cache_size = small_cache_size;
ch->large.cache_size = large_cache_size;
ch->small.cache_count = 0;
ch->large.cache_count = 0;
STAILQ_INIT(&ch->small.cache);
STAILQ_INIT(&ch->large.cache);
for (i = 0; i < small_cache_size; ++i) {
buf = spdk_mempool_get(g_iobuf.small_pool);
if (buf == NULL) {
SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
"You may need to increase spdk_iobuf_opts.small_pool_count\n");
goto error;
}
STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
ch->small.cache_count++;
}
for (i = 0; i < large_cache_size; ++i) {
buf = spdk_mempool_get(g_iobuf.large_pool);
if (buf == NULL) {
SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
"You may need to increase spdk_iobuf_opts.large_pool_count\n");
goto error;
}
STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
ch->large.cache_count++;
}
return 0;
error:
spdk_iobuf_channel_fini(ch);
return -ENOMEM;
}
void
spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
{
struct spdk_iobuf_entry *entry __attribute__((unused));
struct spdk_iobuf_buffer *buf;
/* Make sure none of the wait queue entries are coming from this module */
STAILQ_FOREACH(entry, ch->small.queue, stailq) {
assert(entry->module != ch->module);
}
STAILQ_FOREACH(entry, ch->large.queue, stailq) {
assert(entry->module != ch->module);
}
/* Release cached buffers back to the pool */
while (!STAILQ_EMPTY(&ch->small.cache)) {
buf = STAILQ_FIRST(&ch->small.cache);
STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
spdk_mempool_put(ch->small.pool, buf);
ch->small.cache_count--;
}
while (!STAILQ_EMPTY(&ch->large.cache)) {
buf = STAILQ_FIRST(&ch->large.cache);
STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
spdk_mempool_put(ch->large.pool, buf);
ch->large.cache_count--;
}
assert(ch->small.cache_count == 0);
assert(ch->large.cache_count == 0);
spdk_put_io_channel(ch->parent);
ch->parent = NULL;
}
int
spdk_iobuf_register_module(const char *name)
{
struct iobuf_module *module;
TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
if (strcmp(name, module->name) == 0) {
return -EEXIST;
}
}
module = calloc(1, sizeof(*module));
if (module == NULL) {
return -ENOMEM;
}
module->name = strdup(name);
if (module->name == NULL) {
free(module);
return -ENOMEM;
}
TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
return 0;
}
int
spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
{
struct spdk_iobuf_entry *entry, *tmp;
int rc;
STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
/* We only want to iterate over the entries requested by the module which owns ch */
if (entry->module != ch->module) {
continue;
}
rc = cb_fn(ch, entry, cb_ctx);
if (rc != 0) {
return rc;
}
}
return 0;
}
void
spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
uint64_t len)
{
struct spdk_iobuf_pool *pool;
if (len <= ch->small.bufsize) {
pool = &ch->small;
} else {
assert(len <= ch->large.bufsize);
pool = &ch->large;
}
STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
}
SPDK_LOG_REGISTER_COMPONENT(thread) SPDK_LOG_REGISTER_COMPONENT(thread)

View File

@ -6,7 +6,7 @@
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../../..) SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../../..)
include $(SPDK_ROOT_DIR)/mk/spdk.common.mk include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
DIRS-y = thread.c DIRS-y = thread.c iobuf.c
.PHONY: all clean $(DIRS-y) .PHONY: all clean $(DIRS-y)

View File

@ -0,0 +1,10 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (C) 2016 Intel Corporation.
# All rights reserved.
#
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../../../..)
TEST_FILE = iobuf_ut.c
include $(SPDK_ROOT_DIR)/mk/spdk.unittest.mk

View File

@ -0,0 +1,647 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (C) 2023 Intel Corporation. All rights reserved.
*/
#include "spdk_cunit.h"
#include "common/lib/ut_multithread.c"
#include "unit/lib/json_mock.c"
#include "spdk/config.h"
#include "spdk/thread.h"
#include "thread/iobuf.c"
struct ut_iobuf_entry {
struct spdk_iobuf_channel *ioch;
struct spdk_iobuf_entry iobuf;
void *buf;
uint32_t thread_id;
const char *module;
};
static void
ut_iobuf_finish_cb(void *ctx)
{
*(int *)ctx = 1;
}
static void
ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf)
{
struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
ut_entry->buf = buf;
}
static int
ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg)
{
struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
ut_entry->buf = cb_arg;
return 0;
}
#define SMALL_BUFSIZE 128
#define LARGE_BUFSIZE 512
static void
iobuf(void)
{
struct spdk_iobuf_opts opts = {
.small_pool_count = 2,
.large_pool_count = 2,
.small_bufsize = SMALL_BUFSIZE,
.large_bufsize = LARGE_BUFSIZE,
};
struct ut_iobuf_entry *entry;
struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2];
struct ut_iobuf_entry mod0_entries[] = {
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
};
struct ut_iobuf_entry mod1_entries[] = {
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
};
int rc, finish = 0;
uint32_t i;
allocate_cores(2);
allocate_threads(2);
set_thread(0);
/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
g_iobuf.opts = opts;
rc = spdk_iobuf_initialize();
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module0");
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module1");
CU_ASSERT_EQUAL(rc, 0);
set_thread(0);
rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
set_thread(1);
rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) {
mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id];
}
set_thread(0);
rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
set_thread(1);
rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) {
mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id];
}
/* First check that it's possible to retrieve the whole pools from a single module */
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* The next two should be put onto the large buf wait queue */
entry = &mod0_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Pick the two next buffers from the small pool */
set_thread(1);
entry = &mod0_entries[4];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[5];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* The next two should be put onto the small buf wait queue */
entry = &mod0_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[7];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Now return one of the large buffers to the pool and verify that the first request's
* (entry 2) callback was executed and it was removed from the wait queue.
*/
set_thread(0);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[2];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return the second buffer and check that the other request is satisfied */
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod0_entries[2];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
/* Check that it didn't change the requests waiting for the small buffers */
entry = &mod0_entries[6];
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[7];
CU_ASSERT_PTR_NULL(entry->buf);
/* Do the same test as above, this time using the small pool */
set_thread(1);
entry = &mod0_entries[4];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[6];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[7];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return the second buffer and check that the other request is satisfied */
entry = &mod0_entries[5];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[7];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod0_entries[6];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[7];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
/* Now check requesting buffers from different modules - first request all of them from one
* module, starting from the large pool
*/
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Request all of them from the small one */
set_thread(1);
entry = &mod0_entries[4];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[5];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Request one buffer per module from each pool */
set_thread(0);
entry = &mod1_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Change the order from the small pool and request a buffer from mod0 first */
set_thread(1);
entry = &mod0_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[4];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Now return one buffer to the large pool */
set_thread(0);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
/* Make sure the request from mod1 got the buffer, as it was the first to request it */
entry = &mod1_entries[0];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return second buffer to the large pool and check the outstanding mod0 request */
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod1_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
/* Check the same for the small pool, but this time the order of the request is reversed
* (mod0 before mod1)
*/
set_thread(1);
entry = &mod0_entries[4];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[6];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* mod1 request was second in this case, so it still needs to wait */
entry = &mod1_entries[4];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return the second requested buffer */
entry = &mod0_entries[5];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod1_entries[4];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod0_entries[6];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod1_entries[4];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
/* Request buffers to make the pools empty */
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Queue more requests from both modules */
entry = &mod0_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Check that abort correctly remove an entry from the queue */
entry = &mod0_entries[2];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod1_entries[3];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf);
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf);
/* Clean up */
entry = &mod1_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod1_entries[2];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod1_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
/* Request buffers to make the pools empty */
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Request a buffer from each queue and each module on thread 0 */
set_thread(0);
entry = &mod0_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Do the same on thread 1 */
set_thread(1);
entry = &mod0_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[7];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[7];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Now do the foreach and check that correct entries are iterated over by assigning their
* ->buf pointers to different values.
*/
set_thread(0);
rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].large,
ut_iobuf_foreach_cb, (void *)0xdeadbeef);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].small,
ut_iobuf_foreach_cb, (void *)0xbeefdead);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].large,
ut_iobuf_foreach_cb, (void *)0xfeedbeef);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].small,
ut_iobuf_foreach_cb, (void *)0xbeeffeed);
CU_ASSERT_EQUAL(rc, 0);
set_thread(1);
rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].large,
ut_iobuf_foreach_cb, (void *)0xcafebabe);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].small,
ut_iobuf_foreach_cb, (void *)0xbabecafe);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].large,
ut_iobuf_foreach_cb, (void *)0xbeefcafe);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].small,
ut_iobuf_foreach_cb, (void *)0xcafebeef);
CU_ASSERT_EQUAL(rc, 0);
/* thread 0 */
CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef);
CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xbeefdead);
CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef);
CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xbeeffeed);
/* thread 1 */
CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe);
CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xbabecafe);
CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe);
CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xcafebeef);
/* Clean everything up */
set_thread(0);
entry = &mod0_entries[2];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod1_entries[2];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod1_entries[3];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod1_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod1_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
set_thread(1);
entry = &mod0_entries[6];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod0_entries[7];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod1_entries[6];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod1_entries[7];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
set_thread(0);
spdk_iobuf_channel_fini(&mod0_ch[0]);
poll_threads();
spdk_iobuf_channel_fini(&mod1_ch[0]);
poll_threads();
set_thread(1);
spdk_iobuf_channel_fini(&mod0_ch[1]);
poll_threads();
spdk_iobuf_channel_fini(&mod1_ch[1]);
poll_threads();
spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
poll_threads();
CU_ASSERT_EQUAL(finish, 1);
free_threads();
free_cores();
}
static void
iobuf_cache(void)
{
struct spdk_iobuf_opts opts = {
.small_pool_count = 4,
.large_pool_count = 4,
.small_bufsize = SMALL_BUFSIZE,
.large_bufsize = LARGE_BUFSIZE,
};
struct spdk_iobuf_channel iobuf_ch[2];
struct ut_iobuf_entry *entry;
struct ut_iobuf_entry mod0_entries[] = {
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
};
struct ut_iobuf_entry mod1_entries[] = {
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
};
int rc, finish = 0;
uint32_t i, j, bufsize;
allocate_cores(1);
allocate_threads(1);
set_thread(0);
/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
g_iobuf.opts = opts;
rc = spdk_iobuf_initialize();
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module0");
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module1");
CU_ASSERT_EQUAL(rc, 0);
/* First check that channel initialization fails when it's not possible to fill in the cache
* from the pool.
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
CU_ASSERT_EQUAL(rc, -ENOMEM);
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
CU_ASSERT_EQUAL(rc, -ENOMEM);
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
CU_ASSERT_EQUAL(rc, -ENOMEM);
spdk_iobuf_channel_fini(&iobuf_ch[0]);
poll_threads();
/* Initialize one channel with cache, acquire buffers, and check that a second one can be
* created once the buffers acquired from the first one are returned to the pool
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
CU_ASSERT_EQUAL(rc, 0);
for (i = 0; i < 3; ++i) {
mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], LARGE_BUFSIZE, &mod0_entries[i].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
}
/* It should be able to create a channel with a single entry in the cache */
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 1);
CU_ASSERT_EQUAL(rc, 0);
spdk_iobuf_channel_fini(&iobuf_ch[1]);
poll_threads();
/* But not with two entries */
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
CU_ASSERT_EQUAL(rc, -ENOMEM);
for (i = 0; i < 2; ++i) {
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
CU_ASSERT_EQUAL(rc, -ENOMEM);
}
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, LARGE_BUFSIZE);
/* The last buffer should be released back to the pool, so we should be able to create a new
* channel
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
CU_ASSERT_EQUAL(rc, 0);
spdk_iobuf_channel_fini(&iobuf_ch[0]);
spdk_iobuf_channel_fini(&iobuf_ch[1]);
poll_threads();
/* Check that the pool is only used when the cache is empty and that the cache guarantees a
* certain set of buffers
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
CU_ASSERT_EQUAL(rc, 0);
uint32_t buffer_sizes[] = { SMALL_BUFSIZE, LARGE_BUFSIZE };
for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
bufsize = buffer_sizes[i];
for (j = 0; j < 3; ++j) {
entry = &mod0_entries[j];
entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
}
mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);
/* The whole pool is exhausted now */
mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
/* If there are outstanding requests waiting for a buffer, they should have priority
* over filling in the cache, even if they're from different modules.
*/
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
/* Also make sure the queue is FIFO and doesn't care about which module requested
* and which module released the buffer.
*/
CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
/* Return the buffers back */
spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
for (j = 0; j < 2; ++j) {
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
}
}
spdk_iobuf_channel_fini(&iobuf_ch[0]);
spdk_iobuf_channel_fini(&iobuf_ch[1]);
poll_threads();
spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
poll_threads();
CU_ASSERT_EQUAL(finish, 1);
free_threads();
free_cores();
}
int
main(int argc, char **argv)
{
CU_pSuite suite = NULL;
unsigned int num_failures;
CU_set_error_action(CUEA_ABORT);
CU_initialize_registry();
suite = CU_add_suite("io_channel", NULL, NULL);
CU_ADD_TEST(suite, iobuf);
CU_ADD_TEST(suite, iobuf_cache);
CU_basic_set_mode(CU_BRM_VERBOSE);
CU_basic_run_tests();
num_failures = CU_get_number_of_failures();
CU_cleanup_registry();
return num_failures;
}

View File

@ -13,9 +13,6 @@
#include "thread/thread.c" #include "thread/thread.c"
#include "common/lib/ut_multithread.c" #include "common/lib/ut_multithread.c"
#define SMALL_BUFSIZE 128
#define LARGE_BUFSIZE 512
static int g_sched_rc = 0; static int g_sched_rc = 0;
static int static int
@ -1916,617 +1913,6 @@ spdk_spin(void)
g_spin_abort_fn = __posix_abort; g_spin_abort_fn = __posix_abort;
} }
struct ut_iobuf_entry {
struct spdk_iobuf_channel *ioch;
struct spdk_iobuf_entry iobuf;
void *buf;
uint32_t thread_id;
const char *module;
};
static void
ut_iobuf_finish_cb(void *ctx)
{
*(int *)ctx = 1;
}
static void
ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf)
{
struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
ut_entry->buf = buf;
}
static int
ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg)
{
struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
ut_entry->buf = cb_arg;
return 0;
}
static void
iobuf(void)
{
struct spdk_iobuf_opts opts = {
.small_pool_count = 2,
.large_pool_count = 2,
.small_bufsize = SMALL_BUFSIZE,
.large_bufsize = LARGE_BUFSIZE,
};
struct ut_iobuf_entry *entry;
struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2];
struct ut_iobuf_entry mod0_entries[] = {
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
{ .thread_id = 1, .module = "ut_module0", },
};
struct ut_iobuf_entry mod1_entries[] = {
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
{ .thread_id = 1, .module = "ut_module1", },
};
int rc, finish = 0;
uint32_t i;
allocate_cores(2);
allocate_threads(2);
set_thread(0);
/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
g_iobuf.opts = opts;
rc = spdk_iobuf_initialize();
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module0");
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module1");
CU_ASSERT_EQUAL(rc, 0);
set_thread(0);
rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
set_thread(1);
rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) {
mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id];
}
set_thread(0);
rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
set_thread(1);
rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0);
CU_ASSERT_EQUAL(rc, 0);
for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) {
mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id];
}
/* First check that it's possible to retrieve the whole pools from a single module */
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* The next two should be put onto the large buf wait queue */
entry = &mod0_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Pick the two next buffers from the small pool */
set_thread(1);
entry = &mod0_entries[4];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[5];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* The next two should be put onto the small buf wait queue */
entry = &mod0_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[7];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Now return one of the large buffers to the pool and verify that the first request's
* (entry 2) callback was executed and it was removed from the wait queue.
*/
set_thread(0);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[2];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return the second buffer and check that the other request is satisfied */
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod0_entries[2];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
/* Check that it didn't change the requests waiting for the small buffers */
entry = &mod0_entries[6];
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[7];
CU_ASSERT_PTR_NULL(entry->buf);
/* Do the same test as above, this time using the small pool */
set_thread(1);
entry = &mod0_entries[4];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[6];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[7];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return the second buffer and check that the other request is satisfied */
entry = &mod0_entries[5];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[7];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod0_entries[6];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[7];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
/* Now check requesting buffers from different modules - first request all of them from one
* module, starting from the large pool
*/
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Request all of them from the small one */
set_thread(1);
entry = &mod0_entries[4];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[5];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Request one buffer per module from each pool */
set_thread(0);
entry = &mod1_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Change the order from the small pool and request a buffer from mod0 first */
set_thread(1);
entry = &mod0_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[4];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Now return one buffer to the large pool */
set_thread(0);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
/* Make sure the request from mod1 got the buffer, as it was the first to request it */
entry = &mod1_entries[0];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return second buffer to the large pool and check the outstanding mod0 request */
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod1_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
/* Check the same for the small pool, but this time the order of the request is reversed
* (mod0 before mod1)
*/
set_thread(1);
entry = &mod0_entries[4];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[6];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* mod1 request was second in this case, so it still needs to wait */
entry = &mod1_entries[4];
CU_ASSERT_PTR_NULL(entry->buf);
/* Return the second requested buffer */
entry = &mod0_entries[5];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod1_entries[4];
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Return the remaining two buffers */
entry = &mod0_entries[6];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod1_entries[4];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
/* Request buffers to make the pools empty */
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Queue more requests from both modules */
entry = &mod0_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Check that abort correctly remove an entry from the queue */
entry = &mod0_entries[2];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod1_entries[3];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf);
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf);
/* Clean up */
entry = &mod1_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod1_entries[2];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod1_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
/* Request buffers to make the pools empty */
set_thread(0);
entry = &mod0_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[0];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod0_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
entry = &mod1_entries[1];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
/* Request a buffer from each queue and each module on thread 0 */
set_thread(0);
entry = &mod0_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[2];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[3];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Do the same on thread 1 */
set_thread(1);
entry = &mod0_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[6];
entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod0_entries[7];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
entry = &mod1_entries[7];
entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(entry->buf);
/* Now do the foreach and check that correct entries are iterated over by assigning their
* ->buf pointers to different values.
*/
set_thread(0);
rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].large,
ut_iobuf_foreach_cb, (void *)0xdeadbeef);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].small,
ut_iobuf_foreach_cb, (void *)0xbeefdead);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].large,
ut_iobuf_foreach_cb, (void *)0xfeedbeef);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].small,
ut_iobuf_foreach_cb, (void *)0xbeeffeed);
CU_ASSERT_EQUAL(rc, 0);
set_thread(1);
rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].large,
ut_iobuf_foreach_cb, (void *)0xcafebabe);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].small,
ut_iobuf_foreach_cb, (void *)0xbabecafe);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].large,
ut_iobuf_foreach_cb, (void *)0xbeefcafe);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].small,
ut_iobuf_foreach_cb, (void *)0xcafebeef);
CU_ASSERT_EQUAL(rc, 0);
/* thread 0 */
CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef);
CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xbeefdead);
CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef);
CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xbeeffeed);
/* thread 1 */
CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe);
CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xbabecafe);
CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe);
CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xcafebeef);
/* Clean everything up */
set_thread(0);
entry = &mod0_entries[2];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod0_entries[3];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod1_entries[2];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod1_entries[3];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod0_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod1_entries[0];
spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
entry = &mod0_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
entry = &mod1_entries[1];
spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
set_thread(1);
entry = &mod0_entries[6];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod0_entries[7];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
entry = &mod1_entries[6];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
entry = &mod1_entries[7];
spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
set_thread(0);
spdk_iobuf_channel_fini(&mod0_ch[0]);
poll_threads();
spdk_iobuf_channel_fini(&mod1_ch[0]);
poll_threads();
set_thread(1);
spdk_iobuf_channel_fini(&mod0_ch[1]);
poll_threads();
spdk_iobuf_channel_fini(&mod1_ch[1]);
poll_threads();
spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
poll_threads();
CU_ASSERT_EQUAL(finish, 1);
free_threads();
free_cores();
}
static void
iobuf_cache(void)
{
struct spdk_iobuf_opts opts = {
.small_pool_count = 4,
.large_pool_count = 4,
.small_bufsize = SMALL_BUFSIZE,
.large_bufsize = LARGE_BUFSIZE,
};
struct spdk_iobuf_channel iobuf_ch[2];
struct ut_iobuf_entry *entry;
struct ut_iobuf_entry mod0_entries[] = {
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
{ .thread_id = 0, .module = "ut_module0", },
};
struct ut_iobuf_entry mod1_entries[] = {
{ .thread_id = 0, .module = "ut_module1", },
{ .thread_id = 0, .module = "ut_module1", },
};
int rc, finish = 0;
uint32_t i, j, bufsize;
allocate_cores(1);
allocate_threads(1);
set_thread(0);
/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
g_iobuf.opts = opts;
rc = spdk_iobuf_initialize();
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module0");
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_register_module("ut_module1");
CU_ASSERT_EQUAL(rc, 0);
/* First check that channel initialization fails when it's not possible to fill in the cache
* from the pool.
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
CU_ASSERT_EQUAL(rc, -ENOMEM);
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
CU_ASSERT_EQUAL(rc, -ENOMEM);
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
CU_ASSERT_EQUAL(rc, -ENOMEM);
spdk_iobuf_channel_fini(&iobuf_ch[0]);
poll_threads();
/* Initialize one channel with cache, acquire buffers, and check that a second one can be
* created once the buffers acquired from the first one are returned to the pool
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
CU_ASSERT_EQUAL(rc, 0);
for (i = 0; i < 3; ++i) {
mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], LARGE_BUFSIZE, &mod0_entries[i].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
}
/* It should be able to create a channel with a single entry in the cache */
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 1);
CU_ASSERT_EQUAL(rc, 0);
spdk_iobuf_channel_fini(&iobuf_ch[1]);
poll_threads();
/* But not with two entries */
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
CU_ASSERT_EQUAL(rc, -ENOMEM);
for (i = 0; i < 2; ++i) {
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
CU_ASSERT_EQUAL(rc, -ENOMEM);
}
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, LARGE_BUFSIZE);
/* The last buffer should be released back to the pool, so we should be able to create a new
* channel
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
CU_ASSERT_EQUAL(rc, 0);
spdk_iobuf_channel_fini(&iobuf_ch[0]);
spdk_iobuf_channel_fini(&iobuf_ch[1]);
poll_threads();
/* Check that the pool is only used when the cache is empty and that the cache guarantees a
* certain set of buffers
*/
rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
CU_ASSERT_EQUAL(rc, 0);
rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
CU_ASSERT_EQUAL(rc, 0);
uint32_t buffer_sizes[] = { SMALL_BUFSIZE, LARGE_BUFSIZE };
for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
bufsize = buffer_sizes[i];
for (j = 0; j < 3; ++j) {
entry = &mod0_entries[j];
entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(entry->buf);
}
mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);
/* The whole pool is exhausted now */
mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
ut_iobuf_get_buf_cb);
CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
/* If there are outstanding requests waiting for a buffer, they should have priority
* over filling in the cache, even if they're from different modules.
*/
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
/* Also make sure the queue is FIFO and doesn't care about which module requested
* and which module released the buffer.
*/
CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
/* Return the buffers back */
spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
for (j = 0; j < 2; ++j) {
spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
}
}
spdk_iobuf_channel_fini(&iobuf_ch[0]);
spdk_iobuf_channel_fini(&iobuf_ch[1]);
poll_threads();
spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
poll_threads();
CU_ASSERT_EQUAL(finish, 1);
free_threads();
free_cores();
}
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
@ -2556,8 +1942,6 @@ main(int argc, char **argv)
CU_ADD_TEST(suite, multi_timed_pollers_have_same_expiration); CU_ADD_TEST(suite, multi_timed_pollers_have_same_expiration);
CU_ADD_TEST(suite, io_device_lookup); CU_ADD_TEST(suite, io_device_lookup);
CU_ADD_TEST(suite, spdk_spin); CU_ADD_TEST(suite, spdk_spin);
CU_ADD_TEST(suite, iobuf);
CU_ADD_TEST(suite, iobuf_cache);
CU_basic_set_mode(CU_BRM_VERBOSE); CU_basic_set_mode(CU_BRM_VERBOSE);
CU_basic_run_tests(); CU_basic_run_tests();

View File

@ -271,6 +271,7 @@ fi
run_test "unittest_scsi" unittest_scsi run_test "unittest_scsi" unittest_scsi
run_test "unittest_sock" unittest_sock run_test "unittest_sock" unittest_sock
run_test "unittest_thread" $valgrind $testdir/lib/thread/thread.c/thread_ut run_test "unittest_thread" $valgrind $testdir/lib/thread/thread.c/thread_ut
run_test "unittest_iobuf" $valgrind $testdir/lib/thread/iobuf.c/iobuf_ut
run_test "unittest_util" unittest_util run_test "unittest_util" unittest_util
if grep -q '#define SPDK_CONFIG_VHOST 1' $rootdir/include/spdk/config.h; then if grep -q '#define SPDK_CONFIG_VHOST 1' $rootdir/include/spdk/config.h; then
run_test "unittest_vhost" $valgrind $testdir/lib/vhost/vhost.c/vhost_ut run_test "unittest_vhost" $valgrind $testdir/lib/vhost/vhost.c/vhost_ut