ocf: switch to dynamic queues

Use new queue API to manage OCF queues dynamically
This change allows for dynamic creation and deletion of queues
  on get_ and put_ io channel.

Qeueues no longer depend on number of cores in SPDK.

Queue-to-pollers mapping list is removed as well
  as locks "q_ocf_lock" and "vbdev->_lock" as they became redundant

Change-Id: I5069e1f8535f505816184a333db876afb925ac44
Signed-off-by: Vitaliy Mysak <vitaliy.mysak@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/446841
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Darek Stojaczyk <dariusz.stojaczyk@intel.com>
This commit is contained in:
Vitaliy Mysak 2019-03-01 21:47:31 +00:00 committed by Jim Harris
parent 900f0c978b
commit ca1b5c418d
5 changed files with 35 additions and 112 deletions

View File

@ -287,28 +287,6 @@ vbdev_ocf_ctx_data_secure_erase(ctx_data_t *ctx_data)
} }
} }
/* OCF queue initialization procedure
* Called during ocf_cache_start */
static int
vbdev_ocf_ctx_queue_init(ocf_queue_t q)
{
return 0;
}
/* Called during ocf_submit_io, ocf_purge*
* and any other requests that need to submit io */
static void
vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
{
}
/* OCF queue deinitialization
* Called at ocf_cache_stop */
static void
vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
{
}
static int static int
vbdev_ocf_ctx_cleaner_init(ocf_cleaner_t c) vbdev_ocf_ctx_cleaner_init(ocf_cleaner_t c)
{ {
@ -374,12 +352,6 @@ static const struct ocf_ctx_config vbdev_ocf_ctx_cfg = {
.secure_erase = vbdev_ocf_ctx_data_secure_erase, .secure_erase = vbdev_ocf_ctx_data_secure_erase,
}, },
.queue = {
.init = vbdev_ocf_ctx_queue_init,
.kick = vbdev_ocf_ctx_queue_kick,
.stop = vbdev_ocf_ctx_queue_stop,
},
.metadata_updater = { .metadata_updater = {
.init = vbdev_ocf_volume_updater_init, .init = vbdev_ocf_volume_updater_init,
.stop = vbdev_ocf_volume_updater_stop, .stop = vbdev_ocf_volume_updater_stop,

View File

@ -50,10 +50,6 @@
static struct spdk_bdev_module ocf_if; static struct spdk_bdev_module ocf_if;
/* Set number of OCF queues to maximum numbers of cores
* that SPDK supports, so we never run out of them */
static int g_queues_count = SPDK_CPUSET_SIZE;
static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head); = TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
@ -66,7 +62,6 @@ free_vbdev(struct vbdev_ocf *vbdev)
return; return;
} }
pthread_mutex_destroy(&vbdev->_lock);
free(vbdev->name); free(vbdev->name);
free(vbdev->cache.name); free(vbdev->cache.name);
free(vbdev->core.name); free(vbdev->core.name);
@ -327,7 +322,7 @@ io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
goto fail; goto fail;
} }
ocf_io_set_queue(io, ocf_queue_get_id(qctx->queue)); ocf_io_set_queue(io, qctx->queue);
data = vbdev_ocf_data_from_spdk_io(bdev_io); data = vbdev_ocf_data_from_spdk_io(bdev_io);
if (!data) { if (!data) {
@ -537,58 +532,40 @@ static int queue_poll(void *opaque)
} }
} }
/* Find queue index that is not taken */ /* Called during ocf_submit_io, ocf_purge*
static int * and any other requests that need to submit io */
get_free_queue_id(struct vbdev_ocf *vbdev) static void
vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
{ {
struct vbdev_ocf_qcxt *qctx;
int i, tmp;
for (i = 1; i < (int)vbdev->cfg.cache.io_queues; i++) {
tmp = i;
TAILQ_FOREACH(qctx, &vbdev->queues, tailq) {
tmp = ocf_queue_get_id(qctx->queue);
if (tmp == i) {
tmp = -1;
break;
}
}
if (tmp > 0) {
return i;
}
}
return -1;
} }
/* OCF queue deinitialization
* Called at ocf_cache_stop */
static void
vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
{
}
/* Queue ops is an interface for running queue thread
* stop() operation in called just before queue gets destroyed */
const struct ocf_queue_ops queue_ops = {
.kick_sync = vbdev_ocf_ctx_queue_kick,
.kick = vbdev_ocf_ctx_queue_kick,
.stop = vbdev_ocf_ctx_queue_stop,
};
/* Called on cache vbdev creation at every thread /* Called on cache vbdev creation at every thread
* We determine on which OCF queue IOs from this thread will be running * We allocate OCF queues here and SPDK poller for it */
* and allocate resources for that queue
* This is also where queue poller gets registered */
static int static int
io_device_create_cb(void *io_device, void *ctx_buf) io_device_create_cb(void *io_device, void *ctx_buf)
{ {
struct vbdev_ocf *vbdev = io_device; struct vbdev_ocf *vbdev = io_device;
struct vbdev_ocf_qcxt *qctx = ctx_buf; struct vbdev_ocf_qcxt *qctx = ctx_buf;
int queue_id = 0, rc; int rc;
/* Modifying state of vbdev->queues needs to be synchronous rc = ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
* We use vbdev private lock to achive that */
pthread_mutex_lock(&vbdev->_lock);
queue_id = get_free_queue_id(vbdev);
if (queue_id < 0) {
SPDK_ERRLOG("OCF queues count is too small, try to allocate more than %d\n",
vbdev->cfg.cache.io_queues);
rc = -EINVAL;
goto end;
}
rc = ocf_cache_get_queue(vbdev->ocf_cache, queue_id, &qctx->queue);
if (rc) { if (rc) {
SPDK_ERRLOG("Could not get OCF queue #%d\n", queue_id); return rc;
goto end;
} }
ocf_queue_set_priv(qctx->queue, qctx); ocf_queue_set_priv(qctx->queue, qctx);
@ -598,10 +575,6 @@ io_device_create_cb(void *io_device, void *ctx_buf)
qctx->core_ch = spdk_bdev_get_io_channel(vbdev->core.desc); qctx->core_ch = spdk_bdev_get_io_channel(vbdev->core.desc);
qctx->poller = spdk_poller_register(queue_poll, qctx, 0); qctx->poller = spdk_poller_register(queue_poll, qctx, 0);
TAILQ_INSERT_TAIL(&vbdev->queues, qctx, tailq);
end:
pthread_mutex_unlock(&vbdev->_lock);
return rc; return rc;
} }
@ -616,10 +589,7 @@ io_device_destroy_cb(void *io_device, void *ctx_buf)
spdk_put_io_channel(qctx->cache_ch); spdk_put_io_channel(qctx->cache_ch);
spdk_put_io_channel(qctx->core_ch); spdk_put_io_channel(qctx->core_ch);
spdk_poller_unregister(&qctx->poller); spdk_poller_unregister(&qctx->poller);
ocf_queue_put(qctx->queue);
pthread_mutex_lock(&qctx->vbdev->_lock);
TAILQ_REMOVE(&qctx->vbdev->queues, qctx, tailq);
pthread_mutex_unlock(&qctx->vbdev->_lock);
} }
/* Start OCF cache and register vbdev_ocf at bdev layer */ /* Start OCF cache and register vbdev_ocf at bdev layer */
@ -697,11 +667,6 @@ init_vbdev_config(struct vbdev_ocf *vbdev)
cfg->cache.backfill.max_queue_size = 65536; cfg->cache.backfill.max_queue_size = 65536;
cfg->cache.backfill.queue_unblock_size = 60000; cfg->cache.backfill.queue_unblock_size = 60000;
/* At this moment OCF queues count is static
* so we choose some value for it
* It has to be bigger than SPDK thread count */
cfg->cache.io_queues = g_queues_count;
/* TODO [cache line size] */ /* TODO [cache line size] */
cfg->device.cache_line_size = ocf_cache_line_size_4; cfg->device.cache_line_size = ocf_cache_line_size_4;
cfg->device.force = true; cfg->device.force = true;
@ -744,8 +709,6 @@ init_vbdev(const char *vbdev_name,
vbdev->core.parent = vbdev; vbdev->core.parent = vbdev;
vbdev->cache.is_cache = true; vbdev->cache.is_cache = true;
vbdev->core.is_cache = false; vbdev->core.is_cache = false;
pthread_mutex_init(&vbdev->_lock, NULL);
TAILQ_INIT(&vbdev->queues);
if (cache_mode_name) { if (cache_mode_name) {
vbdev->cfg.cache.cache_mode vbdev->cfg.cache.cache_mode

View File

@ -130,13 +130,6 @@ struct vbdev_ocf {
/* Link to global list of this type structures */ /* Link to global list of this type structures */
TAILQ_ENTRY(vbdev_ocf) tailq; TAILQ_ENTRY(vbdev_ocf) tailq;
/* List of queues contexts
* New items are added at io_channel creation */
TAILQ_HEAD(, vbdev_ocf_qcxt) queues;
/* Private per-bdev lock */
pthread_mutex_t _lock;
}; };
int vbdev_ocf_construct( int vbdev_ocf_construct(

View File

@ -191,7 +191,7 @@ vbdev_ocf_volume_submit_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *
"base returned error on io submission: %d\n", io_ctx->error); "base returned error on io submission: %d\n", io_ctx->error);
} }
if (io->io_queue == 0 && io_ctx->ch != NULL) { if (io->io_queue == NULL && io_ctx->ch != NULL) {
spdk_put_io_channel(io_ctx->ch); spdk_put_io_channel(io_ctx->ch);
} }
@ -211,8 +211,8 @@ prepare_submit(struct ocf_io *io)
struct ocf_io_ctx *io_ctx = ocf_get_io_ctx(io); struct ocf_io_ctx *io_ctx = ocf_get_io_ctx(io);
struct vbdev_ocf_qcxt *qctx; struct vbdev_ocf_qcxt *qctx;
struct vbdev_ocf_base *base; struct vbdev_ocf_base *base;
ocf_queue_t q; ocf_queue_t q = io->io_queue;
int rc; int rc = 0;
io_ctx->rq_cnt++; io_ctx->rq_cnt++;
if (io_ctx->rq_cnt != 1) { if (io_ctx->rq_cnt != 1) {
@ -222,12 +222,9 @@ prepare_submit(struct ocf_io *io)
vbdev_ocf_volume_io_get(io); vbdev_ocf_volume_io_get(io);
base = *((struct vbdev_ocf_base **)ocf_volume_get_priv(io->volume)); base = *((struct vbdev_ocf_base **)ocf_volume_get_priv(io->volume));
if (io->io_queue == 0) { if (io->io_queue == NULL) {
/* In SPDK we never set queue id to 0 /* In case IO is initiated by OCF, queue is unknown
* but OCF sometimes gives it to us (not a bug) * so we have to get io channel ourselves */
* In such cases we cannot determine on which queue we are now
* So to get io channel that is usually passed as queue context
* we have to reallocate it using global method */
io_ctx->ch = spdk_bdev_get_io_channel(base->desc); io_ctx->ch = spdk_bdev_get_io_channel(base->desc);
if (io_ctx->ch == NULL) { if (io_ctx->ch == NULL) {
return -EPERM; return -EPERM;
@ -235,13 +232,11 @@ prepare_submit(struct ocf_io *io)
return 0; return 0;
} }
rc = ocf_cache_get_queue(base->parent->ocf_cache, io->io_queue, &q); qctx = ocf_queue_get_priv(q);
if (rc) { if (qctx == NULL) {
SPDK_ERRLOG("Could not get queue #%d\n", io->io_queue); return -EFAULT;
return rc;
} }
qctx = ocf_queue_get_priv(q);
if (base->is_cache) { if (base->is_cache) {
io_ctx->ch = qctx->cache_ch; io_ctx->ch = qctx->cache_ch;
} else { } else {

2
ocf

@ -1 +1 @@
Subproject commit 276d91fcd7ca693fe093eb08d801b3c46df50cbf Subproject commit e235500472c18a9e0687608d94cc542eaeeeb7a2