Spdk/lib/thread/thread.c
Shuhei Matsumoto 3d1995c35b thread: Use not malloc'ed but fixed size string for IO device name
256 bytes will be enough but not too large for the name of SPDK IO
device. Use fixed size string for the name of SPDK IO device and
reduce the potential malloc failure. If the length of passed name
is longer then 256, it will be cut off without error.

Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Change-Id: I618b82a1d07769df7c775280fbf364cbcfdde403
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/459721
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Changpeng Liu <changpeng.liu@intel.com>
Reviewed-by: Seth Howell <seth.howell5141@gmail.com>
Reviewed-by: Darek Stojaczyk <dariusz.stojaczyk@intel.com>
Reviewed-by: Paul Luse <paul.e.luse@intel.com>
2019-07-02 07:04:58 +00:00

1285 lines
28 KiB
C

/*-
* BSD LICENSE
*
* Copyright (c) Intel Corporation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Intel Corporation nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "spdk/stdinc.h"
#include "spdk/env.h"
#include "spdk/queue.h"
#include "spdk/string.h"
#include "spdk/thread.h"
#include "spdk/util.h"
#include "spdk_internal/log.h"
#include "spdk_internal/thread.h"
#define SPDK_MSG_BATCH_SIZE 8
#define SPDK_MAX_DEVICE_NAME_LEN 256
#define SPDK_MAX_THREAD_NAME_LEN 256
static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
static spdk_new_thread_fn g_new_thread_fn = NULL;
static size_t g_ctx_sz = 0;
struct io_device {
void *io_device;
char name[SPDK_MAX_DEVICE_NAME_LEN + 1];
spdk_io_channel_create_cb create_cb;
spdk_io_channel_destroy_cb destroy_cb;
spdk_io_device_unregister_cb unregister_cb;
struct spdk_thread *unregister_thread;
uint32_t ctx_size;
uint32_t for_each_count;
TAILQ_ENTRY(io_device) tailq;
uint32_t refcnt;
bool unregistered;
};
static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
struct spdk_msg {
spdk_msg_fn fn;
void *arg;
SLIST_ENTRY(spdk_msg) link;
};
#define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024
static struct spdk_mempool *g_spdk_msg_mempool = NULL;
enum spdk_poller_state {
/* The poller is registered with a thread but not currently executing its fn. */
SPDK_POLLER_STATE_WAITING,
/* The poller is currently running its fn. */
SPDK_POLLER_STATE_RUNNING,
/* The poller was unregistered during the execution of its fn. */
SPDK_POLLER_STATE_UNREGISTERED,
};
struct spdk_poller {
TAILQ_ENTRY(spdk_poller) tailq;
/* Current state of the poller; should only be accessed from the poller's thread. */
enum spdk_poller_state state;
uint64_t period_ticks;
uint64_t next_run_tick;
spdk_poller_fn fn;
void *arg;
};
struct spdk_thread {
TAILQ_HEAD(, spdk_io_channel) io_channels;
TAILQ_ENTRY(spdk_thread) tailq;
char name[SPDK_MAX_THREAD_NAME_LEN + 1];
bool exit;
struct spdk_cpuset *cpumask;
uint64_t tsc_last;
struct spdk_thread_stats stats;
/*
* Contains pollers actively running on this thread. Pollers
* are run round-robin. The thread takes one poller from the head
* of the ring, executes it, then puts it back at the tail of
* the ring.
*/
TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers;
/**
* Contains pollers running on this thread with a periodic timer.
*/
TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
struct spdk_ring *messages;
SLIST_HEAD(, spdk_msg) msg_cache;
size_t msg_cache_count;
/* User context allocated at the end */
uint8_t ctx[0];
};
static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
static uint32_t g_thread_count = 0;
static __thread struct spdk_thread *tls_thread = NULL;
static inline struct spdk_thread *
_get_thread(void)
{
return tls_thread;
}
int
spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
{
char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
assert(g_new_thread_fn == NULL);
g_new_thread_fn = new_thread_fn;
g_ctx_sz = ctx_sz;
snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
sizeof(struct spdk_msg),
0, /* No cache. We do our own. */
SPDK_ENV_SOCKET_ID_ANY);
if (!g_spdk_msg_mempool) {
return -1;
}
return 0;
}
void
spdk_thread_lib_fini(void)
{
struct io_device *dev;
TAILQ_FOREACH(dev, &g_io_devices, tailq) {
SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
}
if (g_spdk_msg_mempool) {
spdk_mempool_free(g_spdk_msg_mempool);
g_spdk_msg_mempool = NULL;
}
g_new_thread_fn = NULL;
g_ctx_sz = 0;
}
static void
_free_thread(struct spdk_thread *thread)
{
struct spdk_io_channel *ch;
struct spdk_msg *msg;
struct spdk_poller *poller, *ptmp;
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
thread->name, ch->dev->name);
}
TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
if (poller->state == SPDK_POLLER_STATE_WAITING) {
SPDK_WARNLOG("poller %p still registered at thread exit\n",
poller);
}
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
}
TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
if (poller->state == SPDK_POLLER_STATE_WAITING) {
SPDK_WARNLOG("poller %p still registered at thread exit\n",
poller);
}
TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
free(poller);
}
pthread_mutex_lock(&g_devlist_mutex);
assert(g_thread_count > 0);
g_thread_count--;
TAILQ_REMOVE(&g_threads, thread, tailq);
pthread_mutex_unlock(&g_devlist_mutex);
spdk_cpuset_free(thread->cpumask);
msg = SLIST_FIRST(&thread->msg_cache);
while (msg != NULL) {
SLIST_REMOVE_HEAD(&thread->msg_cache, link);
assert(thread->msg_cache_count > 0);
thread->msg_cache_count--;
spdk_mempool_put(g_spdk_msg_mempool, msg);
msg = SLIST_FIRST(&thread->msg_cache);
}
assert(thread->msg_cache_count == 0);
spdk_ring_free(thread->messages);
free(thread);
}
struct spdk_thread *
spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
{
struct spdk_thread *thread;
struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
int rc, i;
thread = calloc(1, sizeof(*thread) + g_ctx_sz);
if (!thread) {
SPDK_ERRLOG("Unable to allocate memory for thread\n");
return NULL;
}
thread->cpumask = spdk_cpuset_alloc();
if (!thread->cpumask) {
free(thread);
SPDK_ERRLOG("Unable to allocate memory for CPU mask\n");
return NULL;
}
if (cpumask) {
spdk_cpuset_copy(thread->cpumask, cpumask);
} else {
spdk_cpuset_negate(thread->cpumask);
}
TAILQ_INIT(&thread->io_channels);
TAILQ_INIT(&thread->active_pollers);
TAILQ_INIT(&thread->timer_pollers);
SLIST_INIT(&thread->msg_cache);
thread->msg_cache_count = 0;
thread->tsc_last = spdk_get_ticks();
thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
if (!thread->messages) {
SPDK_ERRLOG("Unable to allocate memory for message ring\n");
spdk_cpuset_free(thread->cpumask);
free(thread);
return NULL;
}
/* Fill the local message pool cache. */
rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
if (rc == 0) {
/* If we can't populate the cache it's ok. The cache will get filled
* up organically as messages are passed to the thread. */
for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
thread->msg_cache_count++;
}
}
if (name) {
snprintf(thread->name, sizeof(thread->name), "%s", name);
} else {
snprintf(thread->name, sizeof(thread->name), "%p", thread);
}
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
g_thread_count++;
pthread_mutex_unlock(&g_devlist_mutex);
if (g_new_thread_fn) {
rc = g_new_thread_fn(thread);
if (rc != 0) {
_free_thread(thread);
return NULL;
}
}
return thread;
}
void
spdk_set_thread(struct spdk_thread *thread)
{
tls_thread = thread;
}
void
spdk_thread_exit(struct spdk_thread *thread)
{
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
assert(tls_thread == thread);
thread->exit = true;
}
void
spdk_thread_destroy(struct spdk_thread *thread)
{
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
assert(thread->exit == true);
if (tls_thread == thread) {
tls_thread = NULL;
}
_free_thread(thread);
}
void *
spdk_thread_get_ctx(struct spdk_thread *thread)
{
if (g_ctx_sz > 0) {
return thread->ctx;
}
return NULL;
}
struct spdk_cpuset *
spdk_thread_get_cpumask(struct spdk_thread *thread)
{
return thread->cpumask;
}
struct spdk_thread *
spdk_thread_get_from_ctx(void *ctx)
{
if (ctx == NULL) {
assert(false);
return NULL;
}
assert(g_ctx_sz > 0);
return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
}
static inline uint32_t
_spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
{
unsigned count, i;
void *messages[SPDK_MSG_BATCH_SIZE];
#ifdef DEBUG
/*
* spdk_ring_dequeue() fills messages and returns how many entries it wrote,
* so we will never actually read uninitialized data from events, but just to be sure
* (and to silence a static analyzer false positive), initialize the array to NULL pointers.
*/
memset(messages, 0, sizeof(messages));
#endif
if (max_msgs > 0) {
max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
} else {
max_msgs = SPDK_MSG_BATCH_SIZE;
}
count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
if (count == 0) {
return 0;
}
for (i = 0; i < count; i++) {
struct spdk_msg *msg = messages[i];
assert(msg != NULL);
msg->fn(msg->arg);
if (thread->exit) {
break;
}
if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
/* Insert the messages at the head. We want to re-use the hot
* ones. */
SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
thread->msg_cache_count++;
} else {
spdk_mempool_put(g_spdk_msg_mempool, msg);
}
}
return count;
}
static void
_spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
{
struct spdk_poller *iter;
poller->next_run_tick = now + poller->period_ticks;
/*
* Insert poller in the thread's timer_pollers list in sorted order by next scheduled
* run time.
*/
TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
if (iter->next_run_tick <= poller->next_run_tick) {
TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
return;
}
}
/* No earlier pollers were found, so this poller must be the new head */
TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
}
int
spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
{
uint32_t msg_count;
struct spdk_thread *orig_thread;
struct spdk_poller *poller, *tmp;
int rc = 0;
orig_thread = _get_thread();
tls_thread = thread;
if (now == 0) {
now = spdk_get_ticks();
}
msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
if (msg_count) {
rc = 1;
}
TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
active_pollers_head, tailq, tmp) {
int poller_rc;
if (thread->exit) {
break;
}
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
continue;
}
poller->state = SPDK_POLLER_STATE_RUNNING;
poller_rc = poller->fn(poller->arg);
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
free(poller);
continue;
}
poller->state = SPDK_POLLER_STATE_WAITING;
#ifdef DEBUG
if (poller_rc == -1) {
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
}
#endif
if (poller_rc > rc) {
rc = poller_rc;
}
}
TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
int timer_rc = 0;
if (thread->exit) {
break;
}
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
free(poller);
continue;
}
if (now < poller->next_run_tick) {
break;
}
poller->state = SPDK_POLLER_STATE_RUNNING;
timer_rc = poller->fn(poller->arg);
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
free(poller);
continue;
}
poller->state = SPDK_POLLER_STATE_WAITING;
TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
_spdk_poller_insert_timer(thread, poller, now);
#ifdef DEBUG
if (timer_rc == -1) {
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
}
#endif
if (timer_rc > rc) {
rc = timer_rc;
}
}
if (rc == 0) {
/* Poller status idle */
thread->stats.idle_tsc += now - thread->tsc_last;
} else if (rc > 0) {
/* Poller status busy */
thread->stats.busy_tsc += now - thread->tsc_last;
}
thread->tsc_last = now;
tls_thread = orig_thread;
return rc;
}
uint64_t
spdk_thread_next_poller_expiration(struct spdk_thread *thread)
{
struct spdk_poller *poller;
poller = TAILQ_FIRST(&thread->timer_pollers);
if (poller) {
return poller->next_run_tick;
}
return 0;
}
int
spdk_thread_has_active_pollers(struct spdk_thread *thread)
{
return !TAILQ_EMPTY(&thread->active_pollers);
}
bool
spdk_thread_has_pollers(struct spdk_thread *thread)
{
if (TAILQ_EMPTY(&thread->active_pollers) &&
TAILQ_EMPTY(&thread->timer_pollers)) {
return false;
}
return true;
}
bool
spdk_thread_is_idle(struct spdk_thread *thread)
{
if (spdk_ring_count(thread->messages) ||
spdk_thread_has_pollers(thread)) {
return false;
}
return true;
}
uint32_t
spdk_thread_get_count(void)
{
/*
* Return cached value of the current thread count. We could acquire the
* lock and iterate through the TAILQ of threads to count them, but that
* count could still be invalidated after we release the lock.
*/
return g_thread_count;
}
struct spdk_thread *
spdk_get_thread(void)
{
struct spdk_thread *thread;
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
}
return thread;
}
const char *
spdk_thread_get_name(const struct spdk_thread *thread)
{
return thread->name;
}
int
spdk_thread_get_stats(struct spdk_thread_stats *stats)
{
struct spdk_thread *thread;
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
return -EINVAL;
}
if (stats == NULL) {
return -EINVAL;
}
*stats = thread->stats;
return 0;
}
void
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
{
struct spdk_thread *local_thread;
struct spdk_msg *msg;
int rc;
if (!thread) {
assert(false);
return;
}
local_thread = _get_thread();
msg = NULL;
if (local_thread != NULL) {
if (local_thread->msg_cache_count > 0) {
msg = SLIST_FIRST(&local_thread->msg_cache);
assert(msg != NULL);
SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
local_thread->msg_cache_count--;
}
}
if (msg == NULL) {
msg = spdk_mempool_get(g_spdk_msg_mempool);
if (!msg) {
assert(false);
return;
}
}
msg->fn = fn;
msg->arg = ctx;
rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
if (rc != 1) {
assert(false);
spdk_mempool_put(g_spdk_msg_mempool, msg);
return;
}
}
struct spdk_poller *
spdk_poller_register(spdk_poller_fn fn,
void *arg,
uint64_t period_microseconds)
{
struct spdk_thread *thread;
struct spdk_poller *poller;
uint64_t quotient, remainder, ticks;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return NULL;
}
poller = calloc(1, sizeof(*poller));
if (poller == NULL) {
SPDK_ERRLOG("Poller memory allocation failed\n");
return NULL;
}
poller->state = SPDK_POLLER_STATE_WAITING;
poller->fn = fn;
poller->arg = arg;
if (period_microseconds) {
quotient = period_microseconds / SPDK_SEC_TO_USEC;
remainder = period_microseconds % SPDK_SEC_TO_USEC;
ticks = spdk_get_ticks_hz();
poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
} else {
poller->period_ticks = 0;
}
if (poller->period_ticks) {
_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
} else {
TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
}
return poller;
}
void
spdk_poller_unregister(struct spdk_poller **ppoller)
{
struct spdk_thread *thread;
struct spdk_poller *poller;
poller = *ppoller;
if (poller == NULL) {
return;
}
*ppoller = NULL;
thread = spdk_get_thread();
if (!thread) {
assert(false);
return;
}
/* Simply set the state to unregistered. The poller will get cleaned up
* in a subsequent call to spdk_thread_poll().
*/
poller->state = SPDK_POLLER_STATE_UNREGISTERED;
}
struct call_thread {
struct spdk_thread *cur_thread;
spdk_msg_fn fn;
void *ctx;
struct spdk_thread *orig_thread;
spdk_msg_fn cpl;
};
static void
spdk_on_thread(void *ctx)
{
struct call_thread *ct = ctx;
ct->fn(ct->ctx);
pthread_mutex_lock(&g_devlist_mutex);
ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
pthread_mutex_unlock(&g_devlist_mutex);
if (!ct->cur_thread) {
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
free(ctx);
} else {
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
ct->cur_thread->name);
spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
}
}
void
spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
{
struct call_thread *ct;
struct spdk_thread *thread;
ct = calloc(1, sizeof(*ct));
if (!ct) {
SPDK_ERRLOG("Unable to perform thread iteration\n");
cpl(ctx);
return;
}
ct->fn = fn;
ct->ctx = ctx;
ct->cpl = cpl;
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
free(ct);
cpl(ctx);
return;
}
ct->orig_thread = thread;
pthread_mutex_lock(&g_devlist_mutex);
ct->cur_thread = TAILQ_FIRST(&g_threads);
pthread_mutex_unlock(&g_devlist_mutex);
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
ct->orig_thread->name);
spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
}
void
spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
const char *name)
{
struct io_device *dev, *tmp;
struct spdk_thread *thread;
assert(io_device != NULL);
assert(create_cb != NULL);
assert(destroy_cb != NULL);
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
assert(false);
return;
}
dev = calloc(1, sizeof(struct io_device));
if (dev == NULL) {
SPDK_ERRLOG("could not allocate io_device\n");
return;
}
dev->io_device = io_device;
if (name) {
snprintf(dev->name, sizeof(dev->name), "%s", name);
} else {
snprintf(dev->name, sizeof(dev->name), "%p", dev);
}
dev->create_cb = create_cb;
dev->destroy_cb = destroy_cb;
dev->unregister_cb = NULL;
dev->ctx_size = ctx_size;
dev->for_each_count = 0;
dev->unregistered = false;
dev->refcnt = 0;
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
dev->name, dev->io_device, thread->name);
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
if (tmp->io_device == io_device) {
SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
io_device, tmp->name, dev->name);
free(dev);
pthread_mutex_unlock(&g_devlist_mutex);
return;
}
}
TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
pthread_mutex_unlock(&g_devlist_mutex);
}
static void
_finish_unregister(void *arg)
{
struct io_device *dev = arg;
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
dev->name, dev->io_device, dev->unregister_thread->name);
dev->unregister_cb(dev->io_device);
free(dev);
}
static void
_spdk_io_device_free(struct io_device *dev)
{
if (dev->unregister_cb == NULL) {
free(dev);
} else {
assert(dev->unregister_thread != NULL);
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
dev->name, dev->io_device, dev->unregister_thread->name);
spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
}
}
void
spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
{
struct io_device *dev;
uint32_t refcnt;
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
assert(false);
return;
}
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_FOREACH(dev, &g_io_devices, tailq) {
if (dev->io_device == io_device) {
break;
}
}
if (!dev) {
SPDK_ERRLOG("io_device %p not found\n", io_device);
assert(false);
pthread_mutex_unlock(&g_devlist_mutex);
return;
}
if (dev->for_each_count > 0) {
SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
dev->name, io_device, dev->for_each_count);
pthread_mutex_unlock(&g_devlist_mutex);
return;
}
dev->unregister_cb = unregister_cb;
dev->unregistered = true;
TAILQ_REMOVE(&g_io_devices, dev, tailq);
refcnt = dev->refcnt;
dev->unregister_thread = thread;
pthread_mutex_unlock(&g_devlist_mutex);
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
dev->name, dev->io_device, thread->name);
if (refcnt > 0) {
/* defer deletion */
return;
}
_spdk_io_device_free(dev);
}
struct spdk_io_channel *
spdk_get_io_channel(void *io_device)
{
struct spdk_io_channel *ch;
struct spdk_thread *thread;
struct io_device *dev;
int rc;
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_FOREACH(dev, &g_io_devices, tailq) {
if (dev->io_device == io_device) {
break;
}
}
if (dev == NULL) {
SPDK_ERRLOG("could not find io_device %p\n", io_device);
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
thread = _get_thread();
if (!thread) {
SPDK_ERRLOG("No thread allocated\n");
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev == dev) {
ch->ref++;
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
ch, dev->name, dev->io_device, thread->name, ch->ref);
/*
* An I/O channel already exists for this device on this
* thread, so return it.
*/
pthread_mutex_unlock(&g_devlist_mutex);
return ch;
}
}
ch = calloc(1, sizeof(*ch) + dev->ctx_size);
if (ch == NULL) {
SPDK_ERRLOG("could not calloc spdk_io_channel\n");
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
ch->dev = dev;
ch->destroy_cb = dev->destroy_cb;
ch->thread = thread;
ch->ref = 1;
ch->destroy_ref = 0;
TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
ch, dev->name, dev->io_device, thread->name, ch->ref);
dev->refcnt++;
pthread_mutex_unlock(&g_devlist_mutex);
rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
if (rc != 0) {
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
dev->refcnt--;
free(ch);
pthread_mutex_unlock(&g_devlist_mutex);
return NULL;
}
return ch;
}
static void
_spdk_put_io_channel(void *arg)
{
struct spdk_io_channel *ch = arg;
bool do_remove_dev = true;
struct spdk_thread *thread;
thread = spdk_get_thread();
if (!thread) {
SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
assert(false);
return;
}
SPDK_DEBUGLOG(SPDK_LOG_THREAD,
"Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
assert(ch->thread == thread);
ch->destroy_ref--;
if (ch->ref > 0 || ch->destroy_ref > 0) {
/*
* Another reference to the associated io_device was requested
* after this message was sent but before it had a chance to
* execute.
*/
return;
}
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
pthread_mutex_unlock(&g_devlist_mutex);
/* Don't hold the devlist mutex while the destroy_cb is called. */
ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
pthread_mutex_lock(&g_devlist_mutex);
ch->dev->refcnt--;
if (!ch->dev->unregistered) {
do_remove_dev = false;
}
if (ch->dev->refcnt > 0) {
do_remove_dev = false;
}
pthread_mutex_unlock(&g_devlist_mutex);
if (do_remove_dev) {
_spdk_io_device_free(ch->dev);
}
free(ch);
}
void
spdk_put_io_channel(struct spdk_io_channel *ch)
{
SPDK_DEBUGLOG(SPDK_LOG_THREAD,
"Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
ch->ref--;
if (ch->ref == 0) {
ch->destroy_ref++;
spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
}
}
struct spdk_io_channel *
spdk_io_channel_from_ctx(void *ctx)
{
return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
}
struct spdk_thread *
spdk_io_channel_get_thread(struct spdk_io_channel *ch)
{
return ch->thread;
}
struct spdk_io_channel_iter {
void *io_device;
struct io_device *dev;
spdk_channel_msg fn;
int status;
void *ctx;
struct spdk_io_channel *ch;
struct spdk_thread *cur_thread;
struct spdk_thread *orig_thread;
spdk_channel_for_each_cpl cpl;
};
void *
spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
{
return i->io_device;
}
struct spdk_io_channel *
spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
{
return i->ch;
}
void *
spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
{
return i->ctx;
}
static void
_call_completion(void *ctx)
{
struct spdk_io_channel_iter *i = ctx;
if (i->cpl != NULL) {
i->cpl(i, i->status);
}
free(i);
}
static void
_call_channel(void *ctx)
{
struct spdk_io_channel_iter *i = ctx;
struct spdk_io_channel *ch;
/*
* It is possible that the channel was deleted before this
* message had a chance to execute. If so, skip calling
* the fn() on this thread.
*/
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
if (ch->dev->io_device == i->io_device) {
break;
}
}
pthread_mutex_unlock(&g_devlist_mutex);
if (ch) {
i->fn(i);
} else {
spdk_for_each_channel_continue(i, 0);
}
}
void
spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
spdk_channel_for_each_cpl cpl)
{
struct spdk_thread *thread;
struct spdk_io_channel *ch;
struct spdk_io_channel_iter *i;
i = calloc(1, sizeof(*i));
if (!i) {
SPDK_ERRLOG("Unable to allocate iterator\n");
return;
}
i->io_device = io_device;
i->fn = fn;
i->ctx = ctx;
i->cpl = cpl;
pthread_mutex_lock(&g_devlist_mutex);
i->orig_thread = _get_thread();
TAILQ_FOREACH(thread, &g_threads, tailq) {
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev->io_device == io_device) {
ch->dev->for_each_count++;
i->dev = ch->dev;
i->cur_thread = thread;
i->ch = ch;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(thread, _call_channel, i);
return;
}
}
}
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(i->orig_thread, _call_completion, i);
}
void
spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
{
struct spdk_thread *thread;
struct spdk_io_channel *ch;
assert(i->cur_thread == spdk_get_thread());
i->status = status;
pthread_mutex_lock(&g_devlist_mutex);
if (status) {
goto end;
}
thread = TAILQ_NEXT(i->cur_thread, tailq);
while (thread) {
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev->io_device == i->io_device) {
i->cur_thread = thread;
i->ch = ch;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(thread, _call_channel, i);
return;
}
}
thread = TAILQ_NEXT(thread, tailq);
}
end:
i->dev->for_each_count--;
i->ch = NULL;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(i->orig_thread, _call_completion, i);
}
SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)