diff --git a/examples/nvmf/nvmf/nvmf.c b/examples/nvmf/nvmf/nvmf.c index adc194705..91ee7eec0 100644 --- a/examples/nvmf/nvmf/nvmf.c +++ b/examples/nvmf/nvmf/nvmf.c @@ -35,9 +35,28 @@ #include "spdk/env.h" #include "spdk/event.h" #include "spdk/string.h" +#include "spdk/thread.h" static const char *g_rpc_addr = SPDK_DEFAULT_RPC_ADDR; +struct nvmf_lw_thread { + TAILQ_ENTRY(nvmf_lw_thread) link; +}; + +struct nvmf_reactor { + uint32_t core; + pthread_mutex_t mutex; + + TAILQ_HEAD(, nvmf_lw_thread) threads; + TAILQ_ENTRY(nvmf_reactor) link; +}; +TAILQ_HEAD(, nvmf_reactor) g_reactors = TAILQ_HEAD_INITIALIZER(g_reactors); + +static struct nvmf_reactor *g_master_reactor = NULL; +static struct nvmf_reactor *g_next_reactor = NULL; +static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; +static bool g_reactors_exit = false; + static void usage(char *program_name) { @@ -94,6 +113,178 @@ parse_args(int argc, char **argv, struct spdk_env_opts *opts) return 0; } +static int +nvmf_reactor_run(void *arg) +{ + struct nvmf_reactor *nvmf_reactor = arg; + struct nvmf_lw_thread *lw_thread, *tmp; + struct spdk_thread *thread; + + /* foreach all the lightweight threads in this nvmf_reactor */ + do { + pthread_mutex_lock(&nvmf_reactor->mutex); + TAILQ_FOREACH_SAFE(lw_thread, &nvmf_reactor->threads, link, tmp) { + thread = spdk_thread_get_from_ctx(lw_thread); + + spdk_thread_poll(thread, 0, 0); + } + pthread_mutex_unlock(&nvmf_reactor->mutex); + } while (!g_reactors_exit); + + /* free all the lightweight threads */ + pthread_mutex_lock(&nvmf_reactor->mutex); + TAILQ_FOREACH_SAFE(lw_thread, &nvmf_reactor->threads, link, tmp) { + thread = spdk_thread_get_from_ctx(lw_thread); + TAILQ_REMOVE(&nvmf_reactor->threads, lw_thread, link); + spdk_set_thread(thread); + spdk_thread_exit(thread); + spdk_thread_destroy(thread); + } + pthread_mutex_unlock(&nvmf_reactor->mutex); + + return 0; +} + +static int +nvmf_schedule_spdk_thread(struct spdk_thread *thread) +{ + struct nvmf_reactor *nvmf_reactor; + struct nvmf_lw_thread *lw_thread; + struct spdk_cpuset *cpumask; + uint32_t i; + + /* Lightweight threads may have a requested cpumask. + * This is a request only - the scheduler does not have to honor it. + * For this scheduler implementation, each reactor is pinned to + * a particular core so honoring the request is reasonably easy. + */ + cpumask = spdk_thread_get_cpumask(thread); + + lw_thread = spdk_thread_get_ctx(thread); + assert(lw_thread != NULL); + memset(lw_thread, 0, sizeof(*lw_thread)); + + /* assign lightweight threads to nvmf reactor(core) + * Here we use the mutex.The way the actual SPDK event framework + * solves this is by using internal rings for messages between reactors + */ + for (i = 0; i < spdk_env_get_core_count(); i++) { + pthread_mutex_lock(&g_mutex); + if (g_next_reactor == NULL) { + g_next_reactor = TAILQ_FIRST(&g_reactors); + } + nvmf_reactor = g_next_reactor; + g_next_reactor = TAILQ_NEXT(g_next_reactor, link); + pthread_mutex_unlock(&g_mutex); + + /* each spdk_thread has the core affinity */ + if (spdk_cpuset_get_cpu(cpumask, nvmf_reactor->core)) { + pthread_mutex_lock(&nvmf_reactor->mutex); + TAILQ_INSERT_TAIL(&nvmf_reactor->threads, lw_thread, link); + pthread_mutex_unlock(&nvmf_reactor->mutex); + break; + } + } + + if (i == spdk_env_get_core_count()) { + fprintf(stderr, "failed to schedule spdk thread\n"); + return -1; + } + return 0; +} + +static int +nvmf_init_threads(void) +{ + int rc; + uint32_t i; + char thread_name[32]; + struct nvmf_reactor *nvmf_reactor; + struct spdk_thread *thread; + struct spdk_cpuset cpumask; + uint32_t master_core = spdk_env_get_current_core(); + + /* Whenever SPDK creates a new lightweight thread it will call + * nvmf_schedule_spdk_thread asking for the application to begin + * polling it via spdk_thread_poll(). Each lightweight thread in + * SPDK optionally allocates extra memory to be used by the application + * framework. The size of the extra memory allocated is the second parameter. + */ + spdk_thread_lib_init(nvmf_schedule_spdk_thread, sizeof(struct nvmf_lw_thread)); + + /* Spawn one system thread per CPU core. The system thread is called a reactor. + * SPDK will spawn lightweight threads that must be mapped to reactors in + * nvmf_schedule_spdk_thread. Using a single system thread per CPU core is a + * choice unique to this application. SPDK itself does not require this specific + * threading model. For example, another viable threading model would be + * dynamically scheduling the lightweight threads onto a thread pool using a + * work queue. + */ + SPDK_ENV_FOREACH_CORE(i) { + nvmf_reactor = calloc(1, sizeof(struct nvmf_reactor)); + if (!nvmf_reactor) { + fprintf(stderr, "failed to alloc nvmf reactor\n"); + rc = -ENOMEM; + goto err_exit; + } + + nvmf_reactor->core = i; + pthread_mutex_init(&nvmf_reactor->mutex, NULL); + TAILQ_INIT(&nvmf_reactor->threads); + TAILQ_INSERT_TAIL(&g_reactors, nvmf_reactor, link); + + if (i == master_core) { + g_master_reactor = nvmf_reactor; + g_next_reactor = g_master_reactor; + } else { + rc = spdk_env_thread_launch_pinned(i, + nvmf_reactor_run, + nvmf_reactor); + if (rc) { + fprintf(stderr, "failed to pin reactor launch\n"); + goto err_exit; + } + } + } + + /* Some SPDK libraries assume that there is at least some number of lightweight + * threads that exist from the beginning of time. That assumption is currently + * being removed from the SPDK libraries, but until that work is completed spawn + * one lightweight thread per reactor here. + */ + SPDK_ENV_FOREACH_CORE(i) { + spdk_cpuset_zero(&cpumask); + spdk_cpuset_set_cpu(&cpumask, i, true); + snprintf(thread_name, sizeof(thread_name), "spdk_thread_%u", i); + thread = spdk_thread_create(thread_name, &cpumask); + if (!thread) { + fprintf(stderr, "failed to create spdk thread\n"); + return -1; + } + } + + fprintf(stdout, "nvmf threads initlize successfully\n"); + return 0; + +err_exit: + return rc; +} + +static void +nvmf_destroy_threads(void) +{ + struct nvmf_reactor *nvmf_reactor, *tmp; + + TAILQ_FOREACH_SAFE(nvmf_reactor, &g_reactors, link, tmp) { + pthread_mutex_destroy(&nvmf_reactor->mutex); + free(nvmf_reactor); + } + + pthread_mutex_destroy(&g_mutex); + spdk_thread_lib_fini(); + fprintf(stdout, "nvmf threads destroy successfully\n"); +} + int main(int argc, char **argv) { int rc; @@ -112,5 +303,13 @@ int main(int argc, char **argv) return -EINVAL; } + /* Initialize the threads */ + rc = nvmf_init_threads(); + assert(rc == 0); + + g_reactors_exit = true; + nvmf_reactor_run(g_master_reactor); + spdk_env_thread_wait_all(); + nvmf_destroy_threads(); return rc; }