diff --git a/lib/blobfs/blobfs.c b/lib/blobfs/blobfs.c index 28a9d4349..6d114b09e 100644 --- a/lib/blobfs/blobfs.c +++ b/lib/blobfs/blobfs.c @@ -59,6 +59,9 @@ static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; static struct spdk_mempool *g_cache_pool; static TAILQ_HEAD(, spdk_file) g_caches; +static struct spdk_poller *g_cache_pool_mgmt_poller; +static struct spdk_thread *g_cache_pool_thread; +#define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL static int g_fs_count = 0; static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_spinlock_t g_caches_lock; @@ -257,8 +260,26 @@ spdk_fs_opts_init(struct spdk_blobfs_opts *opts) opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; } +static int _blobfs_cache_pool_reclaim(void *arg); + +static bool +blobfs_cache_pool_need_reclaim(void) +{ + size_t count; + + count = spdk_mempool_count(g_cache_pool); + /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller + * when the number of available cache buffer is less than 1/5 of total buffers. + */ + if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { + return false; + } + + return true; +} + static void -__initialize_cache(void) +__start_cache_pool_mgmt(void *ctx) { assert(g_cache_pool == NULL); @@ -274,15 +295,37 @@ __initialize_cache(void) } TAILQ_INIT(&g_caches); pthread_spin_init(&g_caches_lock, 0); + + assert(g_cache_pool_mgmt_poller == NULL); + g_cache_pool_mgmt_poller = spdk_poller_register(_blobfs_cache_pool_reclaim, NULL, + BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); +} + +static void +__stop_cache_pool_mgmt(void *ctx) +{ + spdk_poller_unregister(&g_cache_pool_mgmt_poller); + + assert(g_cache_pool != NULL); + assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); + spdk_mempool_free(g_cache_pool); + g_cache_pool = NULL; + + spdk_thread_exit(g_cache_pool_thread); +} + +static void +__initialize_cache(void) +{ + g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); + assert(g_cache_pool_thread != NULL); + spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); } static void __free_cache(void) { - assert(g_cache_pool != NULL); - - spdk_mempool_free(g_cache_pool); - g_cache_pool = NULL; + spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); } static uint64_t @@ -2053,69 +2096,56 @@ reclaim_cache_buffers(struct spdk_file *file) return 0; } -static void * -alloc_cache_memory_buffer(struct spdk_file *context) +static int +_blobfs_cache_pool_reclaim(void *arg) { struct spdk_file *file, *tmp; - void *buf; int rc; - buf = spdk_mempool_get(g_cache_pool); - if (buf != NULL) { - return buf; + if (!blobfs_cache_pool_need_reclaim()) { + return 0; } pthread_spin_lock(&g_caches_lock); TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { if (!file->open_for_writing && - file->priority == SPDK_FILE_PRIORITY_LOW && - file != context) { + file->priority == SPDK_FILE_PRIORITY_LOW) { rc = reclaim_cache_buffers(file); if (rc < 0) { continue; } - buf = spdk_mempool_get(g_cache_pool); - if (buf != NULL) { + if (!blobfs_cache_pool_need_reclaim()) { pthread_spin_unlock(&g_caches_lock); - return buf; + return 1; } break; } } TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { - if (!file->open_for_writing && - file != context) { + if (!file->open_for_writing) { rc = reclaim_cache_buffers(file); if (rc < 0) { continue; } - buf = spdk_mempool_get(g_cache_pool); - if (buf != NULL) { + if (!blobfs_cache_pool_need_reclaim()) { pthread_spin_unlock(&g_caches_lock); - return buf; + return 1; } break; } } TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { - if (file != context) { - rc = reclaim_cache_buffers(file); - if (rc < 0) { - continue; - } - buf = spdk_mempool_get(g_cache_pool); - if (buf != NULL) { - pthread_spin_unlock(&g_caches_lock); - return buf; - } - break; + rc = reclaim_cache_buffers(file); + if (rc < 0) { + continue; } + break; } pthread_spin_unlock(&g_caches_lock); - return NULL; + return 1; } static struct cache_buffer * @@ -2130,23 +2160,19 @@ cache_insert_buffer(struct spdk_file *file, uint64_t offset) return NULL; } - buf->buf = alloc_cache_memory_buffer(file); - while (buf->buf == NULL) { - /* - * TODO: alloc_cache_memory_buffer() should eventually free - * some buffers. Need a more sophisticated check here, instead - * of just bailing if 100 tries does not result in getting a - * free buffer. This will involve using the sync channel's - * semaphore to block until a buffer becomes available. - */ + do { + buf->buf = spdk_mempool_get(g_cache_pool); + if (buf->buf) { + break; + } if (count++ == 100) { SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", file, offset); free(buf); return NULL; } - buf->buf = alloc_cache_memory_buffer(file); - } + usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); + } while (true); buf->buf_size = CACHE_BUFFER_SIZE; buf->offset = offset; diff --git a/test/unit/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c b/test/unit/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c index f9b075373..ef431c97b 100644 --- a/test/unit/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c +++ b/test/unit/lib/blobfs/blobfs_async_ut/blobfs_async_ut.c @@ -85,6 +85,13 @@ fs_op_with_handle_complete(void *ctx, struct spdk_filesystem *fs, int fserrno) g_fserrno = fserrno; } +static void +fs_poll_threads(void) +{ + poll_threads(); + while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {} +} + static void fs_init(void) { @@ -94,7 +101,7 @@ fs_init(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -102,7 +109,7 @@ fs_init(void) g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -138,7 +145,7 @@ fs_open(void) memset(name, 'a', sizeof(name) - 1); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -147,18 +154,18 @@ fs_open(void) g_fserrno = 0; /* Open should fail, because the file name is too long. */ spdk_fs_open_file_async(fs, name, SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == -ENAMETOOLONG); g_fserrno = 0; spdk_fs_open_file_async(fs, "file1", 0, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == -ENOENT); g_file = NULL; g_fserrno = 1; spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); CU_ASSERT(!strcmp("file1", g_file->name)); @@ -175,19 +182,19 @@ fs_open(void) g_fserrno = 0; /* Delete should successful, we will mark the file as deleted. */ spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(!TAILQ_EMPTY(&fs->files)); g_fserrno = 1; spdk_file_close_async(g_file, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(TAILQ_EMPTY(&fs->files)); g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -202,7 +209,7 @@ fs_create(void) memset(name, 'a', sizeof(name) - 1); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -211,28 +218,28 @@ fs_create(void) g_fserrno = 0; /* Create should fail, because the file name is too long. */ spdk_fs_create_file_async(fs, name, create_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == -ENAMETOOLONG); g_fserrno = 1; spdk_fs_create_file_async(fs, "file1", create_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); g_fserrno = 1; spdk_fs_create_file_async(fs, "file1", create_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == -EEXIST); g_fserrno = 1; spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(TAILQ_EMPTY(&fs->files)); g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -245,7 +252,7 @@ fs_truncate(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -254,43 +261,43 @@ fs_truncate(void) g_file = NULL; g_fserrno = 1; spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); g_fserrno = 1; spdk_file_truncate_async(g_file, 18 * 1024 * 1024 + 1, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->length == 18 * 1024 * 1024 + 1); g_fserrno = 1; spdk_file_truncate_async(g_file, 1, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->length == 1); g_fserrno = 1; spdk_file_truncate_async(g_file, 18 * 1024 * 1024 + 1, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->length == 18 * 1024 * 1024 + 1); g_fserrno = 1; spdk_file_close_async(g_file, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->ref_count == 0); g_fserrno = 1; spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(TAILQ_EMPTY(&fs->files)); g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -304,7 +311,7 @@ fs_rename(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -312,13 +319,13 @@ fs_rename(void) g_fserrno = 1; spdk_fs_create_file_async(fs, "file1", create_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); g_file = NULL; g_fserrno = 1; spdk_fs_open_file_async(fs, "file1", 0, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); CU_ASSERT(g_file->ref_count == 1); @@ -327,14 +334,14 @@ fs_rename(void) g_file = NULL; g_fserrno = 1; spdk_file_close_async(file, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(file->ref_count == 0); g_file = NULL; g_fserrno = 1; spdk_fs_open_file_async(fs, "file2", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); CU_ASSERT(g_file->ref_count == 1); @@ -343,7 +350,7 @@ fs_rename(void) g_file = NULL; g_fserrno = 1; spdk_file_close_async(file2, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(file2->ref_count == 0); @@ -353,7 +360,7 @@ fs_rename(void) */ g_fserrno = 1; spdk_fs_rename_file_async(fs, "file1", "file2", fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(file->ref_count == 0); CU_ASSERT(!strcmp(file->name, "file2")); @@ -362,7 +369,7 @@ fs_rename(void) g_fserrno = 0; spdk_fs_delete_file_async(fs, "file1", delete_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == -ENOENT); CU_ASSERT(!TAILQ_EMPTY(&fs->files)); TAILQ_FOREACH(file_iter, &fs->files, tailq) { @@ -373,13 +380,13 @@ fs_rename(void) g_fserrno = 1; spdk_fs_delete_file_async(fs, "file2", delete_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(TAILQ_EMPTY(&fs->files)); g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -394,7 +401,7 @@ fs_rw_async(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -403,7 +410,7 @@ fs_rw_async(void) g_file = NULL; g_fserrno = 1; spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); @@ -413,7 +420,7 @@ fs_rw_async(void) memset(w_buf, 0x5a, sizeof(w_buf)); spdk_file_write_async(g_file, fs->sync_target.sync_io_channel, w_buf, 0, 4096, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->length == 4096); @@ -422,18 +429,18 @@ fs_rw_async(void) memset(r_buf, 0x0, sizeof(r_buf)); spdk_file_read_async(g_file, fs->sync_target.sync_io_channel, r_buf, 0, 4096, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(memcmp(r_buf, w_buf, sizeof(r_buf)) == 0); g_fserrno = 1; spdk_file_close_async(g_file, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -450,7 +457,7 @@ fs_writev_readv_async(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -459,7 +466,7 @@ fs_writev_readv_async(void) g_file = NULL; g_fserrno = 1; spdk_fs_open_file_async(fs, "file1", SPDK_BLOBFS_OPEN_CREATE, open_cb, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); @@ -473,7 +480,7 @@ fs_writev_readv_async(void) w_iov[1].iov_len = 2048; spdk_file_writev_async(g_file, fs->sync_target.sync_io_channel, w_iov, 2, 0, 4096, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->length == 4096); @@ -486,7 +493,7 @@ fs_writev_readv_async(void) r_iov[1].iov_len = 2048; spdk_file_readv_async(g_file, fs->sync_target.sync_io_channel, r_iov, 2, 0, 4096, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(memcmp(r_buf, w_buf, sizeof(r_buf)) == 0); @@ -499,7 +506,7 @@ fs_writev_readv_async(void) w_iov[1].iov_len = 2048; spdk_file_writev_async(g_file, fs->sync_target.sync_io_channel, w_iov, 2, 0, 4096, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(g_file->length == 4096); @@ -512,18 +519,18 @@ fs_writev_readv_async(void) r_iov[1].iov_len = 2048; spdk_file_readv_async(g_file, fs->sync_target.sync_io_channel, r_iov, 2, 0, 4096, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); CU_ASSERT(memcmp(r_buf, w_buf, sizeof(r_buf)) == 0); g_fserrno = 1; spdk_file_close_async(g_file, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); } @@ -614,7 +621,7 @@ channel_ops(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -627,7 +634,7 @@ channel_ops(void) g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); g_fs = NULL; } @@ -642,7 +649,7 @@ channel_ops_sync(void) dev = init_dev(); spdk_fs_init(dev, NULL, NULL, fs_op_with_handle_complete, NULL); - poll_threads(); + fs_poll_threads(); SPDK_CU_ASSERT_FATAL(g_fs != NULL); CU_ASSERT(g_fserrno == 0); fs = g_fs; @@ -655,7 +662,7 @@ channel_ops_sync(void) g_fserrno = 1; spdk_fs_unload(fs, fs_op_complete, NULL); - poll_threads(); + fs_poll_threads(); CU_ASSERT(g_fserrno == 0); g_fs = NULL; } diff --git a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c index 21e6d2e2c..92299b78e 100644 --- a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c +++ b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c @@ -134,6 +134,7 @@ _fs_init(void *arg) spdk_fs_init(dev, NULL, send_request, fs_op_with_handle_complete, NULL); thread = spdk_get_thread(); while (spdk_thread_poll(thread, 0, 0) > 0) {} + while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {} SPDK_CU_ASSERT_FATAL(g_fs != NULL); SPDK_CU_ASSERT_FATAL(g_fs->bdev == dev); @@ -152,6 +153,7 @@ _fs_load(void *arg) spdk_fs_load(dev, send_request, fs_op_with_handle_complete, NULL); thread = spdk_get_thread(); while (spdk_thread_poll(thread, 0, 0) > 0) {} + while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {} SPDK_CU_ASSERT_FATAL(g_fs != NULL); SPDK_CU_ASSERT_FATAL(g_fs->bdev == dev); @@ -167,6 +169,7 @@ _fs_unload(void *arg) spdk_fs_unload(g_fs, fs_op_complete, NULL); thread = spdk_get_thread(); while (spdk_thread_poll(thread, 0, 0) > 0) {} + while (spdk_thread_poll(g_cache_pool_thread, 0, 0) > 0) {} CU_ASSERT(g_fserrno == 0); g_fs = NULL; }