bdev, rbd: Remove the lock and introduce polling strategy

This patch removes the lock in RBD module. And it requires
the librbd library supports rbd_poll_io_events function.

Change-Id: I040a7d8369ab4f69f41d1d0233115f885168f019
Signed-off-by: Ziye Yang <ziye.yang@intel.com>
This commit is contained in:
Ziye Yang 2016-10-08 13:09:01 +08:00 committed by Daniel Verkamp
parent 37a7fff634
commit 9dd0f89486

View File

@ -40,6 +40,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <inttypes.h> #include <inttypes.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <rbd/librbd.h> #include <rbd/librbd.h>
#include <rados/librados.h> #include <rados/librados.h>
@ -73,7 +75,6 @@ struct blockdev_rbd_io {
rbd_completion_t completion; rbd_completion_t completion;
rbd_cb_fn_t cb_fn; rbd_cb_fn_t cb_fn;
struct blockdev_rbd_io_channel *ch; struct blockdev_rbd_io_channel *ch;
struct blockdev_rbd_io *next;
}; };
struct blockdev_rbd { struct blockdev_rbd {
@ -88,9 +89,10 @@ struct blockdev_rbd {
struct blockdev_rbd_io_channel { struct blockdev_rbd_io_channel {
rados_ioctx_t io_ctx; rados_ioctx_t io_ctx;
rados_t cluster; rados_t cluster;
struct pollfd pfd;
rbd_image_t image; rbd_image_t image;
pthread_mutex_t lock; rbd_completion_t *comps;
struct blockdev_rbd_io *req_head; uint32_t queue_depth;
struct blockdev_rbd *disk; struct blockdev_rbd *disk;
struct spdk_poller *poller; struct spdk_poller *poller;
}; };
@ -175,37 +177,7 @@ blockdev_rbd_exit(rbd_image_t image)
static void static void
blockdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) blockdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
{ {
struct blockdev_rbd_io *cmd = (struct blockdev_rbd_io *)arg; /* Doing nothing here */
int status;
struct blockdev_rbd_io_channel *ch = (struct blockdev_rbd_io_channel *)cmd->ch;
struct blockdev_rbd_io **req_head;
status = rbd_aio_get_return_value(cb);
if (cmd->direction == BLOCKDEV_RBD_READ) {
if ((int)cmd->len == status)
cmd->status = 0;
else
cmd->status = -1;
} else {
/* For write, 0 means success */
if (!status)
cmd->status = 0;
else
cmd->status = -1;
}
rbd_aio_release(cmd->completion);
/* We queue the IO to the disk list first and call the
* callback from polling thread, this will ensure
* all the IOs complete from the same lcore.
*/
pthread_mutex_lock(&ch->lock);
req_head = &ch->req_head;
cmd->next = *req_head;
*req_head = cmd;
pthread_mutex_unlock(&ch->lock);
} }
static int static int
@ -346,23 +318,66 @@ static void
blockdev_rbd_io_poll(void *arg) blockdev_rbd_io_poll(void *arg)
{ {
struct blockdev_rbd_io_channel *ch = arg; struct blockdev_rbd_io_channel *ch = arg;
struct blockdev_rbd_io **req_head = &ch->req_head;
struct blockdev_rbd_io *req; struct blockdev_rbd_io *req;
struct blockdev_rbd_io *req_next; int i, io_status, status, rc;
int status;
pthread_mutex_lock(&ch->lock); rc = poll(&ch->pfd, 1, 0);
req = *req_head;
*req_head = NULL; /* check the return value of poll since we have only one fd for each channel */
while (req != NULL) { if (rc != 1) {
req_next = req->next; return;
}
rc = rbd_poll_io_events(ch->image, ch->comps, ch->queue_depth);
for (i = 0; i < rc; i++) {
req = (struct blockdev_rbd_io *)rbd_aio_get_arg(ch->comps[i]);
io_status = rbd_aio_get_return_value(ch->comps[i]);
if (req->direction == BLOCKDEV_RBD_READ) {
if ((int)req->len == io_status) {
req->status = 0;
} else {
req->status = -1;
}
} else {
/* For others, 0 means success */
if (!io_status) {
req->status = 0;
} else {
req->status = -1;
}
}
status = req->status == 0 ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED; status = req->status == 0 ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
spdk_bdev_io_complete(spdk_bdev_io_from_ctx(req), status); spdk_bdev_io_complete(spdk_bdev_io_from_ctx(req), status);
req = req_next; rbd_aio_release(req->completion);
}
}
static void
blockdev_rbd_free_channel(struct blockdev_rbd_io_channel *ch)
{
if (!ch) {
return;
}
if (ch->image) {
blockdev_rbd_exit(ch->image);
}
if (ch->io_ctx) {
rados_ioctx_destroy(ch->io_ctx);
}
if (ch->cluster) {
rados_shutdown(ch->cluster);
}
if (ch->comps) {
free(ch->comps);
}
if (ch->pfd.fd >= 0) {
close(ch->pfd.fd);
} }
pthread_mutex_unlock(&ch->lock);
} }
static int static int
@ -375,28 +390,51 @@ blockdev_rbd_create_cb(void *io_device, uint32_t priority,
ch->disk = (struct blockdev_rbd *)io_device; ch->disk = (struct blockdev_rbd *)io_device;
pool_info = ch->disk->pool_info; pool_info = ch->disk->pool_info;
ch->req_head = NULL;
ch->image = NULL; ch->image = NULL;
ch->io_ctx = NULL; ch->io_ctx = NULL;
ch->pfd.fd = -1;
ret = blockdev_rados_context_init(pool_info->name, &ch->cluster, &ch->io_ctx); ret = blockdev_rados_context_init(pool_info->name, &ch->cluster, &ch->io_ctx);
if (ret < 0) { if (ret < 0) {
SPDK_ERRLOG("Failed to create rados context for rbd_pool=%s\n", SPDK_ERRLOG("Failed to create rados context for rbd_pool=%s\n",
pool_info->name); pool_info->name);
return -1; goto err;
} }
ret = rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL); ret = rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL);
if (ret < 0) { if (ret < 0) {
SPDK_ERRLOG("Failed to open specified rbd device\n"); SPDK_ERRLOG("Failed to open specified rbd device\n");
return -1; goto err;
}
ch->pfd.fd = eventfd(0, EFD_NONBLOCK);
if (ch->pfd.fd < 0) {
SPDK_ERRLOG("Failed to get eventfd\n");
goto err;
}
ch->pfd.events = POLLIN;
ret = rbd_set_image_notification(ch->image, ch->pfd.fd, EVENT_TYPE_EVENTFD);
if (ret < 0) {
SPDK_ERRLOG("Failed to set rbd image notification\n");
goto err;
}
ch->queue_depth = 128;
ch->comps = calloc(sizeof(rbd_completion_t), ch->queue_depth);
if (!ch->comps) {
SPDK_ERRLOG("Failed to allocate rbd completion array\n");
goto err;
} }
pthread_mutex_init(&ch->lock, NULL);
spdk_poller_register(&ch->poller, blockdev_rbd_io_poll, ch, spdk_poller_register(&ch->poller, blockdev_rbd_io_poll, ch,
spdk_app_get_current_core(), NULL, 0); spdk_app_get_current_core(), NULL, 0);
return 0; return 0;
err:
blockdev_rbd_free_channel(ch);
return -1;
} }
static void static void
@ -404,17 +442,7 @@ blockdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
{ {
struct blockdev_rbd_io_channel *io_channel = ctx_buf; struct blockdev_rbd_io_channel *io_channel = ctx_buf;
if (io_channel->image) { blockdev_rbd_free_channel(io_channel);
blockdev_rbd_exit(io_channel->image);
}
if (io_channel->io_ctx) {
rados_ioctx_destroy(io_channel->io_ctx);
}
if (io_channel->cluster) {
rados_shutdown(io_channel->cluster);
}
spdk_poller_unregister(&io_channel->poller, NULL); spdk_poller_unregister(&io_channel->poller, NULL);
} }