From f35ca8f7dfe4acd0857dcc8f3629b7acb4c06e6b Mon Sep 17 00:00:00 2001 From: Dariusz Stojaczyk Date: Tue, 17 Oct 2017 14:25:17 +0200 Subject: [PATCH] bdev_virtio: properly handle full-queue scenario In case of not enough free descriptors the bdev_virtio will fail the I/O with NOMEM status. Change-Id: I1cb0cd5453ff70468898bc8e414b53b9c64dbe50 Signed-off-by: Pawel Wodkowski Signed-off-by: Dariusz Stojaczyk Reviewed-on: https://review.gerrithub.io/382783 Reviewed-by: Jim Harris Reviewed-by: Daniel Verkamp Tested-by: SPDK Automated Test System --- lib/bdev/virtio/bdev_virtio.c | 76 ++++++++++++++++++++++--- lib/bdev/virtio/rte_virtio/virtio_dev.c | 19 ++++--- lib/bdev/virtio/rte_virtio/virtio_dev.h | 13 ++++- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/lib/bdev/virtio/bdev_virtio.c b/lib/bdev/virtio/bdev_virtio.c index 641545b9c..c0b9c2598 100644 --- a/lib/bdev/virtio/bdev_virtio.c +++ b/lib/bdev/virtio/bdev_virtio.c @@ -176,13 +176,30 @@ bdev_virtio_init_tmf_vreq(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_ return vreq; } +static void +bdev_virtio_send_io(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) +{ + struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch); + struct virtio_scsi_io_ctx *io_ctx = (struct virtio_scsi_io_ctx *)bdev_io->driver_ctx; + struct virtio_req *vreq = &io_ctx->vreq; + int rc; + + rc = virtio_xmit_pkt(virtio_channel->vq, vreq); + if (spdk_likely(rc == 0)) { + return; + } else if (rc == -ENOMEM) { + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); + } else { + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); + } +} + static void bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) { struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev); struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io); struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base; - struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch); vreq->iov = bdev_io->u.bdev.iovs; vreq->iovcnt = bdev_io->u.bdev.iovcnt; @@ -197,7 +214,7 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) to_be16(&req->cdb[7], bdev_io->u.bdev.num_blocks); } - virtio_xmit_pkts(virtio_channel->vq, vreq); + bdev_virtio_send_io(ch, bdev_io); } static void @@ -207,11 +224,17 @@ bdev_virtio_reset(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) struct virtio_req *vreq = bdev_virtio_init_tmf_vreq(ch, bdev_io); struct virtio_scsi_ctrl_tmf_req *tmf_req = vreq->iov_req.iov_base; struct spdk_ring *ctrlq_send_ring = virtio_ch->vdev->vqs[VIRTIO_SCSI_CONTROLQ]->poller_ctx; + size_t enqueued_count; tmf_req->type = VIRTIO_SCSI_T_TMF; tmf_req->subtype = VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET; - spdk_ring_enqueue(ctrlq_send_ring, (void **)&vreq, 1); + enqueued_count = spdk_ring_enqueue(ctrlq_send_ring, (void **)&vreq, 1); + if (spdk_likely(enqueued_count == 1)) { + return; + } else { + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); + } } static void @@ -219,7 +242,6 @@ bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) { struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io); struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base; - struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch); struct spdk_scsi_unmap_bdesc *desc, *first_desc; uint8_t *buf; uint64_t offset_blocks, num_blocks; @@ -259,7 +281,7 @@ bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) to_be16(&buf[2], cmd_len - 8); /* length of block descriptors */ memset(&buf[4], 0, 4); /* reserved */ - virtio_xmit_pkts(virtio_channel->vq, vreq); + bdev_virtio_send_io(ch, bdev_io); } static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) @@ -413,6 +435,38 @@ bdev_virtio_tmf_cpl(struct virtio_req *req) spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), bdev_virtio_tmf_cpl_cb, req); } +static void +bdev_virtio_tmf_abort_nomem_cb(void *ctx) +{ + struct spdk_bdev_io *bdev_io = ctx; + + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); +} + +static void +bdev_virtio_tmf_abort_ioerr_cb(void *ctx) +{ + struct spdk_bdev_io *bdev_io = ctx; + + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); +} + +static void +bdev_virtio_tmf_abort(struct virtio_req *req, int status) +{ + struct virtio_scsi_io_ctx *io_ctx = SPDK_CONTAINEROF(req, struct virtio_scsi_io_ctx, vreq); + struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(io_ctx); + spdk_thread_fn fn; + + if (status == -ENOMEM) { + fn = bdev_virtio_tmf_abort_nomem_cb; + } else { + fn = bdev_virtio_tmf_abort_ioerr_cb; + } + + spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), fn, bdev_io); +} + static void bdev_virtio_ctrlq_poll(void *arg) { @@ -421,10 +475,14 @@ bdev_virtio_ctrlq_poll(void *arg) struct spdk_ring *send_ring = ctrlq->poller_ctx; struct virtio_req *req[16]; uint16_t i, cnt; + int rc; cnt = spdk_ring_dequeue(send_ring, (void **)req, SPDK_COUNTOF(req)); for (i = 0; i < cnt; ++i) { - virtio_xmit_pkts(ctrlq, req[i]); + rc = virtio_xmit_pkt(ctrlq, req[i]); + if (rc != 0) { + bdev_virtio_tmf_abort(req[i], rc); + } } cnt = virtio_recv_pkts(ctrlq, req, SPDK_COUNTOF(req)); @@ -557,7 +615,7 @@ send_read_cap_10(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v iov[0].iov_len = 8; req->cdb[0] = SPDK_SBC_READ_CAPACITY_10; - virtio_xmit_pkts(base->vq, vreq); + virtio_xmit_pkt(base->vq, vreq); } static void @@ -575,7 +633,7 @@ send_read_cap_16(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v req->cdb[1] = SPDK_SBC_SAI_READ_CAPACITY_16; to_be32(&req->cdb[10], iov[0].iov_len); - virtio_xmit_pkts(base->vq, vreq); + virtio_xmit_pkt(base->vq, vreq); } static int @@ -768,7 +826,7 @@ scan_target(struct virtio_scsi_scan_base *base) cdb->opcode = SPDK_SPC_INQUIRY; cdb->alloc_len[1] = 255; - virtio_xmit_pkts(base->vq, vreq); + virtio_xmit_pkt(base->vq, vreq); } static int diff --git a/lib/bdev/virtio/rte_virtio/virtio_dev.c b/lib/bdev/virtio/rte_virtio/virtio_dev.c index 6b3b59aef..1c3594925 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_dev.c +++ b/lib/bdev/virtio/rte_virtio/virtio_dev.c @@ -404,7 +404,7 @@ virtqueue_iov_to_desc(struct virtqueue *vq, uint16_t desc_idx, struct iovec *iov vq->vq_ring.desc[desc_idx].len = iov->iov_len; } -static inline void +static int virtqueue_enqueue_xmit(struct virtqueue *vq, struct virtio_req *req) { struct vq_desc_extra *dxp; @@ -418,7 +418,7 @@ virtqueue_enqueue_xmit(struct virtqueue *vq, struct virtio_req *req) SPDK_DEBUGLOG(SPDK_TRACE_VIRTIO_DEV, "not enough free descriptors. requested %"PRIu32", got %"PRIu16"\n", total_iovs, vq->vq_free_cnt); - return; + return -ENOMEM; } head_idx = vq->vq_desc_head_idx; @@ -463,6 +463,7 @@ virtqueue_enqueue_xmit(struct virtqueue *vq, struct virtio_req *req) } vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - total_iovs); vq_update_avail_ring(vq, head_idx); + return 0; } #define VIRTIO_MBUF_BURST_SZ 64 @@ -506,17 +507,21 @@ virtio_recv_pkts(struct virtqueue *vq, struct virtio_req **reqs, uint16_t nb_pkt return nb_rx; } -uint16_t -virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req) +int +virtio_xmit_pkt(struct virtqueue *vq, struct virtio_req *req) { struct virtio_dev *vdev = vq->vdev; + int rc; if (spdk_unlikely(vdev->started == 0)) - return 0; + return -EIO; virtio_rmb(); - virtqueue_enqueue_xmit(vq, req); + rc = virtqueue_enqueue_xmit(vq, req); + if (spdk_unlikely(rc != 0)) { + return rc; + } vq_update_avail_idx(vq); @@ -525,7 +530,7 @@ virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req) SPDK_DEBUGLOG(SPDK_TRACE_VIRTIO_DEV, "Notified backend after xmit\n"); } - return 1; + return 0; } int diff --git a/lib/bdev/virtio/rte_virtio/virtio_dev.h b/lib/bdev/virtio/rte_virtio/virtio_dev.h index 935c10485..accdc4853 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_dev.h +++ b/lib/bdev/virtio/rte_virtio/virtio_dev.h @@ -165,7 +165,18 @@ struct virtio_req { uint16_t virtio_recv_pkts(struct virtqueue *vq, struct virtio_req **reqs, uint16_t nb_pkts); -uint16_t virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req); +/** + * Put given request into the virtqueue. The virtio device owning + * the virtqueue must be started. This will also send an interrupt unless + * the host explicitly set VRING_USED_F_NO_NOTIFY in virtqueue flags. + * + * \param vq virtio queue + * \param req virtio request + * \return 0 on success, negative errno on error. In case the ring is full + * or no free descriptors are available -ENOMEM is returned. If virtio + * device owning the virtqueue is not started -EIO is returned. + */ +int virtio_xmit_pkt(struct virtqueue *vq, struct virtio_req *req); int virtio_dev_init(struct virtio_dev *hw, uint64_t req_features); void virtio_dev_free(struct virtio_dev *dev);