diff --git a/lib/blobfs/blobfs.c b/lib/blobfs/blobfs.c index 6d0c6c28b..ecf165bb6 100644 --- a/lib/blobfs/blobfs.c +++ b/lib/blobfs/blobfs.c @@ -2053,11 +2053,15 @@ __file_flush(void *ctx) pthread_spin_lock(&file->lock); next = spdk_tree_find_buffer(file->tree, file->length_flushed); - if (next == NULL || next->in_progress) { + if (next == NULL || next->in_progress || + ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { /* - * There is either no data to flush, or a flush I/O is already in - * progress. So return immediately - if a flush I/O is in - * progress we will flush more data after that is completed. + * There is either no data to flush, a flush I/O is already in + * progress, or the next buffer is partially filled but there's no + * outstanding request to sync it. + * So return immediately - if a flush I/O is in progress we will flush + * more data after that is completed, or a partial buffer will get flushed + * when it is either filled or the file is synced. */ free_fs_request(req); if (next == NULL) { diff --git a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c index bedd79665..fcd0c2a24 100644 --- a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c +++ b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c @@ -163,6 +163,11 @@ _fs_unload(void *arg) g_fs = NULL; } +static void +_nop(void *arg) +{ +} + static void cache_write(void) { @@ -220,13 +225,10 @@ file_length(void) CU_ASSERT(rc == 0); SPDK_CU_ASSERT_FATAL(g_file != NULL); - /* Write slightly more than one CACHE_BUFFER. Filling at least one cache - * buffer triggers a flush to disk. Currently when that cache buffer is written, - * it will proceed to write the final byte, even though there's been no explicit - * sync for it yet. We will fix that eventually, but for now test with this - * behavior since it matches a subtle RocksDB failure scenario. + /* Write one CACHE_BUFFER. Filling at least one cache buffer triggers + * a flush to disk. */ - buf_length = CACHE_BUFFER_SIZE + 1; + buf_length = CACHE_BUFFER_SIZE; buf = calloc(1, buf_length); spdk_file_write(g_file, channel, buf, 0, buf_length); free(buf); @@ -277,6 +279,67 @@ file_length(void) ut_send_request(_fs_unload, NULL); } +static void +partial_buffer(void) +{ + int rc; + char *buf; + uint64_t buf_length; + struct spdk_fs_thread_ctx *channel; + struct spdk_file_stat stat = {0}; + + ut_send_request(_fs_init, NULL); + + channel = spdk_fs_alloc_thread_ctx(g_fs); + + g_file = NULL; + rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file); + CU_ASSERT(rc == 0); + SPDK_CU_ASSERT_FATAL(g_file != NULL); + + /* Write one CACHE_BUFFER plus one byte. Filling at least one cache buffer triggers + * a flush to disk. We want to make sure the extra byte is not implicitly flushed. + * It should only get flushed once we sync or close the file. + */ + buf_length = CACHE_BUFFER_SIZE + 1; + buf = calloc(1, buf_length); + spdk_file_write(g_file, channel, buf, 0, buf_length); + free(buf); + + /* Send some nop messages to the dispatch thread. This will ensure any of the + * pending write operations are completed. A well-functioning blobfs should only + * issue one write for the filled CACHE_BUFFER - a buggy one might try to write + * the extra byte. So do a bunch of _nops to make sure all of them (even the buggy + * ones) get a chance to run. Note that we can't just send a message to the + * dispatch thread to call spdk_thread_poll() because the messages are themselves + * run in the context of spdk_thread_poll(). + */ + ut_send_request(_nop, NULL); + ut_send_request(_nop, NULL); + ut_send_request(_nop, NULL); + ut_send_request(_nop, NULL); + ut_send_request(_nop, NULL); + ut_send_request(_nop, NULL); + + CU_ASSERT(g_file->length_flushed == CACHE_BUFFER_SIZE); + + /* Close the file. This causes an implicit sync which should write the + * length_flushed value as the "length" xattr on the file. + */ + spdk_file_close(g_file, channel); + + rc = spdk_fs_file_stat(g_fs, channel, "testfile", &stat); + CU_ASSERT(rc == 0); + CU_ASSERT(buf_length == stat.size); + + rc = spdk_fs_delete_file(g_fs, channel, "testfile"); + CU_ASSERT(rc == 0); + + spdk_fs_free_thread_ctx(channel); + + ut_send_request(_fs_unload, NULL); +} + static void cache_write_null_buffer(void) { @@ -463,6 +526,7 @@ int main(int argc, char **argv) if ( CU_add_test(suite, "write", cache_write) == NULL || CU_add_test(suite, "file length", file_length) == NULL || + CU_add_test(suite, "partial buffer", partial_buffer) == NULL || CU_add_test(suite, "write_null_buffer", cache_write_null_buffer) == NULL || CU_add_test(suite, "create_sync", fs_create_sync) == NULL || CU_add_test(suite, "append_no_cache", cache_append_no_cache) == NULL ||