lib/event: Support SPDK_THREAD_OP_RESCHED operation in reactor

Add a flag resched to check if reschedule operation is requested
to struct spdk_lw_thread. Add _reactor_resquest_thread_reschedule()
to set the resched flag, and add it to the case SPDK_THREAD_OP_RESCHED
in spdk_reactor_thread_op(), and return true in the case
SPDK_THREAD_OP_RESCHED in spdk_reactor_thread_op_supported().

Then _spdk_reactor_run() checks if the resched flag is true for each
thread. If true, set the resched flag to false, and remove the
thread and call _reactor_schedule_thread(). Add continue to avoid
use-after-free issue for both reschedule and terminate cases.

This idea follows voluntary thread termination and will remove our
worries for all complicated rare cases.

Add unit test case to verify this update.

Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Change-Id: I656872d32dbb469ae70f771cd0419a77236bfe18
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/500
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Changpeng Liu <changpeng.liu@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Shuhei Matsumoto 2020-02-23 18:33:14 -05:00 committed by Tomasz Zawadzki
parent bad25cad1b
commit a4335feb6d
3 changed files with 107 additions and 0 deletions

View File

@ -62,6 +62,7 @@ enum spdk_reactor_state {
struct spdk_lw_thread {
TAILQ_ENTRY(spdk_lw_thread) link;
bool resched;
};
struct spdk_reactor {

View File

@ -305,6 +305,8 @@ _set_thread_name(const char *thread_name)
#endif
}
static int _reactor_schedule_thread(struct spdk_thread *thread);
static int
_spdk_reactor_run(void *arg)
{
@ -339,10 +341,18 @@ _spdk_reactor_run(void *arg)
thread = spdk_thread_get_from_ctx(lw_thread);
spdk_thread_poll(thread, 0, now);
if (spdk_unlikely(lw_thread->resched)) {
lw_thread->resched = false;
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
_reactor_schedule_thread(thread);
continue;
}
if (spdk_unlikely(spdk_thread_is_exited(thread) &&
spdk_thread_is_idle(thread))) {
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
spdk_thread_destroy(thread);
continue;
}
}
@ -516,12 +526,27 @@ _reactor_schedule_thread(struct spdk_thread *thread)
return 0;
}
static void
_reactor_request_thread_reschedule(struct spdk_thread *thread)
{
struct spdk_lw_thread *lw_thread;
assert(thread == spdk_get_thread());
lw_thread = spdk_thread_get_ctx(thread);
lw_thread->resched = true;
}
static int
spdk_reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
{
switch (op) {
case SPDK_THREAD_OP_NEW:
return _reactor_schedule_thread(thread);
case SPDK_THREAD_OP_RESCHED:
_reactor_request_thread_reschedule(thread);
return 0;
default:
return -ENOTSUP;
}
@ -532,6 +557,7 @@ spdk_reactor_thread_op_supported(enum spdk_thread_op op)
{
switch (op) {
case SPDK_THREAD_OP_NEW:
case SPDK_THREAD_OP_RESCHED:
return true;
default:
return false;

View File

@ -155,6 +155,85 @@ test_schedule_thread(void)
free_cores();
}
static void
test_reschedule_thread(void)
{
struct spdk_cpuset cpuset = {};
struct spdk_thread *thread;
struct spdk_reactor *reactor;
struct spdk_lw_thread *lw_thread;
allocate_cores(3);
CU_ASSERT(spdk_reactors_init() == 0);
spdk_cpuset_set_cpu(&g_reactor_core_mask, 0, true);
spdk_cpuset_set_cpu(&g_reactor_core_mask, 1, true);
spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true);
g_next_core = 0;
/* Create and schedule the thread to core 1. */
spdk_cpuset_set_cpu(&cpuset, 1, true);
thread = spdk_thread_create(NULL, &cpuset);
CU_ASSERT(thread != NULL);
lw_thread = spdk_thread_get_ctx(thread);
reactor = spdk_reactor_get(1);
CU_ASSERT(reactor != NULL);
MOCK_SET(spdk_env_get_current_core, 1);
CU_ASSERT(_spdk_event_queue_run_batch(reactor) == 1);
CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread);
spdk_set_thread(thread);
/* Call spdk_thread_set_cpumask() twice with different cpumask values.
* The cpumask of the 2nd call will be used in reschedule operation.
*/
spdk_cpuset_zero(&cpuset);
spdk_cpuset_set_cpu(&cpuset, 0, true);
CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0);
spdk_cpuset_zero(&cpuset);
spdk_cpuset_set_cpu(&cpuset, 2, true);
CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0);
CU_ASSERT(lw_thread->resched == true);
_spdk_reactor_run(reactor);
CU_ASSERT(lw_thread->resched == false);
CU_ASSERT(TAILQ_EMPTY(&reactor->threads));
reactor = spdk_reactor_get(0);
CU_ASSERT(reactor != NULL);
MOCK_SET(spdk_env_get_current_core, 0);
CU_ASSERT(_spdk_event_queue_run_batch(reactor) == 0);
reactor = spdk_reactor_get(2);
CU_ASSERT(reactor != NULL);
MOCK_SET(spdk_env_get_current_core, 2);
CU_ASSERT(_spdk_event_queue_run_batch(reactor) == 1);
CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread);
MOCK_CLEAR(spdk_env_get_current_core);
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
spdk_set_thread(thread);
spdk_thread_exit(thread);
spdk_thread_destroy(thread);
spdk_set_thread(NULL);
spdk_reactors_fini();
free_cores();
}
static void
for_each_reactor_done(void *arg1, void *arg2)
{
@ -239,6 +318,7 @@ main(int argc, char **argv)
CU_add_test(suite, "test_init_reactors", test_init_reactors) == NULL ||
CU_add_test(suite, "test_event_call", test_event_call) == NULL ||
CU_add_test(suite, "test_schedule_thread", test_schedule_thread) == NULL ||
CU_add_test(suite, "test_reschedule_thread", test_reschedule_thread) == NULL ||
CU_add_test(suite, "test_for_each_reactor", test_for_each_reactor) == NULL
) {
CU_cleanup_registry();