diff --git a/lib/bdev/virtio/rte_virtio/virtio.c b/lib/bdev/virtio/rte_virtio/virtio.c index 53f26be6e..d9c031dd0 100644 --- a/lib/bdev/virtio/rte_virtio/virtio.c +++ b/lib/bdev/virtio/rte_virtio/virtio.c @@ -140,6 +140,8 @@ virtio_init_vring(struct virtqueue *vq) vq->vq_avail_idx = 0; vq->vq_desc_tail_idx = (uint16_t)(vq->vq_nentries - 1); vq->vq_free_cnt = vq->vq_nentries; + vq->req_start = VQ_RING_DESC_CHAIN_END; + vq->req_end = VQ_RING_DESC_CHAIN_END; memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries); vring_desc_init(vr->desc, size); @@ -429,78 +431,143 @@ virtqueue_dequeue_burst_rx(struct virtqueue *vq, struct virtio_req **rx_pkts, return i; } -static inline void -virtqueue_iov_to_desc(struct virtqueue *vq, uint16_t desc_idx, struct iovec *iov) +int +virtqueue_req_start(struct virtqueue *vq, void *cookie, int iovcnt) { - if (!vq->vdev->is_hw) { - vq->vq_ring.desc[desc_idx].addr = (uintptr_t)iov->iov_base; - } else { - vq->vq_ring.desc[desc_idx].addr = spdk_vtophys(iov->iov_base); + struct vring_desc *desc; + struct vq_desc_extra *dxp; + + assert(virtio_dev_get_status(vq->vdev) & VIRTIO_CONFIG_S_DRIVER_OK); + + if (iovcnt > vq->vq_free_cnt) { + return -ENOSPC; } - vq->vq_ring.desc[desc_idx].len = iov->iov_len; + if (vq->req_start != VQ_RING_DESC_CHAIN_END) { + desc = &vq->vq_ring.desc[vq->req_end]; + desc->flags &= ~VRING_DESC_F_NEXT; + } + + vq->req_start = vq->vq_desc_head_idx; + dxp = &vq->vq_descx[vq->req_start]; + dxp->cookie = cookie; + dxp->ndescs = 0; + + return 0; +} + +void +virtqueue_req_flush(struct virtqueue *vq) +{ + struct vring_desc *desc; + + if (vq->req_start == VQ_RING_DESC_CHAIN_END) { + /* no requests have been started */ + return; + } + + desc = &vq->vq_ring.desc[vq->req_end]; + desc->flags &= ~VRING_DESC_F_NEXT; + + vq_update_avail_ring(vq, vq->req_start); + vq->req_start = VQ_RING_DESC_CHAIN_END; + vq_update_avail_idx(vq); + if (spdk_unlikely(virtqueue_kick_prepare(vq))) { + virtio_dev_backend_ops(vq->vdev)->notify_queue(vq->vdev, vq); + SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "Notified backend after xmit\n"); + } +} + +void +virtqueue_req_abort(struct virtqueue *vq) +{ + struct vring_desc *desc; + + if (vq->req_start == VQ_RING_DESC_CHAIN_END) { + /* no requests have been started */ + return; + } + + desc = &vq->vq_ring.desc[vq->req_end]; + desc->flags &= ~VRING_DESC_F_NEXT; + + vq_ring_free_chain(vq, vq->req_start); + vq->req_start = VQ_RING_DESC_CHAIN_END; +} + +void +virtqueue_req_add_iovs(struct virtqueue *vq, struct iovec *iovs, uint16_t iovcnt, + enum spdk_virtio_desc_type desc_type) +{ + struct vring_desc *desc; + struct vq_desc_extra *dxp; + uint16_t i, prev_head, new_head; + + assert(vq->req_start != VQ_RING_DESC_CHAIN_END); + assert(iovcnt <= vq->vq_free_cnt); + + /* TODO use indirect descriptors if iovcnt is high enough + * or the caller specifies SPDK_VIRTIO_DESC_F_INDIRECT + */ + + prev_head = new_head = vq->vq_desc_head_idx; + for (i = 0; i < iovcnt; ++i) { + desc = &vq->vq_ring.desc[new_head]; + + if (!vq->vdev->is_hw) { + desc->addr = (uintptr_t)iovs[i].iov_base; + } else { + desc->addr = spdk_vtophys(iovs[i].iov_base); + } + + desc->len = iovs[i].iov_len; + /* always set NEXT flag. unset it on the last descriptor + * in the request-ending function. + */ + desc->flags = desc_type | VRING_DESC_F_NEXT; + + prev_head = new_head; + new_head = desc->next; + } + + dxp = &vq->vq_descx[vq->req_start]; + dxp->ndescs += iovcnt; + + vq->req_end = prev_head; + vq->vq_desc_head_idx = new_head; + if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) { + assert(vq->vq_free_cnt == 0); + vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END; + } + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - iovcnt); } static int virtqueue_enqueue_xmit(struct virtqueue *vq, struct virtio_req *req) { - struct vq_desc_extra *dxp; - struct vring_desc *descs; - uint32_t i; - uint16_t head_idx, idx; - uint32_t total_iovs = req->iovcnt + 2; - struct iovec *iov = req->iov; + int rc; - if (total_iovs > vq->vq_free_cnt) { - SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, - "not enough free descriptors. requested %"PRIu32", got %"PRIu16"\n", - total_iovs, vq->vq_free_cnt); - return -ENOMEM; + rc = virtqueue_req_start(vq, req, req->iovcnt + 2); + if (rc != 0) { + goto err_abort_nospc; } - head_idx = vq->vq_desc_head_idx; - idx = head_idx; - dxp = &vq->vq_descx[idx]; - dxp->cookie = (void *)req; - dxp->ndescs = total_iovs; - - descs = vq->vq_ring.desc; - - virtqueue_iov_to_desc(vq, idx, &req->iov_req); - descs[idx].flags = VRING_DESC_F_NEXT; - idx = descs[idx].next; - - if (req->is_write || req->iovcnt == 0) { - for (i = 0; i < req->iovcnt; i++) { - virtqueue_iov_to_desc(vq, idx, &iov[i]); - descs[idx].flags = VRING_DESC_F_NEXT; - idx = descs[idx].next; - } - - virtqueue_iov_to_desc(vq, idx, &req->iov_resp); - descs[idx].flags = VRING_DESC_F_WRITE; - idx = descs[idx].next; + virtqueue_req_add_iovs(vq, &req->iov_req, 1, SPDK_VIRTIO_DESC_RO); + if (req->is_write) { + virtqueue_req_add_iovs(vq, req->iov, req->iovcnt, SPDK_VIRTIO_DESC_RO); + virtqueue_req_add_iovs(vq, &req->iov_resp, 1, SPDK_VIRTIO_DESC_WR); } else { - virtqueue_iov_to_desc(vq, idx, &req->iov_resp); - descs[idx].flags = VRING_DESC_F_WRITE | VRING_DESC_F_NEXT; - idx = descs[idx].next; - - for (i = 0; i < req->iovcnt; i++) { - virtqueue_iov_to_desc(vq, idx, &iov[i]); - descs[idx].flags = VRING_DESC_F_WRITE; - descs[idx].flags |= (i + 1) != req->iovcnt ? VRING_DESC_F_NEXT : 0; - idx = descs[idx].next; - } + virtqueue_req_add_iovs(vq, &req->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + virtqueue_req_add_iovs(vq, req->iov, req->iovcnt, SPDK_VIRTIO_DESC_WR); } - vq->vq_desc_head_idx = idx; - if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) { - assert(vq->vq_free_cnt == 0); - vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END; - } - vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - total_iovs); - vq_update_avail_ring(vq, head_idx); return 0; +err_abort_nospc: + virtqueue_req_abort(vq); + SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, + "not enough free descriptors. requested %"PRIu32", got %"PRIu16"\n", + req->iovcnt + 2, vq->vq_free_cnt); + return -ENOMEM; } #define VIRTIO_MBUF_BURST_SZ 64 @@ -546,10 +613,9 @@ virtio_recv_pkts(struct virtqueue *vq, struct virtio_req **reqs, uint16_t nb_pkt int virtio_xmit_pkt(struct virtqueue *vq, struct virtio_req *req) { - struct virtio_dev *vdev = vq->vdev; int rc; - assert(virtio_dev_get_status(vdev) & VIRTIO_CONFIG_S_DRIVER_OK); + assert(virtio_dev_get_status(vq->vdev) & VIRTIO_CONFIG_S_DRIVER_OK); virtio_rmb(); rc = virtqueue_enqueue_xmit(vq, req); @@ -557,13 +623,7 @@ virtio_xmit_pkt(struct virtqueue *vq, struct virtio_req *req) return rc; } - vq_update_avail_idx(vq); - - if (spdk_unlikely(virtqueue_kick_prepare(vq))) { - virtio_dev_backend_ops(vdev)->notify_queue(vdev, vq); - SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "Notified backend after xmit\n"); - } - + virtqueue_req_flush(vq); return 0; } diff --git a/lib/bdev/virtio/rte_virtio/virtio.h b/lib/bdev/virtio/rte_virtio/virtio.h index f8851513e..b9b45b3ca 100644 --- a/lib/bdev/virtio/rte_virtio/virtio.h +++ b/lib/bdev/virtio/rte_virtio/virtio.h @@ -169,9 +169,18 @@ struct virtqueue { /** Context for response poller. */ void *poller_ctx; + uint16_t req_start; + uint16_t req_end; + struct vq_desc_extra vq_descx[0]; }; +enum spdk_virtio_desc_type { + SPDK_VIRTIO_DESC_RO = 0, /**< Read only */ + SPDK_VIRTIO_DESC_WR = VRING_DESC_F_WRITE, /**< Write only */ + /* TODO VIRTIO_DESC_INDIRECT */ +}; + struct virtio_req { struct iovec *iov; struct iovec iov_req; @@ -205,6 +214,57 @@ typedef int (*virtio_pci_create_cb)(struct virtio_pci_ctx *pci_ctx); uint16_t virtio_recv_pkts(struct virtqueue *vq, struct virtio_req **reqs, uint16_t nb_pkts); +/** + * Start a new request on the current vring head position. The request will + * be bound to given opaque cookie object. All previous requests will be + * still kept in a ring until they are flushed or the request is aborted. + * If a previous request is empty (no descriptors have been added) this call + * will overwrite it. The device owning given virtqueue must be started. + * + * \param vq virtio queue + * \param cookie opaque object to bind with this request. Once the request + * is sent, processed and a response is received, the same object will be + * returned to the user calling the virtio poll API. + * \param iovcnt number of required iovectors for the request. This can be + * higher than than the actual number of descriptors to be added. + * \return 0 on success or negative errno otherwise. If not enough iovectors + * are available, -ENOSPC is returned. + */ +int virtqueue_req_start(struct virtqueue *vq, void *cookie, int iovcnt); + +/** + * Flush a virtqueue. This will make the host device see and process all + * previously queued requests. An interrupt might be automatically sent if + * the host device expects it. The device owning given virtqueue must be started. + * + * \param vq virtio queue + */ +void virtqueue_req_flush(struct virtqueue *vq); + +/** + * Abort the very last request in a virtqueue. This will restore virtqueue + * state to the point before the last request was created. Note that this + * is only effective if a queue hasn't been flushed yet. The device owning + * given virtqueue must be started. + * + * \param vq virtio queue + */ +void virtqueue_req_abort(struct virtqueue *vq); + +/** + * Add iovec chain to the last created request. This call does not provide any + * error-checking. The caller has to ensure that he doesn't add more iovs than + * what was specified during request creation. The device owning given virtqueue + * must be started. + * + * \param vq virtio queue + * \param iovs iovec array + * \param iovcnt number of iovs in iovec array + * \param desc_type type of all given iovectors + */ +void virtqueue_req_add_iovs(struct virtqueue *vq, struct iovec *iovs, uint16_t iovcnt, + enum spdk_virtio_desc_type desc_type); + /** * Put given request into the virtqueue. The virtio device owning * the virtqueue must be started. This will also send an interrupt unless