From c45dfec4f602e0f43f5e01c624bd45a61c04bbdb Mon Sep 17 00:00:00 2001 From: Ben Walker Date: Fri, 25 Sep 2015 14:09:41 -0700 Subject: [PATCH] nvme: Add concurrency to nvme perf example. Change-Id: Ic565b70517bb2958b64fe7f2cf59a31e4b6250ef Signed-off-by: Ben Walker --- examples/nvme/perf/perf.c | 139 ++++++++++++++++++++++++++++++-------- 1 file changed, 112 insertions(+), 27 deletions(-) diff --git a/examples/nvme/perf/perf.c b/examples/nvme/perf/perf.c index 41269724d..1884c9b65 100644 --- a/examples/nvme/perf/perf.c +++ b/examples/nvme/perf/perf.c @@ -70,11 +70,18 @@ struct perf_task { void *buf; }; +struct worker_thread { + struct ns_entry *namespaces; + struct worker_thread *next; + unsigned lcore; +}; + struct rte_mempool *request_mempool; static struct rte_mempool *task_pool; static struct ctrlr_entry *g_controllers = NULL; -static struct ns_entry *g_namespaces = NULL; +static struct worker_thread *g_workers = NULL; +static struct worker_thread *g_current_worker = NULL; static uint64_t g_tsc_rate; @@ -84,16 +91,21 @@ static int g_is_random; static int g_queue_depth; static int g_time_in_sec; +static const char *g_core_mask; + static void register_ns(struct nvme_controller *ctrlr, struct pci_device *pci_dev, struct nvme_namespace *ns) { + struct worker_thread *worker; struct ns_entry *entry = malloc(sizeof(struct ns_entry)); const struct nvme_controller_data *cdata = nvme_ctrlr_get_data(ctrlr); + worker = g_current_worker; + entry->ctrlr = ctrlr; entry->ns = ns; - entry->next = g_namespaces; + entry->next = worker->namespaces; entry->io_completed = 0; entry->current_queue_depth = 0; entry->offset_in_ios = 0; @@ -103,7 +115,14 @@ register_ns(struct nvme_controller *ctrlr, struct pci_device *pci_dev, struct nv entry->is_draining = false; snprintf(entry->name, sizeof(cdata->mn), "%s", cdata->mn); - g_namespaces = entry; + printf("Assigning namespace %s to lcore %u\n", entry->name, worker->lcore); + worker->namespaces = entry; + + if (worker->next == NULL) { + g_current_worker = g_workers; + } else { + g_current_worker = worker->next; + } } static void @@ -221,12 +240,17 @@ static int work_fn(void *arg) { uint64_t tsc_end = rte_get_timer_cycles() + g_time_in_sec * g_tsc_rate; - struct ns_entry *entry = (struct ns_entry *)arg; + struct worker_thread *worker = (struct worker_thread *)arg; + struct ns_entry *entry = NULL; + + printf("Starting thread on core %u\n", worker->lcore); nvme_register_io_thread(); /* Submit initial I/O for each namespace. */ + entry = worker->namespaces; while (entry != NULL) { + submit_io(entry, g_queue_depth); entry = entry->next; } @@ -237,20 +261,18 @@ work_fn(void *arg) * I/O will be submitted in the io_complete callback * to replace each I/O that is completed. */ - entry = (struct ns_entry *)arg; + entry = worker->namespaces; while (entry != NULL) { check_io(entry); entry = entry->next; } - rte_delay_us(1); - if (rte_get_timer_cycles() > tsc_end) { break; } } - entry = (struct ns_entry *)arg; + entry = worker->namespaces; while (entry != NULL) { drain_io(entry); entry = entry->next; @@ -270,6 +292,8 @@ static void usage(char *program_name) printf("\t\t(read, write, randread, randwrite, rw, randrw)]\n"); printf("\t[-M rwmixread (100 for reads, 0 for writes)]\n"); printf("\t[-t time in seconds]\n"); + printf("\t[-m core mask for I/O submission/completion.]\n"); + printf("\t\t(default: 1)]\n"); } static void @@ -277,24 +301,28 @@ print_stats(void) { float io_per_second, mb_per_second; float total_io_per_second, total_mb_per_second; + struct worker_thread *worker; total_io_per_second = 0; total_mb_per_second = 0; - struct ns_entry *entry = g_namespaces; - while (entry != NULL) { - io_per_second = (float)entry->io_completed / - g_time_in_sec; - mb_per_second = io_per_second * g_io_size_bytes / - (1024 * 1024); - printf("%-.20s: %10.2f IO/s %10.2f MB/s\n", - entry->name, io_per_second, - mb_per_second); - total_io_per_second += io_per_second; - total_mb_per_second += mb_per_second; - entry = entry->next; + worker = g_workers; + while (worker != NULL) { + struct ns_entry *entry = worker->namespaces; + while (entry != NULL) { + io_per_second = (float)entry->io_completed / + g_time_in_sec; + mb_per_second = io_per_second * g_io_size_bytes / + (1024 * 1024); + printf("%-.20s: %10.2f IO/s %10.2f MB/s on lcore %u\n", + entry->name, io_per_second, + mb_per_second, worker->lcore); + total_io_per_second += io_per_second; + total_mb_per_second += mb_per_second; + entry = entry->next; + } + worker = worker->next; } - printf("=====================================================\n"); printf("%-20s: %10.2f IO/s %10.2f MB/s\n", "Total", total_io_per_second, total_mb_per_second); @@ -313,9 +341,13 @@ parse_args(int argc, char **argv) workload_type = NULL; g_time_in_sec = 0; g_rw_percentage = -1; + g_core_mask = NULL; - while ((op = getopt(argc, argv, "q:s:t:w:M:")) != -1) { + while ((op = getopt(argc, argv, "m:q:s:t:w:M:")) != -1) { switch (op) { + case 'm': + g_core_mask = optarg; + break; case 'q': g_queue_depth = atoi(optarg); break; @@ -409,6 +441,30 @@ parse_args(int argc, char **argv) return 0; } +static int +register_workers(void) +{ + unsigned lcore; + struct worker_thread *worker; + struct worker_thread *prev_worker; + + worker = malloc(sizeof(struct worker_thread)); + memset(worker, 0, sizeof(struct worker_thread)); + worker->lcore = rte_get_master_lcore(); + + g_workers = g_current_worker = worker; + + RTE_LCORE_FOREACH_SLAVE(lcore) { + prev_worker = worker; + worker = malloc(sizeof(struct worker_thread)); + memset(worker, 0, sizeof(struct worker_thread)); + worker->lcore = lcore; + prev_worker->next = worker; + } + + return 0; +} + static int register_controllers(void) { @@ -417,6 +473,8 @@ register_controllers(void) struct pci_id_match match; int rc; + printf("Initializing NVMe Controllers\n"); + pci_system_init(); match.vendor_id = PCI_MATCH_ANY; @@ -472,22 +530,28 @@ unregister_controllers(void) static const char *ealargs[] = { "perf", - "-c 0x1", + "-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */ "-n 4", }; int main(int argc, char **argv) { int rc; + char core_mask_arg[128]; + struct worker_thread *worker; rc = parse_args(argc, argv); if (rc != 0) { return rc; } + if (g_core_mask != NULL) { + snprintf(core_mask_arg, sizeof(core_mask_arg), "-c %s", g_core_mask); + ealargs[1] = strdup(core_mask_arg); + } + rc = rte_eal_init(sizeof(ealargs) / sizeof(ealargs[0]), (char **)(void *)(uintptr_t)ealargs); - if (rc < 0) { fprintf(stderr, "could not initialize dpdk\n"); return 1; @@ -510,12 +574,33 @@ int main(int argc, char **argv) g_tsc_rate = rte_get_timer_hz(); - rc = register_controllers(); + register_workers(); + register_controllers(); + + /* Launch all of the slave workers */ + worker = g_workers->next; + while (worker != NULL) { + if (worker->namespaces != NULL) { + rte_eal_remote_launch(work_fn, worker, worker->lcore); + } + worker = worker->next; + } + + work_fn(g_workers); + + worker = g_workers->next; + while (worker != NULL) { + if (worker->namespaces != NULL) { + if (rte_eal_wait_lcore(worker->lcore) < 0) { + return -1; + } + } + worker = worker->next; + } - work_fn(g_namespaces); print_stats(); unregister_controllers(); - return rc; + return 0; }