diff --git a/lib/rocksdb/env_spdk.cc b/lib/rocksdb/env_spdk.cc index 8f00a2c1c..42b8235ef 100644 --- a/lib/rocksdb/env_spdk.cc +++ b/lib/rocksdb/env_spdk.cc @@ -86,6 +86,18 @@ private: thread_local SpdkThreadCtx g_sync_args; +static void +set_channel() +{ + struct spdk_thread *thread; + + if (g_fs != NULL && g_sync_args.channel == NULL) { + thread = spdk_thread_create("spdK_rocksdb", NULL); + spdk_set_thread(thread); + g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); + } +} + static void __call_fn(void *arg1, void *arg2) { @@ -151,6 +163,7 @@ public: SpdkSequentialFile::~SpdkSequentialFile(void) { + set_channel(); spdk_file_close(mFile, g_sync_args.channel); } @@ -159,6 +172,7 @@ SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) { int64_t ret; + set_channel(); ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); if (ret >= 0) { mOffset += ret; @@ -197,6 +211,7 @@ public: SpdkRandomAccessFile::~SpdkRandomAccessFile(void) { + set_channel(); spdk_file_close(mFile, g_sync_args.channel); } @@ -205,6 +220,7 @@ SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scrat { int64_t rc; + set_channel(); rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); if (rc >= 0) { *result = Slice(scratch, n); @@ -246,6 +262,8 @@ public: virtual Status Truncate(uint64_t size) override { int rc; + + set_channel(); rc = spdk_file_truncate(mFile, g_sync_args.channel, size); if (!rc) { mSize = size; @@ -257,6 +275,7 @@ public: } virtual Status Close() override { + set_channel(); spdk_file_close(mFile, g_sync_args.channel); mFile = NULL; return Status::OK(); @@ -270,6 +289,7 @@ public: { int rc; + set_channel(); rc = spdk_file_sync(mFile, g_sync_args.channel); if (!rc) { return Status::OK(); @@ -282,6 +302,7 @@ public: { int rc; + set_channel(); rc = spdk_file_sync(mFile, g_sync_args.channel); if (!rc) { return Status::OK(); @@ -307,6 +328,7 @@ public: { int rc; + set_channel(); rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); if (!rc) { return Status::OK(); @@ -324,6 +346,7 @@ public: * SPDK BlobFS does not have a range sync operation yet, so just sync * the whole file. */ + set_channel(); rc = spdk_file_sync(mFile, g_sync_args.channel); if (!rc) { return Status::OK(); @@ -350,6 +373,7 @@ SpdkWritableFile::Append(const Slice &data) { int64_t rc; + set_channel(); rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); if (rc >= 0) { mSize += data.size(); @@ -400,6 +424,7 @@ public: int rc; std::string name = sanitize_path(fname, mDirectory); + set_channel(); rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 0, &file); if (rc == 0) { @@ -427,6 +452,7 @@ public: struct spdk_file *file; int rc; + set_channel(); rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 0, &file); if (rc == 0) { @@ -450,6 +476,7 @@ public: struct spdk_file *file; int rc; + set_channel(); rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), SPDK_BLOBFS_OPEN_CREATE, &file); if (rc == 0) { @@ -484,6 +511,7 @@ public: int rc; std::string name = sanitize_path(fname, mDirectory); + set_channel(); rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); if (rc == 0) { return Status::OK(); @@ -496,6 +524,7 @@ public: std::string src_name = sanitize_path(src, mDirectory); std::string target_name = sanitize_path(t, mDirectory); + set_channel(); rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, src_name.c_str(), target_name.c_str()); if (rc == -ENOENT) { @@ -514,6 +543,7 @@ public: int rc; std::string name = sanitize_path(fname, mDirectory); + set_channel(); rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); if (rc == -ENOENT) { return EnvWrapper::GetFileSize(fname, size); @@ -526,6 +556,7 @@ public: int rc; std::string name = sanitize_path(fname, mDirectory); + set_channel(); rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); if (rc == -ENOENT) { return EnvWrapper::DeleteFile(fname); @@ -537,6 +568,7 @@ public: std::string name = sanitize_path(fname, mDirectory); int64_t rc; + set_channel(); rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); if (!rc) { @@ -548,6 +580,7 @@ public: } virtual Status UnlockFile(FileLock *lock) override { + set_channel(); spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); return Status::OK(); }