From c4217183f3cadc85ecf5260220970d465b3bc3d4 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 03:32:24 -0500 Subject: [PATCH 01/25] Add io_uring interface based on liburing. --- examples/CMakeLists.txt | 2 +- src/environment/file_linux.cc | 96 ++++++++++++++++++++++++ src/environment/file_linux.h | 137 ++++++++++++++++++++++++++++++++++ test/CMakeLists.txt | 8 +- test/checkpoint_uring_test.cc | 22 ++++++ test/ingest_uring_test.cc | 22 ++++++ third_party/hopscotch-map | 2 +- 7 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 test/checkpoint_uring_test.cc create mode 100644 test/ingest_uring_test.cc diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 0aa6d93..04383a1 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,7 +1,7 @@ if(WIN32) set (FISHSTORE_BENCHMARK_LINK_LIBS fishstore rpcrt4 wsock32 Ws2_32) else() - set (FISHSTORE_BENCHMARK_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl) + set (FISHSTORE_BENCHMARK_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl uring) endif() #Function to automate building benchmark binaries diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index edb32b3..6a6bd62 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -194,6 +194,102 @@ Status QueueFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu return Status::Ok; } +bool UringIoHandler::TryComplete() { + struct io_uring_cqe* cqe = nullptr; + cq_lock_.Acquire(); + int res = io_uring_peek_cqe(ring_, &cqe); + if(res == 0 && cqe) { + int io_res = cqe->res; + auto *context = reinterpret_cast(io_uring_cqe_get_data(cqe)); + cq_lock_.Release(); + io_uring_cqe_seen(ring_, cqe); + Status return_status; + size_t byte_transferred; + if (io_res < 0) { + return_status = Status::IOError; + byte_transferred = 0; + } else { + return_status = Status::Ok; + byte_transferred = io_res; + } + context->callback(context->caller_context, return_status, byte_transferred); + lss_allocator.Free(context); + return true; + } else { + cq_lock_.Release(); + return false; + } +} + +Status UringFile::Open(FileCreateDisposition create_disposition, const FileOptions& options, + UringIoHandler* handler, bool* exists) { + int flags = 0; + if(options.unbuffered) { + flags |= O_DIRECT; + } + RETURN_NOT_OK(File::Open(flags, create_disposition, exists)); + if(exists && !*exists) { + return Status::Ok; + } + + ring_ = handler->io_uring(); + sq_lock_ = handler->sq_lock(); + return Status::Ok; +} + +Status UringFile::Read(size_t offset, uint32_t length, uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) const { + DCHECK_ALIGNMENT(offset, length, buffer); +#ifdef IO_STATISTICS + ++read_count_; + bytes_read_ += length; +#endif + return const_cast(this)->ScheduleOperation(FileOperationType::Read, buffer, + offset, length, context, callback); +} + +Status UringFile::Write(size_t offset, uint32_t length, const uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) { + DCHECK_ALIGNMENT(offset, length, buffer); +#ifdef IO_STATISTICS + bytes_written_ += length; +#endif + return ScheduleOperation(FileOperationType::Write, const_cast(buffer), offset, length, + context, callback); +} + +Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* buffer, + size_t offset, uint32_t length, IAsyncContext& context, + AsyncIOCallback callback) { + auto io_context = alloc_context(sizeof(UringIoHandler::IoCallbackContext)); + if (!io_context.get()) return Status::OutOfMemory; + + IAsyncContext* caller_context_copy; + RETURN_NOT_OK(context.DeepCopy(caller_context_copy)); + + new(io_context.get()) UringIoHandler::IoCallbackContext(caller_context_copy, callback); + + sq_lock_->Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + + if (operationType == FileOperationType::Read) { + io_uring_prep_read(sqe, fd_, buffer, length, offset); + } else { + io_uring_prep_write(sqe, fd_, buffer, length, offset); + } + io_uring_sqe_set_data(sqe, io_context.get()); + + int res = io_uring_submit(ring_); + sq_lock_->Release(); + if (res != 1) { + return Status::IOError; + } + + io_context.release(); + return Status::Ok; +} + #undef DCHECK_ALIGNMENT } diff --git a/src/environment/file_linux.h b/src/environment/file_linux.h index ffb6656..243cdf5 100644 --- a/src/environment/file_linux.h +++ b/src/environment/file_linux.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,29 @@ namespace fishstore { namespace environment { +class alignas(64) SpinLock { +public: + SpinLock(): locked_(false) {} + + void Acquire() noexcept { + for (;;) { + if (!locked_.exchange(true, std::memory_order_acquire)) { + return; + } + + while (locked_.load(std::memory_order_relaxed)) { + __builtin_ia32_pause(); + } + } + } + + void Release() noexcept { + locked_.store(false, std::memory_order_release); + } +private: + std::atomic_bool locked_; +}; + constexpr const char* kPathSeparator = "/"; /// The File class encapsulates the OS file handle. @@ -250,5 +274,118 @@ class QueueFile : public File { io_context_t io_object_; }; + +class UringFile; + +/// The QueueIoHandler class encapsulates completions for async file I/O, where the completions +/// are put on the AIO completion queue. +class UringIoHandler { + public: + typedef UringFile async_file_t; + + private: + constexpr static int kMaxEvents = 128; + + public: + UringIoHandler() { + ring_ = new struct io_uring(); + int ret = io_uring_queue_init(kMaxEvents, ring_, 0); + assert(ret == 0); + } + + UringIoHandler(size_t max_threads) { + ring_ = new struct io_uring(); + int ret = io_uring_queue_init(kMaxEvents, ring_, 0); + assert(ret == 0); + } + + /// Move constructor + UringIoHandler(UringIoHandler&& other) { + ring_ = other.ring_; + other.ring_ = 0; + } + + ~UringIoHandler() { + if (ring_ != 0) { + io_uring_queue_exit(ring_); + delete ring_; + } + } + + /* + /// Invoked whenever a Linux AIO completes. + static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2); + */ + struct IoCallbackContext { + IoCallbackContext(IAsyncContext* context_, AsyncIOCallback callback_) + : caller_context{ context_ } + , callback{ callback_ } {} + + /// Caller callback context. + IAsyncContext* caller_context; + + /// The caller's asynchronous callback function + AsyncIOCallback callback; + }; + + inline struct io_uring* io_uring() const { + return ring_; + } + + inline SpinLock* sq_lock() { + return &sq_lock_; + } + + /// Try to execute the next IO completion on the queue, if any. + bool TryComplete(); + + private: + /// The io_uring for all the I/Os + struct io_uring* ring_; + SpinLock sq_lock_, cq_lock_; +}; + +/// The UringFile class encapsulates asynchronous reads and writes, using the specified +/// io_uring +class UringFile : public File { + public: + UringFile() + : File() + , ring_{ nullptr } { + } + UringFile(const std::string& filename) + : File(filename) + , ring_{ nullptr } { + } + /// Move constructor + UringFile(UringFile&& other) + : File(std::move(other)) + , ring_{ other.ring_ } + , sq_lock_{ other.sq_lock_ } { + } + /// Move assignment operator. + UringFile& operator=(UringFile&& other) { + File::operator=(std::move(other)); + ring_ = other.ring_; + sq_lock_ = other.sq_lock_; + return *this; + } + + Status Open(FileCreateDisposition create_disposition, const FileOptions& options, + UringIoHandler* handler, bool* exists = nullptr); + + Status Read(size_t offset, uint32_t length, uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) const; + Status Write(size_t offset, uint32_t length, const uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback); + + private: + Status ScheduleOperation(FileOperationType operationType, uint8_t* buffer, size_t offset, + uint32_t length, IAsyncContext& context, AsyncIOCallback callback); + + struct io_uring* ring_; + SpinLock* sq_lock_; +}; + } } // namespace FASTER::environment diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 03ffcfb..4e70b60 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,7 +1,7 @@ if(WIN32) set (FISHSTORE_TEST_LINK_LIBS fishstore rpcrt4 gtest) else() - set (FISHSTORE_TEST_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl gtest) + set (FISHSTORE_TEST_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl gtest uring) endif() if(MSVC) @@ -22,11 +22,17 @@ ENDFUNCTION() ADD_FISHSTORE_TEST(in_memory_test "") ADD_FISHSTORE_TEST(ingest_queue_test "ingest_test.h") +if (NOT MSVC) +ADD_FISHSTORE_TEST(ingest_uring_test "ingest_test.h") +endif() if (MSVC) ADD_FISHSTORE_TEST(ingest_threadpool_test "ingest_test.h") endif() ADD_FISHSTORE_TEST(register_test "") ADD_FISHSTORE_TEST(checkpoint_queue_test "checkpoint_test.h") +if (NOT MSVC) +ADD_FISHSTORE_TEST(checkpoint_uring_test "checkpoint_test.h") +endif() if (MSVC) ADD_FISHSTORE_TEST(checkpoint_threadpool_test "checkpoint_test.h") endif() diff --git a/test/checkpoint_uring_test.cc b/test/checkpoint_uring_test.cc new file mode 100644 index 0000000..20aa823 --- /dev/null +++ b/test/checkpoint_uring_test.cc @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include +#include +#include +#include +#include "gtest/gtest.h" + +#include "adapters/simdjson_adapter.h" +#include + +using handler_t = fishstore::environment::UringIoHandler; + +#define CLASS CheckpointTest_Uring +#include "checkpoint_test.h" +#undef CLASS + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/ingest_uring_test.cc b/test/ingest_uring_test.cc new file mode 100644 index 0000000..430061b --- /dev/null +++ b/test/ingest_uring_test.cc @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include +#include +#include +#include +#include "gtest/gtest.h" + +#include "adapters/simdjson_adapter.h" +#include + +using handler_t = fishstore::environment::UringIoHandler; + +#define CLASS IngestTest_Uring +#include "ingest_test.h" +#undef CLASS + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/third_party/hopscotch-map b/third_party/hopscotch-map index 7ef9cc4..8483747 160000 --- a/third_party/hopscotch-map +++ b/third_party/hopscotch-map @@ -1 +1 @@ -Subproject commit 7ef9cc4aca326cbe725553ca330d74076f936ff2 +Subproject commit 848374746a50b3ebebe656611d554cb134e9aeef From 6c844ac67d66ab209020d0de7e7e4458649f3c9a Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 03:39:00 -0500 Subject: [PATCH 02/25] update yml for azure pipelines. --- azure-pipelines.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d9e5bd6..e75fe11 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -33,7 +33,7 @@ jobs: - job: 'cppLinux' pool: - vmImage: ubuntu-16.04 + vmImage: ubuntu-20.04 displayName: 'C++ (Linux)' steps: From d01cdce0c8cf2b644e94d4733c9accaa9a213943 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 03:49:05 -0500 Subject: [PATCH 03/25] Create cmake.yml First github action yml. --- .github/workflows/cmake.yml | 53 +++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 .github/workflows/cmake.yml diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml new file mode 100644 index 0000000..feab7e9 --- /dev/null +++ b/.github/workflows/cmake.yml @@ -0,0 +1,53 @@ +name: CMake + +on: [push] + +env: + # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) + BUILD_TYPE: Release + +jobs: + build: + # The CMake configure and build commands are platform agnostic and should work equally + # well on Windows or Mac. You can convert this to a matrix build if you need + # cross-platform coverage. + # See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix + runs-on: ubuntu-20.04 + + steps: + - name: Checkout reposistory + uses: actions/checkout@master + + - name: Checkout submodules + run: git submodule update --init --recursive + + - name: Install dependencies + run: sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev liburing-dev + + - name: Create Build Environment + # Some projects don't allow in-source building, so create a separate build directory + # We'll use this as our working directory for all subsequent commands + run: cmake -E make_directory ${{runner.workspace}}/build + + - name: Configure CMake + # Use a bash shell so we can use the same syntax for environment variable + # access regardless of the host operating system + shell: bash + working-directory: ${{runner.workspace}}/build + # Note the current convention is to use the -S and -B options here to specify source + # and build directories, but this is only available with CMake 3.13 and higher. + # The CMake binaries on the Github Actions machines are (as of this writing) 3.12 + run: cmake $GITHUB_WORKSPACE -DCMAKE_BUILD_TYPE=$BUILD_TYPE + + - name: Build + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute the build. You can specify a specific target with "--target " + run: cmake --build . --config $BUILD_TYPE + + - name: Test + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute tests defined by the CMake configuration. + # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail + run: ctest -j 1 --interactive-debug-mode 0 --output-on-failure -R .*_test From fc48a063949a8ec52966c843e70f5ea5490261d8 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 03:59:17 -0500 Subject: [PATCH 04/25] Update cmake.yml --- .github/workflows/cmake.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index feab7e9..225617b 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -22,7 +22,14 @@ jobs: run: git submodule update --init --recursive - name: Install dependencies - run: sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev liburing-dev + run: sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev + + - name: Install liburing + run: | + git clone https://github.com/axboe/liburing.git + cd liburing + ./colnfigure + sudo make install - name: Create Build Environment # Some projects don't allow in-source building, so create a separate build directory From 81823dbd629703fcefb47a0be212bde278b494dc Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 04:00:35 -0500 Subject: [PATCH 05/25] Update cmake.yml --- .github/workflows/cmake.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 225617b..cd61414 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -28,7 +28,7 @@ jobs: run: | git clone https://github.com/axboe/liburing.git cd liburing - ./colnfigure + ./configure sudo make install - name: Create Build Environment From 1e2e1ae806a5cc33e5a77634931a944cf2c601b2 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 05:22:55 -0500 Subject: [PATCH 06/25] change to vec interface. --- src/environment/file_linux.cc | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index 6a6bd62..32356bf 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "file_linux.h" namespace fishstore { @@ -273,10 +274,15 @@ Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); assert(sqe != 0); + struct iovec vec[1]; + vec[0].iov_base = buffer; + vec[0].iov_len = length; if (operationType == FileOperationType::Read) { - io_uring_prep_read(sqe, fd_, buffer, length, offset); + io_uring_prep_readv(sqe, fd_, vec, 1, offset); + //io_uring_prep_read(sqe, fd_, buffer, length, offset); } else { - io_uring_prep_write(sqe, fd_, buffer, length, offset); + io_uring_prep_writev(sqe, fd_, vec, 1, offset); + //io_uring_prep_write(sqe, fd_, buffer, length, offset); } io_uring_sqe_set_data(sqe, io_context.get()); From 4c1d946f70a46db1777e7209811fc1d5e0736101 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 05:44:28 -0500 Subject: [PATCH 07/25] retry when have EINTR or EAGAIN. --- src/environment/file_linux.cc | 29 ++++++++++++++++++++++++----- src/environment/file_linux.h | 14 ++++++++++++-- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index 32356bf..57957ce 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -207,6 +207,23 @@ bool UringIoHandler::TryComplete() { Status return_status; size_t byte_transferred; if (io_res < 0) { + if (io_res == -EINTR || io_res == -EAGAIN) { + //Retry... + sq_lock_.Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + if (context->is_read_) { + io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_); + } else { + io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_); + } + io_uring_sqe_set_data(sqe, context); + + int retry_res = io_uring_submit(ring_); + assert(retry_res == 1); + sq_lock_.Release(); + return false; + } return_status = Status::IOError; byte_transferred = 0; } else { @@ -215,6 +232,7 @@ bool UringIoHandler::TryComplete() { } context->callback(context->caller_context, return_status, byte_transferred); lss_allocator.Free(context); + //lss_allocator.Free(context->caller_context); return true; } else { cq_lock_.Release(); @@ -268,16 +286,17 @@ Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu IAsyncContext* caller_context_copy; RETURN_NOT_OK(context.DeepCopy(caller_context_copy)); - new(io_context.get()) UringIoHandler::IoCallbackContext(caller_context_copy, callback); + struct iovec vec[1]; + vec[0].iov_base = buffer; + vec[0].iov_len = length; + bool is_read = operationType == FileOperationType::Read; + new(io_context.get()) UringIoHandler::IoCallbackContext(is_read, fd_, vec[0], offset, caller_context_copy, callback); sq_lock_->Acquire(); struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); assert(sqe != 0); - struct iovec vec[1]; - vec[0].iov_base = buffer; - vec[0].iov_len = length; - if (operationType == FileOperationType::Read) { + if (is_read) { io_uring_prep_readv(sqe, fd_, vec, 1, offset); //io_uring_prep_read(sqe, fd_, buffer, length, offset); } else { diff --git a/src/environment/file_linux.h b/src/environment/file_linux.h index 243cdf5..d6854b9 100644 --- a/src/environment/file_linux.h +++ b/src/environment/file_linux.h @@ -317,10 +317,20 @@ class UringIoHandler { static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2); */ struct IoCallbackContext { - IoCallbackContext(IAsyncContext* context_, AsyncIOCallback callback_) - : caller_context{ context_ } + IoCallbackContext(bool is_read, int fd, struct iovec vec, size_t offset, IAsyncContext* context_, AsyncIOCallback callback_) + : is_read_(is_read) + , fd_(fd) + , vec_(vec) + , offset_(offset) + , caller_context{ context_ } , callback{ callback_ } {} + bool is_read_; + + int fd_; + struct iovec vec_; + size_t offset_; + /// Caller callback context. IAsyncContext* caller_context; From 4e14c5a9cbef6b6279bf47b0a40751851d89b345 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 04:05:44 -0700 Subject: [PATCH 08/25] Seems fixed it. But very bad fix.... --- src/environment/file_linux.cc | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index 57957ce..c8c7901 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -202,37 +202,30 @@ bool UringIoHandler::TryComplete() { if(res == 0 && cqe) { int io_res = cqe->res; auto *context = reinterpret_cast(io_uring_cqe_get_data(cqe)); - cq_lock_.Release(); io_uring_cqe_seen(ring_, cqe); + cq_lock_.Release(); Status return_status; size_t byte_transferred; if (io_res < 0) { - if (io_res == -EINTR || io_res == -EAGAIN) { - //Retry... - sq_lock_.Acquire(); - struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); - assert(sqe != 0); - if (context->is_read_) { - io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_); - } else { - io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_); - } - io_uring_sqe_set_data(sqe, context); - - int retry_res = io_uring_submit(ring_); - assert(retry_res == 1); - sq_lock_.Release(); - return false; + sq_lock_.Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + if (context->is_read_) { + io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_); + } else { + io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_); } - return_status = Status::IOError; - byte_transferred = 0; + io_uring_sqe_set_data(sqe, context); + int retry_res = io_uring_submit(ring_); + assert(retry_res == 1); + sq_lock_.Release(); + return false; } else { return_status = Status::Ok; byte_transferred = io_res; } context->callback(context->caller_context, return_status, byte_transferred); lss_allocator.Free(context); - //lss_allocator.Free(context->caller_context); return true; } else { cq_lock_.Release(); From 096d8e75726010479aba153ee4200972092febb9 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 04:13:04 -0700 Subject: [PATCH 09/25] Try getting rid of retry.... --- src/environment/file_linux.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index c8c7901..6f73c52 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -207,6 +207,7 @@ bool UringIoHandler::TryComplete() { Status return_status; size_t byte_transferred; if (io_res < 0) { + /* sq_lock_.Acquire(); struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); assert(sqe != 0); @@ -220,6 +221,9 @@ bool UringIoHandler::TryComplete() { assert(retry_res == 1); sq_lock_.Release(); return false; + */ + return_status = Status::IOError; + byte_transferred = 0; } else { return_status = Status::Ok; byte_transferred = io_res; From b1d3b9d8ea5c1caf01d2248e2e6164fadb798001 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 04:30:43 -0700 Subject: [PATCH 10/25] Revert "Try getting rid of retry...." This reverts commit 096d8e75726010479aba153ee4200972092febb9. --- src/environment/file_linux.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index 6f73c52..c8c7901 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -207,7 +207,6 @@ bool UringIoHandler::TryComplete() { Status return_status; size_t byte_transferred; if (io_res < 0) { - /* sq_lock_.Acquire(); struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); assert(sqe != 0); @@ -221,9 +220,6 @@ bool UringIoHandler::TryComplete() { assert(retry_res == 1); sq_lock_.Release(); return false; - */ - return_status = Status::IOError; - byte_transferred = 0; } else { return_status = Status::Ok; byte_transferred = io_res; From cd36b330328e34d66b67af9711663af8c1ec890f Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 04:34:35 -0700 Subject: [PATCH 11/25] get rid of one copy. --- src/environment/file_linux.cc | 10 ++++------ src/environment/file_linux.h | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index c8c7901..b59f846 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -207,6 +207,7 @@ bool UringIoHandler::TryComplete() { Status return_status; size_t byte_transferred; if (io_res < 0) { + // Retry if it is failed..... sq_lock_.Acquire(); struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); assert(sqe != 0); @@ -279,21 +280,18 @@ Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu IAsyncContext* caller_context_copy; RETURN_NOT_OK(context.DeepCopy(caller_context_copy)); - struct iovec vec[1]; - vec[0].iov_base = buffer; - vec[0].iov_len = length; bool is_read = operationType == FileOperationType::Read; - new(io_context.get()) UringIoHandler::IoCallbackContext(is_read, fd_, vec[0], offset, caller_context_copy, callback); + new(io_context.get()) UringIoHandler::IoCallbackContext(is_read, fd_, buffer, length, offset, caller_context_copy, callback); sq_lock_->Acquire(); struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); assert(sqe != 0); if (is_read) { - io_uring_prep_readv(sqe, fd_, vec, 1, offset); + io_uring_prep_readv(sqe, fd_, &io_context->vec_, 1, offset); //io_uring_prep_read(sqe, fd_, buffer, length, offset); } else { - io_uring_prep_writev(sqe, fd_, vec, 1, offset); + io_uring_prep_writev(sqe, fd_, &io_context->vec_, 1, offset); //io_uring_prep_write(sqe, fd_, buffer, length, offset); } io_uring_sqe_set_data(sqe, io_context.get()); diff --git a/src/environment/file_linux.h b/src/environment/file_linux.h index d6854b9..4869f8c 100644 --- a/src/environment/file_linux.h +++ b/src/environment/file_linux.h @@ -317,10 +317,10 @@ class UringIoHandler { static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2); */ struct IoCallbackContext { - IoCallbackContext(bool is_read, int fd, struct iovec vec, size_t offset, IAsyncContext* context_, AsyncIOCallback callback_) + IoCallbackContext(bool is_read, int fd, uint8_t* buffer, size_t length, size_t offset, IAsyncContext* context_, AsyncIOCallback callback_) : is_read_(is_read) , fd_(fd) - , vec_(vec) + , vec_{buffer, length} , offset_(offset) , caller_context{ context_ } , callback{ callback_ } {} From 6c28ad27a841643c3c261f83007bfdfabc486d03 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 04:46:53 -0700 Subject: [PATCH 12/25] Change yml to main liburing repo and a release tag. --- .github/workflows/cmake.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index cd61414..3a65397 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -26,8 +26,9 @@ jobs: - name: Install liburing run: | - git clone https://github.com/axboe/liburing.git + git clone https://git.kernel.dk/liburing cd liburing + git checkout liburing-0.7 ./configure sudo make install From 1a66ab4f82810cf644aedff635e34924543c4ff0 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:01:51 -0500 Subject: [PATCH 13/25] Create msbuild.yml --- .github/workflows/msbuild.yml | 61 +++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 .github/workflows/msbuild.yml diff --git a/.github/workflows/msbuild.yml b/.github/workflows/msbuild.yml new file mode 100644 index 0000000..41d2ec0 --- /dev/null +++ b/.github/workflows/msbuild.yml @@ -0,0 +1,61 @@ +name: MSBuild + +on: [push] + +env: + # Path to the solution file relative to the root of the project. + SOLUTION_FILE_PATH: build/ + + # Configuration type to build. + # You can convert this to a build matrix if you need coverage of multiple configuration types. + # https://docs.github.com/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix + BUILD_CONFIGURATION: Release + +jobs: + build: + runs-on: windows-latest + + steps: + - name: Checkout reposistory + uses: actions/checkout@master + + - name: Checkout submodules + run: git submodule update --init --recursive + + - name: Add msbuild to PATH + uses: microsoft/setup-msbuild@v1.0.2 + with: + vs-version: '[16.4,16.5)' + + - name: install-cmake + uses: Symbitic/install-cmake@v0.1.0 + with: + platform: win + + - name: Create Build Environment + # Some projects don't allow in-source building, so create a separate build directory + # We'll use this as our working directory for all subsequent commands + run: cmake -E make_directory ${{runner.workspace}}/build + + - name: Configure CMake + # Use a bash shell so we can use the same syntax for environment variable + # access regardless of the host operating system + shell: bash + working-directory: ${{runner.workspace}}/build + # Note the current convention is to use the -S and -B options here to specify source + # and build directories, but this is only available with CMake 3.13 and higher. + # The CMake binaries on the Github Actions machines are (as of this writing) 3.12 + run: cmake $GITHUB_WORKSPACE -G "Visual Studio 16 2019" + + - name: Build + working-directory: ${{env.GITHUB_WORKSPACE}}/build + # Add additional options to the MSBuild command line here (like platform or verbosity level). + # See https://docs.microsoft.com/visualstudio/msbuild/msbuild-command-line-reference + run: msbuild /m /p:Configuration=${{env.BUILD_CONFIGURATION}} ${{env.SOLUTION_FILE_PATH}} + + - name: Test + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute tests defined by the CMake configuration. + # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail + run: ctest -j 1 --interactive-debug-mode 0 --output-on-failure -C ${{env.BUILD_CONFIGURATION}} -R .*_test From c73ec5bc5e5136313f661002915cae0c2ef74f0c Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:05:12 -0500 Subject: [PATCH 14/25] Update msbuild.yml --- .github/workflows/msbuild.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/msbuild.yml b/.github/workflows/msbuild.yml index 41d2ec0..45f8be8 100644 --- a/.github/workflows/msbuild.yml +++ b/.github/workflows/msbuild.yml @@ -21,16 +21,16 @@ jobs: - name: Checkout submodules run: git submodule update --init --recursive - - - name: Add msbuild to PATH - uses: microsoft/setup-msbuild@v1.0.2 - with: - vs-version: '[16.4,16.5)' - + - name: install-cmake uses: Symbitic/install-cmake@v0.1.0 with: platform: win + + - name: Add msbuild to PATH + uses: microsoft/setup-msbuild@v1.0.2 + #with: + # vs-version: '[16.4,16.5)' - name: Create Build Environment # Some projects don't allow in-source building, so create a separate build directory From 45def2e148e0307be6cc119d905a58671565fd85 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:10:35 -0500 Subject: [PATCH 15/25] Update msbuild.yml --- .github/workflows/msbuild.yml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/workflows/msbuild.yml b/.github/workflows/msbuild.yml index 45f8be8..d7dc563 100644 --- a/.github/workflows/msbuild.yml +++ b/.github/workflows/msbuild.yml @@ -21,17 +21,15 @@ jobs: - name: Checkout submodules run: git submodule update --init --recursive - - - name: install-cmake - uses: Symbitic/install-cmake@v0.1.0 - with: - platform: win - + - name: Add msbuild to PATH uses: microsoft/setup-msbuild@v1.0.2 #with: # vs-version: '[16.4,16.5)' - + + - name: Get latest CMake and ninja + uses: lukka/get-cmake@latest + - name: Create Build Environment # Some projects don't allow in-source building, so create a separate build directory # We'll use this as our working directory for all subsequent commands From 6d88f601ae168758cd41a197b74a3473c5cb6e08 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:13:39 -0500 Subject: [PATCH 16/25] Update msbuild.yml --- .github/workflows/msbuild.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/msbuild.yml b/.github/workflows/msbuild.yml index d7dc563..1aa43c7 100644 --- a/.github/workflows/msbuild.yml +++ b/.github/workflows/msbuild.yml @@ -4,7 +4,7 @@ on: [push] env: # Path to the solution file relative to the root of the project. - SOLUTION_FILE_PATH: build/ + SOLUTION_FILE_PATH: . # Configuration type to build. # You can convert this to a build matrix if you need coverage of multiple configuration types. From 586ec91c9302a09dd1efd6f484b617307b2a7ca1 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:16:50 -0500 Subject: [PATCH 17/25] Update msbuild.yml --- .github/workflows/msbuild.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/msbuild.yml b/.github/workflows/msbuild.yml index 1aa43c7..8911684 100644 --- a/.github/workflows/msbuild.yml +++ b/.github/workflows/msbuild.yml @@ -49,7 +49,7 @@ jobs: working-directory: ${{env.GITHUB_WORKSPACE}}/build # Add additional options to the MSBuild command line here (like platform or verbosity level). # See https://docs.microsoft.com/visualstudio/msbuild/msbuild-command-line-reference - run: msbuild /m /p:Configuration=${{env.BUILD_CONFIGURATION}} ${{env.SOLUTION_FILE_PATH}} + run: msbuild /m /p:Configuration=${{env.BUILD_CONFIGURATION}} FishStore.sln - name: Test working-directory: ${{runner.workspace}}/build From be98d511a60829efc3e2370626075a17c7e4d236 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:19:54 -0500 Subject: [PATCH 18/25] Update msbuild.yml --- .github/workflows/msbuild.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/msbuild.yml b/.github/workflows/msbuild.yml index 8911684..20fe700 100644 --- a/.github/workflows/msbuild.yml +++ b/.github/workflows/msbuild.yml @@ -46,7 +46,7 @@ jobs: run: cmake $GITHUB_WORKSPACE -G "Visual Studio 16 2019" - name: Build - working-directory: ${{env.GITHUB_WORKSPACE}}/build + working-directory: ${{runner.workspace}}/build # Add additional options to the MSBuild command line here (like platform or verbosity level). # See https://docs.microsoft.com/visualstudio/msbuild/msbuild-command-line-reference run: msbuild /m /p:Configuration=${{env.BUILD_CONFIGURATION}} FishStore.sln From 88a8b44d7830762ae7167dfc65cf0a254df99b43 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Mon, 7 Dec 2020 07:26:25 -0500 Subject: [PATCH 19/25] remove experimental for filesystem. --- .../checkpoint_recovery.cc | 4 ++-- examples/github_query-dir/github_query.cc | 4 ++-- examples/online_demo-dir/online_demo.cc | 4 ++-- .../online_demo_disk-dir/online_demo_disk.cc | 4 ++-- examples/twitter_query-dir/twitter_query.cc | 4 ++-- src/core/fishstore.h | 8 ++++---- src/device/file_system_disk.h | 18 +++++++++--------- src/environment/file_linux.cc | 17 ----------------- test/checkpoint_queue_test.cc | 2 +- test/checkpoint_test.h | 4 ++-- test/checkpoint_threadpool_test.cc | 2 +- test/checkpoint_uring_test.cc | 2 +- test/ingest_queue_test.cc | 2 +- test/ingest_test.h | 12 ++++++------ test/ingest_threadpool_test.cc | 2 +- test/ingest_uring_test.cc | 2 +- test/register_test.cc | 10 +++++----- 17 files changed, 42 insertions(+), 59 deletions(-) diff --git a/examples/checkpoint_recovery-dir/checkpoint_recovery.cc b/examples/checkpoint_recovery-dir/checkpoint_recovery.cc index 4c23d87..a25d37f 100644 --- a/examples/checkpoint_recovery-dir/checkpoint_recovery.cc +++ b/examples/checkpoint_recovery-dir/checkpoint_recovery.cc @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -209,7 +209,7 @@ int main(int argc, char* argv[]) { "Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[5]); + std::filesystem::create_directory(argv[5]); size_t store_size = 1LL << atoi(argv[4]); size_t hash_table_size = 1LL << 24; { diff --git a/examples/github_query-dir/github_query.cc b/examples/github_query-dir/github_query.cc index ae8033b..8fbe0ed 100644 --- a/examples/github_query-dir/github_query.cc +++ b/examples/github_query-dir/github_query.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "adapters/simdjson_adapter.h" @@ -198,7 +198,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[5]); + std::filesystem::create_directory(argv[5]); size_t store_size = 1LL << atoi(argv[4]); store_t store{ (1L << 24), store_size, argv[5] }; diff --git a/examples/online_demo-dir/online_demo.cc b/examples/online_demo-dir/online_demo.cc index a3998aa..421ea9c 100644 --- a/examples/online_demo-dir/online_demo.cc +++ b/examples/online_demo-dir/online_demo.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #define _NULL_DISK @@ -207,7 +207,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[4]); + std::filesystem::create_directory(argv[4]); size_t store_size = 1LL << atoi(argv[3]); store_t store{ (1L << 24), store_size, argv[4] }; diff --git a/examples/online_demo_disk-dir/online_demo_disk.cc b/examples/online_demo_disk-dir/online_demo_disk.cc index ba4f801..5e24d37 100644 --- a/examples/online_demo_disk-dir/online_demo_disk.cc +++ b/examples/online_demo_disk-dir/online_demo_disk.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "adapters/simdjson_adapter.h" @@ -204,7 +204,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[4]); + std::filesystem::create_directory(argv[4]); size_t store_size = 1LL << atoi(argv[3]); store_t store{ (1L << 24), store_size, argv[4] }; diff --git a/examples/twitter_query-dir/twitter_query.cc b/examples/twitter_query-dir/twitter_query.cc index 56f4956..3d02c6c 100644 --- a/examples/twitter_query-dir/twitter_query.cc +++ b/examples/twitter_query-dir/twitter_query.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "adapters/simdjson_adapter.h" @@ -198,7 +198,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[5]); + std::filesystem::create_directory(argv[5]); size_t store_size = 1LL << atoi(argv[4]); store_t store{ (1L << 24), store_size, argv[5] }; diff --git a/src/core/fishstore.h b/src/core/fishstore.h index 5984c80..40cd81a 100644 --- a/src/core/fishstore.h +++ b/src/core/fishstore.h @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include @@ -130,7 +130,7 @@ struct ParserContext { }; struct LibraryHandle { - std::experimental::filesystem::path path; + std::filesystem::path path; #ifdef _WIN32 HMODULE handle; #else @@ -3570,7 +3570,7 @@ Status FishStore::Recover(const Guid& index_token, const Guid& hybrid_log_ std::string path; naming_file >> path; LibraryHandle lib; - lib.path = std::experimental::filesystem::absolute(path); + lib.path = std::filesystem::absolute(path); #ifdef _WIN32 lib.handle = LoadLibrary(lib.path.string().c_str()); #else @@ -3763,7 +3763,7 @@ template size_t FishStore::LoadPSFLibrary(const std::string& lib_path) { std::lock_guard lk(mutex); LibraryHandle lib; - lib.path = std::experimental::filesystem::absolute(lib_path); + lib.path = std::filesystem::absolute(lib_path); #ifdef _WIN32 lib.handle = LoadLibrary(lib.path.string().c_str()); #else diff --git a/src/device/file_system_disk.h b/src/device/file_system_disk.h index 1f8f6b1..d817c1d 100644 --- a/src/device/file_system_disk.h +++ b/src/device/file_system_disk.h @@ -4,7 +4,7 @@ #pragma once #include -#include +#include #include #include @@ -491,24 +491,24 @@ class FileSystemDisk { void CreateIndexCheckpointDirectory(const Guid& token) { std::string index_dir = index_checkpoint_path(token); - std::experimental::filesystem::path path{ index_dir }; + std::filesystem::path path{ index_dir }; try { - std::experimental::filesystem::remove_all(path); - } catch(std::experimental::filesystem::filesystem_error&) { + std::filesystem::remove_all(path); + } catch(std::filesystem::filesystem_error&) { // Ignore; throws when path doesn't exist yet. } - std::experimental::filesystem::create_directories(path); + std::filesystem::create_directories(path); } void CreateCprCheckpointDirectory(const Guid& token) { std::string cpr_dir = cpr_checkpoint_path(token); - std::experimental::filesystem::path path{ cpr_dir }; + std::filesystem::path path{ cpr_dir }; try { - std::experimental::filesystem::remove_all(path); - } catch(std::experimental::filesystem::filesystem_error&) { + std::filesystem::remove_all(path); + } catch(std::filesystem::filesystem_error&) { // Ignore; throws when path doesn't exist yet. } - std::experimental::filesystem::create_directories(path); + std::filesystem::create_directories(path); } file_t NewFile(const std::string& relative_path) { diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index 57957ce..07de852 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -207,23 +207,6 @@ bool UringIoHandler::TryComplete() { Status return_status; size_t byte_transferred; if (io_res < 0) { - if (io_res == -EINTR || io_res == -EAGAIN) { - //Retry... - sq_lock_.Acquire(); - struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); - assert(sqe != 0); - if (context->is_read_) { - io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_); - } else { - io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_); - } - io_uring_sqe_set_data(sqe, context); - - int retry_res = io_uring_submit(ring_); - assert(retry_res == 1); - sq_lock_.Release(); - return false; - } return_status = Status::IOError; byte_transferred = 0; } else { diff --git a/test/checkpoint_queue_test.cc b/test/checkpoint_queue_test.cc index b102d99..1ed88fa 100644 --- a/test/checkpoint_queue_test.cc +++ b/test/checkpoint_queue_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/checkpoint_test.h b/test/checkpoint_test.h index cfe9384..2f2b869 100644 --- a/test/checkpoint_test.h +++ b/test/checkpoint_test.h @@ -158,8 +158,8 @@ class JsonFullScanContext : public IAsyncContext { }; TEST(CLASS, Checkpoint_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); std::vector guids(n_threads); { diff --git a/test/checkpoint_threadpool_test.cc b/test/checkpoint_threadpool_test.cc index 413f55a..7d6af4c 100644 --- a/test/checkpoint_threadpool_test.cc +++ b/test/checkpoint_threadpool_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/checkpoint_uring_test.cc b/test/checkpoint_uring_test.cc index 20aa823..034b1e9 100644 --- a/test/checkpoint_uring_test.cc +++ b/test/checkpoint_uring_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/ingest_queue_test.cc b/test/ingest_queue_test.cc index d4b0a5e..9f8ed4b 100644 --- a/test/ingest_queue_test.cc +++ b/test/ingest_queue_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/ingest_test.h b/test/ingest_test.h index 55d8a3d..902fc27 100644 --- a/test/ingest_test.h +++ b/test/ingest_test.h @@ -157,8 +157,8 @@ class JsonFullScanContext : public IAsyncContext { }; TEST(CLASS, Ingest_Serial) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto id_proj = store.MakeProjection("/id"); @@ -212,8 +212,8 @@ TEST(CLASS, Ingest_Serial) { } TEST(CLASS, Ingest_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); { store_t store{ 8192, 201326592, "test" }; store.StartSession(); @@ -282,8 +282,8 @@ TEST(CLASS, Ingest_Concurrent) { } TEST(CLASS, FullScan) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto id_proj = store.MakeProjection("/id"); diff --git a/test/ingest_threadpool_test.cc b/test/ingest_threadpool_test.cc index 427a8a2..9fc26dd 100644 --- a/test/ingest_threadpool_test.cc +++ b/test/ingest_threadpool_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/ingest_uring_test.cc b/test/ingest_uring_test.cc index 430061b..dfc9842 100644 --- a/test/ingest_uring_test.cc +++ b/test/ingest_uring_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/register_test.cc b/test/register_test.cc index 3e8af94..f81bb10 100644 --- a/test/register_test.cc +++ b/test/register_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" @@ -171,8 +171,8 @@ class JsonFullScanContext : public IAsyncContext { }; TEST(Registration, Register_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto school_id_proj = store.MakeProjection("/school/id"); @@ -239,8 +239,8 @@ TEST(Registration, Register_Concurrent) { } TEST(Registration, Deregister_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto school_id_proj = store.MakeProjection("/school/id"); From f5b42fc25397a22593fb84b248a2496488cf9158 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Tue, 15 Dec 2020 11:42:54 -0500 Subject: [PATCH 20/25] rename workloads to proper name. --- .github/workflows/{cmake.yml => linux.yml} | 0 .github/workflows/{msbuild.yml => windows.yml} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename .github/workflows/{cmake.yml => linux.yml} (100%) rename .github/workflows/{msbuild.yml => windows.yml} (100%) diff --git a/.github/workflows/cmake.yml b/.github/workflows/linux.yml similarity index 100% rename from .github/workflows/cmake.yml rename to .github/workflows/linux.yml diff --git a/.github/workflows/msbuild.yml b/.github/workflows/windows.yml similarity index 100% rename from .github/workflows/msbuild.yml rename to .github/workflows/windows.yml From a8eaa5c3bde0edceb8d5916c6b23616c3dfc1a35 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Thu, 25 Mar 2021 11:09:58 -0400 Subject: [PATCH 21/25] Adapt simdjson v0.9.1 --- examples/github_query-dir/github_query.cc | 2 +- src/adapters/common_utils.h | 2 +- src/adapters/simdjson_adapter.h | 133 ++++++++++++---------- third_party/simdjson | 2 +- 4 files changed, 76 insertions(+), 63 deletions(-) diff --git a/examples/github_query-dir/github_query.cc b/examples/github_query-dir/github_query.cc index 8fbe0ed..8ebd86a 100644 --- a/examples/github_query-dir/github_query.cc +++ b/examples/github_query-dir/github_query.cc @@ -213,7 +213,7 @@ int main(int argc, char* argv[]) { auto predicate1_id = store.MakeInlinePSF({ "/type", "/payload/action" }, lib_id, "opened_issue"); auto predicate2_id = - store.MakeInlinePSF({ "/type", "/payload/pull_request/head.repo/language" }, lib_id, "cpp_pr"); + store.MakeInlinePSF({ "/type", "/payload/pull_request/head/repo/language" }, lib_id, "cpp_pr"); std::vector parser_actions; parser_actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); diff --git a/src/adapters/common_utils.h b/src/adapters/common_utils.h index 20d6799..07b6424 100644 --- a/src/adapters/common_utils.h +++ b/src/adapters/common_utils.h @@ -20,7 +20,7 @@ class StringRef { return size; } -private: +public: const char* ptr; size_t size; }; diff --git a/src/adapters/simdjson_adapter.h b/src/adapters/simdjson_adapter.h index 23651c3..37b1f94 100644 --- a/src/adapters/simdjson_adapter.h +++ b/src/adapters/simdjson_adapter.h @@ -8,77 +8,88 @@ #include #include -#ifdef _MSC_VER -#define NOMINMAX -#endif -#include -#include -#include "adapters/common_utils.h" +#include +#include -using namespace simdjson; +const size_t DEFAULT_BATCH_SIZE = 1000000; namespace fishstore { namespace adapter { + class SIMDJsonField { public: - SIMDJsonField(int64_t id_, const ParsedJson::Iterator& it_) - : field_id(id_), iter(it_) {} + SIMDJsonField(int64_t id_, const simdjson::dom::element e) + : field_id(id_), ele(e) {} inline int64_t FieldId() const { return field_id; } inline NullableBool GetAsBool() const { - switch (iter.get_type()) { - case 't': - return NullableBool(true); - case 'f': - return NullableBool(false); - default: + if (ele.is_bool()) { + return NullableBool(ele.get_bool().value()); + } + else { return NullableBool(); } } inline NullableInt GetAsInt() const { - if (iter.is_integer()) { - return NullableInt(static_cast(iter.get_integer())); - } else return NullableInt(); + if (ele.is_int64()) { + return NullableInt(ele.get_int64().value()); + } + else { + return NullableInt(); + } } inline NullableLong GetAsLong() const { - if (iter.is_integer()) { - return NullableLong(iter.get_integer()); - } else return NullableLong(); + if (ele.is_int64()) { + return NullableLong(ele.get_int64().value()); + } + else { + return NullableLong(); + } } inline NullableFloat GetAsFloat() const { - if (iter.is_double()) { - return NullableFloat(static_cast(iter.get_double())); - } else return NullableFloat(); + if (ele.is_double()) { + return NullableFloat(ele.get_double().value()); + } + else { + return NullableFloat(); + } } inline NullableDouble GetAsDouble() const { - if (iter.is_double()) { - return NullableDouble(iter.get_double()); - } else return NullableDouble(); + if (ele.is_double()) { + return NullableDouble(ele.get_double().value()); + } + else { + return NullableDouble(); + } } inline NullableString GetAsString() const { - if (iter.is_string()) { - return NullableString(std::string(iter.get_string(), iter.get_string_length())); - } else return NullableString(); + if (ele.is_string()) { + auto tmp = ele.get_string().value(); + return NullableString(std::string(tmp.data(), tmp.size())); + } + else return NullableString(); } inline NullableStringRef GetAsStringRef() const { - if (iter.is_string()) { - return NullableStringRef(StringRef(iter.get_string(), iter.get_string_length())); - } else return NullableStringRef(); + if (ele.is_string()) { + auto tmp = ele.get_string().value(); + return NullableStringRef({ tmp.data(), tmp.size() }); + } + else return NullableStringRef(); } private: int64_t field_id; - ParsedJson::Iterator iter; + const simdjson::dom::element ele; }; class SIMDJsonRecord { @@ -100,6 +111,12 @@ class SIMDJsonRecord { return original; } + inline void clear() { + original.ptr = NULL; + original.size = 0; + fields.clear(); + } + public: StringRef original; std::vector fields; @@ -107,45 +124,41 @@ class SIMDJsonRecord { class SIMDJsonParser { public: - SIMDJsonParser(const std::vector& field_names, const size_t alloc_bytes = 1LL << 25) - : fields(field_names) { - auto success = pj.allocate_capacity(alloc_bytes); - assert(success); - has_next = false; - } + SIMDJsonParser(const std::vector& field_names) + : fields(field_names), parser_(), stream(), buffer_(NULL), len_(0), record_() {} inline void Load(const char* buffer, size_t length) { - record.original = StringRef(buffer, length); - record.fields.clear(); - auto ok = json_parse(buffer, length, pj); - if (ok != 0 || !pj.is_valid()) { - printf("Parsing failed...\n"); - has_next = false; - } else { - has_next = true; - } + //XX: buffer is not padded, may have issue + buffer_ = buffer; + len_ = length; + parser_.parse_many(buffer, length, DEFAULT_BATCH_SIZE).get(stream); + it = stream.begin(); } inline bool HasNext() { - return has_next; + return it != stream.end(); } inline const SIMDJsonRecord& NextRecord() { - ParsedJson::Iterator it(pj); - for (auto field_id = 0; field_id < fields.size(); ++field_id) { - if (it.move_to(fields[field_id])) { - record.fields.emplace_back(SIMDJsonField{field_id, it}); - } + record_.clear(); + record_.original.ptr = buffer_ + it.current_index(); + auto last_index = it.current_index(); + for (auto& field : fields) { + record_.fields.emplace_back(SIMDJsonField(record_.fields.size(), (*it).at_pointer(field).value())); } - has_next = false; - return record; + ++it; + record_.original.size = it != stream.end() ? it.current_index() - last_index : len_ - last_index; + return record_; } private: + const char* buffer_; + size_t len_; std::vector fields; - ParsedJson pj; - SIMDJsonRecord record; - bool has_next; + simdjson::dom::parser parser_; + simdjson::dom::document_stream stream; + simdjson::dom::document_stream::iterator it; + SIMDJsonRecord record_; }; class SIMDJsonAdapter { diff --git a/third_party/simdjson b/third_party/simdjson index ee66fb1..8a3b2f2 160000 --- a/third_party/simdjson +++ b/third_party/simdjson @@ -1 +1 @@ -Subproject commit ee66fb1c602e17563606c6f6eecc225dac5455cc +Subproject commit 8a3b2f20e47b2eb28b7085d388422de94bdae634 From 46fa6f7e1f7cb3420a832f4f3d24415653b490b4 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Thu, 25 Mar 2021 11:33:01 -0400 Subject: [PATCH 22/25] update. --- src/adapters/simdjson_adapter.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/adapters/simdjson_adapter.h b/src/adapters/simdjson_adapter.h index 37b1f94..27752c8 100644 --- a/src/adapters/simdjson_adapter.h +++ b/src/adapters/simdjson_adapter.h @@ -129,7 +129,10 @@ class SIMDJsonParser { inline void Load(const char* buffer, size_t length) { //XX: buffer is not padded, may have issue - buffer_ = buffer; + simdjson::padded_string ps(buffer, length); + //buffer_ = buffer; + //len_ = length; + buffer_ = ps.data(); len_ = length; parser_.parse_many(buffer, length, DEFAULT_BATCH_SIZE).get(stream); it = stream.begin(); From 3c8b316225959f84414bdc3698d287ef2d1e6197 Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Thu, 25 Mar 2021 15:49:23 -0400 Subject: [PATCH 23/25] Update simdjson_adapter.h --- src/adapters/simdjson_adapter.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/adapters/simdjson_adapter.h b/src/adapters/simdjson_adapter.h index 27752c8..ede545a 100644 --- a/src/adapters/simdjson_adapter.h +++ b/src/adapters/simdjson_adapter.h @@ -130,11 +130,9 @@ class SIMDJsonParser { inline void Load(const char* buffer, size_t length) { //XX: buffer is not padded, may have issue simdjson::padded_string ps(buffer, length); - //buffer_ = buffer; - //len_ = length; - buffer_ = ps.data(); + buffer_ = buffer; len_ = length; - parser_.parse_many(buffer, length, DEFAULT_BATCH_SIZE).get(stream); + auto res = parser_.parse_many(ps, DEFAULT_BATCH_SIZE).get(stream); it = stream.begin(); } From 4b66b8cbd7bb81bf01ba9fc0cc320d0d0b45929c Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Thu, 25 Mar 2021 16:17:00 -0400 Subject: [PATCH 24/25] Fix a timing issue in the unit test. --- test/checkpoint_test.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/checkpoint_test.h b/test/checkpoint_test.h index 2f2b869..04e46e2 100644 --- a/test/checkpoint_test.h +++ b/test/checkpoint_test.h @@ -8,7 +8,7 @@ using adapter_t = fishstore::adapter::SIMDJsonAdapter; using disk_t = fishstore::device::FileSystemDisk; using store_t = FishStore; -const size_t n_records = 1500000; +const size_t n_records = 5000000; const size_t n_threads = 4; const char* pattern = "{\"id\": \"%zu\", \"name\": \"name%zu\", \"gender\": \"%s\", \"school\": {\"id\": \"%zu\", \"name\": \"school%zu\"}}"; @@ -207,10 +207,10 @@ TEST(CLASS, Checkpoint_Concurrent) { }, i); } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); store.CheckpointIndex(checkpoint_callback, index_token); store.CompleteAction(true); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); store.CheckpointHybridLog(hybrid_log_persistence_callback, log_token); store.CompleteAction(true); From aff4c64e5580c093e233f76b802864cc3397521f Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Thu, 25 Mar 2021 16:35:38 -0400 Subject: [PATCH 25/25] tweak the number a bit more. --- test/checkpoint_test.h | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/checkpoint_test.h b/test/checkpoint_test.h index 04e46e2..02bfcbb 100644 --- a/test/checkpoint_test.h +++ b/test/checkpoint_test.h @@ -8,7 +8,7 @@ using adapter_t = fishstore::adapter::SIMDJsonAdapter; using disk_t = fishstore::device::FileSystemDisk; using store_t = FishStore; -const size_t n_records = 5000000; +const size_t n_records = 3000000; const size_t n_threads = 4; const char* pattern = "{\"id\": \"%zu\", \"name\": \"name%zu\", \"gender\": \"%s\", \"school\": {\"id\": \"%zu\", \"name\": \"school%zu\"}}"; @@ -207,12 +207,15 @@ TEST(CLASS, Checkpoint_Concurrent) { }, i); } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); store.CheckpointIndex(checkpoint_callback, index_token); store.CompleteAction(true); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); store.CheckpointHybridLog(hybrid_log_persistence_callback, log_token); store.CompleteAction(true); + for (auto& guid: guids) { + printf("%s\n", guid.ToString().c_str()); + } for (auto& thd : thds) { thd.join(); @@ -237,6 +240,9 @@ TEST(CLASS, Checkpoint_Concurrent) { uint32_t version; std::vector recovered_session_ids; new_store.Recover(index_token, log_token, version, recovered_session_ids); + for (auto& guid: recovered_session_ids) { + printf("%s\n", guid.ToString().c_str()); + } new_store.StartSession(); std::vector> sessions(n_threads);