diff --git a/include/spdk/thread.h b/include/spdk/thread.h index a29d807df..505016d9c 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -417,6 +417,26 @@ struct spdk_poller *spdk_poller_register(spdk_poller_fn fn, */ void spdk_poller_unregister(struct spdk_poller **ppoller); +/** + * Pause a poller on the current thread. + * + * The poller is not run until it is resumed with spdk_poller_resume(). It is + * perfectly fine to pause an already paused poller. + * + * \param poller The poller to pause. + */ +void spdk_poller_pause(struct spdk_poller *poller); + +/** + * Resume a poller on the current thread. + * + * Resumes a poller paused with spdk_poller_pause(). It is perfectly fine to + * resume an unpaused poller. + * + * \param poller The poller to resume. + */ +void spdk_poller_resume(struct spdk_poller *poller); + /** * Register the opaque io_device context as an I/O device. * diff --git a/lib/thread/thread.c b/lib/thread/thread.c index a7ae97255..6de618f05 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -88,6 +88,16 @@ enum spdk_poller_state { /* The poller was unregistered during the execution of its fn. */ SPDK_POLLER_STATE_UNREGISTERED, + + /* The poller is in the process of being paused. It will be paused + * during the next time it's supposed to be executed. + */ + SPDK_POLLER_STATE_PAUSING, + + /* The poller is registered but currently paused. It's on the + * paused_pollers list. + */ + SPDK_POLLER_STATE_PAUSED, }; struct spdk_poller { @@ -127,6 +137,13 @@ struct spdk_thread { */ TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; + /* + * Contains paused pollers. Pollers on this queue are waiting until + * they are resumed (in which case they're put onto the active/timer + * queues) or unregistered. + */ + TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; + struct spdk_ring *messages; SLIST_HEAD(, spdk_msg) msg_cache; @@ -202,7 +219,7 @@ _free_thread(struct spdk_thread *thread) } TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { - if (poller->state == SPDK_POLLER_STATE_WAITING) { + if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { SPDK_WARNLOG("poller %p still registered at thread exit\n", poller); } @@ -213,7 +230,7 @@ _free_thread(struct spdk_thread *thread) TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) { - if (poller->state == SPDK_POLLER_STATE_WAITING) { + if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { SPDK_WARNLOG("poller %p still registered at thread exit\n", poller); } @@ -222,6 +239,12 @@ _free_thread(struct spdk_thread *thread) free(poller); } + TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { + SPDK_WARNLOG("poller %p still registered at thread exit\n", poller); + TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); + free(poller); + } + pthread_mutex_lock(&g_devlist_mutex); assert(g_thread_count > 0); g_thread_count--; @@ -267,6 +290,7 @@ spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) TAILQ_INIT(&thread->io_channels); TAILQ_INIT(&thread->active_pollers); TAILQ_INIT(&thread->timer_pollers); + TAILQ_INIT(&thread->paused_pollers); SLIST_INIT(&thread->msg_cache); thread->msg_cache_count = 0; @@ -444,6 +468,16 @@ _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); } +static void +_spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) +{ + if (poller->period_ticks) { + _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); + } else { + TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); + } +} + int spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) { @@ -476,6 +510,11 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) TAILQ_REMOVE(&thread->active_pollers, poller, tailq); free(poller); continue; + } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_PAUSED; + continue; } poller->state = SPDK_POLLER_STATE_RUNNING; @@ -485,10 +524,10 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) TAILQ_REMOVE(&thread->active_pollers, poller, tailq); free(poller); continue; + } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { + poller->state = SPDK_POLLER_STATE_WAITING; } - poller->state = SPDK_POLLER_STATE_WAITING; - #ifdef DEBUG if (poller_rc == -1) { SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); @@ -512,6 +551,11 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); free(poller); continue; + } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { + TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); + TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_PAUSED; + continue; } if (now < poller->next_run_tick) { @@ -530,7 +574,7 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); free(poller); - } else { + } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { poller->state = SPDK_POLLER_STATE_WAITING; TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); _spdk_poller_insert_timer(thread, poller, now); @@ -574,8 +618,8 @@ spdk_thread_has_active_pollers(struct spdk_thread *thread) return !TAILQ_EMPTY(&thread->active_pollers); } -bool -spdk_thread_has_pollers(struct spdk_thread *thread) +static bool +_spdk_thread_has_unpaused_pollers(struct spdk_thread *thread) { if (TAILQ_EMPTY(&thread->active_pollers) && TAILQ_EMPTY(&thread->timer_pollers)) { @@ -585,11 +629,22 @@ spdk_thread_has_pollers(struct spdk_thread *thread) return true; } +bool +spdk_thread_has_pollers(struct spdk_thread *thread) +{ + if (!_spdk_thread_has_unpaused_pollers(thread) && + TAILQ_EMPTY(&thread->paused_pollers)) { + return false; + } + + return true; +} + bool spdk_thread_is_idle(struct spdk_thread *thread) { if (spdk_ring_count(thread->messages) || - spdk_thread_has_pollers(thread)) { + _spdk_thread_has_unpaused_pollers(thread)) { return false; } @@ -723,11 +778,7 @@ spdk_poller_register(spdk_poller_fn fn, poller->period_ticks = 0; } - if (poller->period_ticks) { - _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); - } else { - TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); - } + _spdk_thread_insert_poller(thread, poller); return poller; } @@ -751,12 +802,87 @@ spdk_poller_unregister(struct spdk_poller **ppoller) return; } + /* If the poller was paused, put it on the active_pollers list so that + * its unregistration can be processed by spdk_thread_poll(). + */ + if (poller->state == SPDK_POLLER_STATE_PAUSED) { + TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); + TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); + poller->period_ticks = 0; + } + /* Simply set the state to unregistered. The poller will get cleaned up * in a subsequent call to spdk_thread_poll(). */ poller->state = SPDK_POLLER_STATE_UNREGISTERED; } +void +spdk_poller_pause(struct spdk_poller *poller) +{ + struct spdk_thread *thread; + + if (poller->state == SPDK_POLLER_STATE_PAUSED || + poller->state == SPDK_POLLER_STATE_PAUSING) { + return; + } + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } + + /* If a poller is paused from within itself, we can immediately move it + * on the paused_pollers list. Otherwise we just set its state to + * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It + * allows a poller to be paused from another one's context without + * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. + */ + if (poller->state != SPDK_POLLER_STATE_RUNNING) { + poller->state = SPDK_POLLER_STATE_PAUSING; + } else { + if (poller->period_ticks > 0) { + TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); + } else { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + } + + TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_PAUSED; + } +} + +void +spdk_poller_resume(struct spdk_poller *poller) +{ + struct spdk_thread *thread; + + if (poller->state != SPDK_POLLER_STATE_PAUSED && + poller->state != SPDK_POLLER_STATE_PAUSING) { + return; + } + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } + + /* If a poller is paused it has to be removed from the paused pollers + * list and put on the active / timer list depending on its + * period_ticks. If a poller is still in the process of being paused, + * we just need to flip its state back to waiting, as it's already on + * the appropriate list. + */ + if (poller->state == SPDK_POLLER_STATE_PAUSED) { + TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); + _spdk_thread_insert_poller(thread, poller); + } + + poller->state = SPDK_POLLER_STATE_WAITING; +} + struct call_thread { struct spdk_thread *cur_thread; spdk_msg_fn fn; diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index 8c593c494..9da4c42d1 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -179,6 +179,169 @@ thread_poller(void) free_threads(); } +struct poller_ctx { + struct spdk_poller *poller; + bool run; +}; + +static int +poller_run_pause(void *ctx) +{ + struct poller_ctx *poller_ctx = ctx; + + poller_ctx->run = true; + spdk_poller_pause(poller_ctx->poller); + + return 0; +} + +static void +poller_msg_pause_cb(void *ctx) +{ + struct spdk_poller *poller = ctx; + + spdk_poller_pause(poller); +} + +static void +poller_msg_resume_cb(void *ctx) +{ + struct spdk_poller *poller = ctx; + + spdk_poller_resume(poller); +} + +static void +poller_pause(void) +{ + struct poller_ctx poller_ctx = {}; + unsigned int delay[] = { 0, 1000 }; + unsigned int i; + + allocate_threads(1); + set_thread(0); + + /* Register a poller that pauses itself */ + poller_ctx.poller = spdk_poller_register(poller_run_pause, &poller_ctx, 0); + CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller); + + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + + spdk_poller_unregister(&poller_ctx.poller); + CU_ASSERT_PTR_NULL(poller_ctx.poller); + + /* Verify that resuming an unpaused poller doesn't do anything */ + poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, 0); + CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller); + + spdk_poller_resume(poller_ctx.poller); + + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + /* Verify that pausing the same poller twice works too */ + spdk_poller_pause(poller_ctx.poller); + + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + + spdk_poller_pause(poller_ctx.poller); + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + + spdk_poller_resume(poller_ctx.poller); + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + /* Verify that a poller is run when it's resumed immediately after pausing */ + poller_ctx.run = false; + spdk_poller_pause(poller_ctx.poller); + spdk_poller_resume(poller_ctx.poller); + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + spdk_poller_unregister(&poller_ctx.poller); + CU_ASSERT_PTR_NULL(poller_ctx.poller); + + /* Poll the thread to make sure the previous poller gets unregistered */ + poll_threads(); + CU_ASSERT_EQUAL(spdk_thread_has_pollers(spdk_get_thread()), false); + + /* Verify that it's possible to unregister a paused poller */ + poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, 0); + CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller); + + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + spdk_poller_pause(poller_ctx.poller); + + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + + spdk_poller_unregister(&poller_ctx.poller); + + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + CU_ASSERT_EQUAL(spdk_thread_has_pollers(spdk_get_thread()), false); + + /* Register pollers with 0 and 1000us wait time and pause/resume them */ + for (i = 0; i < SPDK_COUNTOF(delay); ++i) { + poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, delay[i]); + CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller); + + spdk_delay_us(delay[i]); + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + spdk_poller_pause(poller_ctx.poller); + + spdk_delay_us(delay[i]); + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + + spdk_poller_resume(poller_ctx.poller); + + spdk_delay_us(delay[i]); + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, true); + + /* Verify that the poller can be paused/resumed from spdk_thread_send_msg */ + spdk_thread_send_msg(spdk_get_thread(), poller_msg_pause_cb, poller_ctx.poller); + + spdk_delay_us(delay[i]); + poller_ctx.run = false; + poll_threads(); + CU_ASSERT_EQUAL(poller_ctx.run, false); + + spdk_thread_send_msg(spdk_get_thread(), poller_msg_resume_cb, poller_ctx.poller); + + poll_threads(); + if (delay[i] > 0) { + spdk_delay_us(delay[i]); + poll_threads(); + } + CU_ASSERT_EQUAL(poller_ctx.run, true); + + spdk_poller_unregister(&poller_ctx.poller); + CU_ASSERT_PTR_NULL(poller_ctx.poller); + } + + free_threads(); +} + static void for_each_cb(void *ctx) { @@ -579,6 +742,7 @@ main(int argc, char **argv) CU_add_test(suite, "thread_alloc", thread_alloc) == NULL || CU_add_test(suite, "thread_send_msg", thread_send_msg) == NULL || CU_add_test(suite, "thread_poller", thread_poller) == NULL || + CU_add_test(suite, "poller_pause", poller_pause) == NULL || CU_add_test(suite, "thread_for_each", thread_for_each) == NULL || CU_add_test(suite, "for_each_channel_remove", for_each_channel_remove) == NULL || CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL ||