diff --git a/lib/blob/blobstore.c b/lib/blob/blobstore.c index 72907f73b..d5f6f56a4 100644 --- a/lib/blob/blobstore.c +++ b/lib/blob/blobstore.c @@ -1549,59 +1549,106 @@ _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t page, ui } } +struct op_split_ctx { + struct spdk_blob *blob; + struct spdk_io_channel *channel; + uint64_t page_offset; + uint64_t pages_remaining; + void *curr_payload; + enum spdk_blob_op_type op_type; + spdk_bs_sequence_t *seq; +}; + +static void +_spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) +{ + struct op_split_ctx *ctx = cb_arg; + struct spdk_blob *blob = ctx->blob; + struct spdk_io_channel *ch = ctx->channel; + enum spdk_blob_op_type op_type = ctx->op_type; + uint8_t *buf = ctx->curr_payload; + uint64_t offset = ctx->page_offset; + uint64_t length = ctx->pages_remaining; + uint64_t op_length; + + if (bserrno != 0 || ctx->pages_remaining == 0) { + spdk_bs_sequence_finish(ctx->seq, bserrno); + free(ctx); + return; + } + + op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset)); + + /* Update length and payload for next operation */ + ctx->pages_remaining -= op_length; + ctx->page_offset += op_length; + if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { + ctx->curr_payload += op_length; + } + + switch (op_type) { + case SPDK_BLOB_READ: + spdk_blob_io_read(blob, ch, buf, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_WRITE: + spdk_blob_io_write(blob, ch, buf, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_UNMAP: + spdk_blob_io_unmap(blob, ch, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_WRITE_ZEROES: + spdk_blob_io_write_zeroes(blob, ch, offset, op_length, + _spdk_blob_request_submit_op_split_next, ctx); + break; + case SPDK_BLOB_READV: + case SPDK_BLOB_WRITEV: + SPDK_ERRLOG("readv/write not valid for %s\n", __func__); + spdk_bs_sequence_finish(ctx->seq, -EINVAL); + free(ctx); + break; + } +} + static void _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, void *payload, uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) { - spdk_bs_batch_t *batch; - struct spdk_bs_cpl cpl; - uint64_t op_length; - uint8_t *buf; + struct op_split_ctx *ctx; + spdk_bs_sequence_t *seq; + struct spdk_bs_cpl cpl; assert(blob != NULL); + ctx = calloc(1, sizeof(struct op_split_ctx)); + if (ctx == NULL) { + cb_fn(cb_arg, -ENOMEM); + return; + } + cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; cpl.u.blob_basic.cb_fn = cb_fn; cpl.u.blob_basic.cb_arg = cb_arg; - batch = spdk_bs_batch_open(ch, &cpl); - if (!batch) { + seq = spdk_bs_sequence_start(ch, &cpl); + if (!seq) { + free(ctx); cb_fn(cb_arg, -ENOMEM); return; } - buf = payload; - while (length > 0) { - op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset)); + ctx->blob = blob; + ctx->channel = ch; + ctx->curr_payload = payload; + ctx->page_offset = offset; + ctx->pages_remaining = length; + ctx->op_type = op_type; + ctx->seq = seq; - switch (op_type) { - case SPDK_BLOB_READ: - spdk_bs_batch_read_blob(batch, blob, buf, offset, op_length); - break; - case SPDK_BLOB_WRITE: - spdk_bs_batch_write_blob(batch, blob, buf, offset, op_length); - break; - case SPDK_BLOB_UNMAP: - spdk_bs_batch_unmap_blob(batch, blob, offset, op_length); - break; - case SPDK_BLOB_WRITE_ZEROES: - spdk_bs_batch_write_zeroes_blob(batch, blob, offset, op_length); - break; - case SPDK_BLOB_READV: - case SPDK_BLOB_WRITEV: - SPDK_ERRLOG("readv/write not valid for %s\n", __func__); - break; - } - - length -= op_length; - offset += op_length; - if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { - buf += op_length * SPDK_BS_PAGE_SIZE; - } - } - - spdk_bs_batch_close(batch); + _spdk_blob_request_submit_op_split_next(ctx, 0); } static void