diff --git a/lib/bdev/virtio/bdev_virtio.c b/lib/bdev/virtio/bdev_virtio.c index b276ab6fb..4c479d2d9 100644 --- a/lib/bdev/virtio/bdev_virtio.c +++ b/lib/bdev/virtio/bdev_virtio.c @@ -70,7 +70,9 @@ struct virtio_scsi_io_ctx { struct virtio_scsi_scan_base { struct virtio_dev *vdev; - struct spdk_bdev_poller *scan_poller; + + /** Virtqueue used for the scan I/O. */ + struct virtqueue *vq; /* Currently queried target */ unsigned target; @@ -94,7 +96,9 @@ struct virtio_scsi_disk { struct bdev_virtio_io_channel { struct virtio_dev *vdev; - struct spdk_bdev_poller *poller; + + /** Virtqueue exclusively assigned to this channel. */ + struct virtqueue *vq; }; static void scan_target(struct virtio_scsi_scan_base *base); @@ -144,6 +148,7 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev); struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io); struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base; + struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch); vreq->iov = bdev_io->u.bdev.iovs; vreq->iovcnt = bdev_io->u.bdev.iovcnt; @@ -158,15 +163,15 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) to_be16(&req->cdb[7], bdev_io->u.bdev.num_blocks); } - virtio_xmit_pkts(disk->vdev->vqs[2], vreq); + virtio_xmit_pkts(virtio_channel->vq, vreq); } static void bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) { - struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev); struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io); struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base; + struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch); struct spdk_scsi_unmap_bdesc *desc, *first_desc; uint8_t *buf; uint64_t offset_blocks, num_blocks; @@ -206,7 +211,7 @@ bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) to_be16(&buf[2], cmd_len - 8); /* length of block descriptors */ memset(&buf[4], 0, 4); /* reserved */ - virtio_xmit_pkts(disk->vdev->vqs[2], vreq); + virtio_xmit_pkts(virtio_channel->vq, vreq); } static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) @@ -331,7 +336,7 @@ bdev_virtio_poll(void *arg) struct virtio_req *req[32]; uint16_t i, cnt; - cnt = virtio_recv_pkts(ch->vdev->vqs[2], req, SPDK_COUNTOF(req)); + cnt = virtio_recv_pkts(ch->vq, req, SPDK_COUNTOF(req)); for (i = 0; i < cnt; ++i) { bdev_virtio_io_cpl(req[i]); } @@ -340,12 +345,26 @@ bdev_virtio_poll(void *arg) static int bdev_virtio_create_cb(void *io_device, void *ctx_buf) { - struct virtio_dev **vdev = io_device; + struct virtio_dev **vdev_ptr = io_device; + struct virtio_dev *vdev = *vdev_ptr; struct bdev_virtio_io_channel *ch = ctx_buf; + struct virtqueue *vq; + int32_t queue_idx; + + queue_idx = virtio_dev_find_and_acquire_queue(vdev, 2); + if (queue_idx < 0) { + SPDK_ERRLOG("Couldn't get an unused queue for the io_channel.\n"); + return queue_idx; + } + + vq = vdev->vqs[queue_idx]; + + ch->vdev = vdev; + ch->vq = vq; + + spdk_bdev_poller_start(&vq->poller, bdev_virtio_poll, ch, + vq->owner_lcore, 0); - ch->vdev = *vdev; - spdk_bdev_poller_start(&ch->poller, bdev_virtio_poll, ch, - spdk_env_get_current_core(), 0); return 0; } @@ -353,8 +372,11 @@ static void bdev_virtio_destroy_cb(void *io_device, void *ctx_buf) { struct bdev_virtio_io_channel *io_channel = ctx_buf; + struct virtio_dev *vdev = io_channel->vdev; + struct virtqueue *vq = io_channel->vq; - spdk_bdev_poller_stop(&io_channel->poller); + spdk_bdev_poller_stop(&vq->poller); + virtio_dev_release_queue(vdev, vq->vq_queue_index); } static void @@ -368,7 +390,8 @@ scan_target_finish(struct virtio_scsi_scan_base *base) return; } - spdk_bdev_poller_stop(&base->scan_poller); + spdk_bdev_poller_stop(&base->vq->poller); + virtio_dev_release_queue(base->vdev, base->vq->vq_queue_index); while ((disk = TAILQ_FIRST(&base->found_disks))) { TAILQ_REMOVE(&base->found_disks, disk, link); @@ -400,7 +423,7 @@ send_read_cap_10(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v iov[0].iov_len = 8; req->cdb[0] = SPDK_SBC_READ_CAPACITY_10; - virtio_xmit_pkts(base->vdev->vqs[2], vreq); + virtio_xmit_pkts(base->vq, vreq); } static void @@ -418,7 +441,7 @@ send_read_cap_16(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v req->cdb[1] = SPDK_SBC_SAI_READ_CAPACITY_16; to_be32(&req->cdb[10], iov[0].iov_len); - virtio_xmit_pkts(base->vdev->vqs[2], vreq); + virtio_xmit_pkts(base->vq, vreq); } static int @@ -556,7 +579,7 @@ bdev_scan_poll(void *arg) struct virtio_req *req; uint16_t cnt; - cnt = virtio_recv_pkts(base->vdev->vqs[2], &req, 1); + cnt = virtio_recv_pkts(base->vq, &req, 1); if (cnt > 0) { process_scan_resp(base, req); } @@ -596,7 +619,7 @@ scan_target(struct virtio_scsi_scan_base *base) cdb->opcode = SPDK_SPC_INQUIRY; cdb->alloc_len[1] = 255; - virtio_xmit_pkts(base->vdev->vqs[2], vreq); + virtio_xmit_pkts(base->vq, vreq); } static int @@ -606,6 +629,7 @@ bdev_virtio_process_config(void) struct virtio_dev *vdev = NULL; char *path; unsigned vdev_num; + int num_queues; bool enable_pci; int rc = 0; @@ -628,7 +652,12 @@ bdev_virtio_process_config(void) goto out; } - vdev = virtio_user_dev_init(path, 512); + num_queues = spdk_conf_section_get_intval(sp, "Queues"); + if (num_queues < 1) { + num_queues = 1; + } + + vdev = virtio_user_dev_init(path, num_queues + 2, 512); if (vdev == NULL) { rc = -1; goto out; @@ -659,6 +688,7 @@ bdev_virtio_initialize(void) { struct virtio_scsi_scan_base *base; struct virtio_dev *vdev = NULL; + struct virtqueue *vq; int rc = 0; rc = bdev_virtio_process_config(); @@ -692,9 +722,16 @@ bdev_virtio_initialize(void) base->vdev = vdev; TAILQ_INIT(&base->found_disks); - spdk_bdev_poller_start(&base->scan_poller, bdev_scan_poll, base, - spdk_env_get_current_core(), 0); + rc = virtio_dev_acquire_queue(vdev, 2); + if (rc != 0) { + SPDK_ERRLOG("Couldn't acquire requestq for the target scan.\n"); + goto out; + } + vq = vdev->vqs[2]; + base->vq = vq; + spdk_bdev_poller_start(&vq->poller, bdev_scan_poll, base, + vq->owner_lcore, 0); scan_target(base); } diff --git a/lib/bdev/virtio/rte_virtio/virtio_dev.c b/lib/bdev/virtio/rte_virtio/virtio_dev.c index 1da8350da..e7873ef55 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_dev.c +++ b/lib/bdev/virtio/rte_virtio/virtio_dev.c @@ -165,6 +165,9 @@ virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx) vq->mz = mz; + vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED; + vq->poller = NULL; + if (vtpci_ops(dev)->setup_queue(dev, vq) < 0) { SPDK_ERRLOG("setup_queue failed\n"); return -EINVAL; @@ -276,13 +279,6 @@ virtio_dev_init(struct virtio_dev *dev, uint64_t req_features) if (virtio_negotiate_features(dev, req_features) < 0) return -1; - /* FIXME - * Hardcode num_queues to 3 until we add proper - * mutli-queue support. This value should be limited - * by number of cores assigned to SPDK - */ - dev->max_queues = 3; - ret = virtio_alloc_queues(dev); if (ret < 0) return ret; @@ -532,4 +528,78 @@ virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req) return 1; } +int +virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index) +{ + struct virtqueue *vq = NULL; + + if (index >= vdev->max_queues) { + SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n", + index, vdev->max_queues); + return -1; + } + + pthread_mutex_lock(&vdev->mutex); + vq = vdev->vqs[index]; + if (vq == NULL || vq->owner_lcore != SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) { + pthread_mutex_unlock(&vdev->mutex); + return -1; + } + + assert(vq->poller == NULL); + vq->owner_lcore = spdk_env_get_current_core(); + pthread_mutex_unlock(&vdev->mutex); + return 0; +} + +int32_t +virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index) +{ + struct virtqueue *vq = NULL; + uint16_t i; + + pthread_mutex_lock(&vdev->mutex); + for (i = start_index; i < vdev->max_queues; ++i) { + vq = vdev->vqs[i]; + if (vq != NULL && vq->owner_lcore == SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) { + break; + } + } + + if (vq == NULL || i == vdev->max_queues) { + SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index); + pthread_mutex_unlock(&vdev->mutex); + return -1; + } + + assert(vq->poller == NULL); + vq->owner_lcore = spdk_env_get_current_core(); + pthread_mutex_unlock(&vdev->mutex); + return i; +} + +void +virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index) +{ + struct virtqueue *vq = NULL; + + if (index >= vdev->max_queues) { + SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n", + index, vdev->max_queues); + return; + } + + pthread_mutex_lock(&vdev->mutex); + vq = vdev->vqs[index]; + if (vq == NULL) { + SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index); + return; + } + + assert(vq->poller == NULL); + assert(vq->owner_lcore == spdk_env_get_current_core()); + vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED; + pthread_mutex_unlock(&vdev->mutex); +} + SPDK_LOG_REGISTER_TRACE_FLAG("virtio_dev", SPDK_TRACE_VIRTIO_DEV) diff --git a/lib/bdev/virtio/rte_virtio/virtio_dev.h b/lib/bdev/virtio/rte_virtio/virtio_dev.h index 61f56a091..7540bd4c3 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_dev.h +++ b/lib/bdev/virtio/rte_virtio/virtio_dev.h @@ -67,6 +67,11 @@ */ #define VQ_RING_DESC_CHAIN_END 32768 +/* This is a work-around for fio-plugin bug, where each + * fio job thread returns local lcore id = -1 + */ +#define SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED (UINT32_MAX - 1) + struct virtio_dev { struct virtqueue **vqs; uint16_t started; @@ -85,6 +90,9 @@ struct virtio_dev { /** Modern/legacy virtio device flag. */ uint8_t modern; + /** Mutex for asynchronous virtqueue-changing operations. */ + pthread_mutex_t mutex; + TAILQ_ENTRY(virtio_dev) tailq; }; @@ -128,6 +136,12 @@ struct virtqueue { uint16_t vq_queue_index; /**< PCI queue index */ uint16_t *notify_addr; + /** Logical CPU ID that's polling this queue. */ + uint32_t owner_lcore; + + /** Response poller. */ + struct spdk_bdev_poller *poller; + struct vq_desc_extra vq_descx[0]; }; @@ -212,4 +226,40 @@ virtqueue_kick_prepare(struct virtqueue *vq) return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY); } +/** + * Bind a virtqueue with given index to the current CPU core. + * + * This function is thread-safe. + * + * \param vdev vhost device + * \param index virtqueue index + * \return 0 on success, -1 in case a virtqueue with given index either + * does not exists or is already acquired. + */ +int virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index); + +/** + * Look for unused queue and bind it to the current CPU core. This will + * scan the queues in range from *start_index* (inclusive) up to + * vdev->max_queues (exclusive). + * + * This function is thread-safe. + * + * \param vdev vhost device + * \param start_index virtqueue index to start looking from + * \return index of acquired queue or -1 in case no unused queue in given range + * has been found + */ +int32_t virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index); + +/** + * Release previously acquired queue. + * + * This function must be called from the thread that acquired the queue. + * + * \param vdev vhost device + * \param index index of virtqueue to release + */ +void virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index); + #endif /* _VIRTIO_DEV_H_ */ diff --git a/lib/bdev/virtio/rte_virtio/virtio_pci.c b/lib/bdev/virtio/rte_virtio/virtio_pci.c index ba5ef5a58..5c2475e2a 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_pci.c +++ b/lib/bdev/virtio/rte_virtio/virtio_pci.c @@ -714,6 +714,7 @@ vtpci_init(struct virtio_dev *vdev, const struct virtio_pci_ops *ops) } vdev->id = vdev_num; + pthread_mutex_init(&vdev->mutex, NULL); g_virtio_driver.internal[vdev_num].vtpci_ops = ops; return 0; diff --git a/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.c b/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.c index 11b20538b..cd6d4eadb 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.c +++ b/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.c @@ -174,7 +174,7 @@ virtio_user_dev_setup(struct virtio_user_dev *dev) } struct virtio_dev * -virtio_user_dev_init(char *path, int queue_size) +virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size) { struct virtio_dev *vdev; struct virtio_user_dev *dev; @@ -206,12 +206,13 @@ virtio_user_dev_init(char *path, int queue_size) goto err; } - if (max_queues >= VIRTIO_MAX_VIRTQUEUES) { - SPDK_ERRLOG("invalid get_queue_num value: %"PRIu64"\n", max_queues); + if (requested_queues > max_queues) { + SPDK_ERRLOG("requested %"PRIu16" queues but only %"PRIu64" available\n", + requested_queues, max_queues); goto err; } - vdev->max_queues = max_queues; + vdev->max_queues = requested_queues; if (dev->ops->send_request(dev, VHOST_USER_SET_OWNER, NULL) < 0) { SPDK_ERRLOG("set_owner fails: %s\n", strerror(errno)); diff --git a/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.h b/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.h index af218302e..cec72756d 100644 --- a/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.h +++ b/lib/bdev/virtio/rte_virtio/virtio_user/virtio_user_dev.h @@ -62,7 +62,7 @@ struct virtio_user_dev { int virtio_user_start_device(struct virtio_user_dev *dev); int virtio_user_stop_device(struct virtio_user_dev *dev); -struct virtio_dev *virtio_user_dev_init(char *path, int queue_size); +struct virtio_dev *virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size); void virtio_user_dev_uninit(struct virtio_user_dev *dev); #endif diff --git a/test/vhost/initiator/bdev.conf.in b/test/vhost/initiator/bdev.conf.in index 971cfc9ff..fb159980d 100644 --- a/test/vhost/initiator/bdev.conf.in +++ b/test/vhost/initiator/bdev.conf.in @@ -1,5 +1,6 @@ [VirtioUser0] Path /tmp/vhost.0 + Queues 16 [Rpc] Enable Yes