blobfs: start a poller to do cache buffer reclaim regularly

Currently we will free the cache buffers in the file R/W
data path, however we can start a thread to do the cache
buffer reclaim regulary, so that we don't need to free
the cache buffer in the Rocksdb thread context.  By
doing this, we can also remove the global cache buffer
lock in following patches.

Change-Id: Icb0fbc49baecd1e9d86a5fcfe400758dfc3c53a2
Signed-off-by: Changpeng Liu <changpeng.liu@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/941
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Changpeng Liu 2020-02-18 02:34:11 -05:00 committed by Tomasz Zawadzki
parent 14db3c7e4b
commit 1914de0920
3 changed files with 131 additions and 95 deletions

View File

@ -59,6 +59,9 @@
static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
static struct spdk_mempool *g_cache_pool;
static TAILQ_HEAD(, spdk_file) g_caches;
static struct spdk_poller *g_cache_pool_mgmt_poller;
static struct spdk_thread *g_cache_pool_thread;
#define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
static int g_fs_count = 0;
static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_spinlock_t g_caches_lock;
@ -257,8 +260,26 @@ spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
}
static int _blobfs_cache_pool_reclaim(void *arg);
static bool
blobfs_cache_pool_need_reclaim(void)
{
size_t count;
count = spdk_mempool_count(g_cache_pool);
/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
* when the number of available cache buffer is less than 1/5 of total buffers.
*/
if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
return false;
}
return true;
}
static void
__initialize_cache(void)
__start_cache_pool_mgmt(void *ctx)
{
assert(g_cache_pool == NULL);
@ -274,15 +295,37 @@ __initialize_cache(void)
}
TAILQ_INIT(&g_caches);
pthread_spin_init(&g_caches_lock, 0);
assert(g_cache_pool_mgmt_poller == NULL);
g_cache_pool_mgmt_poller = spdk_poller_register(_blobfs_cache_pool_reclaim, NULL,
BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
}
static void
__stop_cache_pool_mgmt(void *ctx)
{
spdk_poller_unregister(&g_cache_pool_mgmt_poller);
assert(g_cache_pool != NULL);
assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
spdk_mempool_free(g_cache_pool);
g_cache_pool = NULL;
spdk_thread_exit(g_cache_pool_thread);
}
static void
__initialize_cache(void)
{
g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
assert(g_cache_pool_thread != NULL);
spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
}
static void
__free_cache(void)
{
assert(g_cache_pool != NULL);
spdk_mempool_free(g_cache_pool);
g_cache_pool = NULL;
spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
}
static uint64_t
@ -2053,69 +2096,56 @@ reclaim_cache_buffers(struct spdk_file *file)
return 0;
}
static void *
alloc_cache_memory_buffer(struct spdk_file *context)
static int
_blobfs_cache_pool_reclaim(void *arg)
{
struct spdk_file *file, *tmp;
void *buf;
int rc;
buf = spdk_mempool_get(g_cache_pool);
if (buf != NULL) {
return buf;
if (!blobfs_cache_pool_need_reclaim()) {
return 0;
}
pthread_spin_lock(&g_caches_lock);
TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
if (!file->open_for_writing &&
file->priority == SPDK_FILE_PRIORITY_LOW &&
file != context) {
file->priority == SPDK_FILE_PRIORITY_LOW) {
rc = reclaim_cache_buffers(file);
if (rc < 0) {
continue;
}
buf = spdk_mempool_get(g_cache_pool);
if (buf != NULL) {
if (!blobfs_cache_pool_need_reclaim()) {
pthread_spin_unlock(&g_caches_lock);
return buf;
return 1;
}
break;
}
}
TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
if (!file->open_for_writing &&
file != context) {
if (!file->open_for_writing) {
rc = reclaim_cache_buffers(file);
if (rc < 0) {
continue;
}
buf = spdk_mempool_get(g_cache_pool);
if (buf != NULL) {
if (!blobfs_cache_pool_need_reclaim()) {
pthread_spin_unlock(&g_caches_lock);
return buf;
return 1;
}
break;
}
}
TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
if (file != context) {
rc = reclaim_cache_buffers(file);
if (rc < 0) {
continue;
}
buf = spdk_mempool_get(g_cache_pool);
if (buf != NULL) {
pthread_spin_unlock(&g_caches_lock);
return buf;
}
break;
}
}
pthread_spin_unlock(&g_caches_lock);
return NULL;
return 1;
}
static struct cache_buffer *
@ -2130,23 +2160,19 @@ cache_insert_buffer(struct spdk_file *file, uint64_t offset)
return NULL;
}
buf->buf = alloc_cache_memory_buffer(file);
while (buf->buf == NULL) {
/*
* TODO: alloc_cache_memory_buffer() should eventually free
* some buffers. Need a more sophisticated check here, instead
* of just bailing if 100 tries does not result in getting a
* free buffer. This will involve using the sync channel's
* semaphore to block until a buffer becomes available.
*/
do {
buf->buf = spdk_mempool_get(g_cache_pool);
if (buf->buf) {
break;
}
if (count++ == 100) {
SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
file, offset);
free(buf);
return NULL;
}
buf->buf = alloc_cache_memory_buffer(file);
}
usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
} while (true);
buf->buf_size = CACHE_BUFFER_SIZE;
buf->offset = offset;

View File

@ -85,6 +85,13 @@ fs_op_with_handle_complete(void *ctx, struct spdk_filesystem *fs, int fserrno)
g_fserrno = fserrno;
}
static void
fs_poll_threads(void)
{
poll_threads();
while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {}
}
static void
fs_init(void)
{
@ -94,7 +101,7 @@ fs_init(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -102,7 +109,7 @@ fs_init(void)
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -138,7 +145,7 @@ fs_open(void)
memset(name, 'a', sizeof(name) - 1);
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -147,18 +154,18 @@ fs_open(void)
g_fserrno = 0;
/* Open should fail, because the file name is too long. */
spdk_fs_open_file_async(fs, name, SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == -ENAMETOOLONG);
g_fserrno = 0;
spdk_fs_open_file_async(fs, "file1", 0, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == -ENOENT);
g_file = NULL;
g_fserrno = 1;
spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(g_file != NULL);
CU_ASSERT(!strcmp("file1", g_file->name));
@ -175,19 +182,19 @@ fs_open(void)
g_fserrno = 0;
/* Delete should successful, we will mark the file as deleted. */
spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(!TAILQ_EMPTY(&fs->files));
g_fserrno = 1;
spdk_file_close_async(g_file, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(TAILQ_EMPTY(&fs->files));
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -202,7 +209,7 @@ fs_create(void)
memset(name, 'a', sizeof(name) - 1);
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -211,28 +218,28 @@ fs_create(void)
g_fserrno = 0;
/* Create should fail, because the file name is too long. */
spdk_fs_create_file_async(fs, name, create_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == -ENAMETOOLONG);
g_fserrno = 1;
spdk_fs_create_file_async(fs, "file1", create_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
g_fserrno = 1;
spdk_fs_create_file_async(fs, "file1", create_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == -EEXIST);
g_fserrno = 1;
spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(TAILQ_EMPTY(&fs->files));
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -245,7 +252,7 @@ fs_truncate(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -254,43 +261,43 @@ fs_truncate(void)
g_file = NULL;
g_fserrno = 1;
spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(g_file != NULL);
g_fserrno = 1;
spdk_file_truncate_async(g_file, 18 * 1024 * 1024 + 1, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->length == 18 * 1024 * 1024 + 1);
g_fserrno = 1;
spdk_file_truncate_async(g_file, 1, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->length == 1);
g_fserrno = 1;
spdk_file_truncate_async(g_file, 18 * 1024 * 1024 + 1, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->length == 18 * 1024 * 1024 + 1);
g_fserrno = 1;
spdk_file_close_async(g_file, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->ref_count == 0);
g_fserrno = 1;
spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(TAILQ_EMPTY(&fs->files));
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -304,7 +311,7 @@ fs_rename(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -312,13 +319,13 @@ fs_rename(void)
g_fserrno = 1;
spdk_fs_create_file_async(fs, "file1", create_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
g_file = NULL;
g_fserrno = 1;
spdk_fs_open_file_async(fs, "file1", 0, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(g_file != NULL);
CU_ASSERT(g_file->ref_count == 1);
@ -327,14 +334,14 @@ fs_rename(void)
g_file = NULL;
g_fserrno = 1;
spdk_file_close_async(file, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(file->ref_count == 0);
g_file = NULL;
g_fserrno = 1;
spdk_fs_open_file_async(fs, "file2", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(g_file != NULL);
CU_ASSERT(g_file->ref_count == 1);
@ -343,7 +350,7 @@ fs_rename(void)
g_file = NULL;
g_fserrno = 1;
spdk_file_close_async(file2, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(file2->ref_count == 0);
@ -353,7 +360,7 @@ fs_rename(void)
*/
g_fserrno = 1;
spdk_fs_rename_file_async(fs, "file1", "file2", fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(file->ref_count == 0);
CU_ASSERT(!strcmp(file->name, "file2"));
@ -362,7 +369,7 @@ fs_rename(void)
g_fserrno = 0;
spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == -ENOENT);
CU_ASSERT(!TAILQ_EMPTY(&fs->files));
TAILQ_FOREACH(file_iter, &fs->files, tailq) {
@ -373,13 +380,13 @@ fs_rename(void)
g_fserrno = 1;
spdk_fs_delete_file_async(fs, "file2", delete_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(TAILQ_EMPTY(&fs->files));
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -394,7 +401,7 @@ fs_rw_async(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -403,7 +410,7 @@ fs_rw_async(void)
g_file = NULL;
g_fserrno = 1;
spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(g_file != NULL);
@ -413,7 +420,7 @@ fs_rw_async(void)
memset(w_buf, 0x5a, sizeof(w_buf));
spdk_file_write_async(g_file, fs->sync_target.sync_io_channel, w_buf, 0, 4096,
fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->length == 4096);
@ -422,18 +429,18 @@ fs_rw_async(void)
memset(r_buf, 0x0, sizeof(r_buf));
spdk_file_read_async(g_file, fs->sync_target.sync_io_channel, r_buf, 0, 4096,
fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(memcmp(r_buf, w_buf, sizeof(r_buf)) == 0);
g_fserrno = 1;
spdk_file_close_async(g_file, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -450,7 +457,7 @@ fs_writev_readv_async(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -459,7 +466,7 @@ fs_writev_readv_async(void)
g_file = NULL;
g_fserrno = 1;
spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
SPDK_CU_ASSERT_FATAL(g_file != NULL);
@ -473,7 +480,7 @@ fs_writev_readv_async(void)
w_iov[1].iov_len = 2048;
spdk_file_writev_async(g_file, fs->sync_target.sync_io_channel,
w_iov, 2, 0, 4096, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->length == 4096);
@ -486,7 +493,7 @@ fs_writev_readv_async(void)
r_iov[1].iov_len = 2048;
spdk_file_readv_async(g_file, fs->sync_target.sync_io_channel,
r_iov, 2, 0, 4096, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(memcmp(r_buf, w_buf, sizeof(r_buf)) == 0);
@ -499,7 +506,7 @@ fs_writev_readv_async(void)
w_iov[1].iov_len = 2048;
spdk_file_writev_async(g_file, fs->sync_target.sync_io_channel,
w_iov, 2, 0, 4096, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(g_file->length == 4096);
@ -512,18 +519,18 @@ fs_writev_readv_async(void)
r_iov[1].iov_len = 2048;
spdk_file_readv_async(g_file, fs->sync_target.sync_io_channel,
r_iov, 2, 0, 4096, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
CU_ASSERT(memcmp(r_buf, w_buf, sizeof(r_buf)) == 0);
g_fserrno = 1;
spdk_file_close_async(g_file, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
}
@ -614,7 +621,7 @@ channel_ops(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -627,7 +634,7 @@ channel_ops(void)
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
g_fs = NULL;
}
@ -642,7 +649,7 @@ channel_ops_sync(void)
dev = init_dev();
spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL);
poll_threads();
fs_poll_threads();
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
CU_ASSERT(g_fserrno == 0);
fs = g_fs;
@ -655,7 +662,7 @@ channel_ops_sync(void)
g_fserrno = 1;
spdk_fs_unload(fs, fs_op_complete, NULL);
poll_threads();
fs_poll_threads();
CU_ASSERT(g_fserrno == 0);
g_fs = NULL;
}

View File

@ -134,6 +134,7 @@ _fs_init(void *arg)
spdk_fs_init(dev, NULL, send_request, fs_op_with_handle_complete, NULL);
thread = spdk_get_thread();
while (spdk_thread_poll(thread, 0, 0) > 0) {}
while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {}
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
SPDK_CU_ASSERT_FATAL(g_fs->bdev == dev);
@ -152,6 +153,7 @@ _fs_load(void *arg)
spdk_fs_load(dev, send_request, fs_op_with_handle_complete, NULL);
thread = spdk_get_thread();
while (spdk_thread_poll(thread, 0, 0) > 0) {}
while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {}
SPDK_CU_ASSERT_FATAL(g_fs != NULL);
SPDK_CU_ASSERT_FATAL(g_fs->bdev == dev);
@ -167,6 +169,7 @@ _fs_unload(void *arg)
spdk_fs_unload(g_fs, fs_op_complete, NULL);
thread = spdk_get_thread();
while (spdk_thread_poll(thread, 0, 0) > 0) {}
while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {}
CU_ASSERT(g_fserrno == 0);
g_fs = NULL;
}