diff --git a/CHANGELOG.md b/CHANGELOG.md index 96d6c0c23..5166b54ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ SPDK has switched to DPDK's rte_vhost library since 19.07 release, removed the i rte_vhost library which is used for DPDK older than 19.05, removed the experimental vhost nvme target which depends on the internal rte_vhost library. +### thread + +`fd_group` is applied to support interrupt mode. +New APIs were added to support an experimental interrupt mode. This allows modules or +libraries to selectively register file descriptors that the spdk_thread can wait on, +as an alternative to polling. In v20.10, this functionality is enabled in a very small +subset of SPDK libraries and modules. + ### util A new utility named `fd_group` was add. It is now diff --git a/include/spdk/thread.h b/include/spdk/thread.h index 841cf39a8..4b7e65051 100644 --- a/include/spdk/thread.h +++ b/include/spdk/thread.h @@ -729,6 +729,93 @@ void *spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i); */ void spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status); +/** + * A representative for registered interrupt file descriptor. + */ +struct spdk_interrupt; + +/** + * Callback function registered for interrupt file descriptor. + * + * \param ctx Context passed as arg to spdk_interrupt_register(). + * + * \return 0 to indicate that interrupt took place but no events were found; + * positive to indicate that interrupt took place and some events were processed; + * negative if no event information is provided. + */ +typedef int (*spdk_interrupt_fn)(void *ctx); + +/** + * Register an spdk_interrupt on the current thread. The provided function + * will be called any time the associated file descriptor is written to. + * + * \param efd File descriptor of the spdk_interrupt. + * \param fn Called each time there are events in spdk_interrupt. + * \param arg Function argument for fn. + * \param name Human readable name for the spdk_interrupt. Pointer of the spdk_interrupt + * name is set if NULL. + * + * \return a pointer to the spdk_interrupt registered on the current thread on success + * or NULL on failure. + */ +struct spdk_interrupt *spdk_interrupt_register(int efd, spdk_interrupt_fn fn, + void *arg, const char *name); + +/* + * \brief Register an spdk_interrupt on the current thread with setting its name + * to the string of the spdk_interrupt function name. + */ +#define SPDK_INTERRUPT_REGISTER(efd, fn, arg) \ + spdk_interrupt_register(efd, fn, arg, #fn) + +/** + * Unregister an spdk_interrupt on the current thread. + * + * \param pintr The spdk_interrupt to unregister. + */ +void spdk_interrupt_unregister(struct spdk_interrupt **pintr); + +enum spdk_interrupt_event_types { + SPDK_INTERRUPT_EVENT_IN = 0x001, + SPDK_INTERRUPT_EVENT_OUT = 0x004, + SPDK_INTERRUPT_EVENT_ET = 1u << 31 +}; + +/** + * Change the event_types associated with the spdk_interrupt on the current thread. + * + * \param intr The pointer to the spdk_interrupt registered on the current thread. + * \param event_types New event_types for the spdk_interrupt. + * + * \return 0 if success or -errno if failed. + */ +int spdk_interrupt_set_event_types(struct spdk_interrupt *intr, + enum spdk_interrupt_event_types event_types); + +/** + * Return a file descriptor that becomes ready whenever any of the registered + * interrupt file descriptors are ready + * + * \param thread The thread to get. + * + * \return The spdk_interrupt fd of thread itself. + */ +int spdk_thread_get_interrupt_fd(struct spdk_thread *thread); + +/** + * Set SPDK run as event driven mode + * + * \return 0 on success or -errno on failure + */ +int spdk_interrupt_mode_enable(void); + +/** + * Reports whether interrupt mode is set. + * + * \return True if interrupt mode is set, false otherwise. + */ +bool spdk_interrupt_mode_is_enabled(void); + #ifdef __cplusplus } #endif diff --git a/include/spdk_internal/event.h b/include/spdk_internal/event.h index d48e47a6b..37065a59c 100644 --- a/include/spdk_internal/event.h +++ b/include/spdk_internal/event.h @@ -82,6 +82,7 @@ struct spdk_reactor { uint64_t tsc_last; struct spdk_ring *events; + int events_fd; /* The last known rusage values */ struct rusage rusage; @@ -89,6 +90,10 @@ struct spdk_reactor { uint64_t busy_tsc; uint64_t idle_tsc; + + bool interrupt_mode; + struct spdk_fd_group *fgrp; + int resched_fd; } __attribute__((aligned(SPDK_CACHE_LINE_SIZE))); int spdk_reactors_init(void); diff --git a/include/spdk_internal/thread.h b/include/spdk_internal/thread.h index 10bc4824c..9f32898eb 100644 --- a/include/spdk_internal/thread.h +++ b/include/spdk_internal/thread.h @@ -74,6 +74,7 @@ struct spdk_poller { spdk_poller_fn fn; void *arg; struct spdk_thread *thread; + int timerfd; char name[SPDK_MAX_POLLER_NAME_LEN + 1]; }; @@ -112,6 +113,7 @@ struct spdk_thread { */ TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers; struct spdk_ring *messages; + int msg_fd; SLIST_HEAD(, spdk_msg) msg_cache; size_t msg_cache_count; spdk_msg_fn critical_msg; @@ -125,6 +127,9 @@ struct spdk_thread { struct spdk_cpuset cpumask; uint64_t exit_timeout_tsc; + bool interrupt_mode; + struct spdk_fd_group *fgrp; + /* User context allocated at the end */ uint8_t ctx[0]; }; diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 208f9352d..8f75fc65d 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -40,9 +40,12 @@ #include "spdk/thread.h" #include "spdk/env.h" #include "spdk/util.h" +#include "spdk/string.h" +#include "spdk/fd_group.h" #ifdef __linux__ #include +#include #endif #ifdef __FreeBSD__ @@ -59,6 +62,9 @@ static bool g_framework_context_switch_monitor_enabled = true; static struct spdk_mempool *g_spdk_event_mempool = NULL; +static int reactor_interrupt_init(struct spdk_reactor *reactor); +static void reactor_interrupt_fini(struct spdk_reactor *reactor); + static void reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) { @@ -70,6 +76,10 @@ reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); assert(reactor->events != NULL); + + if (spdk_interrupt_mode_is_enabled()) { + reactor_interrupt_init(reactor); + } } struct spdk_reactor * @@ -157,6 +167,10 @@ spdk_reactors_fini(void) if (reactor->events != NULL) { spdk_ring_free(reactor->events); } + + if (reactor->interrupt_mode) { + reactor_interrupt_fini(reactor); + } } spdk_mempool_free(g_spdk_event_mempool); @@ -205,6 +219,15 @@ spdk_event_call(struct spdk_event *event) if (rc != 1) { assert(false); } + + if (reactor->interrupt_mode) { + uint64_t notify = 1; + + rc = write(reactor->events_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); + } + } } static inline uint32_t @@ -224,7 +247,34 @@ event_queue_run_batch(struct spdk_reactor *reactor) memset(events, 0, sizeof(events)); #endif - count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); + if (reactor->interrupt_mode) { + uint64_t notify = 1; + int rc; + + /* There may be race between event_acknowledge and another producer's event_notify, + * so event_acknowledge should be applied ahead. And then check for self's event_notify. + * This can avoid event notification missing. + */ + rc = read(reactor->events_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to acknowledge event queue: %s.\n", spdk_strerror(errno)); + return -errno; + } + + count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); + + if (spdk_ring_count(reactor->events) != 0) { + /* Trigger new notification if there are still events in event-queue waiting for processing. */ + rc = write(reactor->events_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno)); + return -errno; + } + } + } else { + count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); + } + if (count == 0) { return 0; } @@ -313,12 +363,18 @@ static bool reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread) { struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); + int efd; if (spdk_unlikely(lw_thread->resched)) { lw_thread->resched = false; TAILQ_REMOVE(&reactor->threads, lw_thread, link); assert(reactor->thread_count > 0); reactor->thread_count--; + + if (reactor->interrupt_mode) { + efd = spdk_thread_get_interrupt_fd(thread); + spdk_fd_group_remove(reactor->fgrp, efd); + } _reactor_schedule_thread(thread); return true; } @@ -328,6 +384,11 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre TAILQ_REMOVE(&reactor->threads, lw_thread, link); assert(reactor->thread_count > 0); reactor->thread_count--; + + if (reactor->interrupt_mode) { + efd = spdk_thread_get_interrupt_fd(thread); + spdk_fd_group_remove(reactor->fgrp, efd); + } spdk_thread_destroy(thread); return true; } @@ -335,6 +396,16 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre return false; } +static void +reactor_interrupt_run(struct spdk_reactor *reactor) +{ + int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */ + + spdk_fd_group_wait(reactor->fgrp, block_timeout); + + /* TODO: add tsc records and g_framework_context_switch_monitor_enabled */ +} + static void _reactor_run(struct spdk_reactor *reactor) { @@ -387,7 +458,11 @@ reactor_run(void *arg) reactor->tsc_last = spdk_get_ticks(); while (1) { - _reactor_run(reactor); + if (spdk_unlikely(reactor->interrupt_mode)) { + reactor_interrupt_run(reactor); + } else { + _reactor_run(reactor); + } if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { break; @@ -408,6 +483,11 @@ reactor_run(void *arg) TAILQ_REMOVE(&reactor->threads, lw_thread, link); assert(reactor->thread_count > 0); reactor->thread_count--; + if (reactor->interrupt_mode) { + int efd = spdk_thread_get_interrupt_fd(thread); + + spdk_fd_group_remove(reactor->fgrp, efd); + } spdk_thread_destroy(thread); } else { spdk_thread_poll(thread, 0, 0); @@ -492,18 +572,44 @@ spdk_reactors_start(void) void spdk_reactors_stop(void *arg1) { + uint32_t i; + int rc; + struct spdk_reactor *reactor; + uint64_t notify = 1; + g_reactor_state = SPDK_REACTOR_STATE_EXITING; + + if (spdk_interrupt_mode_is_enabled()) { + SPDK_ENV_FOREACH_CORE(i) { + reactor = spdk_reactor_get(i); + + rc = write(reactor->events_fd, ¬ify, sizeof(notify)); + if (rc < 0) { + SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno)); + continue; + } + } + } } static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; static uint32_t g_next_core = UINT32_MAX; +static int +thread_process_interrupts(void *arg) +{ + struct spdk_thread *thread = arg; + + return spdk_thread_poll(thread, 0, 0); +} + static void _schedule_thread(void *arg1, void *arg2) { struct spdk_lw_thread *lw_thread = arg1; struct spdk_reactor *reactor; uint32_t current_core; + int efd; current_core = spdk_env_get_current_core(); @@ -512,6 +618,18 @@ _schedule_thread(void *arg1, void *arg2) TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); reactor->thread_count++; + + if (reactor->interrupt_mode) { + int rc; + struct spdk_thread *thread; + + thread = spdk_thread_get_from_ctx(lw_thread); + efd = spdk_thread_get_interrupt_fd(thread); + rc = spdk_fd_group_add(reactor->fgrp, efd, thread_process_interrupts, thread); + if (rc < 0) { + SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc)); + } + } } static int @@ -561,6 +679,8 @@ static void _reactor_request_thread_reschedule(struct spdk_thread *thread) { struct spdk_lw_thread *lw_thread; + struct spdk_reactor *reactor; + uint32_t current_core; assert(thread == spdk_get_thread()); @@ -569,6 +689,17 @@ _reactor_request_thread_reschedule(struct spdk_thread *thread) assert(lw_thread != NULL); lw_thread->resched = true; + + current_core = spdk_env_get_current_core(); + reactor = spdk_reactor_get(current_core); + assert(reactor != NULL); + if (reactor->interrupt_mode) { + uint64_t notify = 1; + + if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { + SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno)); + } + } } static int @@ -660,4 +791,102 @@ spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cp spdk_event_call(evt); } +#ifdef __linux__ +static int +reactor_schedule_thread_event(void *arg) +{ + struct spdk_reactor *reactor = arg; + struct spdk_lw_thread *lw_thread, *tmp; + uint32_t count = 0; + uint64_t notify = 1; + + assert(reactor->interrupt_mode); + + if (read(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) { + SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno)); + return -errno; + } + + TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { + count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0; + } + + return count; +} + +static int +reactor_interrupt_init(struct spdk_reactor *reactor) +{ + int rc; + + rc = spdk_fd_group_create(&reactor->fgrp); + if (rc != 0) { + return rc; + } + + reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (reactor->resched_fd < 0) { + rc = -EBADF; + goto err; + } + + rc = spdk_fd_group_add(reactor->fgrp, reactor->resched_fd, reactor_schedule_thread_event, + reactor); + if (rc) { + close(reactor->resched_fd); + goto err; + } + + reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (reactor->events_fd < 0) { + spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); + close(reactor->resched_fd); + + rc = -EBADF; + goto err; + } + + rc = spdk_fd_group_add(reactor->fgrp, reactor->events_fd, + (spdk_fd_fn)event_queue_run_batch, reactor); + if (rc) { + spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd); + close(reactor->resched_fd); + close(reactor->events_fd); + goto err; + } + + reactor->interrupt_mode = true; + return 0; + +err: + spdk_fd_group_destroy(reactor->fgrp); + return rc; +} +#else +static int +reactor_interrupt_init(struct spdk_reactor *reactor) +{ + return -ENOTSUP; +} +#endif + +static void +reactor_interrupt_fini(struct spdk_reactor *reactor) +{ + struct spdk_fd_group *fgrp = reactor->fgrp; + + if (!fgrp) { + return; + } + + spdk_fd_group_remove(fgrp, reactor->events_fd); + spdk_fd_group_remove(fgrp, reactor->resched_fd); + + close(reactor->events_fd); + close(reactor->resched_fd); + + spdk_fd_group_destroy(fgrp); + reactor->fgrp = NULL; +} + SPDK_LOG_REGISTER_COMPONENT(reactor) diff --git a/lib/thread/Makefile b/lib/thread/Makefile index ceb7a394e..26d0d950f 100644 --- a/lib/thread/Makefile +++ b/lib/thread/Makefile @@ -34,7 +34,7 @@ SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..) include $(SPDK_ROOT_DIR)/mk/spdk.common.mk -SO_VER := 3 +SO_VER := 4 SO_MINOR := 0 C_SRCS = thread.c diff --git a/lib/thread/spdk_thread.map b/lib/thread/spdk_thread.map index b71fa06eb..ea0024412 100644 --- a/lib/thread/spdk_thread.map +++ b/lib/thread/spdk_thread.map @@ -46,6 +46,12 @@ spdk_io_channel_iter_get_channel; spdk_io_channel_iter_get_ctx; spdk_for_each_channel_continue; + spdk_interrupt_register; + spdk_interrupt_unregister; + spdk_interrupt_set_event_types; + spdk_thread_get_interrupt_fd; + spdk_interrupt_mode_enable; + spdk_interrupt_mode_is_enabled; # internal functions in spdk_internal/thread.h spdk_poller_state_str; diff --git a/lib/thread/thread.c b/lib/thread/thread.c index d80687f0f..dffbc4211 100644 --- a/lib/thread/thread.c +++ b/lib/thread/thread.c @@ -39,10 +39,16 @@ #include "spdk/string.h" #include "spdk/thread.h" #include "spdk/util.h" +#include "spdk/fd_group.h" #include "spdk/log.h" #include "spdk_internal/thread.h" +#ifdef __linux__ +#include +#include +#endif + #define SPDK_MSG_BATCH_SIZE 8 #define SPDK_MAX_DEVICE_NAME_LEN 256 #define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 @@ -178,6 +184,9 @@ spdk_thread_lib_fini(void) g_ctx_sz = 0; } +static void thread_interrupt_destroy(struct spdk_thread *thread); +static int thread_interrupt_create(struct spdk_thread *thread); + static void _free_thread(struct spdk_thread *thread) { @@ -233,6 +242,10 @@ _free_thread(struct spdk_thread *thread) assert(thread->msg_cache_count == 0); + if (thread->interrupt_mode) { + thread_interrupt_destroy(thread); + } + spdk_ring_free(thread->messages); free(thread); } @@ -304,6 +317,15 @@ spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n", thread->id, thread->name); + if (spdk_interrupt_mode_is_enabled()) { + thread->interrupt_mode = true; + rc = thread_interrupt_create(thread); + if (rc != 0) { + _free_thread(thread); + return NULL; + } + } + if (g_new_thread_fn) { rc = g_new_thread_fn(thread); } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { @@ -477,6 +499,7 @@ msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) { unsigned count, i; void *messages[SPDK_MSG_BATCH_SIZE]; + uint64_t notify = 1; #ifdef DEBUG /* @@ -492,8 +515,18 @@ msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) } else { max_msgs = SPDK_MSG_BATCH_SIZE; } + if (thread->interrupt_mode) { + /* There may be race between msg_acknowledge and another producer's msg_notify, + * so msg_acknowledge should be applied ahead. And then check for self's msg_notify. + * This can avoid msg notification missing. + */ + read(thread->msg_fd, ¬ify, sizeof(notify)); + } count = spdk_ring_dequeue(thread->messages, messages, max_msgs); + if (thread->interrupt_mode && spdk_ring_count(thread->messages) != 0) { + write(thread->msg_fd, ¬ify, sizeof(notify)); + } if (count == 0) { return 0; } @@ -688,7 +721,13 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) now = spdk_get_ticks(); } - rc = thread_poll(thread, max_msgs, now); + if (!thread->interrupt_mode) { + rc = thread_poll(thread, max_msgs, now); + } else { + /* Non-block wait on thread's fd_group */ + rc = spdk_fd_group_wait(thread->fgrp, 0); + } + if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { thread_exit(thread, now); @@ -875,6 +914,12 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx return -EIO; } + if (thread->interrupt_mode) { + uint64_t notify = 1; + + write(thread->msg_fd, ¬ify, sizeof(notify)); + } + return 0; } @@ -885,12 +930,105 @@ spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { + if (thread->interrupt_mode) { + uint64_t notify = 1; + + write(thread->msg_fd, ¬ify, sizeof(notify)); + } + return 0; } return -EIO; } +#ifdef __linux__ +static int +interrupt_timerfd_prepare(uint64_t period_microseconds) +{ + int timerfd; + int ret; + struct itimerspec new_tv; + uint64_t period_seconds; + uint64_t period_nanoseconds; + + if (period_microseconds == 0) { + return -EINVAL; + } + + period_seconds = period_microseconds / SPDK_SEC_TO_USEC; + period_nanoseconds = period_microseconds % SPDK_SEC_TO_USEC * 1000; + + new_tv.it_value.tv_sec = period_seconds; + new_tv.it_value.tv_nsec = period_nanoseconds; + + new_tv.it_interval.tv_sec = period_seconds; + new_tv.it_interval.tv_nsec = period_nanoseconds; + + timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK & TFD_CLOEXEC); + if (timerfd < 0) { + return -errno; + } + + ret = timerfd_settime(timerfd, 0, &new_tv, NULL); + if (ret < 0) { + close(timerfd); + return -errno; + } + + return timerfd; +} +#else +static int +interrupt_timerfd_prepare(uint64_t period_microseconds) +{ + return -ENOTSUP; +} +#endif + +static int +interrupt_timerfd_process(void *arg) +{ + struct spdk_poller *poller = arg; + uint64_t exp; + int rc; + + /* clear the level of interval timer */ + rc = read(poller->timerfd, &exp, sizeof(exp)); + if (rc < 0) { + if (rc == -EAGAIN) { + return 0; + } + + return rc; + } + + return poller->fn(poller->arg); +} + +static int +thread_interrupt_register_timerfd(struct spdk_fd_group *fgrp, + uint64_t period_microseconds, + struct spdk_poller *poller) +{ + int timerfd; + int rc; + + timerfd = interrupt_timerfd_prepare(period_microseconds); + if (timerfd < 0) { + return timerfd; + } + + rc = spdk_fd_group_add(fgrp, timerfd, + interrupt_timerfd_process, poller); + if (rc < 0) { + close(timerfd); + return rc; + } + + return timerfd; +} + static struct spdk_poller * poller_register(spdk_poller_fn fn, void *arg, @@ -939,6 +1077,18 @@ poller_register(spdk_poller_fn fn, poller->period_ticks = 0; } + if (thread->interrupt_mode && period_microseconds != 0) { + int rc; + + rc = thread_interrupt_register_timerfd(thread->fgrp, period_microseconds, poller); + if (rc < 0) { + SPDK_ERRLOG("Failed to register timerfd for periodic poller: %s\n", spdk_strerror(-rc)); + free(poller); + return NULL; + } + poller->timerfd = rc; + } + thread_insert_poller(thread, poller); return poller; @@ -986,6 +1136,12 @@ spdk_poller_unregister(struct spdk_poller **ppoller) return; } + if (thread->interrupt_mode && poller->timerfd) { + spdk_fd_group_remove(thread->fgrp, poller->timerfd); + close(poller->timerfd); + poller->timerfd = 0; + } + /* If the poller was paused, put it on the active_pollers list so that * its unregistration can be processed by spdk_thread_poll(). */ @@ -1638,5 +1794,209 @@ end: assert(rc == 0); } +struct spdk_interrupt { + int efd; + struct spdk_thread *thread; + char name[SPDK_MAX_POLLER_NAME_LEN + 1]; +}; + +static void +thread_interrupt_destroy(struct spdk_thread *thread) +{ + struct spdk_fd_group *fgrp = thread->fgrp; + + SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name); + + if (thread->msg_fd <= 0) { + return; + } + + spdk_fd_group_remove(fgrp, thread->msg_fd); + close(thread->msg_fd); + + spdk_fd_group_destroy(fgrp); + thread->fgrp = NULL; +} + +#ifdef __linux__ +static int +thread_interrupt_msg_process(void *arg) +{ + struct spdk_thread *thread = arg; + struct spdk_thread *orig_thread; + uint32_t msg_count; + spdk_msg_fn critical_msg; + int rc = 0; + uint64_t now = spdk_get_ticks(); + + orig_thread = _get_thread(); + tls_thread = thread; + + critical_msg = thread->critical_msg; + if (spdk_unlikely(critical_msg != NULL)) { + critical_msg(NULL); + thread->critical_msg = NULL; + } + + msg_count = msg_queue_run_batch(thread, 0); + if (msg_count) { + rc = 1; + } + + thread_update_stats(thread, spdk_get_ticks(), now, rc); + + tls_thread = orig_thread; + + return rc; +} + +static int +thread_interrupt_create(struct spdk_thread *thread) +{ + int rc; + + SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name); + + rc = spdk_fd_group_create(&thread->fgrp); + if (rc) { + return rc; + } + + thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (thread->msg_fd < 0) { + rc = -errno; + spdk_fd_group_destroy(thread->fgrp); + thread->fgrp = NULL; + + return rc; + } + + return spdk_fd_group_add(thread->fgrp, thread->msg_fd, thread_interrupt_msg_process, thread); +} +#else +static int +thread_interrupt_create(struct spdk_thread *thread) +{ + return -ENOTSUP; +} +#endif + +struct spdk_interrupt * +spdk_interrupt_register(int efd, spdk_interrupt_fn fn, + void *arg, const char *name) +{ + struct spdk_thread *thread; + struct spdk_interrupt *intr; + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return NULL; + } + + if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) { + SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); + return NULL; + } + + if (spdk_fd_group_add(thread->fgrp, efd, fn, arg)) { + return NULL; + } + + intr = calloc(1, sizeof(*intr)); + if (intr == NULL) { + SPDK_ERRLOG("Interrupt handler allocation failed\n"); + return NULL; + } + + if (name) { + snprintf(intr->name, sizeof(intr->name), "%s", name); + } else { + snprintf(intr->name, sizeof(intr->name), "%p", fn); + } + + intr->efd = efd; + intr->thread = thread; + + return intr; +} + +void +spdk_interrupt_unregister(struct spdk_interrupt **pintr) +{ + struct spdk_thread *thread; + struct spdk_interrupt *intr; + + intr = *pintr; + if (intr == NULL) { + return; + } + + *pintr = NULL; + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } + + if (intr->thread != thread) { + SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); + assert(false); + return; + } + + spdk_fd_group_remove(thread->fgrp, intr->efd); + free(intr); +} + +int +spdk_interrupt_set_event_types(struct spdk_interrupt *intr, + enum spdk_interrupt_event_types event_types) +{ + struct spdk_thread *thread; + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return -EINVAL; + } + + if (intr->thread != thread) { + SPDK_ERRLOG("different from the thread that called spdk_interrupt_register()\n"); + assert(false); + return -EINVAL; + } + + return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types); +} + +int +spdk_thread_get_interrupt_fd(struct spdk_thread *thread) +{ + return spdk_fd_group_get_fd(thread->fgrp); +} + +static bool g_interrupt_mode = false; + +int +spdk_interrupt_mode_enable(void) +{ +#ifdef __linux__ + SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n"); + g_interrupt_mode = true; + return 0; +#else + SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n"); + g_interrupt_mode = false; + return -ENOTSUP; +#endif +} + +bool +spdk_interrupt_mode_is_enabled(void) +{ + return g_interrupt_mode; +} SPDK_LOG_REGISTER_COMPONENT(thread)