From 88db872df019657d7954b4b20faf66386b8f06de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 11 Jun 2024 12:58:48 +0200 Subject: [PATCH 1/6] Main implementation and test --- include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp | 3 +- src/IO/ADIOS/ADIOS2Auxiliary.cpp | 4 + src/IO/ADIOS/ADIOS2File.cpp | 122 ++++++++++++------- src/IO/ADIOS/ADIOS2IOHandler.cpp | 1 + test/SerialIOTest.cpp | 38 ++++++ 5 files changed, 124 insertions(+), 44 deletions(-) diff --git a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp index 0b279f38cb..dfc9939153 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp @@ -53,7 +53,8 @@ namespace adios_defs Buffer, Buffer_Override, Disk, - Disk_Override + Disk_Override, + NewStep }; using FlushTarget = adios_defs::FlushTarget; diff --git a/src/IO/ADIOS/ADIOS2Auxiliary.cpp b/src/IO/ADIOS/ADIOS2Auxiliary.cpp index 366be2b893..7183313703 100644 --- a/src/IO/ADIOS/ADIOS2Auxiliary.cpp +++ b/src/IO/ADIOS/ADIOS2Auxiliary.cpp @@ -48,6 +48,10 @@ FlushTarget flushTargetFromString(std::string const &str) { return FlushTarget::Disk_Override; } + else if (str == "new_step") + { + return FlushTarget::NewStep; + } else { throw error::BackendConfigSchema( diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index 447f358f48..5d8b5a99bf 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -25,6 +25,8 @@ #include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/StringManip.hpp" +#include + #if openPMD_USE_VERIFY #define VERIFY(CONDITION, TEXT) \ { \ @@ -1028,60 +1030,94 @@ void ADIOS2File::flush_impl( void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts) { - auto decideFlushAPICall = - [this, flushTarget = flushParams.flushTarget](adios2::Engine &engine) { + auto decideFlushAPICall = [this, flushTarget = flushParams.flushTarget]( + adios2::Engine &engine) { #if ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \ ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \ 2701001223 - bool performDataWrite{}; - switch (flushTarget) + enum class CleanedFlushTarget + { + Buffer, + Disk, + Step + }; + + CleanedFlushTarget target{}; + switch (flushTarget) + { + case FlushTarget::Disk: + case FlushTarget::Disk_Override: + if (m_engineType == "bp5" || + /* this second check should be sufficient, but we leave the + first check in as a safeguard against renamings in + ADIOS2. Also do a lowerCase transform since the docstring + of `Engine::Type()` claims that the return value is in + lowercase, but for BP5 this does not seem true. */ + auxiliary::lowerCase(engine.Type()) == "bp5writer") { - case FlushTarget::Disk: - case FlushTarget::Disk_Override: - performDataWrite = true; - break; - case FlushTarget::Buffer: - case FlushTarget::Buffer_Override: - performDataWrite = false; - break; + target = CleanedFlushTarget::Disk; } - performDataWrite = performDataWrite && - (m_engineType == "bp5" || - /* this second check should be sufficient, but we leave the - first check in as a safeguard against renamings in ADIOS2. - Also do a lowerCase transform since the docstring of - `Engine::Type()` claims that the return value is in - lowercase, but for BP5 this does not seem true. */ - auxiliary::lowerCase(engine.Type()) == "bp5writer"); - - if (performDataWrite) + else { - /* - * Deliberately don't write buffered attributes now since - * readers won't be able to see them before EndStep anyway, - * so there's no use. In fact, writing them now is harmful - * because they can't be overwritten after this anymore in the - * current step. - * Draining the uniquePtrPuts now is good however, since we - * should use this chance to free the memory. - */ - for (auto &entry : m_uniquePtrPuts) - { - entry.run(*this); - } - engine.PerformDataWrite(); - m_uniquePtrPuts.clear(); + target = CleanedFlushTarget::Buffer; } - else + break; + case FlushTarget::Buffer: + case FlushTarget::Buffer_Override: + target = CleanedFlushTarget::Buffer; + break; + case FlushTarget::NewStep: + target = CleanedFlushTarget::Step; + break; + } + + switch (target) + { + case CleanedFlushTarget::Disk: + /* + * Draining the uniquePtrPuts now to use this chance to free the + * memory. + */ + for (auto &entry : m_uniquePtrPuts) { - engine.PerformPuts(); + entry.run(*this); } -#else - (void)this; - (void)flushTarget; + engine.PerformDataWrite(); + m_uniquePtrPuts.clear(); + m_updateSpans.clear(); + break; + case CleanedFlushTarget::Buffer: engine.PerformPuts(); + break; + case CleanedFlushTarget::Step: + if (streamStatus != StreamStatus::DuringStep) + { + throw error::OperationUnsupportedInBackend( + "ADIOS2", + "Trying to flush to a new step while no step is active"); + } + /* + * Draining the uniquePtrPuts now to use this chance to free the + * memory. + */ + for (auto &entry : m_uniquePtrPuts) + { + entry.run(*this); + } + engine.EndStep(); + engine.BeginStep(); + // ++m_currentStep; // think we should keep this as the logical step + m_uniquePtrPuts.clear(); + uncommittedAttributes.clear(); + m_updateSpans.clear(); + break; + } +#else + (void)this; + (void)flushTarget; + engine.PerformPuts(); #endif - }; + }; flush_impl( flushParams, diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 72adfded17..6a5f6e925b 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -459,6 +459,7 @@ overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val) { case FlushTarget::Buffer: case FlushTarget::Disk: + case FlushTarget::NewStep: return true; case FlushTarget::Buffer_Override: case FlushTarget::Disk_Override: diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index e5e0879981..cc2513832c 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -2,6 +2,7 @@ #include "openPMD/ChunkInfo_internal.hpp" #include "openPMD/Datatype.hpp" #include "openPMD/IO/Access.hpp" +#include #if openPMD_USE_INVASIVE_TESTS #define OPENPMD_private public: #define OPENPMD_protected public: @@ -4437,6 +4438,43 @@ BufferChunkSize = 2147483646 # 2^31 - 2 } #endif +TEST_CASE("adios2_flush_via_step") +{ + for (auto const suffix : {"bp4", "bp5"}) + { + Series write( + "../samples/adios2_flush_via_step/simData_%T." + + std::string(suffix), + Access::CREATE); + std::vector data(10); + for (Iteration::IterationIndex_t i = 0; i < 5; ++i) + { + Iteration it = write.writeIterations()[i]; + auto E_x = it.meshes["E"]["x"]; + E_x.resetDataset({Datatype::FLOAT, {10, 10}}); + for (Extent::value_type j = 0; j < 10; ++j) + { + std::iota(data.begin(), data.end(), i * 100 + j * 10); + E_x.storeChunk(data, {j, 0}, {1, 10}); + write.flush( + R"(adios2.engine.preferred_flush_target = "new_step")"); + } + it.close(); + } + + for (Iteration::IterationIndex_t i = 0; i < 5; ++i) + { + std::string filename = "../samples/adios2_flush_via_step/simData_" + + std::to_string(i) + "." + suffix; + adios2::ADIOS adios; + auto IO = adios.DeclareIO("IO"); + auto engine = IO.Open(filename, adios2::Mode::Read); + REQUIRE(engine.Steps() == (suffix == std::string("bp4") ? 10 : 11)); + engine.Close(); + } + } +} + TEST_CASE("adios2_engines_and_file_endings") { size_t filenameCounter = 0; From 0cb92c0410bf09045cba3b807cc5ac0316eefd57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 11 Jun 2024 18:31:59 +0200 Subject: [PATCH 2/6] Test reading --- include/openPMD/IO/ADIOS/macros.hpp | 5 +++ test/SerialIOTest.cpp | 56 +++++++++++++++-------------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/include/openPMD/IO/ADIOS/macros.hpp b/include/openPMD/IO/ADIOS/macros.hpp index 4e5f223e44..8618573713 100644 --- a/include/openPMD/IO/ADIOS/macros.hpp +++ b/include/openPMD/IO/ADIOS/macros.hpp @@ -22,6 +22,11 @@ #define openPMD_HAS_ADIOS_2_10 \ (ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 210) +#define openPMD_HAS_ADIOS_2_10_1 \ + (ADIOS2_VERSION_MAJOR * 1000 + ADIOS2_VERSION_MINOR * 10 + \ + ADIOS2_VERSION_PATCH >= \ + 2101) + #if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_10 // ADIOS2 v2.10 no longer defines this #define openPMD_HAVE_ADIOS2_BP5 1 diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index cc2513832c..a69cf5d830 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -4440,39 +4440,43 @@ BufferChunkSize = 2147483646 # 2^31 - 2 TEST_CASE("adios2_flush_via_step") { - for (auto const suffix : {"bp4", "bp5"}) + Series write( + "../samples/adios2_flush_via_step/simData_%T.bp5", + Access::CREATE, + R"(adios2.engine.parameters.FlattenSteps = "on")"); + std::vector data(10); + for (Iteration::IterationIndex_t i = 0; i < 5; ++i) { - Series write( - "../samples/adios2_flush_via_step/simData_%T." + - std::string(suffix), - Access::CREATE); - std::vector data(10); - for (Iteration::IterationIndex_t i = 0; i < 5; ++i) + Iteration it = write.writeIterations()[i]; + auto E_x = it.meshes["E"]["x"]; + E_x.resetDataset({Datatype::FLOAT, {10, 10}}); + for (Extent::value_type j = 0; j < 10; ++j) { - Iteration it = write.writeIterations()[i]; - auto E_x = it.meshes["E"]["x"]; - E_x.resetDataset({Datatype::FLOAT, {10, 10}}); - for (Extent::value_type j = 0; j < 10; ++j) - { - std::iota(data.begin(), data.end(), i * 100 + j * 10); - E_x.storeChunk(data, {j, 0}, {1, 10}); - write.flush( - R"(adios2.engine.preferred_flush_target = "new_step")"); - } - it.close(); + std::iota(data.begin(), data.end(), i * 100 + j * 10); + E_x.storeChunk(data, {j, 0}, {1, 10}); + write.flush(R"(adios2.engine.preferred_flush_target = "new_step")"); } + it.close(); + } - for (Iteration::IterationIndex_t i = 0; i < 5; ++i) +#if openPMD_HAS_ADIOS_2_10_1 + for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}) + { + Series read( + "../samples/adios2_flush_via_step/simData_%T.bp5", + Access::READ_RANDOM_ACCESS); + std::vector load_data(100); + data.resize(100); + for (auto iteration : read.readIterations()) { - std::string filename = "../samples/adios2_flush_via_step/simData_" + - std::to_string(i) + "." + suffix; - adios2::ADIOS adios; - auto IO = adios.DeclareIO("IO"); - auto engine = IO.Open(filename, adios2::Mode::Read); - REQUIRE(engine.Steps() == (suffix == std::string("bp4") ? 10 : 11)); - engine.Close(); + std::iota(data.begin(), data.end(), iteration.iterationIndex * 100); + iteration.meshes["E"]["x"].loadChunkRaw( + load_data.data(), {0, 0}, {10, 10}); + iteration.close(); + REQUIRE(load_data == data); } } +#endif } TEST_CASE("adios2_engines_and_file_endings") From 0de9f9d2bd1843607515e1bdc87bb9bcae18f0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 12 Jun 2024 11:31:58 +0200 Subject: [PATCH 3/6] Test and fixes for READ_LINEAR mode --- include/openPMD/IO/AbstractIOHandler.hpp | 25 +++++++++++++++++++----- src/Series.cpp | 7 +++++++ test/SerialIOTest.cpp | 4 +--- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index 7b89b619cc..1288a87b21 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -195,13 +195,28 @@ class AbstractIOHandler { /* * In file-based iteration encoding, the APPEND mode is handled entirely - * by the frontend, the backend should just treat it as CREATE mode + * by the frontend, the backend should just treat it as CREATE mode. + * Similar for READ_LINEAR which should be treated as READ_RANDOM_ACCESS + * in the backend. */ - if (encoding == IterationEncoding::fileBased && - m_backendAccess == Access::APPEND) + if (encoding == IterationEncoding::fileBased) { - // do we really want to have those as const members..? - *const_cast(&m_backendAccess) = Access::CREATE; + switch (m_backendAccess) + { + + case Access::READ_LINEAR: + // do we really want to have those as const members..? + *const_cast(&m_backendAccess) = + Access::READ_RANDOM_ACCESS; + break; + case Access::APPEND: + *const_cast(&m_backendAccess) = Access::CREATE; + break; + case Access::READ_RANDOM_ACCESS: + case Access::READ_WRITE: + case Access::CREATE: + break; + } } m_encoding = encoding; diff --git a/src/Series.cpp b/src/Series.cpp index fbd42d6eb4..82b162e540 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -1550,6 +1550,13 @@ void Series::readFileBased() Parameter fOpen; Parameter aRead; + // Tell the backend that we are parsing file-based iteration encoding. + // This especially means that READ_RANDOM_ACCESS will be used instead of + // READ_LINEAR, as READ_LINEAR is implemented in the frontend for file-based + // encoding. Don't set the iteration encoding in the frontend yet, will be + // set after reading the iteration encoding attribute from the opened file. + IOHandler()->setIterationEncoding(IterationEncoding::fileBased); + if (!auxiliary::directory_exists(IOHandler()->directory)) throw error::ReadError( error::AffectedObject::File, diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index a69cf5d830..d913ad5510 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -4462,9 +4462,7 @@ TEST_CASE("adios2_flush_via_step") #if openPMD_HAS_ADIOS_2_10_1 for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}) { - Series read( - "../samples/adios2_flush_via_step/simData_%T.bp5", - Access::READ_RANDOM_ACCESS); + Series read("../samples/adios2_flush_via_step/simData_%T.%E", access); std::vector load_data(100); data.resize(100); for (auto iteration : read.readIterations()) From 148671b1221dfae74157a6914acd516b78b07be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 12 Jun 2024 12:05:31 +0200 Subject: [PATCH 4/6] Cleanup and documentation --- docs/source/details/backendconfig.rst | 3 ++- include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp | 3 ++- src/IO/ADIOS/ADIOS2Auxiliary.cpp | 6 +++++- src/IO/ADIOS/ADIOS2File.cpp | 2 ++ src/IO/ADIOS/ADIOS2IOHandler.cpp | 1 + 5 files changed, 12 insertions(+), 3 deletions(-) diff --git a/docs/source/details/backendconfig.rst b/docs/source/details/backendconfig.rst index 47c6e2d63b..fae114a01c 100644 --- a/docs/source/details/backendconfig.rst +++ b/docs/source/details/backendconfig.rst @@ -129,10 +129,11 @@ Explanation of the single keys: * If ``"disk"``, data will be moved to disk on every flush. * If ``"buffer"``, then only upon ending an IO step or closing an engine. + * If ``new_step``, then a new step will be created. This should be used in combination with the ADIOS2 option ``adios2.engine.parameters.FlattenSteps = "on"``. This behavior can be overridden on a per-flush basis by specifying this JSON/TOML key as an optional parameter to the ``Series::flush()`` or ``Attributable::seriesFlush()`` methods. - Additionally, specifying ``"disk_override"`` or ``"buffer_override"`` will take precedence over options specified without the ``_override`` suffix, allowing to invert the normal precedence order. + Additionally, specifying ``"disk_override"``, ``"buffer_override"`` or ``"new_step_override"`` will take precedence over options specified without the ``_override`` suffix, allowing to invert the normal precedence order. This way, a data producing code can hardcode the preferred flush target per ``flush()`` call, but users can e.g. still entirely deactivate flushing to disk in the ``Series`` constructor by specifying ``preferred_flush_target = buffer_override``. This is useful when applying the asynchronous IO capabilities of the BP5 engine. * ``adios2.dataset.operators``: This key contains a list of ADIOS2 `operators `_, used to enable compression or dataset transformations. diff --git a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp index dfc9939153..ff14f2d69a 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp @@ -54,7 +54,8 @@ namespace adios_defs Buffer_Override, Disk, Disk_Override, - NewStep + NewStep, + NewStep_Override }; using FlushTarget = adios_defs::FlushTarget; diff --git a/src/IO/ADIOS/ADIOS2Auxiliary.cpp b/src/IO/ADIOS/ADIOS2Auxiliary.cpp index 7183313703..95029c009e 100644 --- a/src/IO/ADIOS/ADIOS2Auxiliary.cpp +++ b/src/IO/ADIOS/ADIOS2Auxiliary.cpp @@ -52,11 +52,15 @@ FlushTarget flushTargetFromString(std::string const &str) { return FlushTarget::NewStep; } + else if (str == "new_step_override") + { + return FlushTarget::NewStep_Override; + } else { throw error::BackendConfigSchema( {"adios2", "engine", adios_defaults::str_flushtarget}, - "Flush target must be either 'disk' or 'buffer', but " + "Flush target must be either 'disk', 'buffer' or 'new_step', but " "was " + str + "."); } diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index 5d8b5a99bf..815b163053 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -22,6 +22,7 @@ #include "openPMD/IO/ADIOS/ADIOS2File.hpp" #include "openPMD/Error.hpp" #include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp" +#include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/StringManip.hpp" @@ -1067,6 +1068,7 @@ void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts) target = CleanedFlushTarget::Buffer; break; case FlushTarget::NewStep: + case FlushTarget::NewStep_Override: target = CleanedFlushTarget::Step; break; } diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 6a5f6e925b..f1d94f4e38 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -463,6 +463,7 @@ overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val) return true; case FlushTarget::Buffer_Override: case FlushTarget::Disk_Override: + case FlushTarget::NewStep_Override: return false; } return true; From 50c409ac592c6ba65471f749ec2d3aa21b6dc8cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 12 Jun 2024 13:30:36 +0200 Subject: [PATCH 5/6] Update test/SerialIOTest.cpp --- test/SerialIOTest.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index d913ad5510..5021609924 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -2,7 +2,6 @@ #include "openPMD/ChunkInfo_internal.hpp" #include "openPMD/Datatype.hpp" #include "openPMD/IO/Access.hpp" -#include #if openPMD_USE_INVASIVE_TESTS #define OPENPMD_private public: #define OPENPMD_protected public: From 247b01e4dc2f16ac488834adb0cefa198d3115a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 12 Jun 2024 14:12:42 +0200 Subject: [PATCH 6/6] Run test only when BP5 exists --- test/SerialIOTest.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 5021609924..feaab30788 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -4437,6 +4437,7 @@ BufferChunkSize = 2147483646 # 2^31 - 2 } #endif +#if openPMD_HAVE_ADIOS2_BP5 TEST_CASE("adios2_flush_via_step") { Series write( @@ -4475,6 +4476,7 @@ TEST_CASE("adios2_flush_via_step") } #endif } +#endif TEST_CASE("adios2_engines_and_file_endings") {