bdev/rbd: Implement the group polling policy.

This patch is used to implement the group polling
policy instead of each rbd has one poller.

Signed-off-by: Ziye Yang <ziye.yang@intel.com>
Change-Id: Ieb975e656240bcdaf2657410f010d72b156639ed
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3698
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Changpeng Liu <changpeng.liu@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
This commit is contained in:
Ziye Yang 2020-08-10 03:16:06 +08:00 committed by Tomasz Zawadzki
parent 5d097aa7fd
commit 6cbbc68296

View File

@ -38,6 +38,7 @@
#include <rbd/librbd.h>
#include <rados/librados.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include "spdk/conf.h"
#include "spdk/env.h"
@ -51,6 +52,7 @@
#include "spdk_internal/log.h"
#define SPDK_RBD_QUEUE_DEPTH 128
#define MAX_EVENTS_PER_POLL 128
static int bdev_rbd_count = 0;
@ -68,13 +70,18 @@ struct bdev_rbd {
struct spdk_bdev_io *reset_bdev_io;
};
struct bdev_rbd_group_channel {
struct spdk_poller *poller;
int epoll_fd;
};
struct bdev_rbd_io_channel {
rados_ioctx_t io_ctx;
rados_t cluster;
struct pollfd pfd;
int pfd;
rbd_image_t image;
struct bdev_rbd *disk;
struct spdk_poller *poller;
struct bdev_rbd_group_channel *group_ch;
};
struct bdev_rbd_io {
@ -265,6 +272,8 @@ bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io,
static int bdev_rbd_library_init(void);
static void bdev_rbd_library_fini(void);
static int
bdev_rbd_get_ctx_size(void)
{
@ -274,6 +283,7 @@ bdev_rbd_get_ctx_size(void)
static struct spdk_bdev_module rbd_if = {
.name = "rbd",
.module_init = bdev_rbd_library_init,
.module_fini = bdev_rbd_library_fini,
.get_ctx_size = bdev_rbd_get_ctx_size,
};
@ -421,23 +431,15 @@ bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
}
}
static int
bdev_rbd_io_poll(void *arg)
static void
bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
{
struct bdev_rbd_io_channel *ch = arg;
int i, io_status, rc;
rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
struct spdk_bdev_io *bdev_io;
struct bdev_rbd_io *rbd_io;
enum spdk_bdev_io_status bio_status;
rc = poll(&ch->pfd, 1, 0);
/* check the return value of poll since we have only one fd for each channel */
if (rc != 1) {
return SPDK_POLLER_BUSY;
}
rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
for (i = 0; i < rc; i++) {
bdev_io = rbd_aio_get_arg(comps[i]);
@ -460,8 +462,6 @@ bdev_rbd_io_poll(void *arg)
spdk_bdev_io_complete(bdev_io, bio_status);
}
return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
}
static void
@ -483,8 +483,8 @@ bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
rados_shutdown(ch->cluster);
}
if (ch->pfd.fd >= 0) {
close(ch->pfd.fd);
if (ch->pfd >= 0) {
close(ch->pfd);
}
}
@ -519,30 +519,41 @@ bdev_rbd_create_cb(void *io_device, void *ctx_buf)
{
struct bdev_rbd_io_channel *ch = ctx_buf;
int ret;
struct epoll_event event;
ch->disk = io_device;
ch->image = NULL;
ch->io_ctx = NULL;
ch->pfd.fd = -1;
ch->pfd = -1;
if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
goto err;
}
ch->pfd.fd = eventfd(0, EFD_NONBLOCK);
if (ch->pfd.fd < 0) {
ch->pfd = eventfd(0, EFD_NONBLOCK);
if (ch->pfd < 0) {
SPDK_ERRLOG("Failed to get eventfd\n");
goto err;
}
ch->pfd.events = POLLIN;
ret = rbd_set_image_notification(ch->image, ch->pfd.fd, EVENT_TYPE_EVENTFD);
ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
if (ret < 0) {
SPDK_ERRLOG("Failed to set rbd image notification\n");
goto err;
}
ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_io_poll, ch, BDEV_RBD_POLL_US);
ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
assert(ch->group_ch != NULL);
memset(&event, 0, sizeof(event));
event.events = EPOLLIN;
event.data.ptr = ch;
ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
if (ret < 0) {
SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
ch->group_ch);
goto err;
}
return 0;
@ -555,10 +566,17 @@ static void
bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
{
struct bdev_rbd_io_channel *io_channel = ctx_buf;
int rc;
rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
io_channel->pfd, NULL);
if (rc < 0) {
SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
io_channel, io_channel->group_ch);
}
bdev_rbd_free_channel(io_channel);
spdk_poller_unregister(&io_channel->poller);
spdk_put_io_channel(spdk_io_channel_from_ctx(io_channel->group_ch));
}
static struct spdk_io_channel *
@ -783,6 +801,54 @@ bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
return rc;
}
static int
bdev_rbd_group_poll(void *arg)
{
struct bdev_rbd_group_channel *group_ch = arg;
struct epoll_event events[MAX_EVENTS_PER_POLL];
int num_events, i;
num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
if (num_events <= 0) {
return SPDK_POLLER_IDLE;
}
for (i = 0; i < num_events; i++) {
bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
}
return SPDK_POLLER_BUSY;
}
static int
bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
{
struct bdev_rbd_group_channel *ch = ctx_buf;
ch->epoll_fd = epoll_create1(0);
if (ch->epoll_fd < 0) {
SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
return -1;
}
ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, BDEV_RBD_POLL_US);
return 0;
}
static void
bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
{
struct bdev_rbd_group_channel *ch = ctx_buf;
if (ch->epoll_fd >= 0) {
close(ch->epoll_fd);
}
spdk_poller_unregister(&ch->poller);
}
static int
bdev_rbd_library_init(void)
{
@ -793,9 +859,13 @@ bdev_rbd_library_init(void)
struct spdk_bdev *bdev;
uint32_t block_size;
long int tmp;
struct spdk_conf_section *sp;
struct spdk_conf_section *sp = spdk_conf_find_section(NULL, "Ceph");
spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
sizeof(struct bdev_rbd_group_channel),
"bdev_rbd_poll_groups");
sp = spdk_conf_find_section(NULL, "Ceph");
if (sp == NULL) {
/*
* Ceph section not found. Do not initialize any rbd LUNS.
@ -855,4 +925,10 @@ end:
return rc;
}
static void
bdev_rbd_library_fini(void)
{
spdk_io_device_unregister(&rbd_if, NULL);
}
SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD)