diff --git a/lib/reduce/reduce.c b/lib/reduce/reduce.c index 487d8625c..7b538c618 100644 --- a/lib/reduce/reduce.c +++ b/lib/reduce/reduce.c @@ -40,6 +40,7 @@ #include "spdk/bit_array.h" #include "spdk/util.h" #include "spdk/log.h" +#include "spdk/memory.h" #include "libpmem.h" @@ -334,17 +335,67 @@ struct reduce_init_load_ctx { void *path; }; +static inline bool +_addr_crosses_huge_page(const void *addr, size_t *size) +{ + size_t _size; + uint64_t rc; + + assert(size); + + _size = *size; + rc = spdk_vtophys(addr, size); + + return rc == SPDK_VTOPHYS_ERROR || _size != *size; +} + +static inline int +_set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) +{ + uint8_t *addr; + size_t size_tmp = buffer_size; + + addr = *_addr; + + /* Verify that addr + buffer_size doesn't cross huge page boundary */ + if (_addr_crosses_huge_page(addr, &size_tmp)) { + /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. + * Skip remaining bytes and continue from the beginning of the next page */ + addr += size_tmp; + } + + if (addr + buffer_size > addr_range) { + SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); + return -ERANGE; + } + + *vol_buffer = addr; + *_addr = addr + buffer_size; + + return 0; +} + static int _allocate_vol_requests(struct spdk_reduce_vol *vol) { struct spdk_reduce_vol_request *req; - int i; + uint32_t reqs_in_2mb_page, huge_pages_needed; + uint8_t *buffer, *buffer_end; + int i = 0; + int rc = 0; - /* Allocate 2x since we need buffers for both read/write and compress/decompress - * intermediate buffers. - */ - vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, - 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + /* It is needed to allocate comp and decomp buffers so that they do not cross physical + * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not + * necessarily power of 2 + * Allocate 2x since we need buffers for both read/write and compress/decompress + * intermediate buffers. */ + reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); + if (!reqs_in_2mb_page) { + return -EINVAL; + } + huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); + + vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); if (vol->buf_mem == NULL) { return -ENOMEM; } @@ -369,16 +420,39 @@ _allocate_vol_requests(struct spdk_reduce_vol *vol) return -ENOMEM; } + buffer = vol->buf_mem; + buffer_end = buffer + VALUE_2MB * huge_pages_needed; + for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { req = &vol->request_mem[i]; TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; - req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; - req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; + + rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); + if (rc) { + SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, + vol->buf_mem, buffer_end); + break; + } + rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); + if (rc) { + SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, + vol->buf_mem, buffer_end); + break; + } } - return 0; + if (rc) { + free(vol->buf_iov_mem); + free(vol->request_mem); + spdk_free(vol->buf_mem); + vol->buf_mem = NULL; + vol->buf_iov_mem = NULL; + vol->request_mem = NULL; + } + + return rc; } static void diff --git a/test/unit/lib/reduce/reduce.c/reduce_ut.c b/test/unit/lib/reduce/reduce.c/reduce_ut.c index a61b0e750..e9f12e4e6 100644 --- a/test/unit/lib/reduce/reduce.c/reduce_ut.c +++ b/test/unit/lib/reduce/reduce.c/reduce_ut.c @@ -38,7 +38,9 @@ #include "reduce/reduce.c" #include "spdk_internal/mock.h" +#define UNIT_TEST_NO_VTOPHYS #include "common/lib/test_env.c" +#undef UNIT_TEST_NO_VTOPHYS static struct spdk_reduce_vol *g_vol; static int g_reduce_errno; @@ -53,6 +55,23 @@ static int g_decompressed_len; #define TEST_MD_PATH "/tmp" +uint64_t +spdk_vtophys(const void *buf, uint64_t *size) +{ + /* add + 1 to buf addr for cases where buf is the start of the page, that will give us correct end of the page */ + const uint8_t *page_2mb_end = (const uint8_t *)SPDK_ALIGN_CEIL((uintptr_t)buf + 1, VALUE_2MB); + uint64_t bytes_to_page_end = page_2mb_end - (const uint8_t *)buf; + uint64_t _size; + + if (*size) { + _size = *size; + _size = spdk_min(_size, bytes_to_page_end); + *size = _size; + } + + return (uintptr_t)buf; +} + enum ut_reduce_bdev_io_type { UT_REDUCE_IO_READV = 1, UT_REDUCE_IO_WRITEV = 2, @@ -1735,6 +1754,30 @@ static void test_reduce_decompress_chunk(void) CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req); } +static void test_allocate_vol_requests(void) +{ + struct spdk_reduce_vol *vol; + /* include chunk_sizes which are not power of 2 */ + uint32_t chunk_sizes[] = {8192, 8320, 16384, 16416, 32768}; + uint32_t io_unit_sizes[] = {512, 520, 4096, 4104, 4096}; + uint32_t i; + + for (i = 0; i < 4; i++) { + vol = calloc(1, sizeof(*vol)); + SPDK_CU_ASSERT_FATAL(vol); + + vol->params.chunk_size = chunk_sizes[i]; + vol->params.logical_block_size = io_unit_sizes[i]; + vol->params.backing_io_unit_size = io_unit_sizes[i]; + vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; + vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; + + CU_ASSERT(_validate_vol_params(&vol->params) == 0); + CU_ASSERT(_allocate_vol_requests(vol) == 0); + _init_load_cleanup(vol, NULL); + } +} + int main(int argc, char **argv) { @@ -1761,6 +1804,7 @@ main(int argc, char **argv) CU_ADD_TEST(suite, compress_algorithm); CU_ADD_TEST(suite, test_prepare_compress_chunk); CU_ADD_TEST(suite, test_reduce_decompress_chunk); + CU_ADD_TEST(suite, test_allocate_vol_requests); g_unlink_path = g_path; g_unlink_callback = unlink_cb;