From 93d635990aaf679020f8beed13ab3bb5a66450ac Mon Sep 17 00:00:00 2001 From: Daniel Verkamp Date: Fri, 20 Jan 2017 10:41:21 -0700 Subject: [PATCH] event: simplify _spdk_event_queue_run_batch() Store each reactor's per-socket event mempool in the spdk_reactor structure to avoid calling rte_lcore_to_socket_id() on every iteration, and make the function definition an internal, inlineable version that takes the reactor pointer directly. Change-Id: I841f7d7594308d7c572f5b7f609913c428bd13d7 Signed-off-by: Daniel Verkamp --- lib/event/reactor.c | 62 +++++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 22b389820..459bfec3b 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -98,6 +98,9 @@ struct spdk_reactor { /* Logical core number for this reactor. */ uint32_t lcore; + /* Socket ID for this reactor. */ + uint32_t socket_id; + /* * Contains pollers actively running on this reactor. Pollers * are run round-robin. The reactor takes one poller from the head @@ -113,6 +116,9 @@ struct spdk_reactor { struct rte_ring *events; + /* Pointer to the per-socket g_spdk_event_mempool for this reactor. */ + struct spdk_mempool *event_mempool; + uint64_t max_delay_us; } __attribute__((aligned(64))); @@ -176,19 +182,20 @@ spdk_event_call(struct spdk_event *event) } } -uint32_t -spdk_event_queue_run_batch(uint32_t lcore) +static inline uint32_t +_spdk_event_queue_run_batch(struct spdk_reactor *reactor) { - struct spdk_reactor *reactor; - unsigned socket_id; unsigned count, i; void *events[SPDK_EVENT_BATCH_SIZE]; - reactor = spdk_reactor_get(lcore); - assert(reactor->events != NULL); - - socket_id = rte_lcore_to_socket_id(lcore); - assert(socket_id < SPDK_MAX_SOCKET); +#ifdef DEBUG + /* + * rte_ring_dequeue_burst() fills events and returns how many entries it wrote, + * so we will never actually read uninitialized data from events, but just to be sure + * (and to silence a static analyzer false positive), initialize the array to NULL pointers. + */ + memset(events, 0, sizeof(events)); +#endif count = rte_ring_dequeue_burst(reactor->events, events, SPDK_EVENT_BATCH_SIZE); if (count == 0) { @@ -198,14 +205,21 @@ spdk_event_queue_run_batch(uint32_t lcore) for (i = 0; i < count; i++) { struct spdk_event *event = events[i]; + assert(event != NULL); event->fn(event->arg1, event->arg2); } - spdk_mempool_put_bulk(g_spdk_event_mempool[socket_id], events, count); + spdk_mempool_put_bulk(reactor->event_mempool, events, count); return count; } +uint32_t +spdk_event_queue_run_batch(uint32_t lcore) +{ + return _spdk_event_queue_run_batch(spdk_reactor_get(lcore)); +} + /** \brief Set current reactor thread name to "reactor ". @@ -304,8 +318,8 @@ _spdk_reactor_run(void *arg) spdk_allocate_thread(); set_reactor_thread_name(); - SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", rte_lcore_id(), - rte_lcore_to_socket_id(rte_lcore_id())); + SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore, + reactor->socket_id); spin_cycles = SPDK_REACTOR_SPIN_TIME_US * spdk_get_ticks_hz() / 1000000ULL; sleep_cycles = reactor->max_delay_us * spdk_get_ticks_hz() / 1000000ULL; @@ -314,7 +328,7 @@ _spdk_reactor_run(void *arg) while (1) { bool took_action = false; - event_count = spdk_event_queue_run_batch(rte_lcore_id()); + event_count = _spdk_event_queue_run_batch(reactor); if (event_count > 0) { took_action = true; } @@ -399,6 +413,8 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma char ring_name[64]; reactor->lcore = lcore; + reactor->socket_id = rte_lcore_to_socket_id(lcore); + assert(reactor->socket_id < SPDK_MAX_SOCKET); reactor->max_delay_us = max_delay_us; TAILQ_INIT(&reactor->active_pollers); @@ -406,8 +422,10 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore); reactor->events = - rte_ring_create(ring_name, 65536, rte_lcore_to_socket_id(lcore), RING_F_SC_DEQ); + rte_ring_create(ring_name, 65536, reactor->socket_id, RING_F_SC_DEQ); assert(reactor->events != NULL); + + reactor->event_mempool = g_spdk_event_mempool[reactor->socket_id]; } static void @@ -574,14 +592,6 @@ spdk_reactors_init(const char *mask, unsigned int max_delay_us) printf("Occupied cpu core mask is 0x%lx\n", spdk_app_get_core_mask()); - RTE_LCORE_FOREACH(i) { - if (((1ULL << i) & spdk_app_get_core_mask())) { - reactor = spdk_reactor_get(i); - spdk_reactor_construct(reactor, i, max_delay_us); - g_reactor_count++; - } - } - socket_mask = spdk_reactor_get_socket_mask(); printf("Occupied cpu socket mask is 0x%lx\n", socket_mask); @@ -621,6 +631,14 @@ spdk_reactors_init(const char *mask, unsigned int max_delay_us) } } + RTE_LCORE_FOREACH(i) { + if (((1ULL << i) & spdk_app_get_core_mask())) { + reactor = spdk_reactor_get(i); + spdk_reactor_construct(reactor, i, max_delay_us); + g_reactor_count++; + } + } + g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; return rc;