diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 815bf3cda..c07fbe15d 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -38,6 +38,7 @@ #include "spdk/queue.h" #include "spdk/string.h" #include "spdk/thread.h" +#include "spdk/tree.h" #include "spdk/util.h" #include "spdk/fd_group.h" @@ -78,6 +79,7 @@ enum spdk_poller_state { struct spdk_poller { TAILQ_ENTRY(spdk_poller) tailq; + RB_ENTRY(spdk_poller) node; /* Current state of the poller; should only be accessed from the poller's thread. */ enum spdk_poller_state state; @@ -122,7 +124,7 @@ struct spdk_thread { /** * Contains pollers running on this thread with a periodic timer. */ - TAILQ_HEAD(timed_pollers_head, spdk_poller) timed_pollers; + RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers; struct spdk_poller *first_timed_poller; /* * Contains paused pollers. Pollers on this queue are waiting until @@ -199,6 +201,29 @@ static uint32_t g_thread_count = 0; static __thread struct spdk_thread *tls_thread = NULL; +/* + * If this compare function returns zero when two next_run_ticks are equal, + * the macro RB_INSERT() returns a pointer to the element with the same + * next_run_tick. + * + * Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element + * to remove as a parameter. + * + * Hence we allow RB_INSERT() to insert elements with the same keys on the right + * side by returning 1 when two next_run_ticks are equal. + */ +static inline int +timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2) +{ + if (poller1->next_run_tick < poller2->next_run_tick) { + return -1; + } else { + return 1; + } +} + +RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare); + static inline struct spdk_thread * _get_thread(void) { @@ -310,12 +335,12 @@ _free_thread(struct spdk_thread *thread) free(poller); } - TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { + RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) { if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { SPDK_WARNLOG("timed_poller %s still registered at thread exit\n", poller->name); } - TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); free(poller); } @@ -373,7 +398,7 @@ spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) TAILQ_INIT(&thread->io_channels); TAILQ_INIT(&thread->active_pollers); - TAILQ_INIT(&thread->timed_pollers); + RB_INIT(&thread->timed_pollers); TAILQ_INIT(&thread->paused_pollers); SLIST_INIT(&thread->msg_cache); thread->msg_cache_count = 0; @@ -471,7 +496,7 @@ thread_exit(struct spdk_thread *thread, uint64_t now) } } - TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { + RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) { if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { SPDK_INFOLOG(thread, "thread %s still has active timed poller %s\n", @@ -660,35 +685,41 @@ msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) static void poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) { - struct spdk_poller *iter; + struct spdk_poller *tmp __attribute__((unused)); poller->next_run_tick = now + poller->period_ticks; /* - * Insert poller in the thread's timed_pollers list in sorted order by next scheduled - * run time. + * Insert poller in the thread's timed_pollers tree by next scheduled run time + * as its key. */ - TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { - if (iter->next_run_tick <= poller->next_run_tick) { - TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); - return; - } + tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller); + assert(tmp == NULL); + + /* Update the cache only if it is empty or the inserted poller is earlier than it. + * RB_MIN() is not necessary here because all pollers, which has exactly the same + * next_run_tick as the existing poller, are inserted on the right side. + */ + if (thread->first_timed_poller == NULL || + poller->next_run_tick < thread->first_timed_poller->next_run_tick) { + thread->first_timed_poller = poller; } - - /* No earlier pollers were found, so this poller must be the new head */ - TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); - - thread->first_timed_poller = poller; } #ifdef __linux__ static inline void poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller) { - TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + struct spdk_poller *tmp __attribute__((unused)); + tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); + assert(tmp != NULL); + + /* This function is not used in any case that is performance critical. + * Update the cache simply by RB_MIN() if it needs to be changed. + */ if (thread->first_timed_poller == poller) { - thread->first_timed_poller = TAILQ_FIRST(&thread->timed_pollers); + thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers); } } #endif @@ -877,8 +908,8 @@ thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) break; } - tmp = TAILQ_NEXT(poller, tailq); - TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller); + RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller); /* Update the cache to the next timed poller in the list * only if the current poller is still the closest, otherwise, @@ -971,7 +1002,7 @@ static bool thread_has_unpaused_pollers(struct spdk_thread *thread) { if (TAILQ_EMPTY(&thread->active_pollers) && - TAILQ_EMPTY(&thread->timed_pollers)) { + RB_EMPTY(&thread->timed_pollers)) { return false; } @@ -1585,10 +1616,10 @@ spdk_poller_resume(struct spdk_poller *poller) } /* If a poller is paused it has to be removed from the paused pollers - * list and put on the active / timer list depending on its + * list and put on the active list or timer tree depending on its * period_ticks. If a poller is still in the process of being paused, * we just need to flip its state back to waiting, as it's already on - * the appropriate list. + * the appropriate list or tree. */ switch (poller->state) { case SPDK_POLLER_STATE_PAUSED: @@ -1660,13 +1691,13 @@ spdk_thread_get_next_active_poller(struct spdk_poller *prev) struct spdk_poller * spdk_thread_get_first_timed_poller(struct spdk_thread *thread) { - return TAILQ_FIRST(&thread->active_pollers); + return RB_MIN(timed_pollers_tree, &thread->timed_pollers); } struct spdk_poller * spdk_thread_get_next_timed_poller(struct spdk_poller *prev) { - return TAILQ_NEXT(prev, tailq); + return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev); } struct spdk_poller * @@ -1796,7 +1827,7 @@ spdk_thread_set_interrupt_mode(bool enable_interrupt) } /* Set pollers to expected mode */ - TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { + RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) { poller_set_interrupt_mode(poller, enable_interrupt); } TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) { diff --git a/test/unit/lib/thread/thread.c/thread_ut.c b/test/unit/lib/thread/thread.c/thread_ut.c index d292459cf..150aaed84 100644 --- a/test/unit/lib/thread/thread.c/thread_ut.c +++ b/test/unit/lib/thread/thread.c/thread_ut.c @@ -1446,13 +1446,13 @@ cache_closest_timed_poller(void) * have the closest timed poller. */ CU_ASSERT(thread->first_timed_poller == poller1); - CU_ASSERT(TAILQ_FIRST(&thread->timed_pollers) == poller1); + CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller1); spdk_delay_us(1000); poll_threads(); CU_ASSERT(thread->first_timed_poller == poller2); - CU_ASSERT(TAILQ_FIRST(&thread->timed_pollers) == poller2); + CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller2); /* If we unregister a timed poller by spdk_poller_unregister() * when it is waiting, it is marked as being unregistereed and @@ -1470,13 +1470,13 @@ cache_closest_timed_poller(void) poll_threads(); CU_ASSERT(thread->first_timed_poller == tmp); - CU_ASSERT(TAILQ_FIRST(&thread->timed_pollers) == tmp); + CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == tmp); spdk_delay_us(1); poll_threads(); CU_ASSERT(thread->first_timed_poller == poller3); - CU_ASSERT(TAILQ_FIRST(&thread->timed_pollers) == poller3); + CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller3); /* If we pause a timed poller by spdk_poller_pause() when it is waiting, * it is marked as being paused and is actually paused when it is expired. @@ -1490,13 +1490,13 @@ cache_closest_timed_poller(void) poll_threads(); CU_ASSERT(thread->first_timed_poller == poller3); - CU_ASSERT(TAILQ_FIRST(&thread->timed_pollers) == poller3); + CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller3); spdk_delay_us(1); poll_threads(); CU_ASSERT(thread->first_timed_poller == poller1); - CU_ASSERT(TAILQ_FIRST(&thread->timed_pollers) == poller1); + CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller1); /* After unregistering all timed pollers, the cache should * be NULL. @@ -1508,7 +1508,7 @@ cache_closest_timed_poller(void) poll_threads(); CU_ASSERT(thread->first_timed_poller == NULL); - CU_ASSERT(TAILQ_EMPTY(&thread->timed_pollers)); + CU_ASSERT(RB_EMPTY(&thread->timed_pollers)); free_threads(); } @@ -1604,7 +1604,7 @@ multi_timed_pollers_have_same_expiration(void) poll_threads(); CU_ASSERT(thread->first_timed_poller == NULL); - CU_ASSERT(TAILQ_EMPTY(&thread->timed_pollers)); + CU_ASSERT(RB_EMPTY(&thread->timed_pollers)); /* * case 2: unregister timed pollers while multiple timed pollers are registered. @@ -1671,7 +1671,7 @@ multi_timed_pollers_have_same_expiration(void) poll_threads(); CU_ASSERT(thread->first_timed_poller == NULL); - CU_ASSERT(TAILQ_EMPTY(&thread->timed_pollers)); + CU_ASSERT(RB_EMPTY(&thread->timed_pollers)); free_threads(); }