diff --git a/include/spdk/thread.h b/include/spdk/thread.h index 3f1e73006..e74967eab 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -68,6 +68,26 @@ struct spdk_io_channel_iter; */ typedef int (*spdk_new_thread_fn)(struct spdk_thread *thread); +/** + * SPDK thread operation type. + */ +enum spdk_thread_op { + /* Called each time a new thread is created. The implementor of this operation + * should frequently call spdk_thread_poll() on the thread provided. + */ + SPDK_THREAD_OP_NEW, +}; + +/** + * Function to be called for SPDK thread operation. + */ +typedef int (*spdk_thread_op_fn)(struct spdk_thread *thread, enum spdk_thread_op op); + +/** + * Function to check whether the SPDK thread operation is supported. + */ +typedef bool (*spdk_thread_op_supported_fn)(enum spdk_thread_op op); + /** * A function that will be called on the target thread. * @@ -194,6 +214,23 @@ struct spdk_io_channel { */ int spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz); +/** + * Initialize the threading library. Must be called once prior to allocating any threads + * + * Both thread_op_fn and thread_op_type_supported_fn have to be specified or not + * specified together. + * + * \param thread_op_fn Called for SPDK thread operation. + * \param thread_op_supported_fn Called to check whether the SPDK thread operation is supported. + * \param ctx_sz For each thread allocated, for use by the thread scheduler. A pointer + * to this region may be obtained by calling spdk_thread_get_ctx(). + * + * \return 0 on success. Negated errno on failure. + */ +int spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, + spdk_thread_op_supported_fn thread_op_supported_fn, + size_t ctx_sz); + /** * Release all resources associated with this library. */ diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 600ef75d9..9c1b89746 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -50,6 +50,8 @@ static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; static spdk_new_thread_fn g_new_thread_fn = NULL; +static spdk_thread_op_fn g_thread_op_fn = NULL; +static spdk_thread_op_supported_fn g_thread_op_supported_fn; static size_t g_ctx_sz = 0; /* Monotonic increasing ID is set to each created thread beginning at 1. Once the * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting @@ -174,14 +176,11 @@ _get_thread(void) return tls_thread; } -int -spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) +static int +_thread_lib_init(size_t ctx_sz) { char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; - assert(g_new_thread_fn == NULL); - g_new_thread_fn = new_thread_fn; - g_ctx_sz = ctx_sz; snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); @@ -198,6 +197,36 @@ spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) return 0; } +int +spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) +{ + assert(g_new_thread_fn == NULL); + assert(g_thread_op_fn == NULL); + g_new_thread_fn = new_thread_fn; + + return _thread_lib_init(ctx_sz); +} + +int +spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, + spdk_thread_op_supported_fn thread_op_supported_fn, + size_t ctx_sz) +{ + assert(g_new_thread_fn == NULL); + assert(g_thread_op_fn == NULL); + assert(g_thread_op_supported_fn == NULL); + + if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { + SPDK_ERRLOG("Both must be defined or undefined together.\n"); + return -EINVAL; + } + + g_thread_op_fn = thread_op_fn; + g_thread_op_supported_fn = thread_op_supported_fn; + + return _thread_lib_init(ctx_sz); +} + void spdk_thread_lib_fini(void) { @@ -213,6 +242,8 @@ spdk_thread_lib_fini(void) } g_new_thread_fn = NULL; + g_thread_op_fn = NULL; + g_thread_op_supported_fn = NULL; g_ctx_sz = 0; } @@ -280,7 +311,7 @@ spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) { struct spdk_thread *thread; struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; - int rc, i; + int rc = 0, i; thread = calloc(1, sizeof(*thread) + g_ctx_sz); if (!thread) { @@ -344,10 +375,13 @@ spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) if (g_new_thread_fn) { rc = g_new_thread_fn(thread); - if (rc != 0) { - _free_thread(thread); - return NULL; - } + } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { + rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); + } + + if (rc != 0) { + _free_thread(thread); + return NULL; } return thread; diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index d161bc21e..a563f8bdd 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -48,6 +48,28 @@ _thread_schedule(struct spdk_thread *thread) return g_sched_rc; } +static bool +_thread_op_supported(enum spdk_thread_op op) +{ + switch (op) { + case SPDK_THREAD_OP_NEW: + return true; + default: + return false; + } +} + +static int +_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) +{ + switch (op) { + case SPDK_THREAD_OP_NEW: + return _thread_schedule(thread); + default: + return -ENOTSUP; + } +} + static void thread_alloc(void) { @@ -79,6 +101,24 @@ thread_alloc(void) SPDK_CU_ASSERT_FATAL(thread == NULL); spdk_thread_lib_fini(); + + /* Scheduling callback exists with extended thread library initialization. */ + spdk_thread_lib_init_ext(_thread_op, _thread_op_supported, 0); + + /* Scheduling succeeds */ + g_sched_rc = 0; + thread = spdk_thread_create(NULL, NULL); + SPDK_CU_ASSERT_FATAL(thread != NULL); + spdk_set_thread(thread); + spdk_thread_exit(thread); + spdk_thread_destroy(thread); + + /* Scheduling fails */ + g_sched_rc = -1; + thread = spdk_thread_create(NULL, NULL); + SPDK_CU_ASSERT_FATAL(thread == NULL); + + spdk_thread_lib_fini(); } static void