event: Use a single event pool instead of one per socket

Previously, a pool of events per socket was created for NUMA
locality. However, when passing a message between threads,
the event would be taken from one pool and put back into another,
resulting in imbalances.

At this time, I do not see an efficient way to allocate the events
so that they remain NUMA local. We'll work on that over time.
However, for correctness right now, go down to a single global
pool of events.

Change-Id: I09f8b0c5c928777e8274c53c6dce21b9c346e2a5
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/433519
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Changpeng Liu <changpeng.liu@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Darek Stojaczyk <dariusz.stojaczyk@intel.com>
Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
This commit is contained in:
Ben Walker 2018-11-15 09:51:02 -07:00 committed by Darek Stojaczyk
parent e956be96eb
commit e1ec5c60ca
2 changed files with 17 additions and 139 deletions

View File

@ -42,8 +42,6 @@
#include "spdk/env.h" #include "spdk/env.h"
#include "spdk/util.h" #include "spdk/util.h"
#define SPDK_MAX_SOCKET 64
#define SPDK_EVENT_BATCH_SIZE 8 #define SPDK_EVENT_BATCH_SIZE 8
enum spdk_reactor_state { enum spdk_reactor_state {
@ -58,9 +56,6 @@ struct spdk_reactor {
/* Logical core number for this reactor. */ /* Logical core number for this reactor. */
uint32_t lcore; uint32_t lcore;
/* Socket ID for this reactor. */
uint32_t socket_id;
/* Poller for get the rusage for the reactor. */ /* Poller for get the rusage for the reactor. */
struct spdk_poller *rusage_poller; struct spdk_poller *rusage_poller;
@ -74,9 +69,6 @@ struct spdk_reactor {
struct spdk_ring *events; struct spdk_ring *events;
/* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
struct spdk_mempool *event_mempool;
uint64_t max_delay_us; uint64_t max_delay_us;
} __attribute__((aligned(64))); } __attribute__((aligned(64)));
@ -89,7 +81,7 @@ static bool g_context_switch_monitor_enabled = true;
static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore, static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore,
uint64_t max_delay_us); uint64_t max_delay_us);
static struct spdk_mempool *g_spdk_event_mempool[SPDK_MAX_SOCKET]; static struct spdk_mempool *g_spdk_event_mempool = NULL;
static struct spdk_cpuset *g_spdk_app_core_mask; static struct spdk_cpuset *g_spdk_app_core_mask;
@ -112,7 +104,7 @@ spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
return NULL; return NULL;
} }
event = spdk_mempool_get(reactor->event_mempool); event = spdk_mempool_get(g_spdk_event_mempool);
if (event == NULL) { if (event == NULL) {
assert(false); assert(false);
return NULL; return NULL;
@ -168,7 +160,7 @@ _spdk_event_queue_run_batch(struct spdk_reactor *reactor)
event->fn(event->arg1, event->arg2); event->fn(event->arg1, event->arg2);
} }
spdk_mempool_put_bulk(reactor->event_mempool, events, count); spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
return count; return count;
} }
@ -215,24 +207,6 @@ _spdk_reactor_context_switch_monitor_stop(void *arg1, void *arg2)
} }
} }
static size_t
_spdk_reactor_get_max_event_cnt(uint8_t socket_count)
{
size_t cnt;
/* Try to make event ring fill at most 2MB of memory,
* as some ring implementations may require physical address
* contingency. We don't want to introduce a requirement of
* at least 2 physically contiguous 2MB hugepages.
*/
cnt = spdk_min(262144 / socket_count, 262144 / 2);
/* Take into account one extra element required by
* some ring implementations.
*/
cnt -= 1;
return cnt;
}
void void
spdk_reactor_enable_context_switch_monitor(bool enable) spdk_reactor_enable_context_switch_monitor(bool enable)
{ {
@ -312,8 +286,7 @@ _spdk_reactor_run(void *arg)
if (!thread) { if (!thread) {
return -1; return -1;
} }
SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore, SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
reactor->socket_id);
sleep_cycles = reactor->max_delay_us * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; sleep_cycles = reactor->max_delay_us * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
if (g_context_switch_monitor_enabled) { if (g_context_switch_monitor_enabled) {
@ -378,21 +351,10 @@ static void
spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t max_delay_us) spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t max_delay_us)
{ {
reactor->lcore = lcore; reactor->lcore = lcore;
reactor->socket_id = spdk_env_get_socket_id(lcore);
assert(reactor->socket_id < SPDK_MAX_SOCKET);
reactor->max_delay_us = max_delay_us; reactor->max_delay_us = max_delay_us;
reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, reactor->socket_id); reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
if (!reactor->events) {
SPDK_NOTICELOG("Ring creation failed on preferred socket %d. Try other sockets.\n",
reactor->socket_id);
reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536,
SPDK_ENV_SOCKET_ID_ANY);
}
assert(reactor->events != NULL); assert(reactor->events != NULL);
reactor->event_mempool = g_spdk_event_mempool[reactor->socket_id];
} }
int int
@ -418,22 +380,6 @@ spdk_app_get_core_mask(void)
return g_spdk_app_core_mask; return g_spdk_app_core_mask;
} }
static uint64_t
spdk_reactor_get_socket_mask(void)
{
uint32_t i;
uint32_t socket_id;
uint64_t socket_info = 0;
SPDK_ENV_FOREACH_CORE(i) {
socket_id = spdk_env_get_socket_id(i);
socket_info |= (1ULL << socket_id);
}
return socket_info;
}
void void
spdk_reactors_start(void) spdk_reactors_start(void)
{ {
@ -479,63 +425,21 @@ int
spdk_reactors_init(unsigned int max_delay_us) spdk_reactors_init(unsigned int max_delay_us)
{ {
int rc; int rc;
uint32_t i, j, last_core; uint32_t i, last_core;
struct spdk_reactor *reactor; struct spdk_reactor *reactor;
uint64_t socket_mask = 0x0;
uint8_t socket_count = 0;
char mempool_name[32]; char mempool_name[32];
socket_mask = spdk_reactor_get_socket_mask(); snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
SPDK_NOTICELOG("Occupied cpu socket mask is 0x%lx\n", socket_mask); g_spdk_event_mempool = spdk_mempool_create(mempool_name,
262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
for (i = 0; i < SPDK_MAX_SOCKET; i++) {
if ((1ULL << i) & socket_mask) {
socket_count++;
}
}
if (socket_count == 0) {
SPDK_ERRLOG("No sockets occupied (internal error)\n");
return -1;
}
for (i = 0; i < SPDK_MAX_SOCKET; i++) {
if ((1ULL << i) & socket_mask) {
snprintf(mempool_name, sizeof(mempool_name), "evtpool%d_%d", i, getpid());
g_spdk_event_mempool[i] = spdk_mempool_create(mempool_name,
_spdk_reactor_get_max_event_cnt(socket_count),
sizeof(struct spdk_event),
SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, i);
if (g_spdk_event_mempool[i] == NULL) {
SPDK_NOTICELOG("Event_mempool creation failed on preferred socket %d.\n", i);
/*
* Instead of failing the operation directly, try to create
* the mempool on any available sockets in the case that
* memory is not evenly installed on all sockets. If still
* fails, free all allocated memory and exits.
*/
g_spdk_event_mempool[i] = spdk_mempool_create(
mempool_name,
_spdk_reactor_get_max_event_cnt(socket_count),
sizeof(struct spdk_event), sizeof(struct spdk_event),
SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
SPDK_ENV_SOCKET_ID_ANY); SPDK_ENV_SOCKET_ID_ANY);
if (g_spdk_event_mempool[i] == NULL) { if (g_spdk_event_mempool == NULL) {
for (j = i - 1; j < i; j--) {
if (g_spdk_event_mempool[j] != NULL) {
spdk_mempool_free(g_spdk_event_mempool[j]);
}
}
SPDK_ERRLOG("spdk_event_mempool creation failed\n"); SPDK_ERRLOG("spdk_event_mempool creation failed\n");
return -1; return -1;
} }
}
} else {
g_spdk_event_mempool[i] = NULL;
}
}
/* struct spdk_reactor must be aligned on 64 byte boundary */ /* struct spdk_reactor must be aligned on 64 byte boundary */
last_core = spdk_env_get_last_core(); last_core = spdk_env_get_last_core();
@ -544,11 +448,7 @@ spdk_reactors_init(unsigned int max_delay_us)
if (rc != 0) { if (rc != 0) {
SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
last_core + 1); last_core + 1);
for (i = 0; i < SPDK_MAX_SOCKET; i++) { spdk_mempool_free(g_spdk_event_mempool);
if (g_spdk_event_mempool[i] != NULL) {
spdk_mempool_free(g_spdk_event_mempool[i]);
}
}
return -1; return -1;
} }
@ -577,11 +477,7 @@ spdk_reactors_fini(void)
} }
} }
for (i = 0; i < SPDK_MAX_SOCKET; i++) { spdk_mempool_free(g_spdk_event_mempool);
if (g_spdk_event_mempool[i] != NULL) {
spdk_mempool_free(g_spdk_event_mempool[i]);
}
}
free(g_reactors); free(g_reactors);
g_reactors = NULL; g_reactors = NULL;

View File

@ -160,24 +160,6 @@ _set_thread_name(const char *thread_name)
#endif #endif
} }
static size_t
_spdk_thread_lib_get_max_msg_cnt(uint8_t socket_count)
{
size_t cnt;
/* Try to make message ring fill at most 2MB of memory,
* as some ring implementations may require physical address
* contingency. We don't want to introduce a requirement of
* at least 2 physically contiguous 2MB hugepages.
*/
cnt = spdk_min(262144 / socket_count, 262144 / 2);
/* Take into account one extra element required by
* some ring implementations.
*/
cnt -= 1;
return cnt;
}
int int
spdk_thread_lib_init(void) spdk_thread_lib_init(void)
{ {
@ -185,7 +167,7 @@ spdk_thread_lib_init(void)
snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
g_spdk_msg_mempool = spdk_mempool_create(mempool_name, g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
_spdk_thread_lib_get_max_msg_cnt(1), 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
sizeof(struct spdk_msg), sizeof(struct spdk_msg),
SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
SPDK_ENV_SOCKET_ID_ANY); SPDK_ENV_SOCKET_ID_ANY);