event: simplify _spdk_event_queue_run_batch()

Store each reactor's per-socket event mempool in the spdk_reactor
structure to avoid calling rte_lcore_to_socket_id() on every iteration,
and make the function definition an internal, inlineable version
that takes the reactor pointer directly.

Change-Id: I841f7d7594308d7c572f5b7f609913c428bd13d7
Signed-off-by: Daniel Verkamp <daniel.verkamp@intel.com>
This commit is contained in:
Daniel Verkamp 2017-01-20 10:41:21 -07:00
parent 885a86a569
commit 93d635990a

View File

@ -98,6 +98,9 @@ struct spdk_reactor {
/* Logical core number for this reactor. */
uint32_t lcore;
/* Socket ID for this reactor. */
uint32_t socket_id;
/*
* Contains pollers actively running on this reactor. Pollers
* are run round-robin. The reactor takes one poller from the head
@ -113,6 +116,9 @@ struct spdk_reactor {
struct rte_ring *events;
/* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
struct spdk_mempool *event_mempool;
uint64_t max_delay_us;
} __attribute__((aligned(64)));
@ -176,19 +182,20 @@ spdk_event_call(struct spdk_event *event)
}
}
uint32_t
spdk_event_queue_run_batch(uint32_t lcore)
static inline uint32_t
_spdk_event_queue_run_batch(struct spdk_reactor *reactor)
{
struct spdk_reactor *reactor;
unsigned socket_id;
unsigned count, i;
void *events[SPDK_EVENT_BATCH_SIZE];
reactor = spdk_reactor_get(lcore);
assert(reactor->events != NULL);
socket_id = rte_lcore_to_socket_id(lcore);
assert(socket_id < SPDK_MAX_SOCKET);
#ifdef DEBUG
/*
* rte_ring_dequeue_burst() fills events and returns how many entries it wrote,
* so we will never actually read uninitialized data from events, but just to be sure
* (and to silence a static analyzer false positive), initialize the array to NULL pointers.
*/
memset(events, 0, sizeof(events));
#endif
count = rte_ring_dequeue_burst(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
if (count == 0) {
@ -198,14 +205,21 @@ spdk_event_queue_run_batch(uint32_t lcore)
for (i = 0; i < count; i++) {
struct spdk_event *event = events[i];
assert(event != NULL);
event->fn(event->arg1, event->arg2);
}
spdk_mempool_put_bulk(g_spdk_event_mempool[socket_id], events, count);
spdk_mempool_put_bulk(reactor->event_mempool, events, count);
return count;
}
uint32_t
spdk_event_queue_run_batch(uint32_t lcore)
{
return _spdk_event_queue_run_batch(spdk_reactor_get(lcore));
}
/**
\brief Set current reactor thread name to "reactor <cpu #>".
@ -304,8 +318,8 @@ _spdk_reactor_run(void *arg)
spdk_allocate_thread();
set_reactor_thread_name();
SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", rte_lcore_id(),
rte_lcore_to_socket_id(rte_lcore_id()));
SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore,
reactor->socket_id);
spin_cycles = SPDK_REACTOR_SPIN_TIME_US * spdk_get_ticks_hz() / 1000000ULL;
sleep_cycles = reactor->max_delay_us * spdk_get_ticks_hz() / 1000000ULL;
@ -314,7 +328,7 @@ _spdk_reactor_run(void *arg)
while (1) {
bool took_action = false;
event_count = spdk_event_queue_run_batch(rte_lcore_id());
event_count = _spdk_event_queue_run_batch(reactor);
if (event_count > 0) {
took_action = true;
}
@ -399,6 +413,8 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma
char ring_name[64];
reactor->lcore = lcore;
reactor->socket_id = rte_lcore_to_socket_id(lcore);
assert(reactor->socket_id < SPDK_MAX_SOCKET);
reactor->max_delay_us = max_delay_us;
TAILQ_INIT(&reactor->active_pollers);
@ -406,8 +422,10 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma
snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore);
reactor->events =
rte_ring_create(ring_name, 65536, rte_lcore_to_socket_id(lcore), RING_F_SC_DEQ);
rte_ring_create(ring_name, 65536, reactor->socket_id, RING_F_SC_DEQ);
assert(reactor->events != NULL);
reactor->event_mempool = g_spdk_event_mempool[reactor->socket_id];
}
static void
@ -574,14 +592,6 @@ spdk_reactors_init(const char *mask, unsigned int max_delay_us)
printf("Occupied cpu core mask is 0x%lx\n", spdk_app_get_core_mask());
RTE_LCORE_FOREACH(i) {
if (((1ULL << i) & spdk_app_get_core_mask())) {
reactor = spdk_reactor_get(i);
spdk_reactor_construct(reactor, i, max_delay_us);
g_reactor_count++;
}
}
socket_mask = spdk_reactor_get_socket_mask();
printf("Occupied cpu socket mask is 0x%lx\n", socket_mask);
@ -621,6 +631,14 @@ spdk_reactors_init(const char *mask, unsigned int max_delay_us)
}
}
RTE_LCORE_FOREACH(i) {
if (((1ULL << i) & spdk_app_get_core_mask())) {
reactor = spdk_reactor_get(i);
spdk_reactor_construct(reactor, i, max_delay_us);
g_reactor_count++;
}
}
g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
return rc;