util/pipe: Track full condition with a separate bool

This allows the pipe to fill up entirely, instead of reserving 1 byte.
This tends to keep copies from the pipe aligned over time.

Change-Id: I4801d62fa839165efef61ea1c83f602931ee7018
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16990
Reviewed-by: Shuhei Matsumoto <smatsumoto@nvidia.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
Ben Walker 2023-02-28 12:05:08 -07:00
parent fe0344ec74
commit 6648ea0fe6
3 changed files with 97 additions and 78 deletions

View File

@ -12,6 +12,7 @@ struct spdk_pipe {
uint32_t write;
uint32_t read;
bool full;
};
struct spdk_pipe *
@ -46,12 +47,16 @@ spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struc
read = pipe->read;
write = pipe->write;
if (read <= write) {
requested_sz = spdk_min(requested_sz, ((read + pipe->sz) - write - 1));
if (pipe->full || requested_sz == 0) {
iovs[0].iov_base = NULL;
iovs[0].iov_len = 0;
return 0;
}
if (read <= write) {
sz = spdk_min(requested_sz, pipe->sz - write);
iovs[0].iov_base = (sz == 0) ? NULL : (pipe->buf + write);
iovs[0].iov_base = pipe->buf + write;
iovs[0].iov_len = sz;
requested_sz -= sz;
@ -66,7 +71,7 @@ spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struc
iovs[1].iov_len = 0;
}
} else {
sz = spdk_min(requested_sz, read - write - 1);
sz = spdk_min(requested_sz, read - write);
iovs[0].iov_base = (sz == 0) ? NULL : (pipe->buf + write);
iovs[0].iov_len = sz;
@ -87,38 +92,37 @@ spdk_pipe_writer_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
read = pipe->read;
write = pipe->write;
if (requested_sz > pipe->sz - 1) {
if (requested_sz > pipe->sz || pipe->full) {
return -EINVAL;
}
if (read <= write) {
if (requested_sz > (read + pipe->sz) - write) {
if (requested_sz > (pipe->sz - write) + read) {
return -EINVAL;
}
sz = spdk_min(requested_sz, pipe->sz - write);
write += sz;
if (write > pipe->sz - 1) {
if (write == pipe->sz) {
write = 0;
}
requested_sz -= sz;
if (requested_sz > 0) {
if (requested_sz >= read) {
return -EINVAL;
}
write = requested_sz;
}
} else {
if (requested_sz > (read - write - 1)) {
if (requested_sz > (read - write)) {
return -EINVAL;
}
write += requested_sz;
}
if (read == write) {
pipe->full = true;
}
pipe->write = write;
return 0;
@ -133,11 +137,13 @@ spdk_pipe_reader_bytes_available(struct spdk_pipe *pipe)
read = pipe->read;
write = pipe->write;
if (read <= write) {
if (read == write && !pipe->full) {
return 0;
} else if (read < write) {
return write - read;
} else {
return (pipe->sz - read) + write;
}
return (write + pipe->sz) - read;
}
int
@ -150,7 +156,12 @@ spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struc
read = pipe->read;
write = pipe->write;
if (read <= write) {
if (read == write && !pipe->full) {
iovs[0].iov_base = NULL;
iovs[0].iov_len = 0;
iovs[1].iov_base = NULL;
iovs[1].iov_len = 0;
} else if (read < write) {
sz = spdk_min(requested_sz, write - read);
iovs[0].iov_base = (sz == 0) ? NULL : (pipe->buf + read);
@ -188,7 +199,11 @@ spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
read = pipe->read;
write = pipe->write;
if (read <= write) {
if (requested_sz == 0) {
return 0;
}
if (read < write) {
if (requested_sz > (write - read)) {
return -EINVAL;
}
@ -198,7 +213,7 @@ spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
sz = spdk_min(requested_sz, pipe->sz - read);
read += sz;
if (read > pipe->sz - 1) {
if (read == pipe->sz) {
read = 0;
}
requested_sz -= sz;
@ -212,6 +227,8 @@ spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
}
}
/* We know we advanced at least one byte, so the pipe isn't full. */
pipe->full = false;
pipe->read = read;
return 0;

View File

@ -293,13 +293,13 @@ uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
}
/* Round up to next 64 byte multiple */
new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
new_buf = calloc(SPDK_ALIGN_CEIL(sz, 64), sizeof(uint8_t));
if (!new_buf) {
SPDK_ERRLOG("socket recv buf allocation failed\n");
return -ENOMEM;
}
new_pipe = spdk_pipe_create(new_buf, sz + 1);
new_pipe = spdk_pipe_create(new_buf, sz);
if (new_pipe == NULL) {
SPDK_ERRLOG("socket pipe allocation failed\n");
free(new_buf);

View File

@ -58,22 +58,10 @@ test_write_get_buffer(void)
memset(iovs, 0, sizeof(iovs));
/* Get all available memory */
rc = spdk_pipe_writer_get_buffer(pipe, 9, iovs);
CU_ASSERT(rc == 9);
CU_ASSERT(iovs[0].iov_base == mem);
CU_ASSERT(iovs[0].iov_len == 9);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 0);
CU_ASSERT(pipe->read == 0);
memset(iovs, 0, sizeof(iovs));
/* Get the full size of the data buffer backing the pipe, which isn't allowed */
rc = spdk_pipe_writer_get_buffer(pipe, 10, iovs);
CU_ASSERT(rc == 9);
CU_ASSERT(rc == 10);
CU_ASSERT(iovs[0].iov_base == mem);
CU_ASSERT(iovs[0].iov_len == 9);
CU_ASSERT(iovs[0].iov_len == 10);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 0);
@ -85,10 +73,10 @@ test_write_get_buffer(void)
pipe->write = 7;
/* Get all of the available memory. */
rc = spdk_pipe_writer_get_buffer(pipe, 2, iovs);
CU_ASSERT(rc == 2);
rc = spdk_pipe_writer_get_buffer(pipe, 3, iovs);
CU_ASSERT(rc == 3);
CU_ASSERT(iovs[0].iov_base == (mem + 7));
CU_ASSERT(iovs[0].iov_len == 2);
CU_ASSERT(iovs[0].iov_len == 3);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 7);
@ -97,10 +85,10 @@ test_write_get_buffer(void)
memset(iovs, 0, sizeof(iovs));
/* Get more than the available memory */
rc = spdk_pipe_writer_get_buffer(pipe, 3, iovs);
CU_ASSERT(rc == 2);
rc = spdk_pipe_writer_get_buffer(pipe, 4, iovs);
CU_ASSERT(rc == 3);
CU_ASSERT(iovs[0].iov_base == (mem + 7));
CU_ASSERT(iovs[0].iov_len == 2);
CU_ASSERT(iovs[0].iov_len == 3);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 7);
@ -112,24 +100,24 @@ test_write_get_buffer(void)
pipe->read = 3;
/* Get all of the available memory. */
rc = spdk_pipe_writer_get_buffer(pipe, 5, iovs);
CU_ASSERT(rc == 5);
rc = spdk_pipe_writer_get_buffer(pipe, 6, iovs);
CU_ASSERT(rc == 6);
CU_ASSERT(iovs[0].iov_base == (mem + 7));
CU_ASSERT(iovs[0].iov_len == 3);
CU_ASSERT(iovs[1].iov_base == mem);
CU_ASSERT(iovs[1].iov_len == 2);
CU_ASSERT(iovs[1].iov_len == 3);
CU_ASSERT(pipe->write == 7);
CU_ASSERT(pipe->read == 3);
memset(iovs, 0, sizeof(iovs));
/* Get more than the available memory */
rc = spdk_pipe_writer_get_buffer(pipe, 6, iovs);
CU_ASSERT(rc == 5);
rc = spdk_pipe_writer_get_buffer(pipe, 7, iovs);
CU_ASSERT(rc == 6);
CU_ASSERT(iovs[0].iov_base == (mem + 7));
CU_ASSERT(iovs[0].iov_len == 3);
CU_ASSERT(iovs[1].iov_base == mem);
CU_ASSERT(iovs[1].iov_len == 2);
CU_ASSERT(iovs[1].iov_len == 3);
CU_ASSERT(pipe->write == 7);
CU_ASSERT(pipe->read == 3);
@ -139,10 +127,10 @@ test_write_get_buffer(void)
pipe->read = 9;
/* Get all of the available memory. */
rc = spdk_pipe_writer_get_buffer(pipe, 1, iovs);
CU_ASSERT(rc == 1);
rc = spdk_pipe_writer_get_buffer(pipe, 2, iovs);
CU_ASSERT(rc == 2);
CU_ASSERT(iovs[0].iov_base == (mem + 7));
CU_ASSERT(iovs[0].iov_len == 1);
CU_ASSERT(iovs[0].iov_len == 2);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 7);
@ -151,10 +139,10 @@ test_write_get_buffer(void)
memset(iovs, 0, sizeof(iovs));
/* Get more than the available memory */
rc = spdk_pipe_writer_get_buffer(pipe, 2, iovs);
CU_ASSERT(rc == 1);
rc = spdk_pipe_writer_get_buffer(pipe, 3, iovs);
CU_ASSERT(rc == 2);
CU_ASSERT(iovs[0].iov_base == (mem + 7));
CU_ASSERT(iovs[0].iov_len == 1);
CU_ASSERT(iovs[0].iov_len == 2);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 7);
@ -163,7 +151,8 @@ test_write_get_buffer(void)
memset(iovs, 0, sizeof(iovs));
/* Fill the pipe */
pipe->write = 8;
pipe->write = 9;
pipe->full = true;
/* Get data while the pipe is full */
rc = spdk_pipe_writer_get_buffer(pipe, 1, iovs);
@ -172,7 +161,7 @@ test_write_get_buffer(void)
CU_ASSERT(iovs[0].iov_len == 0);
CU_ASSERT(iovs[1].iov_base == NULL);
CU_ASSERT(iovs[1].iov_len == 0);
CU_ASSERT(pipe->write == 8);
CU_ASSERT(pipe->write == 9);
CU_ASSERT(pipe->read == 9);
spdk_pipe_destroy(pipe);
@ -193,60 +182,73 @@ test_write_advance(void)
CU_ASSERT(rc == 0);
CU_ASSERT(pipe->write == 5);
CU_ASSERT(pipe->read == 0);
CU_ASSERT(!pipe->full);
pipe->write = 0;
pipe->full = false;
/* Advance to the end of the pipe */
rc = spdk_pipe_writer_advance(pipe, 9);
rc = spdk_pipe_writer_advance(pipe, 10);
CU_ASSERT(rc == 0);
CU_ASSERT(pipe->write == 9);
CU_ASSERT(pipe->write == 0);
CU_ASSERT(pipe->read == 0);
CU_ASSERT(pipe->full);
pipe->write = 0;
pipe->full = false;
/* Advance beyond the end */
rc = spdk_pipe_writer_advance(pipe, 10);
rc = spdk_pipe_writer_advance(pipe, 11);
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(pipe->write == 0);
CU_ASSERT(pipe->read == 0);
CU_ASSERT(!pipe->full);
/* Move the read pointer forward */
pipe->write = 0;
pipe->read = 5;
pipe->full = false;
/* Advance to the end of the pipe */
rc = spdk_pipe_writer_advance(pipe, 4);
rc = spdk_pipe_writer_advance(pipe, 5);
CU_ASSERT(rc == 0);
CU_ASSERT(pipe->write == 4);
CU_ASSERT(pipe->write == 5);
CU_ASSERT(pipe->read == 5);
CU_ASSERT(pipe->full);
pipe->write = 0;
pipe->read = 5;
/* Advance beyond the end */
rc = spdk_pipe_writer_advance(pipe, 5);
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(pipe->write == 0);
CU_ASSERT(pipe->read == 5);
/* Test wrap around */
pipe->write = 7;
pipe->read = 3;
/* Advance to the end of the pipe */
rc = spdk_pipe_writer_advance(pipe, 5);
CU_ASSERT(rc == 0);
CU_ASSERT(pipe->write == 2);
CU_ASSERT(pipe->read == 3);
pipe->write = 7;
pipe->read = 3;
pipe->full = false;
/* Advance beyond the end */
rc = spdk_pipe_writer_advance(pipe, 6);
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(pipe->write == 0);
CU_ASSERT(pipe->read == 5);
CU_ASSERT(!pipe->full);
/* Test wrap around */
pipe->write = 7;
pipe->read = 3;
pipe->full = false;
/* Advance to the end of the pipe */
rc = spdk_pipe_writer_advance(pipe, 6);
CU_ASSERT(rc == 0);
CU_ASSERT(pipe->write == 3);
CU_ASSERT(pipe->read == 3);
CU_ASSERT(pipe->full);
pipe->write = 7;
pipe->read = 3;
pipe->full = false;
/* Advance beyond the end */
rc = spdk_pipe_writer_advance(pipe, 7);
CU_ASSERT(rc == -EINVAL);
CU_ASSERT(pipe->write == 7);
CU_ASSERT(pipe->read == 3);
CU_ASSERT(!pipe->full);
spdk_pipe_destroy(pipe);
}