diff --git a/include/spdk/thread.h b/include/spdk/thread.h index 63cb7b31b..2752bee6c 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -182,8 +182,11 @@ void spdk_thread_lib_fini(void); * pointer (spdk_thread_fn) that must be called on the same thread that spdk_allocate_thread * was called from. * \param start_poller_fn Function to be called to start a poller for the thread. + * DEPRECATED. Only used in tests. Pass NULL for this parameter. * \param stop_poller_fn Function to be called to stop a poller for the thread. - * \param thread_ctx Context that will be passed to fn, start_poller_fn and spdk_stop_poller. + * DEPRECATED. Only used in tests. Pass NULL for this parameter. + * \param thread_ctx Context that will be passed to msg_fn, start_poller_fn, and stop_poller_fn. + * DEPRECATED. Only used in tests. Pass NULL for this parameter. * \param name Human-readable name for the thread; can be retrieved with spdk_thread_get_name(). * The string is copied, so the pointed-to data only needs to be valid during the * spdk_allocate_thread() call. May be NULL to specify no name. @@ -204,6 +207,27 @@ struct spdk_thread *spdk_allocate_thread(spdk_thread_pass_msg msg_fn, */ void spdk_free_thread(void); +/** + * Perform one iteration worth of processing on the thread. This currently only + * executes pollers. + * + * \param thread The thread to process + * + * \return 1 if work was done. 0 if no work was done. -1 if unknown. + */ +int spdk_thread_poll(struct spdk_thread *thread); + +/** + * Return the number of ticks until the next timed poller + * would expire. Timed pollers are pollers for which + * period_microseconds is greater than 0. + * + * \param thread The thread to check poller expiration times on + * + * \return Number of ticks. If no timed pollers, return 0. + */ +uint64_t spdk_thread_next_poller_expiration(struct spdk_thread *thread); + /** * Get count of allocated threads. */ diff --git a/lib/event/reactor.c b/lib/event/reactor.c index d9ba9f6be..a3dfef718 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -46,30 +46,6 @@ #define SPDK_EVENT_BATCH_SIZE 8 -enum spdk_poller_state { - /* The poller is registered with a reactor but not currently executing its fn. */ - SPDK_POLLER_STATE_WAITING, - - /* The poller is currently running its fn. */ - SPDK_POLLER_STATE_RUNNING, - - /* The poller was unregistered during the execution of its fn. */ - SPDK_POLLER_STATE_UNREGISTERED, -}; - -struct spdk_poller { - TAILQ_ENTRY(spdk_poller) tailq; - uint32_t lcore; - - /* Current state of the poller; should only be accessed from the poller's thread. */ - enum spdk_poller_state state; - - uint64_t period_ticks; - uint64_t next_run_tick; - spdk_poller_fn fn; - void *arg; -}; - enum spdk_reactor_state { SPDK_REACTOR_STATE_INVALID = 0, SPDK_REACTOR_STATE_INITIALIZED = 1, @@ -96,19 +72,6 @@ struct spdk_reactor { /* The last known rusage values */ struct rusage rusage; - /* - * Contains pollers actively running on this reactor. Pollers - * are run round-robin. The reactor takes one poller from the head - * of the ring, executes it, then puts it back at the tail of - * the ring. - */ - TAILQ_HEAD(, spdk_poller) active_pollers; - - /** - * Contains pollers running on this reactor with a periodic timer. - */ - TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; - struct spdk_ring *events; /* Pointer to the per-socket g_spdk_event_mempool for this reactor. */ @@ -231,99 +194,6 @@ _spdk_reactor_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) spdk_event_call(event); } -static void -_spdk_poller_insert_timer(struct spdk_reactor *reactor, struct spdk_poller *poller, uint64_t now) -{ - struct spdk_poller *iter; - uint64_t next_run_tick; - - next_run_tick = now + poller->period_ticks; - poller->next_run_tick = next_run_tick; - - /* - * Insert poller in the reactor's timer_pollers list in sorted order by next scheduled - * run time. - */ - TAILQ_FOREACH_REVERSE(iter, &reactor->timer_pollers, timer_pollers_head, tailq) { - if (iter->next_run_tick <= next_run_tick) { - TAILQ_INSERT_AFTER(&reactor->timer_pollers, iter, poller, tailq); - return; - } - } - - /* No earlier pollers were found, so this poller must be the new head */ - TAILQ_INSERT_HEAD(&reactor->timer_pollers, poller, tailq); -} - -static struct spdk_poller * -_spdk_reactor_start_poller(void *thread_ctx, - spdk_poller_fn fn, - void *arg, - uint64_t period_microseconds) -{ - struct spdk_poller *poller; - struct spdk_reactor *reactor; - uint64_t quotient, remainder, ticks; - - reactor = thread_ctx; - - poller = calloc(1, sizeof(*poller)); - if (poller == NULL) { - SPDK_ERRLOG("Poller memory allocation failed\n"); - return NULL; - } - - poller->lcore = reactor->lcore; - poller->state = SPDK_POLLER_STATE_WAITING; - poller->fn = fn; - poller->arg = arg; - - if (period_microseconds) { - quotient = period_microseconds / SPDK_SEC_TO_USEC; - remainder = period_microseconds % SPDK_SEC_TO_USEC; - ticks = spdk_get_ticks_hz(); - - poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; - } else { - poller->period_ticks = 0; - } - - if (poller->period_ticks) { - _spdk_poller_insert_timer(reactor, poller, spdk_get_ticks()); - } else { - TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq); - } - - return poller; -} - -static void -_spdk_reactor_stop_poller(struct spdk_poller *poller, void *thread_ctx) -{ - struct spdk_reactor *reactor; - - reactor = thread_ctx; - - assert(poller->lcore == spdk_env_get_current_core()); - - if (poller->state == SPDK_POLLER_STATE_RUNNING) { - /* - * We are being called from the poller_fn, so set the state to unregistered - * and let the reactor loop free the poller. - */ - poller->state = SPDK_POLLER_STATE_UNREGISTERED; - } else { - /* Poller is not running currently, so just free it. */ - if (poller->period_ticks) { - TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq); - } else { - TAILQ_REMOVE(&reactor->active_pollers, poller, tailq); - } - - free(poller); - } -} - static int get_rusage(void *arg) { @@ -446,32 +316,11 @@ spdk_reactor_get_tsc_stats(struct spdk_reactor_tsc_stats *tsc_stats, uint32_t co return 0; } -/** - * - * \brief This is the main function of the reactor thread. - * - * \code - * - * while (1) - * if (events to run) - * dequeue and run a batch of events - * - * if (active pollers) - * run the first poller in the list and move it to the back - * - * if (first timer poller has expired) - * run the first timer poller and reinsert it in the timer list - * - * if (no action taken and sleep enabled) - * sleep until next timer poller is scheduled to expire - * \endcode - * - */ static int _spdk_reactor_run(void *arg) { struct spdk_reactor *reactor = arg; - struct spdk_poller *poller; + struct spdk_thread *thread; uint32_t event_count; uint64_t now; uint64_t sleep_cycles; @@ -480,10 +329,8 @@ _spdk_reactor_run(void *arg) char thread_name[32]; snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); - if (spdk_allocate_thread(_spdk_reactor_send_msg, - _spdk_reactor_start_poller, - _spdk_reactor_stop_poller, - reactor, thread_name) == NULL) { + thread = spdk_allocate_thread(_spdk_reactor_send_msg, NULL, NULL, reactor, thread_name); + if (!thread) { return -1; } SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore, @@ -507,66 +354,29 @@ _spdk_reactor_run(void *arg) took_action = true; } - poller = TAILQ_FIRST(&reactor->active_pollers); - if (poller) { - TAILQ_REMOVE(&reactor->active_pollers, poller, tailq); - poller->state = SPDK_POLLER_STATE_RUNNING; - rc = poller->fn(poller->arg); + rc = spdk_thread_poll(thread); + if (rc != 0) { now = spdk_get_ticks(); spdk_reactor_add_tsc_stats(reactor, rc, now); - if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { - free(poller); - } else { - poller->state = SPDK_POLLER_STATE_WAITING; - TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq); - } took_action = true; } - poller = TAILQ_FIRST(&reactor->timer_pollers); - if (poller) { - if (took_action == false) { - now = spdk_get_ticks(); - } - - if (now >= poller->next_run_tick) { - uint64_t tmp_timer_tsc; - - TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq); - poller->state = SPDK_POLLER_STATE_RUNNING; - rc = poller->fn(poller->arg); - /* Save the tsc value from before poller->fn was executed. We want to - * use the current time for idle/busy tsc value accounting, but want to - * use the older time to reinsert to the timer poller below. */ - tmp_timer_tsc = now; - now = spdk_get_ticks(); - spdk_reactor_add_tsc_stats(reactor, rc, now); - if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { - free(poller); - } else { - poller->state = SPDK_POLLER_STATE_WAITING; - _spdk_poller_insert_timer(reactor, poller, tmp_timer_tsc); - } - took_action = true; - } - } - /* Determine if the thread can sleep */ if (sleep_cycles && !took_action) { + uint64_t next_run_tick; + now = spdk_get_ticks(); sleep_us = reactor->max_delay_us; + next_run_tick = spdk_thread_next_poller_expiration(thread); - poller = TAILQ_FIRST(&reactor->timer_pollers); - if (poller) { - /* There are timers registered, so don't sleep beyond - * when the next timer should fire */ - if (poller->next_run_tick < (now + sleep_cycles)) { - if (poller->next_run_tick <= now) { - sleep_us = 0; - } else { - sleep_us = ((poller->next_run_tick - now) * - SPDK_SEC_TO_USEC) / spdk_get_ticks_hz(); - } + /* There are timers registered, so don't sleep beyond + * when the next timer should fire */ + if (next_run_tick > 0 && next_run_tick < (now + sleep_cycles)) { + if (next_run_tick <= now) { + sleep_us = 0; + } else { + sleep_us = ((next_run_tick - now) * + SPDK_SEC_TO_USEC) / spdk_get_ticks_hz(); } } @@ -593,9 +403,6 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma assert(reactor->socket_id < SPDK_MAX_SOCKET); reactor->max_delay_us = max_delay_us; - TAILQ_INIT(&reactor->active_pollers); - TAILQ_INIT(&reactor->timer_pollers); - reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, reactor->socket_id); if (!reactor->events) { SPDK_NOTICELOG("Ring creation failed on preferred socket %d. Try other sockets.\n", diff --git a/lib/thread/thread.c b/lib/thread/thread.c index 3f0257689..a68d286b7 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -33,8 +33,11 @@ #include "spdk/stdinc.h" +#include "spdk/env.h" +#include "spdk/queue.h" #include "spdk/string.h" #include "spdk/thread.h" +#include "spdk/util.h" #include "spdk_internal/log.h" @@ -66,6 +69,29 @@ struct io_device { static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); +enum spdk_poller_state { + /* The poller is registered with a thread but not currently executing its fn. */ + SPDK_POLLER_STATE_WAITING, + + /* The poller is currently running its fn. */ + SPDK_POLLER_STATE_RUNNING, + + /* The poller was unregistered during the execution of its fn. */ + SPDK_POLLER_STATE_UNREGISTERED, +}; + +struct spdk_poller { + TAILQ_ENTRY(spdk_poller) tailq; + + /* Current state of the poller; should only be accessed from the poller's thread. */ + enum spdk_poller_state state; + + uint64_t period_ticks; + uint64_t next_run_tick; + spdk_poller_fn fn; + void *arg; +}; + struct spdk_thread { pthread_t thread_id; spdk_thread_pass_msg msg_fn; @@ -75,6 +101,19 @@ struct spdk_thread { TAILQ_HEAD(, spdk_io_channel) io_channels; TAILQ_ENTRY(spdk_thread) tailq; char *name; + + /* + * Contains pollers actively running on this thread. Pollers + * are run round-robin. The thread takes one poller from the head + * of the ring, executes it, then puts it back at the tail of + * the ring. + */ + TAILQ_HEAD(, spdk_poller) active_pollers; + + /** + * Contains pollers running on this thread with a periodic timer. + */ + TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; }; static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); @@ -138,6 +177,12 @@ spdk_allocate_thread(spdk_thread_pass_msg msg_fn, return NULL; } + if ((start_poller_fn != NULL && stop_poller_fn == NULL) || + (start_poller_fn == NULL && stop_poller_fn != NULL)) { + SPDK_ERRLOG("start_poller_fn and stop_poller_fn must either both be NULL or both be non-NULL\n"); + return NULL; + } + thread = calloc(1, sizeof(*thread)); if (!thread) { SPDK_ERRLOG("Unable to allocate memory for thread\n"); @@ -152,6 +197,10 @@ spdk_allocate_thread(spdk_thread_pass_msg msg_fn, thread->thread_ctx = thread_ctx; TAILQ_INIT(&thread->io_channels); TAILQ_INSERT_TAIL(&g_threads, thread, tailq); + + TAILQ_INIT(&thread->active_pollers); + TAILQ_INIT(&thread->timer_pollers); + g_thread_count++; if (name) { _set_thread_name(name); @@ -192,6 +241,99 @@ spdk_free_thread(void) pthread_mutex_unlock(&g_devlist_mutex); } +static void +_spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) +{ + struct spdk_poller *iter; + + poller->next_run_tick = now + poller->period_ticks; + + /* + * Insert poller in the thread's timer_pollers list in sorted order by next scheduled + * run time. + */ + TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) { + if (iter->next_run_tick <= poller->next_run_tick) { + TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq); + return; + } + } + + /* No earlier pollers were found, so this poller must be the new head */ + TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq); +} + +int +spdk_thread_poll(struct spdk_thread *thread) +{ + struct spdk_poller *poller; + int rc = 0; + + poller = TAILQ_FIRST(&thread->active_pollers); + if (poller) { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_RUNNING; + rc = poller->fn(poller->arg); + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { + free(poller); + } else { + poller->state = SPDK_POLLER_STATE_WAITING; + TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); + } + +#ifdef DEBUG + if (rc == -1) { + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller); + } +#endif + } + + poller = TAILQ_FIRST(&thread->timer_pollers); + if (poller) { + uint64_t now = spdk_get_ticks(); + + if (now >= poller->next_run_tick) { + int timer_rc = 0; + + TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_RUNNING; + timer_rc = poller->fn(poller->arg); + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { + free(poller); + } else { + poller->state = SPDK_POLLER_STATE_WAITING; + _spdk_poller_insert_timer(thread, poller, now); + } + +#ifdef DEBUG + if (timer_rc == -1) { + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller); + } +#endif + + if (timer_rc > rc) { + rc = timer_rc; + + } + } + } + + return rc; +} + +uint64_t +spdk_thread_next_poller_expiration(struct spdk_thread *thread) +{ + struct spdk_poller *poller; + + poller = TAILQ_FIRST(&thread->timer_pollers); + if (poller) { + return poller->next_run_tick; + } + + return 0; +} + uint32_t spdk_thread_get_count(void) { @@ -240,6 +382,7 @@ spdk_poller_register(spdk_poller_fn fn, { struct spdk_thread *thread; struct spdk_poller *poller; + uint64_t quotient, remainder, ticks; thread = spdk_get_thread(); if (!thread) { @@ -247,17 +390,34 @@ spdk_poller_register(spdk_poller_fn fn, return NULL; } - if (!thread->start_poller_fn || !thread->stop_poller_fn) { - SPDK_ERRLOG("No related functions to start requested poller\n"); - assert(false); + if (thread->start_poller_fn) { + return thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); + } + + poller = calloc(1, sizeof(*poller)); + if (poller == NULL) { + SPDK_ERRLOG("Poller memory allocation failed\n"); return NULL; } - poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); - if (!poller) { - SPDK_ERRLOG("Unable to start requested poller\n"); - assert(false); - return NULL; + poller->state = SPDK_POLLER_STATE_WAITING; + poller->fn = fn; + poller->arg = arg; + + if (period_microseconds) { + quotient = period_microseconds / SPDK_SEC_TO_USEC; + remainder = period_microseconds % SPDK_SEC_TO_USEC; + ticks = spdk_get_ticks_hz(); + + poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; + } else { + poller->period_ticks = 0; + } + + if (poller->period_ticks) { + _spdk_poller_insert_timer(thread, poller, spdk_get_ticks()); + } else { + TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); } return poller; @@ -277,9 +437,31 @@ spdk_poller_unregister(struct spdk_poller **ppoller) *ppoller = NULL; thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } - if (thread) { + if (thread->stop_poller_fn) { thread->stop_poller_fn(poller, thread->thread_ctx); + return; + } + + if (poller->state == SPDK_POLLER_STATE_RUNNING) { + /* + * We are being called from the poller_fn, so set the state to unregistered + * and let the thread poll loop free the poller. + */ + poller->state = SPDK_POLLER_STATE_UNREGISTERED; + } else { + /* Poller is not running currently, so just free it. */ + if (poller->period_ticks) { + TAILQ_REMOVE(&thread->timer_pollers, poller, tailq); + } else { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + } + + free(poller); } }