Spdk/lib/thread/thread.c

3053 lines
72 KiB
C
Raw Normal View History

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (C) 2016 Intel Corporation.
* All rights reserved.
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*/
#include "spdk/stdinc.h"
#include "spdk/env.h"
#include "spdk/likely.h"
#include "spdk/queue.h"
#include "spdk/string.h"
#include "spdk/thread.h"
#include "spdk/trace.h"
#include "spdk/util.h"
#include "spdk/fd_group.h"
#include "spdk/log.h"
#include "spdk_internal/thread.h"
#include "spdk_internal/usdt.h"
#include "thread_internal.h"
#include "spdk_internal/trace_defs.h"
#ifdef __linux__
#include <sys/timerfd.h>
#include <sys/eventfd.h>
#include <execinfo.h>
#endif
#ifdef __FreeBSD__
#include <execinfo.h>
#endif
#define SPDK_MSG_BATCH_SIZE 8
#define SPDK_MAX_DEVICE_NAME_LEN 256
#define SPDK_THREAD_EXIT_TIMEOUT_SEC 5
#define SPDK_MAX_POLLER_NAME_LEN 256
#define SPDK_MAX_THREAD_NAME_LEN 256
static struct spdk_thread *g_app_thread;
struct spdk_interrupt {
int efd;
struct spdk_thread *thread;
spdk_interrupt_fn fn;
void *arg;
char name[SPDK_MAX_POLLER_NAME_LEN + 1];
};
enum spdk_poller_state {
/* The poller is registered with a thread but not currently executing its fn. */
SPDK_POLLER_STATE_WAITING,
/* The poller is currently running its fn. */
SPDK_POLLER_STATE_RUNNING,
/* The poller was unregistered during the execution of its fn. */
SPDK_POLLER_STATE_UNREGISTERED,
/* The poller is in the process of being paused. It will be paused
* during the next time it's supposed to be executed.
*/
SPDK_POLLER_STATE_PAUSING,
/* The poller is registered but currently paused. It's on the
* paused_pollers list.
*/
SPDK_POLLER_STATE_PAUSED,
};
struct spdk_poller {
TAILQ_ENTRY(spdk_poller) tailq;
RB_ENTRY(spdk_poller) node;
/* Current state of the poller; should only be accessed from the poller's thread. */
enum spdk_poller_state state;
uint64_t period_ticks;
uint64_t next_run_tick;
uint64_t run_count;
uint64_t busy_count;
uint64_t id;
spdk_poller_fn fn;
void *arg;
struct spdk_thread *thread;
struct spdk_interrupt *intr;
spdk_poller_set_interrupt_mode_cb set_intr_cb_fn;
void *set_intr_cb_arg;
char name[SPDK_MAX_POLLER_NAME_LEN + 1];
};
enum spdk_thread_state {
/* The thread is processing poller and message by spdk_thread_poll(). */
SPDK_THREAD_STATE_RUNNING,
/* The thread is in the process of termination. It reaps unregistering
* poller are releasing I/O channel.
*/
SPDK_THREAD_STATE_EXITING,
/* The thread is exited. It is ready to call spdk_thread_destroy(). */
SPDK_THREAD_STATE_EXITED,
};
struct spdk_thread {
uint64_t tsc_last;
struct spdk_thread_stats stats;
/*
* Contains pollers actively running on this thread. Pollers
* are run round-robin. The thread takes one poller from the head
* of the ring, executes it, then puts it back at the tail of
* the ring.
*/
TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers;
/**
* Contains pollers running on this thread with a periodic timer.
*/
RB_HEAD(timed_pollers_tree, spdk_poller) timed_pollers;
struct spdk_poller *first_timed_poller;
/*
* Contains paused pollers. Pollers on this queue are waiting until
* they are resumed (in which case they're put onto the active/timer
* queues) or unregistered.
*/
TAILQ_HEAD(paused_pollers_head, spdk_poller) paused_pollers;
struct spdk_ring *messages;
int msg_fd;
SLIST_HEAD(, spdk_msg) msg_cache;
size_t msg_cache_count;
spdk_msg_fn critical_msg;
uint64_t id;
uint64_t next_poller_id;
enum spdk_thread_state state;
int pending_unregister_count;
RB_HEAD(io_channel_tree, spdk_io_channel) io_channels;
TAILQ_ENTRY(spdk_thread) tailq;
char name[SPDK_MAX_THREAD_NAME_LEN + 1];
struct spdk_cpuset cpumask;
uint64_t exit_timeout_tsc;
int32_t lock_count;
/* Indicates whether this spdk_thread currently runs in interrupt. */
bool in_interrupt;
bool poller_unregistered;
struct spdk_fd_group *fgrp;
/* User context allocated at the end */
uint8_t ctx[0];
};
static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
static spdk_new_thread_fn g_new_thread_fn = NULL;
static spdk_thread_op_fn g_thread_op_fn = NULL;
static spdk_thread_op_supported_fn g_thread_op_supported_fn;
static size_t g_ctx_sz = 0;
/* Monotonic increasing ID is set to each created thread beginning at 1. Once the
* ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
* SPDK application is required.
*/
static uint64_t g_thread_id = 1;
enum spin_error {
SPIN_ERR_NONE,
/* Trying to use an SPDK lock while not on an SPDK thread */
SPIN_ERR_NOT_SPDK_THREAD,
/* Trying to lock a lock already held by this SPDK thread */
SPIN_ERR_DEADLOCK,
/* Trying to unlock a lock not held by this SPDK thread */
SPIN_ERR_WRONG_THREAD,
/* pthread_spin_*() returned an error */
SPIN_ERR_PTHREAD,
/* Trying to destroy a lock that is held */
SPIN_ERR_LOCK_HELD,
/* lock_count is invalid */
SPIN_ERR_LOCK_COUNT,
/*
* An spdk_thread may migrate to another pthread. A spinlock held across migration leads to
* undefined behavior. A spinlock held when an SPDK thread goes off CPU would lead to
* deadlock when another SPDK thread on the same pthread tries to take that lock.
*/
SPIN_ERR_HOLD_DURING_SWITCH,
/* Trying to use a lock that was destroyed (but not re-initialized) */
SPIN_ERR_DESTROYED,
/* Trying to use a lock that is not initialized */
SPIN_ERR_NOT_INITIALIZED,
/* Must be last, not an actual error code */
SPIN_ERR_LAST
};
static const char *spin_error_strings[] = {
[SPIN_ERR_NONE] = "No error",
[SPIN_ERR_NOT_SPDK_THREAD] = "Not an SPDK thread",
[SPIN_ERR_DEADLOCK] = "Deadlock detected",
[SPIN_ERR_WRONG_THREAD] = "Unlock on wrong SPDK thread",
[SPIN_ERR_PTHREAD] = "Error from pthread_spinlock",
[SPIN_ERR_LOCK_HELD] = "Destroying a held spinlock",
[SPIN_ERR_LOCK_COUNT] = "Lock count is invalid",
[SPIN_ERR_HOLD_DURING_SWITCH] = "Lock(s) held while SPDK thread going off CPU",
[SPIN_ERR_DESTROYED] = "Lock has been destroyed",
[SPIN_ERR_NOT_INITIALIZED] = "Lock has not been initialized",
};
#define SPIN_ERROR_STRING(err) (err < 0 || err >= SPDK_COUNTOF(spin_error_strings)) \
? "Unknown error" : spin_error_strings[err]
static void
__posix_abort(enum spin_error err)
{
abort();
}
typedef void (*spin_abort)(enum spin_error err);
spin_abort g_spin_abort_fn = __posix_abort;
#define SPIN_ASSERT_IMPL(cond, err, extra_log, ret) \
do { \
if (spdk_unlikely(!(cond))) { \
SPDK_ERRLOG("unrecoverable spinlock error %d: %s (%s)\n", err, \
SPIN_ERROR_STRING(err), #cond); \
extra_log; \
g_spin_abort_fn(err); \
ret; \
} \
} while (0)
#define SPIN_ASSERT_LOG_STACKS(cond, err, lock) \
SPIN_ASSERT_IMPL(cond, err, sspin_stacks_print(sspin), return)
#define SPIN_ASSERT_RETURN(cond, err, ret) SPIN_ASSERT_IMPL(cond, err, , return ret)
#define SPIN_ASSERT(cond, err) SPIN_ASSERT_IMPL(cond, err, ,)
struct io_device {
void *io_device;
char name[SPDK_MAX_DEVICE_NAME_LEN + 1];
spdk_io_channel_create_cb create_cb;
spdk_io_channel_destroy_cb destroy_cb;
spdk_io_device_unregister_cb unregister_cb;
struct spdk_thread *unregister_thread;
uint32_t ctx_size;
uint32_t for_each_count;
RB_ENTRY(io_device) node;
uint32_t refcnt;
bool pending_unregister;
bool unregistered;
};
static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);
static int
io_device_cmp(struct io_device *dev1, struct io_device *dev2)
{
return (dev1->io_device < dev2->io_device ? -1 : dev1->io_device > dev2->io_device);
}
RB_GENERATE_STATIC(io_device_tree, io_device, node, io_device_cmp);
static int
io_channel_cmp(struct spdk_io_channel *ch1, struct spdk_io_channel *ch2)
{
return (ch1->dev < ch2->dev ? -1 : ch1->dev > ch2->dev);
}
RB_GENERATE_STATIC(io_channel_tree, spdk_io_channel, node, io_channel_cmp);
struct spdk_msg {
spdk_msg_fn fn;
void *arg;
SLIST_ENTRY(spdk_msg) link;
};
#define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024
static struct spdk_mempool *g_spdk_msg_mempool = NULL;
static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
static uint32_t g_thread_count = 0;
static __thread struct spdk_thread *tls_thread = NULL;
SPDK_TRACE_REGISTER_FN(thread_trace, "thread", TRACE_GROUP_THREAD)
{
spdk_trace_register_description("THREAD_IOCH_GET",
TRACE_THREAD_IOCH_GET,
OWNER_NONE, OBJECT_NONE, 0,
SPDK_TRACE_ARG_TYPE_INT, "refcnt");
spdk_trace_register_description("THREAD_IOCH_PUT",
TRACE_THREAD_IOCH_PUT,
OWNER_NONE, OBJECT_NONE, 0,
SPDK_TRACE_ARG_TYPE_INT, "refcnt");
}
/*
* If this compare function returns zero when two next_run_ticks are equal,
* the macro RB_INSERT() returns a pointer to the element with the same
* next_run_tick.
*
* Fortunately, the macro RB_REMOVE() takes not a key but a pointer to the element
* to remove as a parameter.
*
* Hence we allow RB_INSERT() to insert elements with the same keys on the right
* side by returning 1 when two next_run_ticks are equal.
*/
static inline int
timed_poller_compare(struct spdk_poller *poller1, struct spdk_poller *poller2)
{
if (poller1->next_run_tick < poller2->next_run_tick) {
return -1;
} else {
return 1;
}
}
RB_GENERATE_STATIC(timed_pollers_tree, spdk_poller, node, timed_poller_compare);
static inline struct spdk_thread *
_get_thread(void)
{
return tls_thread;
}
static int
_thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz)
{
char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
g_ctx_sz = ctx_sz;
snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
g_spdk_msg_mempool = spdk_mempool_create(mempool_name, msg_mempool_sz,
sizeof(struct spdk_msg),
0, /* No cache. We do our own. */
SPDK_ENV_SOCKET_ID_ANY);
SPDK_DEBUGLOG(thread, "spdk_msg_mempool was created with size: %zu\n",
msg_mempool_sz);
if (!g_spdk_msg_mempool) {
SPDK_ERRLOG("spdk_msg_mempool creation failed\n");
return -ENOMEM;
}
return 0;
}
static void thread_interrupt_destroy(struct spdk_thread *thread);
static int thread_interrupt_create(struct spdk_thread *thread);
static void
_free_thread(struct spdk_thread *thread)
{
struct spdk_io_channel *ch;
struct spdk_msg *msg;
struct spdk_poller *poller, *ptmp;
RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
thread->name, ch->dev->name);
}
TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
SPDK_WARNLOG("active_poller %s still registered at thread exit\n",
poller->name);
}
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
}
RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, ptmp) {
if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
SPDK_WARNLOG("timed_poller %s still registered at thread exit\n",
poller->name);
}
RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
free(poller);
}
TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
SPDK_WARNLOG("paused_poller %s still registered at thread exit\n", poller->name);
TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
free(poller);
}
pthread_mutex_lock(&g_devlist_mutex);
assert(g_thread_count > 0);
g_thread_count--;
TAILQ_REMOVE(&g_threads, thread, tailq);
pthread_mutex_unlock(&g_devlist_mutex);
msg = SLIST_FIRST(&thread->msg_cache);
while (msg != NULL) {
SLIST_REMOVE_HEAD(&thread->msg_cache, link);
assert(thread->msg_cache_count > 0);
thread->msg_cache_count--;
spdk_mempool_put(g_spdk_msg_mempool, msg);
msg = SLIST_FIRST(&thread->msg_cache);
}
assert(thread->msg_cache_count == 0);
if (spdk_interrupt_mode_is_enabled()) {
thread_interrupt_destroy(thread);
}
spdk_ring_free(thread->messages);
free(thread);
}
int
spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
{
assert(g_new_thread_fn == NULL);
assert(g_thread_op_fn == NULL);
if (new_thread_fn == NULL) {
SPDK_INFOLOG(thread, "new_thread_fn was not specified at spdk_thread_lib_init\n");
} else {
g_new_thread_fn = new_thread_fn;
}
return _thread_lib_init(ctx_sz, SPDK_DEFAULT_MSG_MEMPOOL_SIZE);
}
int
spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
spdk_thread_op_supported_fn thread_op_supported_fn,
size_t ctx_sz, size_t msg_mempool_sz)
{
assert(g_new_thread_fn == NULL);
assert(g_thread_op_fn == NULL);
assert(g_thread_op_supported_fn == NULL);
if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
SPDK_ERRLOG("Both must be defined or undefined together.\n");
return -EINVAL;
}
if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
SPDK_INFOLOG(thread, "thread_op_fn and thread_op_supported_fn were not specified\n");
} else {
g_thread_op_fn = thread_op_fn;
g_thread_op_supported_fn = thread_op_supported_fn;
}
return _thread_lib_init(ctx_sz, msg_mempool_sz);
}
void
spdk_thread_lib_fini(void)
{
struct io_device *dev;
RB_FOREACH(dev, io_device_tree, &g_io_devices) {
SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
}
g_new_thread_fn = NULL;
g_thread_op_fn = NULL;
g_thread_op_supported_fn = NULL;
g_ctx_sz = 0;
if (g_app_thread != NULL) {
_free_thread(g_app_thread);
g_app_thread = NULL;
}
if (g_spdk_msg_mempool) {
spdk_mempool_free(g_spdk_msg_mempool);
g_spdk_msg_mempool = NULL;
}
}
struct spdk_thread *
spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
{
struct spdk_thread *thread, *null_thread;
struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
int rc = 0, i;
thread = calloc(1, sizeof(*thread) + g_ctx_sz);
if (!thread) {
SPDK_ERRLOG("Unable to allocate memory for thread\n");
return NULL;
}
if (cpumask) {
spdk_cpuset_copy(&thread->cpumask, cpumask);
} else {
spdk_cpuset_negate(&thread->cpumask);
}
RB_INIT(&thread->io_channels);
TAILQ_INIT(&thread->active_pollers);
RB_INIT(&thread->timed_pollers);
TAILQ_INIT(&thread->paused_pollers);
SLIST_INIT(&thread->msg_cache);
thread->msg_cache_count = 0;
thread->tsc_last = spdk_get_ticks();
/* Monotonic increasing ID is set to each created poller beginning at 1. Once the
* ID exceeds UINT64_MAX a warning message is logged
*/
thread->next_poller_id = 1;
thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
if (!thread->messages) {
SPDK_ERRLOG("Unable to allocate memory for message ring\n");
free(thread);
return NULL;
}
/* Fill the local message pool cache. */
rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
if (rc == 0) {
/* If we can't populate the cache it's ok. The cache will get filled
* up organically as messages are passed to the thread. */
for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
thread->msg_cache_count++;
}
}
if (name) {
snprintf(thread->name, sizeof(thread->name), "%s", name);
} else {
snprintf(thread->name, sizeof(thread->name), "%p", thread);
}
pthread_mutex_lock(&g_devlist_mutex);
if (g_thread_id == 0) {
SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
pthread_mutex_unlock(&g_devlist_mutex);
_free_thread(thread);
return NULL;
}
thread->id = g_thread_id++;
TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
g_thread_count++;
pthread_mutex_unlock(&g_devlist_mutex);
SPDK_DEBUGLOG(thread, "Allocating new thread (%" PRIu64 ", %s)\n",
thread->id, thread->name);
if (spdk_interrupt_mode_is_enabled()) {
thread->in_interrupt = true;
rc = thread_interrupt_create(thread);
if (rc != 0) {
_free_thread(thread);
return NULL;
}
}
if (g_new_thread_fn) {
rc = g_new_thread_fn(thread);
} else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
}
if (rc != 0) {
_free_thread(thread);
return NULL;
}
thread->state = SPDK_THREAD_STATE_RUNNING;
/* If this is the first thread, save it as the app thread. Use an atomic
* compare + exchange to guard against crazy users who might try to
* call spdk_thread_create() simultaneously on multiple threads.
*/
null_thread = NULL;
__atomic_compare_exchange_n(&g_app_thread, &null_thread, thread, false,
__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
return thread;
}
struct spdk_thread *
spdk_thread_get_app_thread(void)
{
return g_app_thread;
}
void
spdk_set_thread(struct spdk_thread *thread)
{
tls_thread = thread;
}
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
static void
thread_exit(struct spdk_thread *thread, uint64_t now)
{
struct spdk_poller *poller;
struct spdk_io_channel *ch;
if (now >= thread->exit_timeout_tsc) {
SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
thread->name);
goto exited;
}
if (spdk_ring_count(thread->messages) > 0) {
SPDK_INFOLOG(thread, "thread %s still has messages\n", thread->name);
return;
}
TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
SPDK_INFOLOG(thread,
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
"thread %s still has active poller %s\n",
thread->name, poller->name);
return;
}
}
RB_FOREACH(poller, timed_pollers_tree, &thread->timed_pollers) {
if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
SPDK_INFOLOG(thread,
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
"thread %s still has active timed poller %s\n",
thread->name, poller->name);
return;
}
}
TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
SPDK_INFOLOG(thread,
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
"thread %s still has paused poller %s\n",
thread->name, poller->name);
return;
}
RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
SPDK_INFOLOG(thread,
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
"thread %s still has channel for io_device %s\n",
thread->name, ch->dev->name);
return;
}
if (thread->pending_unregister_count > 0) {
SPDK_INFOLOG(thread,
"thread %s is still unregistering io_devices\n",
thread->name);
return;
}
exited:
thread->state = SPDK_THREAD_STATE_EXITED;
if (spdk_unlikely(thread->in_interrupt)) {
g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
}
}
static void _thread_exit(void *ctx);
int
spdk_thread_exit(struct spdk_thread *thread)
{
SPDK_DEBUGLOG(thread, "Exit thread %s\n", thread->name);
assert(tls_thread == thread);
if (thread->state >= SPDK_THREAD_STATE_EXITING) {
SPDK_INFOLOG(thread,
"thread %s is already exiting\n",
thread->name);
return 0;
}
thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
SPDK_THREAD_EXIT_TIMEOUT_SEC);
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
thread->state = SPDK_THREAD_STATE_EXITING;
if (spdk_interrupt_mode_is_enabled()) {
spdk_thread_send_msg(thread, _thread_exit, thread);
}
lib/thread: thread_poll() polls until the exiting thread is exited Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2020-04-04 23:19:43 +00:00
return 0;
}
bool
spdk_thread_is_running(struct spdk_thread *thread)
{
return thread->state == SPDK_THREAD_STATE_RUNNING;
}
bool
spdk_thread_is_exited(struct spdk_thread *thread)
{
return thread->state == SPDK_THREAD_STATE_EXITED;
}
void
spdk_thread_destroy(struct spdk_thread *thread)
{
assert(thread != NULL);
SPDK_DEBUGLOG(thread, "Destroy thread %s\n", thread->name);
assert(thread->state == SPDK_THREAD_STATE_EXITED);
if (tls_thread == thread) {
tls_thread = NULL;
}
/* To be safe, do not free the app thread until spdk_thread_lib_fini(). */
if (thread != g_app_thread) {
_free_thread(thread);
}
}
void *
spdk_thread_get_ctx(struct spdk_thread *thread)
{
if (g_ctx_sz > 0) {
return thread->ctx;
}
return NULL;
}
struct spdk_cpuset *
spdk_thread_get_cpumask(struct spdk_thread *thread)
{
return &thread->cpumask;
}
int
spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
{
struct spdk_thread *thread;
if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
SPDK_ERRLOG("Framework does not support reschedule operation.\n");
assert(false);
return -ENOTSUP;
}
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("Called from non-SPDK thread\n");
assert(false);
return -EINVAL;
}
spdk_cpuset_copy(&thread->cpumask, cpumask);
/* Invoke framework's reschedule operation. If this function is called multiple times
* in a single spdk_thread_poll() context, the last cpumask will be used in the
* reschedule operation.
*/
g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
return 0;
}
struct spdk_thread *
spdk_thread_get_from_ctx(void *ctx)
{
if (ctx == NULL) {
assert(false);
return NULL;
}
assert(g_ctx_sz > 0);
return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
}
static inline uint32_t
msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
{
unsigned count, i;
void *messages[SPDK_MSG_BATCH_SIZE];
uint64_t notify = 1;
int rc;
#ifdef DEBUG
/*
* spdk_ring_dequeue() fills messages and returns how many entries it wrote,
* so we will never actually read uninitialized data from events, but just to be sure
* (and to silence a static analyzer false positive), initialize the array to NULL pointers.
*/
memset(messages, 0, sizeof(messages));
#endif
if (max_msgs > 0) {
max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
} else {
max_msgs = SPDK_MSG_BATCH_SIZE;
}
count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
if (spdk_unlikely(thread->in_interrupt) &&
spdk_ring_count(thread->messages) != 0) {
rc = write(thread->msg_fd, &notify, sizeof(notify));
if (rc < 0) {
SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
}
}
if (count == 0) {
return 0;
}
for (i = 0; i < count; i++) {
struct spdk_msg *msg = messages[i];
assert(msg != NULL);
SPDK_DTRACE_PROBE2(msg_exec, msg->fn, msg->arg);
msg->fn(msg->arg);
SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
/* Insert the messages at the head. We want to re-use the hot
* ones. */
SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
thread->msg_cache_count++;
} else {
spdk_mempool_put(g_spdk_msg_mempool, msg);
}
}
return count;
}
static void
poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
{
struct spdk_poller *tmp __attribute__((unused));
poller->next_run_tick = now + poller->period_ticks;
/*
* Insert poller in the thread's timed_pollers tree by next scheduled run time
* as its key.
*/
tmp = RB_INSERT(timed_pollers_tree, &thread->timed_pollers, poller);
assert(tmp == NULL);
/* Update the cache only if it is empty or the inserted poller is earlier than it.
* RB_MIN() is not necessary here because all pollers, which has exactly the same
* next_run_tick as the existing poller, are inserted on the right side.
*/
if (thread->first_timed_poller == NULL ||
poller->next_run_tick < thread->first_timed_poller->next_run_tick) {
thread->first_timed_poller = poller;
}
}
static inline void
poller_remove_timer(struct spdk_thread *thread, struct spdk_poller *poller)
{
struct spdk_poller *tmp __attribute__((unused));
tmp = RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
assert(tmp != NULL);
/* This function is not used in any case that is performance critical.
* Update the cache simply by RB_MIN() if it needs to be changed.
*/
if (thread->first_timed_poller == poller) {
thread->first_timed_poller = RB_MIN(timed_pollers_tree, &thread->timed_pollers);
}
}
static void
thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
{
if (poller->period_ticks) {
poller_insert_timer(thread, poller, spdk_get_ticks());
} else {
TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
}
}
static inline void
thread_update_stats(struct spdk_thread *thread, uint64_t end,
uint64_t start, int rc)
{
if (rc == 0) {
/* Poller status idle */
lib/thread: spdk_thread_poll() measures run time per call by reading end TSC Update spdk_thread_poll() to count SPDK thread stats correctly on multiple SPDK threads per reactor configuration. spdk_thread_poll() gets start time and reads TSC at end as end time, and then gets delta between them as run time. Run time is added to idle time or busy time according to the result of polling. Reactor overhead is included into the next thread which calls spdk_thread_poll() now. spdk_thread_poll() saves the end time to the current thread to use it as the start time of the next thread. Unit test framework for this patch and the next patch need to access thread->tsc_last. In the next patch, reactor will use the end time of the current thread to the start time of the next thread in reactor_run() to realize the idea. Hence add an new API spdk_thread_get_last_tsc(). The corresponding variable is named as tsc_last and it is good and is aligned with DPDK (DPDK has used tsc_start and tsc_end as variable name). But last_tsc will be better as API name because the last TSC value is easier to understand. Then add necessary unit test and update the unit test framework. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: I5e465e9283c032acb427576d0c90f9e1414f2271 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1048 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Changpeng Liu <changpeng.liu@intel.com> Reviewed-by: Paul Luse <paul.e.luse@intel.com>
2020-03-13 02:53:00 +00:00
thread->stats.idle_tsc += end - start;
} else if (rc > 0) {
/* Poller status busy */
lib/thread: spdk_thread_poll() measures run time per call by reading end TSC Update spdk_thread_poll() to count SPDK thread stats correctly on multiple SPDK threads per reactor configuration. spdk_thread_poll() gets start time and reads TSC at end as end time, and then gets delta between them as run time. Run time is added to idle time or busy time according to the result of polling. Reactor overhead is included into the next thread which calls spdk_thread_poll() now. spdk_thread_poll() saves the end time to the current thread to use it as the start time of the next thread. Unit test framework for this patch and the next patch need to access thread->tsc_last. In the next patch, reactor will use the end time of the current thread to the start time of the next thread in reactor_run() to realize the idea. Hence add an new API spdk_thread_get_last_tsc(). The corresponding variable is named as tsc_last and it is good and is aligned with DPDK (DPDK has used tsc_start and tsc_end as variable name). But last_tsc will be better as API name because the last TSC value is easier to understand. Then add necessary unit test and update the unit test framework. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: I5e465e9283c032acb427576d0c90f9e1414f2271 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1048 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Changpeng Liu <changpeng.liu@intel.com> Reviewed-by: Paul Luse <paul.e.luse@intel.com>
2020-03-13 02:53:00 +00:00
thread->stats.busy_tsc += end - start;
}
lib/thread: spdk_thread_poll() measures run time per call by reading end TSC Update spdk_thread_poll() to count SPDK thread stats correctly on multiple SPDK threads per reactor configuration. spdk_thread_poll() gets start time and reads TSC at end as end time, and then gets delta between them as run time. Run time is added to idle time or busy time according to the result of polling. Reactor overhead is included into the next thread which calls spdk_thread_poll() now. spdk_thread_poll() saves the end time to the current thread to use it as the start time of the next thread. Unit test framework for this patch and the next patch need to access thread->tsc_last. In the next patch, reactor will use the end time of the current thread to the start time of the next thread in reactor_run() to realize the idea. Hence add an new API spdk_thread_get_last_tsc(). The corresponding variable is named as tsc_last and it is good and is aligned with DPDK (DPDK has used tsc_start and tsc_end as variable name). But last_tsc will be better as API name because the last TSC value is easier to understand. Then add necessary unit test and update the unit test framework. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: I5e465e9283c032acb427576d0c90f9e1414f2271 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1048 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Changpeng Liu <changpeng.liu@intel.com> Reviewed-by: Paul Luse <paul.e.luse@intel.com>
2020-03-13 02:53:00 +00:00
/* Store end time to use it as start time of the next spdk_thread_poll(). */
thread->tsc_last = end;
}
static inline int
thread_execute_poller(struct spdk_thread *thread, struct spdk_poller *poller)
{
int rc;
switch (poller->state) {
case SPDK_POLLER_STATE_UNREGISTERED:
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
return 0;
case SPDK_POLLER_STATE_PAUSING:
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
poller->state = SPDK_POLLER_STATE_PAUSED;
return 0;
case SPDK_POLLER_STATE_WAITING:
break;
default:
assert(false);
break;
}
poller->state = SPDK_POLLER_STATE_RUNNING;
rc = poller->fn(poller->arg);
SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
poller->run_count++;
if (rc > 0) {
poller->busy_count++;
}
#ifdef DEBUG
if (rc == -1) {
SPDK_DEBUGLOG(thread, "Poller %s returned -1\n", poller->name);
}
#endif
switch (poller->state) {
case SPDK_POLLER_STATE_UNREGISTERED:
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
break;
case SPDK_POLLER_STATE_PAUSING:
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
poller->state = SPDK_POLLER_STATE_PAUSED;
break;
case SPDK_POLLER_STATE_PAUSED:
case SPDK_POLLER_STATE_WAITING:
break;
case SPDK_POLLER_STATE_RUNNING:
poller->state = SPDK_POLLER_STATE_WAITING;
break;
default:
assert(false);
break;
}
return rc;
}
static inline int
thread_execute_timed_poller(struct spdk_thread *thread, struct spdk_poller *poller,
uint64_t now)
{
int rc;
switch (poller->state) {
case SPDK_POLLER_STATE_UNREGISTERED:
free(poller);
return 0;
case SPDK_POLLER_STATE_PAUSING:
TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
poller->state = SPDK_POLLER_STATE_PAUSED;
return 0;
case SPDK_POLLER_STATE_WAITING:
break;
default:
assert(false);
break;
}
poller->state = SPDK_POLLER_STATE_RUNNING;
rc = poller->fn(poller->arg);
SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
poller->run_count++;
if (rc > 0) {
poller->busy_count++;
}
#ifdef DEBUG
if (rc == -1) {
SPDK_DEBUGLOG(thread, "Timed poller %s returned -1\n", poller->name);
}
#endif
switch (poller->state) {
case SPDK_POLLER_STATE_UNREGISTERED:
free(poller);
break;
case SPDK_POLLER_STATE_PAUSING:
TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
poller->state = SPDK_POLLER_STATE_PAUSED;
break;
case SPDK_POLLER_STATE_PAUSED:
break;
case SPDK_POLLER_STATE_RUNNING:
poller->state = SPDK_POLLER_STATE_WAITING;
/* fallthrough */
case SPDK_POLLER_STATE_WAITING:
poller_insert_timer(thread, poller, now);
break;
default:
assert(false);
break;
}
return rc;
}
static int
thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
{
uint32_t msg_count;
struct spdk_poller *poller, *tmp;
spdk_msg_fn critical_msg;
int rc = 0;
thread->tsc_last = now;
critical_msg = thread->critical_msg;
if (spdk_unlikely(critical_msg != NULL)) {
critical_msg(NULL);
thread->critical_msg = NULL;
rc = 1;
}
msg_count = msg_queue_run_batch(thread, max_msgs);
if (msg_count) {
rc = 1;
}
TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
active_pollers_head, tailq, tmp) {
int poller_rc;
poller_rc = thread_execute_poller(thread, poller);
if (poller_rc > rc) {
rc = poller_rc;
}
}
poller = thread->first_timed_poller;
while (poller != NULL) {
int timer_rc = 0;
if (now < poller->next_run_tick) {
break;
}
tmp = RB_NEXT(timed_pollers_tree, &thread->timed_pollers, poller);
RB_REMOVE(timed_pollers_tree, &thread->timed_pollers, poller);
/* Update the cache to the next timed poller in the list
* only if the current poller is still the closest, otherwise,
* do nothing because the cache has been already updated.
*/
if (thread->first_timed_poller == poller) {
thread->first_timed_poller = tmp;
}
timer_rc = thread_execute_timed_poller(thread, poller, now);
if (timer_rc > rc) {
rc = timer_rc;
}
poller = tmp;
}
return rc;
}
static void
_thread_remove_pollers(void *ctx)
{
struct spdk_thread *thread = ctx;
struct spdk_poller *poller, *tmp;
TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
active_pollers_head, tailq, tmp) {
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
}
}
RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
poller_remove_timer(thread, poller);
free(poller);
}
}
thread->poller_unregistered = false;
}
static void
_thread_exit(void *ctx)
{
struct spdk_thread *thread = ctx;
assert(thread->state == SPDK_THREAD_STATE_EXITING);
thread_exit(thread, spdk_get_ticks());
}
int
spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
{
struct spdk_thread *orig_thread;
int rc;
orig_thread = _get_thread();
tls_thread = thread;
if (now == 0) {
now = spdk_get_ticks();
}
if (spdk_likely(!thread->in_interrupt)) {
rc = thread_poll(thread, max_msgs, now);
if (spdk_unlikely(thread->in_interrupt)) {
/* The thread transitioned to interrupt mode during the above poll.
* Poll it one more time in case that during the transition time
* there is msg received without notification.
*/
rc = thread_poll(thread, max_msgs, now);
}
if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
thread_exit(thread, now);
}
} else {
/* Non-block wait on thread's fd_group */
rc = spdk_fd_group_wait(thread->fgrp, 0);
}
thread_update_stats(thread, spdk_get_ticks(), now, rc);
tls_thread = orig_thread;
return rc;
}
uint64_t
spdk_thread_next_poller_expiration(struct spdk_thread *thread)
{
struct spdk_poller *poller;
poller = thread->first_timed_poller;
if (poller) {
return poller->next_run_tick;
}
return 0;
}
int
spdk_thread_has_active_pollers(struct spdk_thread *thread)
{
return !TAILQ_EMPTY(&thread->active_pollers);
}
static bool
thread_has_unpaused_pollers(struct spdk_thread *thread)
{
if (TAILQ_EMPTY(&thread->active_pollers) &&
RB_EMPTY(&thread->timed_pollers)) {
return false;
}
return true;
}
bool
spdk_thread_has_pollers(struct spdk_thread *thread)
{
if (!thread_has_unpaused_pollers(thread) &&
TAILQ_EMPTY(&thread->paused_pollers)) {
return false;
}
return true;
}
bool
spdk_thread_is_idle(struct spdk_thread *thread)
{
if (spdk_ring_count(thread->messages) ||
thread_has_unpaused_pollers(thread) ||
thread->critical_msg != NULL) {
return false;
}
return true;
}
uint32_t
spdk_thread_get_count(void)
{
/*
* Return cached value of the current thread count. We could acquire the
* lock and iterate through the TAILQ of threads to count them, but that
* count could still be invalidated after we release the lock.
*/
return g_thread_count;
}
struct spdk_thread *
spdk_get_thread(void)
{
return _get_thread();
}
const char *
spdk_thread_get_name(const struct spdk_thread *thread)
{
return thread->name;
}
uint64_t
spdk_thread_get_id(const struct spdk_thread *thread)
{
return thread->id;
}
struct spdk_thread *
spdk_thread_get_by_id(uint64_t id)
{
struct spdk_thread *thread;
if (id == 0 || id >= g_thread_id) {
SPDK_ERRLOG("invalid thread id: %" PRIu64 ".\n", id);
return NULL;
}
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_FOREACH(thread, &g_threads, tailq) {
if (thread->id == id) {
break;
}
}
pthread_mutex_unlock(&g_devlist_mutex);
return thread;
}
int
spdk_thread_get_stats(struct spdk_thread_stats *stats)
{
struct spdk_thread *thread;
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
return -EINVAL;
}
if (stats == NULL) {
return -EINVAL;
}
*stats = thread->stats;
return 0;
}
lib/thread: spdk_thread_poll() measures run time per call by reading end TSC Update spdk_thread_poll() to count SPDK thread stats correctly on multiple SPDK threads per reactor configuration. spdk_thread_poll() gets start time and reads TSC at end as end time, and then gets delta between them as run time. Run time is added to idle time or busy time according to the result of polling. Reactor overhead is included into the next thread which calls spdk_thread_poll() now. spdk_thread_poll() saves the end time to the current thread to use it as the start time of the next thread. Unit test framework for this patch and the next patch need to access thread->tsc_last. In the next patch, reactor will use the end time of the current thread to the start time of the next thread in reactor_run() to realize the idea. Hence add an new API spdk_thread_get_last_tsc(). The corresponding variable is named as tsc_last and it is good and is aligned with DPDK (DPDK has used tsc_start and tsc_end as variable name). But last_tsc will be better as API name because the last TSC value is easier to understand. Then add necessary unit test and update the unit test framework. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: I5e465e9283c032acb427576d0c90f9e1414f2271 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1048 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Changpeng Liu <changpeng.liu@intel.com> Reviewed-by: Paul Luse <paul.e.luse@intel.com>
2020-03-13 02:53:00 +00:00
uint64_t
spdk_thread_get_last_tsc(struct spdk_thread *thread)
{
if (thread == NULL) {
thread = _get_thread();
}
lib/thread: spdk_thread_poll() measures run time per call by reading end TSC Update spdk_thread_poll() to count SPDK thread stats correctly on multiple SPDK threads per reactor configuration. spdk_thread_poll() gets start time and reads TSC at end as end time, and then gets delta between them as run time. Run time is added to idle time or busy time according to the result of polling. Reactor overhead is included into the next thread which calls spdk_thread_poll() now. spdk_thread_poll() saves the end time to the current thread to use it as the start time of the next thread. Unit test framework for this patch and the next patch need to access thread->tsc_last. In the next patch, reactor will use the end time of the current thread to the start time of the next thread in reactor_run() to realize the idea. Hence add an new API spdk_thread_get_last_tsc(). The corresponding variable is named as tsc_last and it is good and is aligned with DPDK (DPDK has used tsc_start and tsc_end as variable name). But last_tsc will be better as API name because the last TSC value is easier to understand. Then add necessary unit test and update the unit test framework. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: I5e465e9283c032acb427576d0c90f9e1414f2271 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1048 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Changpeng Liu <changpeng.liu@intel.com> Reviewed-by: Paul Luse <paul.e.luse@intel.com>
2020-03-13 02:53:00 +00:00
return thread->tsc_last;
}
static inline int
thread_send_msg_notification(const struct spdk_thread *target_thread)
{
uint64_t notify = 1;
int rc;
/* Not necessary to do notification if interrupt facility is not enabled */
if (spdk_likely(!spdk_interrupt_mode_is_enabled())) {
return 0;
}
/* When each spdk_thread can switch between poll and interrupt mode dynamically,
* after sending thread msg, it is necessary to check whether target thread runs in
* interrupt mode and then decide whether do event notification.
*/
if (spdk_unlikely(target_thread->in_interrupt)) {
rc = write(target_thread->msg_fd, &notify, sizeof(notify));
if (rc < 0) {
SPDK_ERRLOG("failed to notify msg_queue: %s.\n", spdk_strerror(errno));
return -EIO;
}
}
return 0;
}
int
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
{
struct spdk_thread *local_thread;
struct spdk_msg *msg;
int rc;
assert(thread != NULL);
if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
return -EIO;
}
local_thread = _get_thread();
msg = NULL;
if (local_thread != NULL) {
if (local_thread->msg_cache_count > 0) {
msg = SLIST_FIRST(&local_thread->msg_cache);
assert(msg != NULL);
SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
local_thread->msg_cache_count--;
}
}
if (msg == NULL) {
msg = spdk_mempool_get(g_spdk_msg_mempool);
if (!msg) {
SPDK_ERRLOG("msg could not be allocated\n");
return -ENOMEM;
}
}
msg->fn = fn;
msg->arg = ctx;
rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
if (rc != 1) {
SPDK_ERRLOG("msg could not be enqueued\n");
spdk_mempool_put(g_spdk_msg_mempool, msg);
return -EIO;
}
return thread_send_msg_notification(thread);
}
int
spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
{
spdk_msg_fn expected = NULL;
if (!__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST)) {
return -EIO;
}
return thread_send_msg_notification(thread);
}
#ifdef __linux__
static int
interrupt_timerfd_process(void *arg)
{
struct spdk_poller *poller = arg;
uint64_t exp;
int rc;
/* clear the level of interval timer */
rc = read(poller->intr->efd, &exp, sizeof(exp));
if (rc < 0) {
if (rc == -EAGAIN) {
return 0;
}
return rc;
}
SPDK_DTRACE_PROBE2(timerfd_exec, poller->fn, poller->arg);
return poller->fn(poller->arg);
}
static int
period_poller_interrupt_init(struct spdk_poller *poller)
{
int timerfd;
SPDK_DEBUGLOG(thread, "timerfd init for periodic poller %s\n", poller->name);
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0) {
return -errno;
}
poller->intr = spdk_interrupt_register(timerfd, interrupt_timerfd_process, poller, poller->name);
if (poller->intr == NULL) {
close(timerfd);
return -1;
}
return 0;
}
static void
period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
{
int timerfd;
uint64_t now_tick = spdk_get_ticks();
uint64_t ticks = spdk_get_ticks_hz();
int ret;
struct itimerspec new_tv = {};
struct itimerspec old_tv = {};
assert(poller->intr != NULL);
assert(poller->period_ticks != 0);
timerfd = poller->intr->efd;
assert(timerfd >= 0);
SPDK_DEBUGLOG(thread, "timerfd set poller %s into %s mode\n", poller->name,
interrupt_mode ? "interrupt" : "poll");
if (interrupt_mode) {
/* Set repeated timer expiration */
new_tv.it_interval.tv_sec = poller->period_ticks / ticks;
new_tv.it_interval.tv_nsec = poller->period_ticks % ticks * SPDK_SEC_TO_NSEC / ticks;
/* Update next timer expiration */
if (poller->next_run_tick == 0) {
poller->next_run_tick = now_tick + poller->period_ticks;
} else if (poller->next_run_tick < now_tick) {
poller->next_run_tick = now_tick;
}
new_tv.it_value.tv_sec = (poller->next_run_tick - now_tick) / ticks;
new_tv.it_value.tv_nsec = (poller->next_run_tick - now_tick) % ticks * SPDK_SEC_TO_NSEC / ticks;
ret = timerfd_settime(timerfd, 0, &new_tv, NULL);
if (ret < 0) {
SPDK_ERRLOG("Failed to arm timerfd: error(%d)\n", errno);
assert(false);
}
} else {
/* Disarm the timer */
ret = timerfd_settime(timerfd, 0, &new_tv, &old_tv);
if (ret < 0) {
/* timerfd_settime's failure indicates that the timerfd is in error */
SPDK_ERRLOG("Failed to disarm timerfd: error(%d)\n", errno);
assert(false);
}
/* In order to reuse poller_insert_timer, fix now_tick, so next_run_tick would be
* now_tick + ticks * old_tv.it_value.tv_sec + (ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC
*/
now_tick = now_tick - poller->period_ticks + ticks * old_tv.it_value.tv_sec + \
(ticks * old_tv.it_value.tv_nsec) / SPDK_SEC_TO_NSEC;
poller_remove_timer(poller->thread, poller);
poller_insert_timer(poller->thread, poller, now_tick);
}
}
static void
poller_interrupt_fini(struct spdk_poller *poller)
{
int fd;
SPDK_DEBUGLOG(thread, "interrupt fini for poller %s\n", poller->name);
assert(poller->intr != NULL);
fd = poller->intr->efd;
spdk_interrupt_unregister(&poller->intr);
close(fd);
}
static int
busy_poller_interrupt_init(struct spdk_poller *poller)
{
int busy_efd;
SPDK_DEBUGLOG(thread, "busy_efd init for busy poller %s\n", poller->name);
busy_efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (busy_efd < 0) {
SPDK_ERRLOG("Failed to create eventfd for Poller(%s).\n", poller->name);
return -errno;
}
poller->intr = spdk_interrupt_register(busy_efd, poller->fn, poller->arg, poller->name);
if (poller->intr == NULL) {
close(busy_efd);
return -1;
}
return 0;
}
static void
busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
{
int busy_efd = poller->intr->efd;
uint64_t notify = 1;
int rc __attribute__((unused));
assert(busy_efd >= 0);
if (interrupt_mode) {
/* Write without read on eventfd will get it repeatedly triggered. */
if (write(busy_efd, &notify, sizeof(notify)) < 0) {
SPDK_ERRLOG("Failed to set busy wait for Poller(%s).\n", poller->name);
}
} else {
/* Read on eventfd will clear its level triggering. */
rc = read(busy_efd, &notify, sizeof(notify));
}
}
#else
static int
period_poller_interrupt_init(struct spdk_poller *poller)
{
return -ENOTSUP;
}
static void
period_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
{
}
static void
poller_interrupt_fini(struct spdk_poller *poller)
{
}
static int
busy_poller_interrupt_init(struct spdk_poller *poller)
{
return -ENOTSUP;
}
static void
busy_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
{
}
#endif
void
spdk_poller_register_interrupt(struct spdk_poller *poller,
spdk_poller_set_interrupt_mode_cb cb_fn,
void *cb_arg)
{
assert(poller != NULL);
assert(cb_fn != NULL);
assert(spdk_get_thread() == poller->thread);
if (!spdk_interrupt_mode_is_enabled()) {
return;
}
/* If this poller already had an interrupt, clean the old one up. */
if (poller->intr != NULL) {
poller_interrupt_fini(poller);
}
poller->set_intr_cb_fn = cb_fn;
poller->set_intr_cb_arg = cb_arg;
/* Set poller into interrupt mode if thread is in interrupt. */
if (poller->thread->in_interrupt) {
poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
}
}
static uint64_t
convert_us_to_ticks(uint64_t us)
{
uint64_t quotient, remainder, ticks;
if (us) {
quotient = us / SPDK_SEC_TO_USEC;
remainder = us % SPDK_SEC_TO_USEC;
ticks = spdk_get_ticks_hz();
return ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
} else {
return 0;
}
}
static struct spdk_poller *
poller_register(spdk_poller_fn fn,
void *arg,
uint64_t period_microseconds,
const char *name)
{
struct spdk_thread *thread;
struct spdk_poller *poller;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return NULL;
}
if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
return NULL;
}
poller = calloc(1, sizeof(*poller));
if (poller == NULL) {
SPDK_ERRLOG("Poller memory allocation failed\n");
return NULL;
}
if (name) {
snprintf(poller->name, sizeof(poller->name), "%s", name);
} else {
snprintf(poller->name, sizeof(poller->name), "%p", fn);
}
poller->state = SPDK_POLLER_STATE_WAITING;
poller->fn = fn;
poller->arg = arg;
poller->thread = thread;
poller->intr = NULL;
if (thread->next_poller_id == 0) {
SPDK_WARNLOG("Poller ID rolled over. Poller ID is duplicated.\n");
thread->next_poller_id = 1;
}
poller->id = thread->next_poller_id++;
poller->period_ticks = convert_us_to_ticks(period_microseconds);
if (spdk_interrupt_mode_is_enabled()) {
int rc;
if (period_microseconds) {
rc = period_poller_interrupt_init(poller);
if (rc < 0) {
SPDK_ERRLOG("Failed to register interruptfd for periodic poller: %s\n", spdk_strerror(-rc));
free(poller);
return NULL;
}
poller->set_intr_cb_fn = period_poller_set_interrupt_mode;
poller->set_intr_cb_arg = NULL;
} else {
/* If the poller doesn't have a period, create interruptfd that's always
* busy automatically when running in interrupt mode.
*/
rc = busy_poller_interrupt_init(poller);
if (rc > 0) {
SPDK_ERRLOG("Failed to register interruptfd for busy poller: %s\n", spdk_strerror(-rc));
free(poller);
return NULL;
}
poller->set_intr_cb_fn = busy_poller_set_interrupt_mode;
poller->set_intr_cb_arg = NULL;
}
/* Set poller into interrupt mode if thread is in interrupt. */
if (poller->thread->in_interrupt) {
poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, true);
}
}
thread_insert_poller(thread, poller);
return poller;
}
struct spdk_poller *
spdk_poller_register(spdk_poller_fn fn,
void *arg,
uint64_t period_microseconds)
{
return poller_register(fn, arg, period_microseconds, NULL);
}
struct spdk_poller *
spdk_poller_register_named(spdk_poller_fn fn,
void *arg,
uint64_t period_microseconds,
const char *name)
{
return poller_register(fn, arg, period_microseconds, name);
}
static void
wrong_thread(const char *func, const char *name, struct spdk_thread *thread,
struct spdk_thread *curthread)
{
if (thread == NULL) {
SPDK_ERRLOG("%s(%s) called with NULL thread\n", func, name);
abort();
}
SPDK_ERRLOG("%s(%s) called from wrong thread %s:%" PRIu64 " (should be "
"%s:%" PRIu64 ")\n", func, name, curthread->name, curthread->id,
thread->name, thread->id);
assert(false);
}
void
spdk_poller_unregister(struct spdk_poller **ppoller)
{
struct spdk_thread *thread;
struct spdk_poller *poller;
poller = *ppoller;
if (poller == NULL) {
return;
}
*ppoller = NULL;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return;
}
if (poller->thread != thread) {
wrong_thread(__func__, poller->name, poller->thread, thread);
return;
}
if (spdk_interrupt_mode_is_enabled()) {
/* Release the interrupt resource for period or busy poller */
if (poller->intr != NULL) {
poller_interrupt_fini(poller);
}
/* If there is not already a pending poller removal, generate
* a message to go process removals. */
if (!thread->poller_unregistered) {
thread->poller_unregistered = true;
spdk_thread_send_msg(thread, _thread_remove_pollers, thread);
}
}
/* If the poller was paused, put it on the active_pollers list so that
* its unregistration can be processed by spdk_thread_poll().
*/
if (poller->state == SPDK_POLLER_STATE_PAUSED) {
TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
poller->period_ticks = 0;
}
/* Simply set the state to unregistered. The poller will get cleaned up
* in a subsequent call to spdk_thread_poll().
*/
poller->state = SPDK_POLLER_STATE_UNREGISTERED;
}
void
spdk_poller_pause(struct spdk_poller *poller)
{
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return;
}
if (poller->thread != thread) {
wrong_thread(__func__, poller->name, poller->thread, thread);
return;
}
/* We just set its state to SPDK_POLLER_STATE_PAUSING and let
* spdk_thread_poll() move it. It allows a poller to be paused from
* another one's context without breaking the TAILQ_FOREACH_REVERSE_SAFE
* iteration, or from within itself without breaking the logic to always
* remove the closest timed poller in the TAILQ_FOREACH_SAFE iteration.
*/
switch (poller->state) {
case SPDK_POLLER_STATE_PAUSED:
case SPDK_POLLER_STATE_PAUSING:
break;
case SPDK_POLLER_STATE_RUNNING:
case SPDK_POLLER_STATE_WAITING:
poller->state = SPDK_POLLER_STATE_PAUSING;
break;
default:
assert(false);
break;
}
}
void
spdk_poller_resume(struct spdk_poller *poller)
{
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return;
}
if (poller->thread != thread) {
wrong_thread(__func__, poller->name, poller->thread, thread);
return;
}
/* If a poller is paused it has to be removed from the paused pollers
* list and put on the active list or timer tree depending on its
* period_ticks. If a poller is still in the process of being paused,
* we just need to flip its state back to waiting, as it's already on
* the appropriate list or tree.
*/
switch (poller->state) {
case SPDK_POLLER_STATE_PAUSED:
TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
thread_insert_poller(thread, poller);
/* fallthrough */
case SPDK_POLLER_STATE_PAUSING:
poller->state = SPDK_POLLER_STATE_WAITING;
break;
case SPDK_POLLER_STATE_RUNNING:
case SPDK_POLLER_STATE_WAITING:
break;
default:
assert(false);
break;
}
}
const char *
spdk_poller_get_name(struct spdk_poller *poller)
{
return poller->name;
}
uint64_t
spdk_poller_get_id(struct spdk_poller *poller)
{
return poller->id;
}
const char *
spdk_poller_get_state_str(struct spdk_poller *poller)
{
switch (poller->state) {
case SPDK_POLLER_STATE_WAITING:
return "waiting";
case SPDK_POLLER_STATE_RUNNING:
return "running";
case SPDK_POLLER_STATE_UNREGISTERED:
return "unregistered";
case SPDK_POLLER_STATE_PAUSING:
return "pausing";
case SPDK_POLLER_STATE_PAUSED:
return "paused";
default:
return NULL;
}
}
uint64_t
spdk_poller_get_period_ticks(struct spdk_poller *poller)
{
return poller->period_ticks;
}
void
spdk_poller_get_stats(struct spdk_poller *poller, struct spdk_poller_stats *stats)
{
stats->run_count = poller->run_count;
stats->busy_count = poller->busy_count;
}
struct spdk_poller *
spdk_thread_get_first_active_poller(struct spdk_thread *thread)
{
return TAILQ_FIRST(&thread->active_pollers);
}
struct spdk_poller *
spdk_thread_get_next_active_poller(struct spdk_poller *prev)
{
return TAILQ_NEXT(prev, tailq);
}
struct spdk_poller *
spdk_thread_get_first_timed_poller(struct spdk_thread *thread)
{
return RB_MIN(timed_pollers_tree, &thread->timed_pollers);
}
struct spdk_poller *
spdk_thread_get_next_timed_poller(struct spdk_poller *prev)
{
return RB_NEXT(timed_pollers_tree, &thread->timed_pollers, prev);
}
struct spdk_poller *
spdk_thread_get_first_paused_poller(struct spdk_thread *thread)
{
return TAILQ_FIRST(&thread->paused_pollers);
}
struct spdk_poller *
spdk_thread_get_next_paused_poller(struct spdk_poller *prev)
{
return TAILQ_NEXT(prev, tailq);
}
struct spdk_io_channel *
spdk_thread_get_first_io_channel(struct spdk_thread *thread)
{
return RB_MIN(io_channel_tree, &thread->io_channels);
}
struct spdk_io_channel *
spdk_thread_get_next_io_channel(struct spdk_io_channel *prev)
{
return RB_NEXT(io_channel_tree, &thread->io_channels, prev);
}
struct call_thread {
struct spdk_thread *cur_thread;
spdk_msg_fn fn;
void *ctx;
struct spdk_thread *orig_thread;
spdk_msg_fn cpl;
};
static void
_on_thread(void *ctx)
{
struct call_thread *ct = ctx;
int rc __attribute__((unused));
ct->fn(ct->ctx);
pthread_mutex_lock(&g_devlist_mutex);
ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
while (ct->cur_thread && ct->cur_thread->state != SPDK_THREAD_STATE_RUNNING) {
SPDK_DEBUGLOG(thread, "thread %s is not running but still not destroyed.\n",
ct->cur_thread->name);
ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
}
pthread_mutex_unlock(&g_devlist_mutex);
if (!ct->cur_thread) {
SPDK_DEBUGLOG(thread, "Completed thread iteration\n");
rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
free(ctx);
} else {
SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
ct->cur_thread->name);
rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
}
assert(rc == 0);
}
void
spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
{
struct call_thread *ct;
struct spdk_thread *thread;
int rc __attribute__((unused));
ct = calloc(1, sizeof(*ct));
if (!ct) {
SPDK_ERRLOG("Unable to perform thread iteration\n");
cpl(ctx);
return;
}
ct->fn = fn;
ct->ctx = ctx;
ct->cpl = cpl;
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
free(ct);
cpl(ctx);
return;
}
ct->orig_thread = thread;
pthread_mutex_lock(&g_devlist_mutex);
ct->cur_thread = TAILQ_FIRST(&g_threads);
pthread_mutex_unlock(&g_devlist_mutex);
SPDK_DEBUGLOG(thread, "Starting thread iteration from %s\n",
ct->orig_thread->name);
rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
assert(rc == 0);
}
static inline void
poller_set_interrupt_mode(struct spdk_poller *poller, bool interrupt_mode)
{
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
return;
}
if (!poller->set_intr_cb_fn) {
SPDK_ERRLOG("Poller(%s) doesn't support set interrupt mode.\n", poller->name);
assert(false);
return;
}
poller->set_intr_cb_fn(poller, poller->set_intr_cb_arg, interrupt_mode);
}
void
spdk_thread_set_interrupt_mode(bool enable_interrupt)
{
struct spdk_thread *thread = _get_thread();
struct spdk_poller *poller, *tmp;
assert(thread);
assert(spdk_interrupt_mode_is_enabled());
SPDK_NOTICELOG("Set spdk_thread (%s) to %s mode from %s mode.\n",
thread->name, enable_interrupt ? "intr" : "poll",
thread->in_interrupt ? "intr" : "poll");
if (thread->in_interrupt == enable_interrupt) {
return;
}
/* Set pollers to expected mode */
RB_FOREACH_SAFE(poller, timed_pollers_tree, &thread->timed_pollers, tmp) {
poller_set_interrupt_mode(poller, enable_interrupt);
}
TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, tmp) {
poller_set_interrupt_mode(poller, enable_interrupt);
}
/* All paused pollers will go to work in interrupt mode */
TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, tmp) {
poller_set_interrupt_mode(poller, enable_interrupt);
}
thread->in_interrupt = enable_interrupt;
return;
}
static struct io_device *
io_device_get(void *io_device)
{
struct io_device find = {};
find.io_device = io_device;
return RB_FIND(io_device_tree, &g_io_devices, &find);
}
void
spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
const char *name)
{
struct io_device *dev, *tmp;
struct spdk_thread *thread;
assert(io_device != NULL);
assert(create_cb != NULL);
assert(destroy_cb != NULL);
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("called from non-SPDK thread\n");
assert(false);
return;
}
dev = calloc(1, sizeof(struct io_device));
if (dev == NULL) {
SPDK_ERRLOG("could not allocate io_device\n");
return;
}
dev->io_device = io_device;
if (name) {
snprintf(dev->name, sizeof(dev->name), "%s", name);
} else {
snprintf(dev->name, sizeof(dev->name), "%p", dev);
}
dev->create_cb = create_cb;
dev->destroy_cb = destroy_cb;
dev->unregister_cb = NULL;
dev->ctx_size = ctx_size;
dev->for_each_count = 0;
dev->unregistered = false;
dev->refcnt = 0;
SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
dev->name, dev->io_device, thread->name);
pthread_mutex_lock(&g_devlist_mutex);
tmp = RB_INSERT(io_device_tree, &g_io_devices, dev);
if (tmp != NULL) {
SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
io_device, tmp->name, dev->name);
free(dev);
}
pthread_mutex_unlock(&g_devlist_mutex);
}
static void
_finish_unregister(void *arg)
{
struct io_device *dev = arg;
struct spdk_thread *thread;
thread = spdk_get_thread();
assert(thread == dev->unregister_thread);
SPDK_DEBUGLOG(thread, "Finishing unregistration of io_device %s (%p) on thread %s\n",
dev->name, dev->io_device, thread->name);
assert(thread->pending_unregister_count > 0);
thread->pending_unregister_count--;
dev->unregister_cb(dev->io_device);
free(dev);
}
static void
io_device_free(struct io_device *dev)
{
int rc __attribute__((unused));
if (dev->unregister_cb == NULL) {
free(dev);
} else {
assert(dev->unregister_thread != NULL);
SPDK_DEBUGLOG(thread, "io_device %s (%p) needs to unregister from thread %s\n",
dev->name, dev->io_device, dev->unregister_thread->name);
rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
assert(rc == 0);
}
}
void
spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
{
struct io_device *dev;
uint32_t refcnt;
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("called from non-SPDK thread\n");
assert(false);
return;
}
pthread_mutex_lock(&g_devlist_mutex);
dev = io_device_get(io_device);
if (!dev) {
SPDK_ERRLOG("io_device %p not found\n", io_device);
assert(false);
pthread_mutex_unlock(&g_devlist_mutex);
return;
}
/* The for_each_count check differentiates the user attempting to unregister the
* device a second time, from the internal call to this function that occurs
* after the for_each_count reaches 0.
*/
if (dev->pending_unregister && dev->for_each_count > 0) {
SPDK_ERRLOG("io_device %p already has a pending unregister\n", io_device);
assert(false);
pthread_mutex_unlock(&g_devlist_mutex);
return;
}
dev->unregister_cb = unregister_cb;
dev->unregister_thread = thread;
if (dev->for_each_count > 0) {
SPDK_WARNLOG("io_device %s (%p) has %u for_each calls outstanding\n",
dev->name, io_device, dev->for_each_count);
dev->pending_unregister = true;
pthread_mutex_unlock(&g_devlist_mutex);
return;
}
dev->unregistered = true;
RB_REMOVE(io_device_tree, &g_io_devices, dev);
refcnt = dev->refcnt;
pthread_mutex_unlock(&g_devlist_mutex);
SPDK_DEBUGLOG(thread, "Unregistering io_device %s (%p) from thread %s\n",
dev->name, dev->io_device, thread->name);
if (unregister_cb) {
thread->pending_unregister_count++;
}
if (refcnt > 0) {
/* defer deletion */
return;
}
io_device_free(dev);
}
const char *
spdk_io_device_get_name(struct io_device *dev)
{
return dev->name;
}
static struct spdk_io_channel *
thread_get_io_channel(struct spdk_thread *thread, struct io_device *dev)
{
struct spdk_io_channel find = {};
find.dev = dev;
return RB_FIND(io_channel_tree, &thread->io_channels, &find);
}
struct spdk_io_channel *
spdk_get_io_channel(void *io_device)
{
struct spdk_io_channel *ch;
struct spdk_thread *thread;
struct io_device *dev;
int rc;
pthread_mutex_lock(&g_devlist_mutex);
dev = io_device_get(io_device);
if (dev == NULL) {
SPDK_ERRLOG("could not find io_device %p\n", io_device);
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
ch = thread_get_io_channel(thread, dev);
if (ch != NULL) {
ch->ref++;
SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
ch, dev->name, dev->io_device, thread->name, ch->ref);
/*
* An I/O channel already exists for this device on this
* thread, so return it.
*/
pthread_mutex_unlock(&g_devlist_mutex);
spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0,
(uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
return ch;
}
ch = calloc(1, sizeof(*ch) + dev->ctx_size);
if (ch == NULL) {
SPDK_ERRLOG("could not calloc spdk_io_channel\n");
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
ch->dev = dev;
ch->destroy_cb = dev->destroy_cb;
ch->thread = thread;
ch->ref = 1;
ch->destroy_ref = 0;
RB_INSERT(io_channel_tree, &thread->io_channels, ch);
SPDK_DEBUGLOG(thread, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
ch, dev->name, dev->io_device, thread->name, ch->ref);
dev->refcnt++;
pthread_mutex_unlock(&g_devlist_mutex);
rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
if (rc != 0) {
pthread_mutex_lock(&g_devlist_mutex);
RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
dev->refcnt--;
free(ch);
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
spdk_trace_record(TRACE_THREAD_IOCH_GET, 0, 0, (uint64_t)spdk_io_channel_get_ctx(ch), 1);
return ch;
}
static void
put_io_channel(void *arg)
{
struct spdk_io_channel *ch = arg;
bool do_remove_dev = true;
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("called from non-SPDK thread\n");
assert(false);
return;
}
SPDK_DEBUGLOG(thread,
"Releasing io_channel %p for io_device %s (%p) on thread %s\n",
ch, ch->dev->name, ch->dev->io_device, thread->name);
assert(ch->thread == thread);
ch->destroy_ref--;
if (ch->ref > 0 || ch->destroy_ref > 0) {
/*
* Another reference to the associated io_device was requested
* after this message was sent but before it had a chance to
* execute.
*/
return;
}
pthread_mutex_lock(&g_devlist_mutex);
RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
pthread_mutex_unlock(&g_devlist_mutex);
/* Don't hold the devlist mutex while the destroy_cb is called. */
ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
pthread_mutex_lock(&g_devlist_mutex);
ch->dev->refcnt--;
if (!ch->dev->unregistered) {
do_remove_dev = false;
}
if (ch->dev->refcnt > 0) {
do_remove_dev = false;
}
pthread_mutex_unlock(&g_devlist_mutex);
if (do_remove_dev) {
io_device_free(ch->dev);
}
free(ch);
}
void
spdk_put_io_channel(struct spdk_io_channel *ch)
{
struct spdk_thread *thread;
int rc __attribute__((unused));
spdk_trace_record(TRACE_THREAD_IOCH_PUT, 0, 0,
(uint64_t)spdk_io_channel_get_ctx(ch), ch->ref);
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("called from non-SPDK thread\n");
assert(false);
return;
}
if (ch->thread != thread) {
wrong_thread(__func__, "ch", ch->thread, thread);
return;
}
SPDK_DEBUGLOG(thread,
"Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
ch->ref--;
if (ch->ref == 0) {
ch->destroy_ref++;
rc = spdk_thread_send_msg(thread, put_io_channel, ch);
assert(rc == 0);
}
}
struct spdk_io_channel *
spdk_io_channel_from_ctx(void *ctx)
{
return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
}
struct spdk_thread *
spdk_io_channel_get_thread(struct spdk_io_channel *ch)
{
return ch->thread;
}
void *
spdk_io_channel_get_io_device(struct spdk_io_channel *ch)
{
return ch->dev->io_device;
}
const char *
spdk_io_channel_get_io_device_name(struct spdk_io_channel *ch)
{
return spdk_io_device_get_name(ch->dev);
}
int
spdk_io_channel_get_ref_count(struct spdk_io_channel *ch)
{
return ch->ref;
}
struct spdk_io_channel_iter {
void *io_device;
struct io_device *dev;
spdk_channel_msg fn;
int status;
void *ctx;
struct spdk_io_channel *ch;
struct spdk_thread *cur_thread;
struct spdk_thread *orig_thread;
spdk_channel_for_each_cpl cpl;
};
void *
spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
{
return i->io_device;
}
struct spdk_io_channel *
spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
{
return i->ch;
}
void *
spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
{
return i->ctx;
}
static void
_call_completion(void *ctx)
{
struct spdk_io_channel_iter *i = ctx;
if (i->cpl != NULL) {
i->cpl(i, i->status);
}
free(i);
}
static void
_call_channel(void *ctx)
{
struct spdk_io_channel_iter *i = ctx;
struct spdk_io_channel *ch;
/*
* It is possible that the channel was deleted before this
* message had a chance to execute. If so, skip calling
* the fn() on this thread.
*/
pthread_mutex_lock(&g_devlist_mutex);
ch = thread_get_io_channel(i->cur_thread, i->dev);
pthread_mutex_unlock(&g_devlist_mutex);
if (ch) {
i->fn(i);
} else {
spdk_for_each_channel_continue(i, 0);
}
}
void
spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
spdk_channel_for_each_cpl cpl)
{
struct spdk_thread *thread;
struct spdk_io_channel *ch;
struct spdk_io_channel_iter *i;
int rc __attribute__((unused));
i = calloc(1, sizeof(*i));
if (!i) {
SPDK_ERRLOG("Unable to allocate iterator\n");
assert(false);
return;
}
i->io_device = io_device;
i->fn = fn;
i->ctx = ctx;
i->cpl = cpl;
thread: spdk_get_io_channel() use i->dev as the search key In current implementation, io_channel list will be accessed by spdk_for_each_channel() and spdk_get_io_channel(). We will try to accelerate spdk_get_io_channel() in the following change "thread: speed up io_channel lookup by using rbtree" by changing io_channel from list into RB tree. To make it cleaner, we prefer to use ch->dev as the key for the io_channel RB tree instead of ch->dev->io_device. This patch makes spdk_for_each_channel() use the i->dev to find the expected io_channel. And the io_device in structure spdk_io_channel_iter is not needed in spdk_for_each_channel_continue() but we keep it for the compatibility of spdk_io_channel_iter_get_io_device(). After this patch, spdk_for_each_channel() has to access both io_device list and io_channel list, and spdk_for_each_channel_continue() still has to access only io_channel list. Both io_device list and io_channel list will become RB tree. Hence performance degradation will be negligible. spdk_for_each_channel() is not so performance critical than spdk_get_io_channel(). Signed-off-by: Jiewei Ke <jiewei@smartx.com> Change-Id: Idd486b0aa1b63b57ede90527dcd1631cbb008a1a Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8749 Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Community-CI: Mellanox Build Bot Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2021-07-13 08:09:33 +00:00
i->orig_thread = _get_thread();
pthread_mutex_lock(&g_devlist_mutex);
thread: spdk_get_io_channel() use i->dev as the search key In current implementation, io_channel list will be accessed by spdk_for_each_channel() and spdk_get_io_channel(). We will try to accelerate spdk_get_io_channel() in the following change "thread: speed up io_channel lookup by using rbtree" by changing io_channel from list into RB tree. To make it cleaner, we prefer to use ch->dev as the key for the io_channel RB tree instead of ch->dev->io_device. This patch makes spdk_for_each_channel() use the i->dev to find the expected io_channel. And the io_device in structure spdk_io_channel_iter is not needed in spdk_for_each_channel_continue() but we keep it for the compatibility of spdk_io_channel_iter_get_io_device(). After this patch, spdk_for_each_channel() has to access both io_device list and io_channel list, and spdk_for_each_channel_continue() still has to access only io_channel list. Both io_device list and io_channel list will become RB tree. Hence performance degradation will be negligible. spdk_for_each_channel() is not so performance critical than spdk_get_io_channel(). Signed-off-by: Jiewei Ke <jiewei@smartx.com> Change-Id: Idd486b0aa1b63b57ede90527dcd1631cbb008a1a Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8749 Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Community-CI: Mellanox Build Bot Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2021-07-13 08:09:33 +00:00
i->dev = io_device_get(io_device);
if (i->dev == NULL) {
SPDK_ERRLOG("could not find io_device %p\n", io_device);
assert(false);
i->status = -ENODEV;
thread: spdk_get_io_channel() use i->dev as the search key In current implementation, io_channel list will be accessed by spdk_for_each_channel() and spdk_get_io_channel(). We will try to accelerate spdk_get_io_channel() in the following change "thread: speed up io_channel lookup by using rbtree" by changing io_channel from list into RB tree. To make it cleaner, we prefer to use ch->dev as the key for the io_channel RB tree instead of ch->dev->io_device. This patch makes spdk_for_each_channel() use the i->dev to find the expected io_channel. And the io_device in structure spdk_io_channel_iter is not needed in spdk_for_each_channel_continue() but we keep it for the compatibility of spdk_io_channel_iter_get_io_device(). After this patch, spdk_for_each_channel() has to access both io_device list and io_channel list, and spdk_for_each_channel_continue() still has to access only io_channel list. Both io_device list and io_channel list will become RB tree. Hence performance degradation will be negligible. spdk_for_each_channel() is not so performance critical than spdk_get_io_channel(). Signed-off-by: Jiewei Ke <jiewei@smartx.com> Change-Id: Idd486b0aa1b63b57ede90527dcd1631cbb008a1a Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8749 Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Community-CI: Mellanox Build Bot Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2021-07-13 08:09:33 +00:00
goto end;
}
/* Do not allow new for_each operations if we are already waiting to unregister
* the device for other for_each operations to complete.
*/
if (i->dev->pending_unregister) {
SPDK_ERRLOG("io_device %p has a pending unregister\n", io_device);
i->status = -ENODEV;
goto end;
}
TAILQ_FOREACH(thread, &g_threads, tailq) {
ch = thread_get_io_channel(thread, i->dev);
if (ch != NULL) {
ch->dev->for_each_count++;
i->cur_thread = thread;
i->ch = ch;
pthread_mutex_unlock(&g_devlist_mutex);
rc = spdk_thread_send_msg(thread, _call_channel, i);
assert(rc == 0);
return;
}
}
thread: spdk_get_io_channel() use i->dev as the search key In current implementation, io_channel list will be accessed by spdk_for_each_channel() and spdk_get_io_channel(). We will try to accelerate spdk_get_io_channel() in the following change "thread: speed up io_channel lookup by using rbtree" by changing io_channel from list into RB tree. To make it cleaner, we prefer to use ch->dev as the key for the io_channel RB tree instead of ch->dev->io_device. This patch makes spdk_for_each_channel() use the i->dev to find the expected io_channel. And the io_device in structure spdk_io_channel_iter is not needed in spdk_for_each_channel_continue() but we keep it for the compatibility of spdk_io_channel_iter_get_io_device(). After this patch, spdk_for_each_channel() has to access both io_device list and io_channel list, and spdk_for_each_channel_continue() still has to access only io_channel list. Both io_device list and io_channel list will become RB tree. Hence performance degradation will be negligible. spdk_for_each_channel() is not so performance critical than spdk_get_io_channel(). Signed-off-by: Jiewei Ke <jiewei@smartx.com> Change-Id: Idd486b0aa1b63b57ede90527dcd1631cbb008a1a Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8749 Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Community-CI: Mellanox Build Bot Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
2021-07-13 08:09:33 +00:00
end:
pthread_mutex_unlock(&g_devlist_mutex);
rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
assert(rc == 0);
}
static void
__pending_unregister(void *arg)
{
struct io_device *dev = arg;
assert(dev->pending_unregister);
assert(dev->for_each_count == 0);
spdk_io_device_unregister(dev->io_device, dev->unregister_cb);
}
void
spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
{
struct spdk_thread *thread;
struct spdk_io_channel *ch;
struct io_device *dev;
int rc __attribute__((unused));
assert(i->cur_thread == spdk_get_thread());
i->status = status;
pthread_mutex_lock(&g_devlist_mutex);
dev = i->dev;
if (status) {
goto end;
}
thread = TAILQ_NEXT(i->cur_thread, tailq);
while (thread) {
ch = thread_get_io_channel(thread, dev);
if (ch != NULL) {
i->cur_thread = thread;
i->ch = ch;
pthread_mutex_unlock(&g_devlist_mutex);
rc = spdk_thread_send_msg(thread, _call_channel, i);
assert(rc == 0);
return;
}
thread = TAILQ_NEXT(thread, tailq);
}
end:
dev->for_each_count--;
i->ch = NULL;
pthread_mutex_unlock(&g_devlist_mutex);
rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
assert(rc == 0);
pthread_mutex_lock(&g_devlist_mutex);
if (dev->pending_unregister && dev->for_each_count == 0) {
rc = spdk_thread_send_msg(dev->unregister_thread, __pending_unregister, dev);
assert(rc == 0);
}
pthread_mutex_unlock(&g_devlist_mutex);
}
static void
thread_interrupt_destroy(struct spdk_thread *thread)
{
struct spdk_fd_group *fgrp = thread->fgrp;
SPDK_INFOLOG(thread, "destroy fgrp for thread (%s)\n", thread->name);
if (thread->msg_fd < 0) {
return;
}
spdk_fd_group_remove(fgrp, thread->msg_fd);
close(thread->msg_fd);
thread->msg_fd = -1;
spdk_fd_group_destroy(fgrp);
thread->fgrp = NULL;
}
#ifdef __linux__
static int
thread_interrupt_msg_process(void *arg)
{
struct spdk_thread *thread = arg;
struct spdk_thread *orig_thread;
uint32_t msg_count;
spdk_msg_fn critical_msg;
int rc = 0;
uint64_t notify = 1;
assert(spdk_interrupt_mode_is_enabled());
orig_thread = spdk_get_thread();
spdk_set_thread(thread);
/* There may be race between msg_acknowledge and another producer's msg_notify,
* so msg_acknowledge should be applied ahead. And then check for self's msg_notify.
* This can avoid msg notification missing.
*/
rc = read(thread->msg_fd, &notify, sizeof(notify));
if (rc < 0 && errno != EAGAIN) {
SPDK_ERRLOG("failed to acknowledge msg event: %s.\n", spdk_strerror(errno));
}
critical_msg = thread->critical_msg;
if (spdk_unlikely(critical_msg != NULL)) {
critical_msg(NULL);
thread->critical_msg = NULL;
rc = 1;
}
msg_count = msg_queue_run_batch(thread, 0);
if (msg_count) {
rc = 1;
}
SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
if (spdk_unlikely(!thread->in_interrupt)) {
/* The thread transitioned to poll mode in a msg during the above processing.
* Clear msg_fd since thread messages will be polled directly in poll mode.
*/
rc = read(thread->msg_fd, &notify, sizeof(notify));
if (rc < 0 && errno != EAGAIN) {
SPDK_ERRLOG("failed to acknowledge msg queue: %s.\n", spdk_strerror(errno));
}
}
spdk_set_thread(orig_thread);
return rc;
}
static int
thread_interrupt_create(struct spdk_thread *thread)
{
int rc;
SPDK_INFOLOG(thread, "Create fgrp for thread (%s)\n", thread->name);
rc = spdk_fd_group_create(&thread->fgrp);
if (rc) {
return rc;
}
thread->msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (thread->msg_fd < 0) {
rc = -errno;
spdk_fd_group_destroy(thread->fgrp);
thread->fgrp = NULL;
return rc;
}
return SPDK_FD_GROUP_ADD(thread->fgrp, thread->msg_fd,
thread_interrupt_msg_process, thread);
}
#else
static int
thread_interrupt_create(struct spdk_thread *thread)
{
return -ENOTSUP;
}
#endif
static int
_interrupt_wrapper(void *ctx)
{
struct spdk_interrupt *intr = ctx;
struct spdk_thread *orig_thread, *thread;
int rc;
orig_thread = spdk_get_thread();
thread = intr->thread;
spdk_set_thread(thread);
SPDK_DTRACE_PROBE4(interrupt_fd_process, intr->name, intr->efd,
intr->fn, intr->arg);
rc = intr->fn(intr->arg);
SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);
spdk_set_thread(orig_thread);
return rc;
}
struct spdk_interrupt *
spdk_interrupt_register(int efd, spdk_interrupt_fn fn,
void *arg, const char *name)
{
struct spdk_thread *thread;
struct spdk_interrupt *intr;
int ret;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return NULL;
}
if (spdk_unlikely(thread->state != SPDK_THREAD_STATE_RUNNING)) {
SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
return NULL;
}
intr = calloc(1, sizeof(*intr));
if (intr == NULL) {
SPDK_ERRLOG("Interrupt handler allocation failed\n");
return NULL;
}
if (name) {
snprintf(intr->name, sizeof(intr->name), "%s", name);
} else {
snprintf(intr->name, sizeof(intr->name), "%p", fn);
}
intr->efd = efd;
intr->thread = thread;
intr->fn = fn;
intr->arg = arg;
ret = spdk_fd_group_add(thread->fgrp, efd, _interrupt_wrapper, intr, intr->name);
if (ret != 0) {
SPDK_ERRLOG("thread %s: failed to add fd %d: %s\n",
thread->name, efd, spdk_strerror(-ret));
free(intr);
return NULL;
}
return intr;
}
void
spdk_interrupt_unregister(struct spdk_interrupt **pintr)
{
struct spdk_thread *thread;
struct spdk_interrupt *intr;
intr = *pintr;
if (intr == NULL) {
return;
}
*pintr = NULL;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return;
}
if (intr->thread != thread) {
wrong_thread(__func__, intr->name, intr->thread, thread);
return;
}
spdk_fd_group_remove(thread->fgrp, intr->efd);
free(intr);
}
int
spdk_interrupt_set_event_types(struct spdk_interrupt *intr,
enum spdk_interrupt_event_types event_types)
{
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return -EINVAL;
}
if (intr->thread != thread) {
wrong_thread(__func__, intr->name, intr->thread, thread);
return -EINVAL;
}
return spdk_fd_group_event_modify(thread->fgrp, intr->efd, event_types);
}
int
spdk_thread_get_interrupt_fd(struct spdk_thread *thread)
{
return spdk_fd_group_get_fd(thread->fgrp);
}
struct spdk_fd_group *
spdk_thread_get_interrupt_fd_group(struct spdk_thread *thread)
{
return thread->fgrp;
}
static bool g_interrupt_mode = false;
int
spdk_interrupt_mode_enable(void)
{
/* It must be called once prior to initializing the threading library.
* g_spdk_msg_mempool will be valid if thread library is initialized.
*/
if (g_spdk_msg_mempool) {
SPDK_ERRLOG("Failed due to threading library is already initialized.\n");
return -1;
}
#ifdef __linux__
SPDK_NOTICELOG("Set SPDK running in interrupt mode.\n");
g_interrupt_mode = true;
return 0;
#else
SPDK_ERRLOG("SPDK interrupt mode supports only Linux platform now.\n");
g_interrupt_mode = false;
return -ENOTSUP;
#endif
}
bool
spdk_interrupt_mode_is_enabled(void)
{
return g_interrupt_mode;
}
#define SSPIN_DEBUG_STACK_FRAMES 16
struct sspin_stack {
void *addrs[SSPIN_DEBUG_STACK_FRAMES];
uint32_t depth;
};
struct spdk_spinlock_internal {
struct sspin_stack init_stack;
struct sspin_stack lock_stack;
struct sspin_stack unlock_stack;
};
static void
sspin_init_internal(struct spdk_spinlock *sspin)
{
#ifdef DEBUG
sspin->internal = calloc(1, sizeof(*sspin->internal));
#endif
}
static void
sspin_fini_internal(struct spdk_spinlock *sspin)
{
#ifdef DEBUG
free(sspin->internal);
sspin->internal = NULL;
#endif
}
#ifdef DEBUG
#define SSPIN_GET_STACK(sspin, which) \
do { \
if (sspin->internal != NULL) { \
struct sspin_stack *stack = &sspin->internal->which ## _stack; \
stack->depth = backtrace(stack->addrs, SPDK_COUNTOF(stack->addrs)); \
} \
} while (0)
#else
#define SSPIN_GET_STACK(sspin, which) do { } while (0)
#endif
static void
sspin_stack_print(const char *title, const struct sspin_stack *sspin_stack)
{
char **stack;
size_t i;
stack = backtrace_symbols(sspin_stack->addrs, sspin_stack->depth);
if (stack == NULL) {
SPDK_ERRLOG("Out of memory while allocate stack for %s\n", title);
return;
}
SPDK_ERRLOG(" %s:\n", title);
for (i = 0; i < sspin_stack->depth; i++) {
/*
* This does not print line numbers. In gdb, use something like "list *0x444b6b" or
* "list *sspin_stack->addrs[0]". Or more conveniently, load the spdk gdb macros
* and use use "print *sspin" or "print sspin->internal.lock_stack". See
* gdb_macros.md in the docs directory for details.
*/
SPDK_ERRLOG(" #%" PRIu64 ": %s\n", i, stack[i]);
}
free(stack);
}
static void
sspin_stacks_print(const struct spdk_spinlock *sspin)
{
if (sspin->internal == NULL) {
return;
}
SPDK_ERRLOG("spinlock %p\n", sspin);
sspin_stack_print("Lock initalized at", &sspin->internal->init_stack);
sspin_stack_print("Last locked at", &sspin->internal->lock_stack);
sspin_stack_print("Last unlocked at", &sspin->internal->unlock_stack);
}
void
spdk_spin_init(struct spdk_spinlock *sspin)
{
int rc;
memset(sspin, 0, sizeof(*sspin));
rc = pthread_spin_init(&sspin->spinlock, PTHREAD_PROCESS_PRIVATE);
SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin);
sspin_init_internal(sspin);
SSPIN_GET_STACK(sspin, init);
sspin->initialized = true;
}
void
spdk_spin_destroy(struct spdk_spinlock *sspin)
{
int rc;
SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin);
SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin);
SPIN_ASSERT_LOG_STACKS(sspin->thread == NULL, SPIN_ERR_LOCK_HELD, sspin);
rc = pthread_spin_destroy(&sspin->spinlock);
SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin);
sspin_fini_internal(sspin);
sspin->initialized = false;
sspin->destroyed = true;
}
void
spdk_spin_lock(struct spdk_spinlock *sspin)
{
struct spdk_thread *thread = spdk_get_thread();
int rc;
SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin);
SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin);
SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin);
SPIN_ASSERT_LOG_STACKS(thread != sspin->thread, SPIN_ERR_DEADLOCK, sspin);
rc = pthread_spin_lock(&sspin->spinlock);
SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin);
sspin->thread = thread;
sspin->thread->lock_count++;
SSPIN_GET_STACK(sspin, lock);
}
void
spdk_spin_unlock(struct spdk_spinlock *sspin)
{
struct spdk_thread *thread = spdk_get_thread();
int rc;
SPIN_ASSERT_LOG_STACKS(!sspin->destroyed, SPIN_ERR_DESTROYED, sspin);
SPIN_ASSERT_LOG_STACKS(sspin->initialized, SPIN_ERR_NOT_INITIALIZED, sspin);
SPIN_ASSERT_LOG_STACKS(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, sspin);
SPIN_ASSERT_LOG_STACKS(thread == sspin->thread, SPIN_ERR_WRONG_THREAD, sspin);
SPIN_ASSERT_LOG_STACKS(thread->lock_count > 0, SPIN_ERR_LOCK_COUNT, sspin);
thread->lock_count--;
sspin->thread = NULL;
SSPIN_GET_STACK(sspin, unlock);
rc = pthread_spin_unlock(&sspin->spinlock);
SPIN_ASSERT_LOG_STACKS(rc == 0, SPIN_ERR_PTHREAD, sspin);
}
bool
spdk_spin_held(struct spdk_spinlock *sspin)
{
struct spdk_thread *thread = spdk_get_thread();
SPIN_ASSERT_RETURN(thread != NULL, SPIN_ERR_NOT_SPDK_THREAD, false);
return sspin->thread == thread;
}
SPDK_LOG_REGISTER_COMPONENT(thread)