From 29e8013d0543b56639d6d5d2c0e751505813b66e Mon Sep 17 00:00:00 2001 From: Tomasz Zawadzki Date: Fri, 16 Mar 2018 04:53:42 -0400 Subject: [PATCH] blobstore: process split operation sequentially Previously when operation for blobstore was spanning multiple clusters, it was split into multiple operation processed in a batch. This made it possible to max out channel ops when using large enough operation. It was encountered when issuing unmap for whole device. Now single large operation is processed sequentially, making it take at most one ops from channel. Fixes #251 Fixes #250 Change-Id: I132309877ba3b2d7217332daf1025fb5f9ee74d0 Signed-off-by: Tomasz Zawadzki Reviewed-on: https://review.gerrithub.io/403306 Reviewed-by: Jim Harris Reviewed-by: Shuhei Matsumoto Tested-by: SPDK Automated Test System Reviewed-by: Daniel Verkamp --- lib/blob/blobstore.c | 119 ++++++++++++++++++++++++++++++------------- 1 file changed, 83 insertions(+), 36 deletions(-) diff --git a/lib/blob/blobstore.c b/lib/blob/blobstore.c index 72907f73b..d5f6f56a4 100644 --- a/lib/blob/blobstore.c +++ b/lib/blob/blobstore.c @@ -1549,59 +1549,106 @@ _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t page, ui } } +struct op_split_ctx { + struct spdk_blob *blob; + struct spdk_io_channel *channel; + uint64_t page_offset; + uint64_t pages_remaining; + void *curr_payload; + enum spdk_blob_op_type op_type; + spdk_bs_sequence_t *seq; +}; + +static void +_spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) +{ + struct op_split_ctx *ctx = cb_arg; + struct spdk_blob *blob = ctx->blob; + struct spdk_io_channel *ch = ctx->channel; + enum spdk_blob_op_type op_type = ctx->op_type; + uint8_t *buf = ctx->curr_payload; + uint64_t offset = ctx->page_offset; + uint64_t length = ctx->pages_remaining; + uint64_t op_length; + + if (bserrno != 0 || ctx->pages_remaining == 0) { + spdk_bs_sequence_finish(ctx->seq, bserrno); + free(ctx); + return; + } + + op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset)); + + /* Update length and payload for next operation */ + ctx->pages_remaining -= op_length; + ctx->page_offset += op_length; + if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { + ctx->curr_payload += op_length; + } + + switch (op_type) { + case SPDK_BLOB_READ: + spdk_blob_io_read(blob, ch, buf, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_WRITE: + spdk_blob_io_write(blob, ch, buf, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_UNMAP: + spdk_blob_io_unmap(blob, ch, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_WRITE_ZEROES: + spdk_blob_io_write_zeroes(blob, ch, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_READV: + case SPDK_BLOB_WRITEV: + SPDK_ERRLOG("readv/write not valid for %s\n", __func__); + spdk_bs_sequence_finish(ctx->seq, -EINVAL); + free(ctx); + break; + } +} + static void _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, void *payload, uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) { - spdk_bs_batch_t *batch; - struct spdk_bs_cpl cpl; - uint64_t op_length; - uint8_t *buf; + struct op_split_ctx *ctx; + spdk_bs_sequence_t *seq; + struct spdk_bs_cpl cpl; assert(blob != NULL); + ctx = calloc(1, sizeof(struct op_split_ctx)); + if (ctx == NULL) { + cb_fn(cb_arg, -ENOMEM); + return; + } + cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; cpl.u.blob_basic.cb_fn = cb_fn; cpl.u.blob_basic.cb_arg = cb_arg; - batch = spdk_bs_batch_open(ch, &cpl); - if (!batch) { + seq = spdk_bs_sequence_start(ch, &cpl); + if (!seq) { + free(ctx); cb_fn(cb_arg, -ENOMEM); return; } - buf = payload; - while (length > 0) { - op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset)); + ctx->blob = blob; + ctx->channel = ch; + ctx->curr_payload = payload; + ctx->page_offset = offset; + ctx->pages_remaining = length; + ctx->op_type = op_type; + ctx->seq = seq; - switch (op_type) { - case SPDK_BLOB_READ: - spdk_bs_batch_read_blob(batch, blob, buf, offset, op_length); - break; - case SPDK_BLOB_WRITE: - spdk_bs_batch_write_blob(batch, blob, buf, offset, op_length); - break; - case SPDK_BLOB_UNMAP: - spdk_bs_batch_unmap_blob(batch, blob, offset, op_length); - break; - case SPDK_BLOB_WRITE_ZEROES: - spdk_bs_batch_write_zeroes_blob(batch, blob, offset, op_length); - break; - case SPDK_BLOB_READV: - case SPDK_BLOB_WRITEV: - SPDK_ERRLOG("readv/write not valid for %s\n", __func__); - break; - } - - length -= op_length; - offset += op_length; - if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { - buf += op_length * SPDK_BS_PAGE_SIZE; - } - } - - spdk_bs_batch_close(batch); + _spdk_blob_request_submit_op_split_next(ctx, 0); } static void