diff --git a/lib/blobfs/blobfs.c b/lib/blobfs/blobfs.c index f66f01319..6d0c6c28b 100644 --- a/lib/blobfs/blobfs.c +++ b/lib/blobfs/blobfs.c @@ -77,6 +77,7 @@ struct spdk_file { bool is_deleted; bool open_for_writing; uint64_t length_flushed; + uint64_t length_xattr; uint64_t append_pos; uint64_t seq_byte_count; uint64_t next_seq_offset; @@ -168,9 +169,16 @@ struct spdk_fs_cb_args { uint64_t offset; } readahead; struct { + /* offset of the file when the sync request was made */ uint64_t offset; TAILQ_ENTRY(spdk_fs_request) tailq; bool xattr_in_progress; + /* length written to the xattr for this file - this should + * always be the same as the offset if only one thread is + * writing to the file, but could differ if multiple threads + * are appending + */ + uint64_t length; } sync; struct { uint32_t num_clusters; @@ -667,6 +675,7 @@ iter_cb(void *ctx, struct spdk_blob *blob, int rc) f->blobid = spdk_blob_get_id(blob); f->length = *length; f->length_flushed = *length; + f->length_xattr = *length; f->append_pos = *length; SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); } else { @@ -1955,6 +1964,7 @@ __file_cache_finish_sync(void *ctx, int bserrno) pthread_spin_lock(&file->lock); sync_req = TAILQ_FIRST(&file->sync_requests); sync_args = &sync_req->args; + file->length_xattr = sync_args->op.sync.length; assert(sync_args->op.sync.offset <= file->length_flushed); BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); @@ -1984,6 +1994,7 @@ __check_sync_reqs(struct spdk_file *file) if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); sync_req->args.op.sync.xattr_in_progress = true; + sync_req->args.op.sync.length = file->length_flushed; spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, sizeof(file->length_flushed)); @@ -2074,6 +2085,11 @@ __file_flush(void *ctx) if (length == 0) { free_fs_request(req); pthread_spin_unlock(&file->lock); + /* + * There is no data to flush, but we still need to check for any + * outstanding sync requests to make sure metadata gets updated. + */ + __check_sync_reqs(file); return; } args->op.flush.length = length; @@ -2469,8 +2485,8 @@ _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); pthread_spin_lock(&file->lock); - if (file->append_pos <= file->length_flushed) { - BLOBFS_TRACE(file, "done - no data to flush\n"); + if (file->append_pos <= file->length_xattr) { + BLOBFS_TRACE(file, "done - file already synced\n"); pthread_spin_unlock(&file->lock); cb_fn(cb_arg, 0); return; diff --git a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c index 35f602353..bedd79665 100644 --- a/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c +++ b/test/unit/lib/blobfs/blobfs_sync_ut/blobfs_sync_ut.c @@ -132,6 +132,24 @@ _fs_init(void *arg) CU_ASSERT(g_fserrno == 0); } +static void +_fs_load(void *arg) +{ + struct spdk_thread *thread; + struct spdk_bs_dev *dev; + + g_fs = NULL; + g_fserrno = -1; + dev = init_dev(); + spdk_fs_load(dev, send_request, fs_op_with_handle_complete, NULL); + thread = spdk_get_thread(); + while (spdk_thread_poll(thread, 0, 0) > 0) {} + + SPDK_CU_ASSERT_FATAL(g_fs != NULL); + SPDK_CU_ASSERT_FATAL(g_fs->bdev == dev); + CU_ASSERT(g_fserrno == 0); +} + static void _fs_unload(void *arg) { @@ -184,6 +202,81 @@ cache_write(void) ut_send_request(_fs_unload, NULL); } +static void +file_length(void) +{ + int rc; + char *buf; + uint64_t buf_length; + struct spdk_fs_thread_ctx *channel; + struct spdk_file_stat stat = {0}; + + ut_send_request(_fs_init, NULL); + + channel = spdk_fs_alloc_thread_ctx(g_fs); + + g_file = NULL; + rc = spdk_fs_open_file(g_fs, channel, "testfile", SPDK_BLOBFS_OPEN_CREATE, &g_file); + CU_ASSERT(rc == 0); + SPDK_CU_ASSERT_FATAL(g_file != NULL); + + /* Write slightly more than one CACHE_BUFFER. Filling at least one cache + * buffer triggers a flush to disk. Currently when that cache buffer is written, + * it will proceed to write the final byte, even though there's been no explicit + * sync for it yet. We will fix that eventually, but for now test with this + * behavior since it matches a subtle RocksDB failure scenario. + */ + buf_length = CACHE_BUFFER_SIZE + 1; + buf = calloc(1, buf_length); + spdk_file_write(g_file, channel, buf, 0, buf_length); + free(buf); + + /* Spin until all of the data has been flushed to the SSD. There's been no + * sync operation yet, so the xattr on the file is still 0. + */ + while (g_file->length_flushed != buf_length) {} + + /* Close the file. This causes an implicit sync which should write the + * length_flushed value as the "length" xattr on the file. + */ + spdk_file_close(g_file, channel); + + rc = spdk_fs_file_stat(g_fs, channel, "testfile", &stat); + CU_ASSERT(rc == 0); + CU_ASSERT(buf_length == stat.size); + + spdk_fs_free_thread_ctx(channel); + + /* Unload and reload the filesystem. The file length will be + * read during load from the length xattr. We want to make sure + * it matches what was written when the file was originally + * written and closed. + */ + ut_send_request(_fs_unload, NULL); + + ut_send_request(_fs_load, NULL); + + channel = spdk_fs_alloc_thread_ctx(g_fs); + + rc = spdk_fs_file_stat(g_fs, channel, "testfile", &stat); + CU_ASSERT(rc == 0); + CU_ASSERT(buf_length == stat.size); + + g_file = NULL; + rc = spdk_fs_open_file(g_fs, channel, "testfile", 0, &g_file); + CU_ASSERT(rc == 0); + SPDK_CU_ASSERT_FATAL(g_file != NULL); + + spdk_file_close(g_file, channel); + + rc = spdk_fs_delete_file(g_fs, channel, "testfile"); + CU_ASSERT(rc == 0); + + spdk_fs_free_thread_ctx(channel); + + ut_send_request(_fs_unload, NULL); +} + static void cache_write_null_buffer(void) { @@ -369,6 +462,7 @@ int main(int argc, char **argv) if ( CU_add_test(suite, "write", cache_write) == NULL || + CU_add_test(suite, "file length", file_length) == NULL || CU_add_test(suite, "write_null_buffer", cache_write_null_buffer) == NULL || CU_add_test(suite, "create_sync", fs_create_sync) == NULL || CU_add_test(suite, "append_no_cache", cache_append_no_cache) == NULL ||