idxd: add batch capability to accel framework and IDXD back-end

This patch only includes the basic framework for batching and the
ability to batch one type of command, copy. Follow-on patches will
add the ability to batch other commands and include an example of
how to do so via the accel perf tool.  SW engine support for batching
will also come in a future patch. Documentation will also be coming.

Batching allows the application to submit a list of independent
descriptors to DSA with one single "batch" descriptor. This is beneficial
when the application is in a position to have several operations ready
at once; batching saves the overhead of submitting each one separately.

The way batching works in SPDK is as follows:

1) The app gets a handle to a new batch with spdk_accel_batch_create()
2) The app uses that handle to prepare a command to be included in the
batch. For copy the command is spdk_accel_batch_prep_copy(). The
app many continue to prep commands for the batch up to the max via
calling spdk_accel_batch_get_max()
3) The app then submits the batch with spdk_accel_batch_submit()
4) The callback provided for each command in the batch will be called as
they complete, the callback provided to the batch submit itself will be
called then the entire batch is done.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: I4102e9291fe59a245cedde6888f42a923b6dbafd
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/2248
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
paul luse 2020-05-07 14:45:15 -04:00 committed by Tomasz Zawadzki
parent cf83cac493
commit fc250841ca
10 changed files with 650 additions and 88 deletions

View File

@ -9,6 +9,10 @@
Function | Description
--------------------------------------- | -----------
spdk_idxd_probe() | @copybrief spdk_idxd_probe()
spdk_idxd_batch_get_max() | @copybrief spdk_idxd_batch_get_max()
spdk_idxd_batch_create() | @copybrief spdk_idxd_batch_create()
spdk_idxd_batch_prep_copy() | @copybrief spdk_idxd_batch_prep_copy()
spdk_idxd_batch_submit() | @copybrief spdk_idxd_batch_submit()
spdk_idxd_submit_copy() | @copybrief spdk_idxd_submit_copy()
spdk_idxd_submit_compare() | @copybrief spdk_idxd_submit_compare()
spdk_idxd_submit_crc32c() | @copybrief spdk_idxd_submit_crc32c()

View File

@ -73,6 +73,8 @@ struct spdk_io_channel;
struct spdk_accel_task;
struct spdk_accel_batch;
/**
* Initialize the acceleration engine.
*
@ -112,8 +114,7 @@ struct spdk_io_channel *spdk_accel_engine_get_io_channel(void);
/**
* Retrieve accel engine capabilities.
*
* \param ch I/O channel to submit request to the accel engine. This channel can
* be obtained by the function spdk_accel_engine_get_io_channel().
* \param ch I/O channel associated with this call.
*
* \return bitmap of capabilities defined by enum accel_capability.
*/
@ -123,8 +124,7 @@ uint64_t spdk_accel_get_capabilities(struct spdk_io_channel *ch);
* Submit a copy request.
*
* \param accel_req Accel request task.
* \param ch I/O channel to submit request to the accel engine. This channel can
* be obtained by the function spdk_accel_engine_get_io_channel().
* \param ch I/O channel to submit request to the accel engine.
* \param dst Destination to copy to.
* \param src Source to copy from.
* \param nbytes Length in bytes to copy.
@ -135,12 +135,64 @@ uint64_t spdk_accel_get_capabilities(struct spdk_io_channel *ch);
int spdk_accel_submit_copy(struct spdk_accel_task *accel_req, struct spdk_io_channel *ch, void *dst,
void *src, uint64_t nbytes, spdk_accel_completion_cb cb);
/**
* Synchronous call to get batch size. This is the maximum number of
* descriptors that a batch can contain. Once this limit is reached the batch
* should be processed with spdk_accel_batch_submit().
*
* \param ch I/O channel associated with this call.
*
* \return max number of descriptors per batch.
*/
uint32_t spdk_accel_batch_get_max(struct spdk_io_channel *ch);
/**
* Synchronous call to create a batch sequence.
*
* \param ch I/O channel associated with this call.
*
* \return handle to use for subsequent batch requests, NULL on failure.
*/
struct spdk_accel_batch *spdk_accel_batch_create(struct spdk_io_channel *ch);
/**
* Asynchronous call to submit a batch sequence.
*
* \param accel_req Accel request task.
* \param ch I/O channel associated with this call.
* \param batch Handle provided when the batch was started with spdk_accel_batch_create().
* \param cb Called when this operation completes.
*
* \return 0 on success, negative errno on failure.
*/
int spdk_accel_batch_submit(struct spdk_accel_task *accel_req, struct spdk_io_channel *ch,
struct spdk_accel_batch *batch, spdk_accel_completion_cb cb);
/**
* Synchronous call to prepare a copy request into a previously initialized batch
* created with spdk_accel_batch_create(). The callback will be called when the copy
* completes after the batch has been submitted by an asynchronous call to
* spdk_accel_batch_submit().
*
* \param accel_req Accel request task.
* \param ch I/O channel associated with this call.
* \param batch Handle provided when the batch was started with spdk_accel_batch_create().
* \param dst Destination to copy to.
* \param src Source to copy from.
* \param nbytes Length in bytes to copy.
* \param cb Called when this operation completes.
*
* \return 0 on success, negative errno on failure.
*/
int spdk_accel_batch_prep_copy(struct spdk_accel_task *accel_req, struct spdk_io_channel *ch,
struct spdk_accel_batch *batch, void *dst, void *src,
uint64_t nbytes, spdk_accel_completion_cb cb);
/**
* Submit a dual cast copy request.
*
* \param accel_req Accel request task.
* \param ch I/O channel to submit request to the accel engine. This channel can
* be obtained by the function spdk_accel_engine_get_io_channel().
* \param ch I/O channel to submit request to the accel engine.
* \param dst1 First destination to copy to (must be 4K aligned).
* \param dst2 Second destination to copy to (must be 4K aligned).
* \param src Source to copy from.
@ -157,8 +209,7 @@ int spdk_accel_submit_dualcast(struct spdk_accel_task *accel_req, struct spdk_io
* Submit a compare request.
*
* \param accel_req Accel request task.
* \param ch I/O channel to submit request to the accel engine. This channel can
* be obtained by the function spdk_accel_engine_get_io_channel().
* \param ch I/O channel to submit request to the accel engine.
* \param src1 First location to perform compare on.
* \param src2 Second location to perform compare on.
* \param nbytes Length in bytes to compare.
@ -176,8 +227,7 @@ int spdk_accel_submit_compare(struct spdk_accel_task *accel_req, struct spdk_io_
* This operation will fill the destination buffer with the specified value.
*
* \param accel_req Accel request task.
* \param ch I/O channel to submit request to the accel engine. This channel can
* be obtained by the function spdk_accel_engine_get_io_channel().
* \param ch I/O channel to submit request to the accel engine.
* \param dst Destination to fill.
* \param fill Constant byte to fill to the destination.
* \param nbytes Length in bytes to fill.
@ -194,8 +244,7 @@ int spdk_accel_submit_fill(struct spdk_accel_task *accel_req, struct spdk_io_cha
* This operation will calculate the 4 byte CRC32-C for the given data.
*
* \param accel_req Accel request task.
* \param ch I/O channel to submit request to the accel engine. This channel can
* be obtained by the function spdk_accel_engine_get_io_channel().
* \param ch I/O channel to submit request to the accel engine.
* \param dst Destination to write the CRC-32C to.
* \param src The source address for the data.
* \param seed Four byte seed value.

View File

@ -56,6 +56,11 @@ struct spdk_idxd_io_channel;
*/
struct spdk_idxd_device;
/**
* Opaque handle for batching.
*/
struct idxd_batch;
/**
* Signature for configuring a channel
*
@ -138,7 +143,57 @@ void spdk_idxd_detach(struct spdk_idxd_device *idxd);
void spdk_idxd_set_config(uint32_t config_number);
/**
* Build and submit a accel engine memory copy request.
* Return the max number of descriptors per batch for IDXD.
*
* \return max number of desciptors per batch.
*/
uint32_t spdk_idxd_batch_get_max(void);
/**
* Create a batch sequence.
*
* \param chan IDXD channel to submit request.
*
* \return handle to use for subsequent batch requests, NULL on failure.
*/
struct idxd_batch *spdk_idxd_batch_create(struct spdk_idxd_io_channel *chan);
/**
* Submit a batch sequence.
*
* \param chan IDXD channel to submit request.
* \param batch Handle provided when the batch was started with spdk_idxd_batch_create().
* \param cb_fn Callback function which will be called when the request is complete.
* \param cb_arg Opaque value which will be passed back as the arg parameter in
* the completion callback.
*
* \return 0 on success, negative errno on failure.
*/
int spdk_idxd_batch_submit(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
spdk_idxd_req_cb cb_fn, void *cb_arg);
/**
* Synchronous call to prepare a copy request into a previously initialized batch
* created with spdk_idxd_batch_create(). The callback will be called when the copy
* completes after the batch has been submitted by an asynchronous call to
* spdk_idxd_batch_submit().
*
* \param chan IDXD channel to submit request.
* \param batch Handle provided when the batch was started with spdk_idxd_batch_create().
* \param dst Destination virtual address.
* \param src Source virtual address.
* \param nbytes Number of bytes to copy.
* \param cb_fn Callback function which will be called when the request is complete.
* \param cb_arg Opaque value which will be passed back as the arg parameter in
* the completion callback.
*
* \return 0 on success, negative errno on failure.
*/
int spdk_idxd_batch_prep_copy(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
void *dst, const void *src, uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg);
/**
* Build and submit a idxd memory copy request.
*
* This function will build the copy descriptor and then immediately submit
* by writing to the proper device portal.
@ -158,7 +213,7 @@ int spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan,
spdk_idxd_req_cb cb_fn, void *cb_arg);
/**
* Build and submit an accel engine dual cast copy request.
* Build and submit an idxd dual cast copy request.
*
* This function will build the dual cast descriptor and then immediately submit
* by writing to the proper device portal.
@ -179,7 +234,7 @@ int spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan,
spdk_idxd_req_cb cb_fn, void *cb_arg);
/**
* Build and submit a memory compare request.
* Build and submit an idxd memory compare request.
*
* This function will build the compare descriptor and then immediately submit
* by writing to the proper device portal.
@ -199,7 +254,7 @@ int spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan,
spdk_idxd_req_cb cb_fn, void *cb_arg);
/**
* Build and submit a accel engine memory fill request.
* Build and submit an idxd memory fill request.
*
* This function will build the fill descriptor and then immediately submit
* by writing to the proper device portal.
@ -219,7 +274,7 @@ int spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan,
spdk_idxd_req_cb cb_fn, void *cb_arg);
/**
* Build and submit a memory CRC32-C request.
* Build and submit an idxd memory CRC32-C request.
*
* This function will build the CRC-32C descriptor and then immediately submit
* by writing to the proper device portal.

View File

@ -50,6 +50,12 @@ struct spdk_accel_engine {
uint64_t nbytes, spdk_accel_completion_cb cb);
int (*dualcast)(void *cb_arg, struct spdk_io_channel *ch, void *dst1, void *dst2, void *src,
uint64_t nbytes, spdk_accel_completion_cb cb);
uint32_t (*batch_get_max)(void);
struct spdk_accel_batch *(*batch_create)(struct spdk_io_channel *ch);
int (*batch_prep_copy)(void *cb_arg, struct spdk_io_channel *ch, struct spdk_accel_batch *batch,
void *dst, void *src, uint64_t nbytes, spdk_accel_completion_cb cb);
int (*batch_submit)(void *cb_arg, struct spdk_io_channel *ch, struct spdk_accel_batch *batch,
spdk_accel_completion_cb cb);
int (*compare)(void *cb_arg, struct spdk_io_channel *ch, void *src1, void *src2,
uint64_t nbytes, spdk_accel_completion_cb cb);
int (*fill)(void *cb_arg, struct spdk_io_channel *ch, void *dst, uint8_t fill,

View File

@ -140,6 +140,50 @@ spdk_accel_submit_dualcast(struct spdk_accel_task *accel_req, struct spdk_io_cha
_accel_engine_done);
}
/* Accel framework public API for batch_create function */
struct spdk_accel_batch *
spdk_accel_batch_create(struct spdk_io_channel *ch)
{
struct accel_io_channel *accel_ch = spdk_io_channel_get_ctx(ch);
return accel_ch->engine->batch_create(accel_ch->ch);
}
/* Accel framework public API for batch_submit function */
int
spdk_accel_batch_submit(struct spdk_accel_task *accel_req, struct spdk_io_channel *ch,
struct spdk_accel_batch *batch, spdk_accel_completion_cb cb)
{
struct accel_io_channel *accel_ch = spdk_io_channel_get_ctx(ch);
accel_req->cb = cb;
return accel_ch->engine->batch_submit(accel_req->offload_ctx, accel_ch->ch, batch,
_accel_engine_done);
}
/* Accel framework public API for getting max batch */
uint32_t
spdk_accel_batch_get_max(struct spdk_io_channel *ch)
{
struct accel_io_channel *accel_ch = spdk_io_channel_get_ctx(ch);
return accel_ch->engine->batch_get_max();
}
/* Accel framework public API for batch prep_copy function */
int
spdk_accel_batch_prep_copy(struct spdk_accel_task *accel_req, struct spdk_io_channel *ch,
struct spdk_accel_batch *batch, void *dst, void *src, uint64_t nbytes,
spdk_accel_completion_cb cb)
{
struct accel_io_channel *accel_ch = spdk_io_channel_get_ctx(ch);
accel_req->cb = cb;
return accel_ch->engine->batch_prep_copy(accel_req->offload_ctx, accel_ch->ch, batch, dst, src,
nbytes,
_accel_engine_done);
}
/* Accel framework public API for compare function */
int
spdk_accel_submit_compare(struct spdk_accel_task *accel_req, struct spdk_io_channel *ch,
@ -419,6 +463,10 @@ static struct spdk_accel_engine sw_accel_engine = {
.get_capabilities = sw_accel_get_capabilities,
.copy = sw_accel_submit_copy,
.dualcast = sw_accel_submit_dualcast,
.batch_get_max = NULL, /* TODO */
.batch_create = NULL, /* TODO */
.batch_prep_copy = NULL, /* TODO */
.batch_submit = NULL, /* TODO */
.compare = sw_accel_submit_compare,
.fill = sw_accel_submit_fill,
.crc32c = sw_accel_submit_crc32c,

View File

@ -8,6 +8,10 @@
spdk_accel_engine_module_finish;
spdk_accel_engine_get_io_channel;
spdk_accel_get_capabilities;
spdk_accel_batch_get_max;
spdk_accel_batch_create;
spdk_accel_batch_prep_copy;
spdk_accel_batch_submit;
spdk_accel_submit_copy;
spdk_accel_submit_dualcast;
spdk_accel_submit_compare;

View File

@ -104,6 +104,8 @@ struct spdk_idxd_io_channel *
spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
{
struct spdk_idxd_io_channel *chan;
struct idxd_batch *batch;
int i;
chan = calloc(1, sizeof(struct spdk_idxd_io_channel));
if (chan == NULL) {
@ -112,6 +114,22 @@ spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
}
chan->idxd = idxd;
TAILQ_INIT(&chan->batches);
TAILQ_INIT(&chan->batch_pool);
for (i = 0 ; i < NUM_BATCHES ; i++) {
batch = calloc(1, sizeof(struct idxd_batch));
if (batch == NULL) {
SPDK_ERRLOG("Failed to allocate batch\n");
while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
TAILQ_REMOVE(&chan->batch_pool, batch, link);
free(batch);
}
return NULL;
}
TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
}
return chan;
}
@ -125,7 +143,9 @@ int
spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan)
{
uint32_t num_ring_slots;
int rc;
/* Round robin the WQ selection for the chan on this IDXD device. */
chan->idxd->wq_id++;
if (chan->idxd->wq_id == g_dev_cfg->total_wqs) {
chan->idxd->wq_id = 0;
@ -148,13 +168,13 @@ spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan)
/* Store the original size of the ring. */
chan->ring_ctrl.ring_size = num_ring_slots;
chan->ring_ctrl.data_desc = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_hw_desc),
0x40, NULL,
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
if (chan->ring_ctrl.data_desc == NULL) {
chan->ring_ctrl.desc = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_hw_desc),
0x40, NULL,
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
if (chan->ring_ctrl.desc == NULL) {
SPDK_ERRLOG("Failed to allocate descriptor memory\n");
spdk_bit_array_free(&chan->ring_ctrl.ring_slots);
return -ENOMEM;
rc = -ENOMEM;
goto err_desc;
}
chan->ring_ctrl.completions = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_comp),
@ -162,13 +182,79 @@ spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan)
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
if (chan->ring_ctrl.completions == NULL) {
SPDK_ERRLOG("Failed to allocate completion memory\n");
spdk_bit_array_free(&chan->ring_ctrl.ring_slots);
spdk_free(chan->ring_ctrl.data_desc);
return -ENOMEM;
rc = -ENOMEM;
goto err_comp;
}
chan->ring_ctrl.user_desc = spdk_zmalloc(TOTAL_USER_DESC * sizeof(struct idxd_hw_desc),
0x40, NULL,
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
if (chan->ring_ctrl.user_desc == NULL) {
SPDK_ERRLOG("Failed to allocate batch descriptor memory\n");
rc = -ENOMEM;
goto err_user_desc;
}
/* Each slot on the ring reserves DESC_PER_BATCH elemnts in user_desc. */
chan->ring_ctrl.user_ring_slots = spdk_bit_array_create(NUM_BATCHES);
if (chan->ring_ctrl.user_ring_slots == NULL) {
SPDK_ERRLOG("Failed to allocate bit array for user ring\n");
rc = -ENOMEM;
goto err_user_ring;
}
chan->ring_ctrl.user_completions = spdk_zmalloc(TOTAL_USER_DESC * sizeof(struct idxd_comp),
0x40, NULL,
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
if (chan->ring_ctrl.user_completions == NULL) {
SPDK_ERRLOG("Failed to allocate user completion memory\n");
rc = -ENOMEM;
goto err_user_comp;
}
chan->ring_ctrl.portal = (char *)chan->idxd->portals + chan->idxd->wq_id * PORTAL_SIZE;
return 0;
err_user_comp:
spdk_bit_array_free(&chan->ring_ctrl.user_ring_slots);
err_user_ring:
spdk_free(chan->ring_ctrl.user_desc);
err_user_desc:
spdk_free(chan->ring_ctrl.completions);
err_comp:
spdk_free(chan->ring_ctrl.desc);
err_desc:
spdk_bit_array_free(&chan->ring_ctrl.ring_slots);
return rc;
}
/* Used for control commands, not for descriptor submission. */
static int
idxd_wait_cmd(struct spdk_idxd_device *idxd, int _timeout)
{
uint32_t timeout = _timeout;
union idxd_cmdsts_reg cmd_status = {};
cmd_status.raw = _idxd_read_4(idxd, IDXD_CMDSTS_OFFSET);
while (cmd_status.active && --timeout) {
usleep(1);
cmd_status.raw = _idxd_read_4(idxd, IDXD_CMDSTS_OFFSET);
}
/* Check for timeout */
if (timeout == 0 && cmd_status.active) {
SPDK_ERRLOG("Command timeout, waited %u\n", _timeout);
return -EBUSY;
}
/* Check for error */
if (cmd_status.err) {
SPDK_ERRLOG("Command status reg reports error 0x%x\n", cmd_status.err);
return -EINVAL;
}
return 0;
}
@ -178,10 +264,6 @@ _idxd_drain(struct spdk_idxd_io_channel *chan)
uint32_t index;
int set = 0;
/*
* TODO this is a temp solution to drain until getting the drain cmd to work, this
* provides equivalent functionality but just doesn't use the device to do it.
*/
do {
spdk_idxd_process_events(chan);
set = 0;
@ -196,6 +278,7 @@ spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan, uint32_t num_chann
{
uint32_t num_ring_slots;
int rc;
struct idxd_batch *batch;
_idxd_drain(chan);
@ -203,8 +286,15 @@ spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan, uint32_t num_chann
if (num_channels == 0) {
spdk_free(chan->ring_ctrl.completions);
spdk_free(chan->ring_ctrl.data_desc);
spdk_free(chan->ring_ctrl.desc);
spdk_bit_array_free(&chan->ring_ctrl.ring_slots);
spdk_free(chan->ring_ctrl.user_completions);
spdk_free(chan->ring_ctrl.user_desc);
spdk_bit_array_free(&chan->ring_ctrl.user_ring_slots);
while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
TAILQ_REMOVE(&chan->batch_pool, batch, link);
free(batch);
}
return 0;
}
@ -219,6 +309,12 @@ spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan, uint32_t num_chann
chan->ring_ctrl.max_ring_slots = num_ring_slots;
/*
* Note: The batch descriptor ring does not change with the
* number of channels as descriptors on this ring do not
* "count" for flow control.
*/
return rc;
}
@ -286,34 +382,6 @@ idxd_map_pci_bars(struct spdk_idxd_device *idxd)
return 0;
}
/* Used for control commands, not for descriptor submission. */
static int
idxd_wait_cmd(struct spdk_idxd_device *idxd, int _timeout)
{
uint32_t timeout = _timeout;
union idxd_cmdsts_reg cmd_status = {};
cmd_status.raw = _idxd_read_4(idxd, IDXD_CMDSTS_OFFSET);
while (cmd_status.active && --timeout) {
usleep(1);
cmd_status.raw = _idxd_read_4(idxd, IDXD_CMDSTS_OFFSET);
}
/* Check for timeout */
if (timeout == 0 && cmd_status.active) {
SPDK_ERRLOG("Command timeout, waited %u\n", _timeout);
return -EBUSY;
}
/* Check for error */
if (cmd_status.err) {
SPDK_ERRLOG("Command status reg reports error 0x%x\n", cmd_status.err);
return -EINVAL;
}
return 0;
}
static int
idxd_reset_dev(struct spdk_idxd_device *idxd)
{
@ -639,8 +707,8 @@ spdk_idxd_detach(struct spdk_idxd_device *idxd)
}
static struct idxd_hw_desc *
_idxd_prep_command(struct spdk_idxd_io_channel *chan,
spdk_idxd_req_cb cb_fn, void *cb_arg)
_idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
void *cb_arg, struct idxd_batch *batch)
{
uint32_t index;
struct idxd_hw_desc *desc;
@ -654,26 +722,30 @@ _idxd_prep_command(struct spdk_idxd_io_channel *chan,
spdk_bit_array_set(chan->ring_ctrl.ring_slots, index);
desc = &chan->ring_ctrl.data_desc[index];
desc = &chan->ring_ctrl.desc[index];
comp = &chan->ring_ctrl.completions[index];
desc->flags = IDXD_FLAG_COMPLETION_ADDR_VALID | IDXD_FLAG_REQUEST_COMPLETION;
desc->completion_addr = (uintptr_t)&comp->hw;
comp->cb_arg = cb_arg;
comp->cb_fn = cb_fn;
if (batch) {
assert(comp->batch == NULL);
comp->batch = batch;
batch->batch_desc_index = index;
}
return desc;
}
int
spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
uint64_t nbytes,
spdk_idxd_req_cb cb_fn, void *cb_arg)
uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
{
struct idxd_hw_desc *desc;
/* Common prep. */
desc = _idxd_prep_command(chan, cb_fn, cb_arg);
desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
if (desc == NULL) {
return -EBUSY;
}
@ -703,7 +775,7 @@ spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *d
}
/* Common prep. */
desc = _idxd_prep_command(chan, cb_fn, cb_arg);
desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
if (desc == NULL) {
return -EBUSY;
}
@ -729,7 +801,7 @@ spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan, void *src1, const vo
struct idxd_hw_desc *desc;
/* Common prep. */
desc = _idxd_prep_command(chan, cb_fn, cb_arg);
desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
if (desc == NULL) {
return -EBUSY;
}
@ -754,7 +826,7 @@ spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan, void *dst, uint64_t fil
struct idxd_hw_desc *desc;
/* Common prep. */
desc = _idxd_prep_command(chan, cb_fn, cb_arg);
desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
if (desc == NULL) {
return -EBUSY;
}
@ -779,7 +851,7 @@ spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void *
struct idxd_hw_desc *desc;
/* Common prep. */
desc = _idxd_prep_command(chan, cb_fn, cb_arg);
desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
if (desc == NULL) {
return -EBUSY;
}
@ -798,6 +870,141 @@ spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void *
return 0;
}
uint32_t
spdk_idxd_batch_get_max(void)
{
return DESC_PER_BATCH; /* TODO maybe add startup RPC to set this */
}
struct idxd_batch *
spdk_idxd_batch_create(struct spdk_idxd_io_channel *chan)
{
struct idxd_batch *batch = NULL;
if (!TAILQ_EMPTY(&chan->batch_pool)) {
batch = TAILQ_FIRST(&chan->batch_pool);
TAILQ_REMOVE(&chan->batch_pool, batch, link);
} else {
/* The application needs to handle this. */
return NULL;
}
batch->batch_num = spdk_bit_array_find_first_clear(chan->ring_ctrl.user_ring_slots, 0);
if (batch->batch_num == UINT32_MAX) {
/* ran out of ring slots, the application needs to handle this. */
TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
return NULL;
}
spdk_bit_array_set(chan->ring_ctrl.user_ring_slots, batch->batch_num);
/*
* Find the first descriptor address for the given batch. The
* descriptor ring used for user desctipors is allocated in
* units of DESC_PER_BATCH. The actual index is in units of
* one descriptor.
*/
batch->start_index = batch->cur_index = batch->batch_num * DESC_PER_BATCH;
TAILQ_INSERT_TAIL(&chan->batches, batch, link);
SPDK_DEBUGLOG(SPDK_LOG_IDXD, "New batch %p num %u\n", batch, batch->batch_num);
return batch;
}
static bool
_does_batch_exist(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
{
bool found = false;
struct idxd_batch *cur_batch;
TAILQ_FOREACH(cur_batch, &chan->batches, link) {
if (cur_batch == batch) {
found = true;
break;
}
}
return found;
}
int
spdk_idxd_batch_submit(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
spdk_idxd_req_cb cb_fn, void *cb_arg)
{
struct idxd_hw_desc *desc;
if (_does_batch_exist(batch, chan) == false) {
SPDK_ERRLOG("Attempt to submit a batch that doesn't exist\n.");
return -EINVAL;
}
/* Common prep. */
desc = _idxd_prep_command(chan, cb_fn, cb_arg, batch);
if (desc == NULL) {
SPDK_DEBUGLOG(SPDK_LOG_IDXD, "Can't submit batch %p busy batch num %u\n", batch, batch->batch_num);
return -EBUSY;
}
/* Command specific. */
desc->opcode = IDXD_OPCODE_BATCH;
desc->desc_list_addr = (uint64_t)&chan->ring_ctrl.user_desc[batch->start_index];
desc->desc_count = batch->cur_index - batch->start_index;
assert(desc->desc_count <= DESC_PER_BATCH);
if (desc->desc_count < MIN_USER_DESC_COUNT) {
SPDK_ERRLOG("Attempt to submit a batch without at least %u operations.\n",
MIN_USER_DESC_COUNT);
return -EINVAL;
}
/* Total completions for the batch = num desc plus 1 for the batch desc itself. */
batch->remaining = desc->desc_count + 1;
/* Submit operation. */
movdir64b(chan->ring_ctrl.portal, desc);
return 0;
}
int
spdk_idxd_batch_prep_copy(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
void *dst, const void *src, uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
{
struct idxd_hw_desc *desc;
struct idxd_comp *comp;
if (_does_batch_exist(batch, chan) == false) {
SPDK_ERRLOG("Attempt to add to a batch that doesn't exist\n.");
return -EINVAL;
}
if ((batch->cur_index - batch->start_index) == DESC_PER_BATCH) {
SPDK_ERRLOG("Attempt to add to a batch that is already full\n.");
return -ENOMEM;
}
desc = &chan->ring_ctrl.user_desc[batch->cur_index];
comp = &chan->ring_ctrl.user_completions[batch->cur_index];
SPDK_DEBUGLOG(SPDK_LOG_IDXD, "Prep batch %p index %u\n", batch, batch->cur_index);
batch->cur_index++;
assert(batch->cur_index > batch->start_index);
desc->flags = IDXD_FLAG_COMPLETION_ADDR_VALID | IDXD_FLAG_REQUEST_COMPLETION;
desc->opcode = IDXD_OPCODE_MEMMOVE;
desc->src_addr = (uint64_t)src;
desc->dst_addr = (uint64_t)dst;
desc->xfer_size = nbytes;
desc->completion_addr = (uint64_t)&comp->hw;
comp->cb_arg = cb_arg;
comp->cb_fn = cb_fn;
comp->batch = batch;
return 0;
}
static void
_dump_error_reg(struct spdk_idxd_io_channel *chan)
{
@ -817,6 +1024,77 @@ _dump_error_reg(struct spdk_idxd_io_channel *chan)
SPDK_NOTICELOG("SW Error Operation: %u\n", (uint8_t)(sw_error_0 >> 32));
}
static void
_free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan,
struct idxd_comp *comp)
{
TAILQ_REMOVE(&chan->batches, batch, link);
TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
comp->batch = NULL;
spdk_bit_array_clear(chan->ring_ctrl.user_ring_slots, batch->batch_num);
spdk_bit_array_clear(chan->ring_ctrl.ring_slots, batch->batch_desc_index);
}
static void
_spdk_idxd_process_batch_events(struct spdk_idxd_io_channel *chan)
{
uint16_t index;
struct idxd_comp *comp;
uint64_t sw_error_0;
int status = 0;
struct idxd_batch *batch;
/*
* We don't check the bit array for user completions as there's only
* one bit per per batch.
*/
for (index = 0; index < TOTAL_USER_DESC; index++) {
comp = &chan->ring_ctrl.user_completions[index];
if (comp->hw.status == 1) {
struct idxd_hw_desc *desc;
sw_error_0 = _idxd_read_8(chan->idxd, IDXD_SWERR_OFFSET);
if (sw_error_0 & 0x1) {
_dump_error_reg(chan);
status = -EINVAL;
}
desc = &chan->ring_ctrl.user_desc[index];
switch (desc->opcode) {
case IDXD_OPCODE_CRC32C_GEN:
*(uint32_t *)desc->dst_addr = comp->hw.crc32c_val;
*(uint32_t *)desc->dst_addr ^= ~0;
break;
case IDXD_OPCODE_COMPARE:
if (status == 0) {
status = comp->hw.result;
}
break;
case IDXD_OPCODE_MEMMOVE:
break;
default:
assert(false);
break;
}
/* The hw will complete all user desc first before the batch
* desc (see spec for configuration exceptions) however
* because of the order that we check for comps in the poller
* we may "see" them in a different order than they actually
* completed in.
*/
batch = comp->batch;
assert(batch->remaining > 0);
if (--batch->remaining == 0) {
_free_batch(batch, chan, comp);
}
comp->cb_fn((void *)comp->cb_arg, status);
comp->hw.status = status = 0;
}
}
}
/*
* TODO: Experiment with different methods of reaping completions for performance
* once we have real silicon.
@ -828,6 +1106,11 @@ spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
struct idxd_comp *comp;
uint64_t sw_error_0;
int status = 0;
struct idxd_batch *batch;
if (!TAILQ_EMPTY(&chan->batches)) {
_spdk_idxd_process_batch_events(chan);
}
for (index = 0; index < chan->ring_ctrl.max_ring_slots; index++) {
if (spdk_bit_array_get(chan->ring_ctrl.ring_slots, index)) {
@ -841,8 +1124,21 @@ spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
status = -EINVAL;
}
desc = &chan->ring_ctrl.data_desc[index];
desc = &chan->ring_ctrl.desc[index];
switch (desc->opcode) {
case IDXD_OPCODE_BATCH:
/* The hw will complete all user desc first before the batch
* desc (see spec for configuration exceptions) however
* because of the order that we check for comps in the poller
* we may "see" them in a different order than they actually
* completed in.
*/
batch = comp->batch;
assert(batch->remaining > 0);
if (--batch->remaining == 0) {
_free_batch(batch, chan, comp);
}
break;
case IDXD_OPCODE_CRC32C_GEN:
*(uint32_t *)desc->dst_addr = comp->hw.crc32c_val;
*(uint32_t *)desc->dst_addr ^= ~0;
@ -856,7 +1152,9 @@ spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
comp->cb_fn(comp->cb_arg, status);
comp->hw.status = status = 0;
spdk_bit_array_clear(chan->ring_ctrl.ring_slots, index);
if (desc->opcode != IDXD_OPCODE_BATCH) {
spdk_bit_array_clear(chan->ring_ctrl.ring_slots, index);
}
}
}
}

View File

@ -57,6 +57,7 @@ static inline void movdir64b(void *dst, const void *src)
}
#define IDXD_REGISTER_TIMEOUT_US 50
#define IDXD_DRAIN_TIMEOUT_US 500000
/* TODO: make some of these RPC selectable */
#define WQ_MODE_DEDICATED 1
@ -66,6 +67,19 @@ static inline void movdir64b(void *dst, const void *src)
#define WQ_PRIORITY_1 1
#define IDXD_MAX_QUEUES 64
#define TOTAL_USER_DESC (1 << LOG2_WQ_MAX_BATCH)
#define DESC_PER_BATCH 16 /* TODO maybe make this a startup RPC */
#define NUM_BATCHES (TOTAL_USER_DESC / DESC_PER_BATCH)
#define MIN_USER_DESC_COUNT 2
struct idxd_batch {
uint32_t batch_desc_index;
uint32_t batch_num;
uint32_t cur_index;
uint32_t start_index;
uint32_t remaining;
TAILQ_ENTRY(idxd_batch) link;
};
struct device_config {
uint8_t config_num;
@ -83,24 +97,33 @@ struct idxd_ring_control {
/*
* Rings for this channel, one for descriptors and one
* for completions, share the same index. Future will
* include a separate ring for batch descriptors once
* the batch interface is completed.
* for completions, share the same index. Batch descriptors
* are managed independently from data descriptors.
*/
struct idxd_hw_desc *data_desc;
struct idxd_hw_desc *desc;
struct idxd_comp *completions;
struct idxd_hw_desc *user_desc;
struct idxd_comp *user_completions;
/*
* We use one bit array to track ring slots for both
* data_desc and completions.
* desc and completions.
*/
struct spdk_bit_array *ring_slots;
uint32_t max_ring_slots;
/*
* We use a separate bit array to track ring slots for
* descriptors submitted via the user in a batch.
*/
struct spdk_bit_array *user_ring_slots;
};
struct spdk_idxd_io_channel {
struct spdk_idxd_device *idxd;
struct idxd_ring_control ring_ctrl;
TAILQ_HEAD(, idxd_batch) batch_pool; /* free batches */
TAILQ_HEAD(, idxd_batch) batches; /* in use batches */
};
struct pci_dev_id {
@ -130,7 +153,7 @@ struct idxd_comp {
struct idxd_hw_comp_record hw;
void *cb_arg;
spdk_idxd_req_cb cb_fn;
uint64_t pad1;
struct idxd_batch *batch;
uint64_t pad2;
} __attribute__((packed));
SPDK_STATIC_ASSERT(sizeof(struct idxd_comp) == 64, "size mismatch");

View File

@ -6,6 +6,10 @@
spdk_idxd_reconfigure_chan;
spdk_idxd_probe;
spdk_idxd_detach;
spdk_idxd_batch_prep_copy;
spdk_idxd_batch_submit;
spdk_idxd_batch_create;
spdk_idxd_batch_get_max;
spdk_idxd_set_config;
spdk_idxd_submit_compare;
spdk_idxd_submit_crc32c;

View File

@ -94,6 +94,7 @@ struct idxd_op {
uint64_t fill_pattern;
uint32_t op_code;
uint64_t nbytes;
struct idxd_batch *batch;
TAILQ_ENTRY(idxd_op) link;
};
@ -121,6 +122,7 @@ idxd_select_device(void)
* We allow channels to share underlying devices,
* selection is round-robin based.
*/
g_next_dev = TAILQ_NEXT(g_next_dev, tailq);
if (g_next_dev == NULL) {
g_next_dev = TAILQ_FIRST(&g_idxd_devices);
@ -144,7 +146,6 @@ idxd_poll(void *arg)
while (!TAILQ_EMPTY(&chan->queued_ops)) {
op = TAILQ_FIRST(&chan->queued_ops);
TAILQ_REMOVE(&chan->queued_ops, op, link);
switch (op->op_code) {
case IDXD_OPCODE_MEMMOVE:
@ -167,16 +168,19 @@ idxd_poll(void *arg)
rc = spdk_idxd_submit_crc32c(op->chan, op->dst, op->src, op->seed, op->nbytes,
op->cb_fn, op->cb_arg);
break;
case IDXD_OPCODE_BATCH:
rc = spdk_idxd_batch_submit(op->chan, op->batch, op->cb_fn, op->cb_arg);
break;
default:
/* Should never get here */
assert(false);
break;
}
if (rc == 0) {
TAILQ_REMOVE(&chan->queued_ops, op, link);
free(op);
} else {
/* Busy, resubmit to try again later */
TAILQ_INSERT_HEAD(&chan->queued_ops, op, link);
break;
}
}
@ -262,8 +266,7 @@ idxd_submit_copy(void *cb_arg, struct spdk_io_channel *ch, void *dst, void *src,
static int
idxd_submit_dualcast(void *cb_arg, struct spdk_io_channel *ch, void *dst1, void *dst2, void *src,
uint64_t nbytes,
spdk_accel_completion_cb cb)
uint64_t nbytes, spdk_accel_completion_cb cb)
{
struct idxd_task *idxd_task = (struct idxd_task *)cb_arg;
struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
@ -304,8 +307,7 @@ idxd_submit_dualcast(void *cb_arg, struct spdk_io_channel *ch, void *dst1, void
static int
idxd_submit_compare(void *cb_arg, struct spdk_io_channel *ch, void *src1, void *src2,
uint64_t nbytes,
spdk_accel_completion_cb cb)
uint64_t nbytes, spdk_accel_completion_cb cb)
{
struct idxd_task *idxd_task = (struct idxd_task *)cb_arg;
struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
@ -430,12 +432,83 @@ static uint64_t
idxd_get_capabilities(void)
{
return ACCEL_COPY | ACCEL_FILL | ACCEL_CRC32C | ACCEL_COMPARE |
ACCEL_DUALCAST;
ACCEL_DUALCAST | ACCEL_BATCH;
}
static uint32_t
idxd_batch_get_max(void)
{
return spdk_idxd_batch_get_max();
}
static struct spdk_accel_batch *
idxd_batch_start(struct spdk_io_channel *ch)
{
struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
return (struct spdk_accel_batch *)spdk_idxd_batch_create(chan->chan);
}
static int
idxd_batch_submit(void *cb_arg, struct spdk_io_channel *ch, struct spdk_accel_batch *_batch,
spdk_accel_completion_cb cb)
{
struct idxd_task *idxd_task = (struct idxd_task *)cb_arg;
struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
struct idxd_batch *batch = (struct idxd_batch *)_batch;
int rc = 0;
idxd_task->cb = cb;
if (chan->state == IDXD_CHANNEL_ACTIVE) {
rc = spdk_idxd_batch_submit(chan->chan, batch, idxd_done, idxd_task);
}
if (chan->state == IDXD_CHANNEL_PAUSED || rc == -EBUSY) {
struct idxd_op *op_to_queue;
/* Commpom prep. */
op_to_queue = _prep_queue_command(chan, idxd_done, idxd_task);
if (op_to_queue == NULL) {
return -ENOMEM;
}
/* Command specific. */
op_to_queue->batch = batch;
op_to_queue->op_code = IDXD_OPCODE_BATCH;
/* Queue the operation. */
TAILQ_INSERT_TAIL(&chan->queued_ops, op_to_queue, link);
return 0;
} else if (chan->state == IDXD_CHANNEL_ERROR) {
return -EINVAL;
}
return rc;
}
static int
idxd_batch_prep_copy(void *cb_arg, struct spdk_io_channel *ch, struct spdk_accel_batch *_batch,
void *dst, void *src, uint64_t nbytes, spdk_accel_completion_cb cb)
{
struct idxd_task *idxd_task = (struct idxd_task *)cb_arg;
struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
struct idxd_batch *batch = (struct idxd_batch *)_batch;
idxd_task->cb = cb;
return spdk_idxd_batch_prep_copy(chan->chan, batch, dst, src, nbytes,
idxd_done, idxd_task);
}
static struct spdk_accel_engine idxd_accel_engine = {
.get_capabilities = idxd_get_capabilities,
.copy = idxd_submit_copy,
.batch_get_max = idxd_batch_get_max,
.batch_create = idxd_batch_start,
.batch_prep_copy = idxd_batch_prep_copy,
.batch_submit = idxd_batch_submit,
.dualcast = idxd_submit_dualcast,
.compare = idxd_submit_compare,
.fill = idxd_submit_fill,
@ -489,8 +562,7 @@ _pause_chan(struct spdk_io_channel_iter *i)
static void
_pause_chan_done(struct spdk_io_channel_iter *i, int status)
{
spdk_for_each_channel(&idxd_accel_engine, _config_max_desc, NULL,
NULL);
spdk_for_each_channel(&idxd_accel_engine, _config_max_desc, NULL, NULL);
}
static int
@ -550,8 +622,7 @@ static void
_pause_chan_destroy_done(struct spdk_io_channel_iter *i, int status)
{
/* Rebalance the rings with the smaller number of remaining channels. */
spdk_for_each_channel(&idxd_accel_engine, _config_max_desc, NULL,
NULL);
spdk_for_each_channel(&idxd_accel_engine, _config_max_desc, NULL, NULL);
}
static void