diff --git a/docs/source/dev/design.rst b/docs/source/dev/design.rst index e01816b612..ce43777407 100644 --- a/docs/source/dev/design.rst +++ b/docs/source/dev/design.rst @@ -23,7 +23,7 @@ Therefore, enabling users to handle hierarchical, self-describing file formats w .. literalinclude:: IOTask.hpp :language: cpp - :lines: 44-62 + :lines: 48-78 Every task is designed to be a fully self-contained description of one such atomic operation. By describing a required minimal step of work (without any side-effect), these operations are the foundation of the unified handling mechanism across suitable file formats. The actual low-level exchange of data is implemented in ``IOHandlers``, one per file format (possibly two if handlingi MPI-parallel work is possible and requires different behaviour). diff --git a/docs/source/usage/streaming.rst b/docs/source/usage/streaming.rst index c72c1121db..d70b929389 100644 --- a/docs/source/usage/streaming.rst +++ b/docs/source/usage/streaming.rst @@ -22,6 +22,7 @@ C++ ^^^ The reading end of the streaming API is activated through use of ``Series::readIterations()`` instead of accessing the field ``Series::iterations`` directly. +Use of ``Access::READ_LINEAR`` mode is recommended. The returned object of type ``ReadIterations`` can be used in a C++11 range-based for loop to iterate over objects of type ``IndexedIteration``. This class extends the ``Iteration`` class with a field ``IndexedIteration::iterationIndex``, denoting this iteration's index. @@ -40,6 +41,7 @@ Python ^^^^^^ The reading end of the streaming API is activated through use of ``Series.read_iterations()`` instead of accessing the field ``Series.iterations`` directly. +Use of ``Access.read_linear`` mode is recommended. The returned object of type ``ReadIterations`` can be used in a Python range-based for loop to iterate over objects of type ``IndexedIteration``. This class extends the ``Iteration`` class with a field ``IndexedIteration.iteration_index``, denoting this iteration's index. diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst index ac5398b4cf..64194629ba 100644 --- a/docs/source/usage/workflow.rst +++ b/docs/source/usage/workflow.rst @@ -7,14 +7,41 @@ The openPMD-api distinguishes between a number of different access modes: * **Create mode**: Used for creating a new Series from scratch. Any file possibly existing in the specified location will be overwritten. -* **Read-only mode**: Used for reading from an existing Series. +* Two distinct read modes: **Read-random-access mode** and **Read-linear mode**. + (Specification of **Read-only mode** is equivalent to read-random-access mode.) + Both modes are used for reading from an existing Series. No modifications will be made. + + Differences between both modes: + + * When intending to use ``Series::readIterations()`` (i.e. step-by-step reading of iterations, e.g. in streaming), then **linear read mode** is preferred and always supported. + Data is parsed inside ``Series::readIterations()``, no data is available right after opening the Series. + Global attributes are available directly after calling ``Series::readIterations()``, Iterations and all their corresponding data become available by use of the returned Iterator, e.g. in a foreach loop. + * Otherwise (i.e. for random-access workflows), **random-access read mode** is required, but works only in backends that support random access. + Data is parsed and available right after opening the Series. + + In both modes, parsing of iterations can be deferred with the JSON/TOML option ``defer_iteration_parsing``. + + Detailed rules: + + 1. In backends that have no notion of IO steps (all except ADIOS2), *random-access read mode* can always be used. + 2. In backends that can be accessed either in random-access or step-by-step, the chosen access mode decides which approach is used. + Examples are the BP4 and BP5 engines of ADIOS2. + 3. In streaming backends, random-access is not possible. + When using such a backend, the access mode will be coerced automatically to *linear read mode*. + Use of Series::readIterations() is mandatory for access. + 4. Reading a variable-based Series is only fully supported with *linear access mode*. + If using *random-access read mode*, the dataset will be considered to only have one single step. + If the dataset only has one single step, this is guaranteed to work as expected. + Otherwise, it is undefined which step's data is returned. + * **Read/Write mode**: Creates a new Series if not existing, otherwise opens an existing Series for reading and writing. New datasets and iterations will be inserted as needed. Not fully supported by all backends: * ADIOS1: Automatically coerced to *Create* mode if the file does not exist yet and to *Read-only* mode if it exists. * ADIOS2: Automatically coerced to *Create* mode if the file does not exist yet and to *Read-only* mode if it exists. + Since this happens on a per-file level, this mode allows to read from existing iterations and write to new iterations at the same time in file-based iteration encoding. * **Append mode**: Restricted mode for appending new iterations to an existing Series that is supported by all backends at least in file-based iteration encoding, and by all but ADIOS1 in other encodings. The API is equivalent to that of the *Create* mode, meaning that no reading is supported whatsoever. diff --git a/examples/10_streaming_read.cpp b/examples/10_streaming_read.cpp index e271dd4393..a7e503a055 100644 --- a/examples/10_streaming_read.cpp +++ b/examples/10_streaming_read.cpp @@ -19,7 +19,7 @@ int main() return 0; } - Series series = Series("electrons.sst", Access::READ_ONLY); + Series series = Series("electrons.sst", Access::READ_LINEAR); for (IndexedIteration iteration : series.readIterations()) { diff --git a/examples/10_streaming_read.py b/examples/10_streaming_read.py index d6f7a6e67b..4cd29c46b2 100755 --- a/examples/10_streaming_read.py +++ b/examples/10_streaming_read.py @@ -17,7 +17,7 @@ print("SST engine not available in ADIOS2.") sys.exit(0) - series = io.Series("simData.sst", io.Access_Type.read_only, + series = io.Series("simData.sst", io.Access_Type.read_linear, json.dumps(config)) # Read all available iterations and print electron position data. diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 7c0dd1a44e..309648b782 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -164,7 +164,7 @@ class ADIOS2IOHandlerImpl void extendDataset( Writable *, Parameter const &) override; - void openFile(Writable *, Parameter const &) override; + void openFile(Writable *, Parameter &) override; void closeFile(Writable *, Parameter const &) override; @@ -213,6 +213,10 @@ class ADIOS2IOHandlerImpl void availableChunks( Writable *, Parameter &) override; + + void + deregister(Writable *, Parameter const &) override; + /** * @brief The ADIOS2 access type to chose for Engines opened * within this instance. @@ -248,7 +252,10 @@ class ADIOS2IOHandlerImpl */ std::string m_userSpecifiedExtension; - ADIOS2Schema::schema_t m_schema = ADIOS2Schema::schema_0000_00_00; + /* + * Empty option: No schema has been explicitly selected, use default. + */ + std::optional m_schema; enum class UseSpan : char { @@ -267,7 +274,11 @@ class ADIOS2IOHandlerImpl inline SupportedSchema schema() const { - switch (m_schema) + if (!m_schema.has_value()) + { + return SupportedSchema::s_0000_00_00; + } + switch (m_schema.value()) { case ADIOS2Schema::schema_0000_00_00: return SupportedSchema::s_0000_00_00; @@ -276,7 +287,7 @@ class ADIOS2IOHandlerImpl default: throw std::runtime_error( "[ADIOS2] Encountered unsupported schema version: " + - std::to_string(m_schema)); + std::to_string(m_schema.value())); } } @@ -331,11 +342,11 @@ class ADIOS2IOHandlerImpl * @return first parameter: the operators, second parameters: whether * operators have been configured */ - std::optional > + std::optional> getOperators(json::TracingJSON config); // use m_config - std::optional > getOperators(); + std::optional> getOperators(); std::string fileSuffix(bool verbose = true) const; @@ -361,7 +372,7 @@ class ADIOS2IOHandlerImpl */ std::unordered_map< InvalidatableFile, - std::unique_ptr > + std::unique_ptr> m_fileData; std::map m_operators; @@ -455,8 +466,8 @@ namespace detail template inline constexpr bool IsUnsupportedComplex_v = - std::is_same_v > || - std::is_same_v > >; + std::is_same_v> || + std::is_same_v>>; struct DatasetReader { @@ -581,7 +592,8 @@ namespace detail Parameter ¶ms, adios2::IO &IO, adios2::Engine &engine, - std::string const &varName); + std::string const &varName, + bool allSteps); template static void call(Params &&...); @@ -630,7 +642,7 @@ namespace detail }; template <> - struct AttributeTypes > + struct AttributeTypes> { static void createAttribute( adios2::IO &, @@ -663,13 +675,13 @@ namespace detail }; template <> - struct AttributeTypes > > + struct AttributeTypes>> { static void createAttribute( adios2::IO &, adios2::Engine &, detail::BufferedAttributeWrite &, - const std::vector > &) + const std::vector> &) { throw std::runtime_error( "[ADIOS2] Internal error: no support for long double complex " @@ -687,7 +699,7 @@ namespace detail } static bool attributeUnchanged( - adios2::IO &, std::string, std::vector >) + adios2::IO &, std::string, std::vector>) { throw std::runtime_error( "[ADIOS2] Internal error: no support for long double complex " @@ -696,7 +708,7 @@ namespace detail }; template - struct AttributeTypes > + struct AttributeTypes> { static void createAttribute( adios2::IO &IO, @@ -734,7 +746,7 @@ namespace detail }; template <> - struct AttributeTypes > + struct AttributeTypes> { static void createAttribute( adios2::IO &IO, @@ -772,7 +784,7 @@ namespace detail }; template - struct AttributeTypes > + struct AttributeTypes> { static void createAttribute( adios2::IO &IO, @@ -986,14 +998,14 @@ namespace detail * Hence, next to the actual file name, also store the name for the * IO. */ - std::string const m_IOName; + std::string m_IOName; adios2::ADIOS &m_ADIOS; adios2::IO m_IO; /** * The default queue for deferred actions. * Drained upon BufferedActions::flush(). */ - std::vector > m_buffer; + std::vector> m_buffer; /** * Buffer for attributes to be written in the new (variable-based) * attribute layout. @@ -1017,7 +1029,7 @@ namespace detail * We must store them somewhere until the next PerformPuts/Gets, EndStep * or Close in ADIOS2 to avoid use after free conditions. */ - std::vector > m_alreadyEnqueued; + std::vector> m_alreadyEnqueued; adios2::Mode m_mode; /** * The base pointer of an ADIOS2 span might change after reallocations. @@ -1027,7 +1039,7 @@ namespace detail * retrieval of the updated base pointer. * This map is cleared upon flush points. */ - std::map > m_updateSpans; + std::map> m_updateSpans; PreloadAdiosAttributes preloadAttributes; /* @@ -1049,6 +1061,10 @@ namespace detail */ bool optimizeAttributesStreaming = false; + using ParsePreference = + Parameter::ParsePreference; + ParsePreference parsePreference = ParsePreference::UpFront; + using AttributeMap_t = std::map; BufferedActions(ADIOS2IOHandlerImpl &impl, InvalidatableFile file); @@ -1257,13 +1273,6 @@ namespace detail */ std::string m_engineType; - /** - * See documentation for StreamStatus::Parsing. - * Will be set true under the circumstance described there in order to - * indicate that the first step should only be opened after parsing. - */ - bool delayOpeningTheFirstStep = false; - /* * ADIOS2 does not give direct access to its internal attribute and * variable maps, but will instead give access to copies of them. @@ -1279,6 +1288,11 @@ namespace detail std::optional m_availableAttributes; std::optional m_availableVariables; + /* + * Cannot write attributes right after opening the engine + * https://github.com/ornladios/ADIOS2/issues/3433 + */ + bool initializedDefaults = false; /* * finalize() will set this true to avoid running twice. */ @@ -1289,7 +1303,11 @@ namespace detail return m_impl->schema(); } + void create_IO(); + void configure_IO(ADIOS2IOHandlerImpl &impl); + void configure_IO_Read(std::optional userSpecifiedUsesteps); + void configure_IO_Write(std::optional userSpecifiedUsesteps); using AttributeLayout = ADIOS2IOHandlerImpl::AttributeLayout; inline AttributeLayout attributeLayout() const diff --git a/include/openPMD/IO/ADIOS/CommonADIOS1IOHandler.hpp b/include/openPMD/IO/ADIOS/CommonADIOS1IOHandler.hpp index 9f43b9ae18..9fbe0e0e87 100644 --- a/include/openPMD/IO/ADIOS/CommonADIOS1IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/CommonADIOS1IOHandler.hpp @@ -59,7 +59,7 @@ class CommonADIOS1IOHandlerImpl : public AbstractIOHandlerImpl Writable *, Parameter const &) override; void extendDataset( Writable *, Parameter const &) override; - void openFile(Writable *, Parameter const &) override; + void openFile(Writable *, Parameter &) override; void closeFile(Writable *, Parameter const &) override; void availableChunks( @@ -84,6 +84,8 @@ class CommonADIOS1IOHandlerImpl : public AbstractIOHandlerImpl void listDatasets(Writable *, Parameter &) override; void listAttributes(Writable *, Parameter &) override; + void + deregister(Writable *, Parameter const &) override; void close(int64_t); void close(ADIOS_FILE *); diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 79e9b35739..925c5f83a0 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -202,6 +202,12 @@ class AbstractIOHandlerImpl deref_dynamic_cast >( i.parameter.get())); break; + case O::DEREGISTER: + deregister( + i.writable, + deref_dynamic_cast >( + i.parameter.get())); + break; } } catch (...) @@ -354,8 +360,7 @@ class AbstractIOHandlerImpl * root group "/" of the hierarchy in the opened file. The Writable should * be marked written when the operation completes successfully. */ - virtual void - openFile(Writable *, Parameter const &) = 0; + virtual void openFile(Writable *, Parameter &) = 0; /** Open all contained groups in a path, possibly recursively. * * The operation should overwrite existing file positions, even when the @@ -567,6 +572,16 @@ class AbstractIOHandlerImpl void keepSynchronous(Writable *, Parameter param); + /** Notify the backend that the Writable has been / will be deallocated. + * + * The backend should remove all references to this Writable from internal + * data structures. Subtle bugs might be possible if not doing this, since + * new objects might be allocated to the now-freed address. + * The Writable pointer must not be dereferenced. + */ + virtual void + deregister(Writable *, Parameter const ¶m) = 0; + AbstractIOHandler *m_handler; }; // AbstractIOHandlerImpl } // namespace openPMD diff --git a/include/openPMD/IO/Access.hpp b/include/openPMD/IO/Access.hpp index 2b5d37f260..eddd9f40e9 100644 --- a/include/openPMD/IO/Access.hpp +++ b/include/openPMD/IO/Access.hpp @@ -20,18 +20,111 @@ */ #pragma once +#include + namespace openPMD { /** File access mode to use during IO. */ enum class Access { - READ_ONLY, //!< open series as read-only, fails if series is not found - READ_WRITE, //!< open existing series as writable + /** + * Open Series as read-only, fails if Series is not found. + * When to use READ_ONLY or READ_LINEAR: + * + * * When intending to use Series::readIterations() + * (i.e. step-by-step reading of iterations, e.g. in streaming), + * then Access::READ_LINEAR is preferred and always supported. + * Data is parsed inside Series::readIterations(), no data is available + * right after opening the Series. + * * Otherwise (i.e. for random-access workflows), Access::READ_ONLY + * is required, but works only in backends that support random access. + * Data is parsed and available right after opening the Series. + * + * In both modes, parsing of iterations can be deferred with the JSON/TOML + * option `defer_iteration_parsing`. + * + * Detailed rules: + * + * 1. In backends that have no notion of IO steps (all except ADIOS2), + * Access::READ_ONLY can always be used. + * 2. In backends that can be accessed either in random-access or + * step-by-step, the chosen access mode decides which approach is used. + * Examples are the BP4 and BP5 engines of ADIOS2. + * 3. In streaming backends, random-access is not possible. + * When using such a backend, the access mode will be coerced + * automatically to Access::READ_LINEAR. Use of Series::readIterations() + * is mandatory for access. + * 4. Reading a variable-based Series is only fully supported with + * Access::READ_LINEAR. + * If using Access::READ_ONLY, the dataset will be considered to only + * have one single step. + * If the dataset only has one single step, this is guaranteed to work + * as expected. Otherwise, it is undefined which step's data is returned. + */ + READ_ONLY, + READ_RANDOM_ACCESS = READ_ONLY, //!< more explicit alias for READ_ONLY + /* + * Open Series as read-only, fails if Series is not found. + * This access mode requires use of Series::readIterations(). + * Global attributes are available directly after calling + * Series::readIterations(), Iterations and all their corresponding data + * become available by use of the returned Iterator, e.g. in a foreach loop. + * See Access::READ_ONLY for when to use this. + */ + READ_LINEAR, + /** + * Open existing Series as writable. + * Read mode corresponds with Access::READ_RANDOM_ACCESS. + */ + READ_WRITE, CREATE, //!< create new series and truncate existing (files) APPEND //!< write new iterations to an existing series without reading }; // Access +namespace access +{ + inline bool readOnly(Access access) + { + switch (access) + { + case Access::READ_LINEAR: + case Access::READ_ONLY: + return true; + case Access::READ_WRITE: + case Access::CREATE: + case Access::APPEND: + return false; + } + throw std::runtime_error("Unreachable!"); + } + + inline bool write(Access access) + { + return !readOnly(access); + } + + inline bool writeOnly(Access access) + { + switch (access) + { + case Access::READ_LINEAR: + case Access::READ_ONLY: + case Access::READ_WRITE: + return false; + case Access::CREATE: + case Access::APPEND: + return true; + } + throw std::runtime_error("Unreachable!"); + } + + inline bool read(Access access) + { + return !writeOnly(access); + } +} // namespace access + // deprecated name (used prior to 0.12.0) // note: "using old [[deprecated(msg)]] = new;" is still badly supported, thus // using typedef diff --git a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp index 7502e36e3f..d8d564c281 100644 --- a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp +++ b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp @@ -52,7 +52,7 @@ class HDF5IOHandlerImpl : public AbstractIOHandlerImpl Writable *, Parameter const &) override; void availableChunks( Writable *, Parameter &) override; - void openFile(Writable *, Parameter const &) override; + void openFile(Writable *, Parameter &) override; void closeFile(Writable *, Parameter const &) override; void openPath(Writable *, Parameter const &) override; @@ -75,6 +75,8 @@ class HDF5IOHandlerImpl : public AbstractIOHandlerImpl void listDatasets(Writable *, Parameter &) override; void listAttributes(Writable *, Parameter &) override; + void + deregister(Writable *, Parameter const &) override; std::unordered_map m_fileNames; std::unordered_map m_fileNamesWithID; diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 88c7d0380b..cf2cf520e5 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -27,6 +27,7 @@ #include "openPMD/auxiliary/Export.hpp" #include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/Attribute.hpp" +#include "openPMD/backend/ParsePreference.hpp" #include #include @@ -44,21 +45,37 @@ Writable *getWritable(Attributable *); /** Type of IO operation between logical and persistent data. */ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){ - CREATE_FILE, CHECK_FILE, OPEN_FILE, CLOSE_FILE, + CREATE_FILE, + CHECK_FILE, + OPEN_FILE, + CLOSE_FILE, DELETE_FILE, - CREATE_PATH, CLOSE_PATH, OPEN_PATH, DELETE_PATH, + CREATE_PATH, + CLOSE_PATH, + OPEN_PATH, + DELETE_PATH, LIST_PATHS, - CREATE_DATASET, EXTEND_DATASET, OPEN_DATASET, DELETE_DATASET, - WRITE_DATASET, READ_DATASET, LIST_DATASETS, GET_BUFFER_VIEW, + CREATE_DATASET, + EXTEND_DATASET, + OPEN_DATASET, + DELETE_DATASET, + WRITE_DATASET, + READ_DATASET, + LIST_DATASETS, + GET_BUFFER_VIEW, - DELETE_ATT, WRITE_ATT, READ_ATT, LIST_ATTS, + DELETE_ATT, + WRITE_ATT, + READ_ATT, + LIST_ATTS, ADVANCE, AVAILABLE_CHUNKS, //!< Query chunks that can be loaded in a dataset - KEEP_SYNCHRONOUS //!< Keep two items in the object model synchronous with - //!< each other + KEEP_SYNCHRONOUS, //!< Keep two items in the object model synchronous with + //!< each other + DEREGISTER //!< Inform the backend that an object has been deleted. }; // note: if you change the enum members here, please update // docs/source/dev/design.rst @@ -151,7 +168,10 @@ struct OPENPMDAPI_EXPORT Parameter { Parameter() = default; Parameter(Parameter const &p) - : AbstractParameter(), name(p.name), encoding(p.encoding) + : AbstractParameter() + , name(p.name) + , encoding(p.encoding) + , out_parsePreference(p.out_parsePreference) {} std::unique_ptr clone() const override @@ -167,6 +187,9 @@ struct OPENPMDAPI_EXPORT Parameter * variableBased encoding. */ IterationEncoding encoding = IterationEncoding::groupBased; + using ParsePreference = internal::ParsePreference; + std::shared_ptr out_parsePreference = + std::make_shared(ParsePreference::UpFront); }; template <> @@ -673,6 +696,20 @@ struct OPENPMDAPI_EXPORT Parameter Writable *otherWritable; }; +template <> +struct OPENPMDAPI_EXPORT Parameter + : public AbstractParameter +{ + Parameter() = default; + Parameter(Parameter const &) : AbstractParameter() + {} + + std::unique_ptr clone() const override + { + return std::make_unique>(*this); + } +}; + /** @brief Self-contained description of a single IO operation. * * Contained are diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp index 738891f33e..4c68004bc7 100644 --- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp @@ -174,7 +174,7 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl void availableChunks( Writable *, Parameter &) override; - void openFile(Writable *, Parameter const &) override; + void openFile(Writable *, Parameter &) override; void closeFile(Writable *, Parameter const &) override; @@ -212,6 +212,9 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl void listAttributes(Writable *, Parameter &) override; + void + deregister(Writable *, Parameter const &) override; + std::future flush(); private: diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index 179ade29fe..3c7fffc545 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include namespace openPMD @@ -338,8 +339,11 @@ class Iteration : public Attributable * Useful in group-based iteration encoding where the Iteration will only * be known after opening the step. */ - static BeginStepStatus - beginStep(std::optional thisObject, Series &series, bool reread); + static BeginStepStatus beginStep( + std::optional thisObject, + Series &series, + bool reread, + std::set const &ignoreIterations = {}); /** * @brief End an IO step on the IO file (or file-like object) diff --git a/include/openPMD/ReadIterations.hpp b/include/openPMD/ReadIterations.hpp index 7d6266e4f0..c6a1e4fc36 100644 --- a/include/openPMD/ReadIterations.hpp +++ b/include/openPMD/ReadIterations.hpp @@ -22,10 +22,12 @@ #include "openPMD/Iteration.hpp" #include "openPMD/Series.hpp" +#include "openPMD/backend/ParsePreference.hpp" #include #include #include +#include namespace openPMD { @@ -55,15 +57,34 @@ class SeriesIterator using maybe_series_t = std::optional; - maybe_series_t m_series; - std::deque m_iterationsInCurrentStep; - uint64_t m_currentIteration{}; + struct SharedData + { + SharedData() = default; + SharedData(SharedData const &) = delete; + SharedData(SharedData &&) = delete; + SharedData &operator=(SharedData const &) = delete; + SharedData &operator=(SharedData &&) = delete; + + maybe_series_t series; + std::deque iterationsInCurrentStep; + uint64_t currentIteration{}; + std::optional parsePreference; + /* + * Necessary because in the old ADIOS2 schema, old iterations' metadata + * will leak into new steps, making the frontend think that the groups + * are still there and the iterations can be parsed again. + */ + std::set ignoreIterations; + }; + + std::shared_ptr m_data; public: //! construct the end() iterator explicit SeriesIterator(); - SeriesIterator(Series); + SeriesIterator( + Series, std::optional parsePreference); SeriesIterator &operator++(); @@ -78,7 +99,8 @@ class SeriesIterator private: inline bool setCurrentIteration() { - if (m_iterationsInCurrentStep.empty()) + auto &data = *m_data; + if (data.iterationsInCurrentStep.empty()) { std::cerr << "[ReadIterations] Encountered a step without " "iterations. Closing the Series." @@ -86,19 +108,20 @@ class SeriesIterator *this = end(); return false; } - m_currentIteration = *m_iterationsInCurrentStep.begin(); + data.currentIteration = *data.iterationsInCurrentStep.begin(); return true; } inline std::optional peekCurrentIteration() { - if (m_iterationsInCurrentStep.empty()) + auto &data = *m_data; + if (data.iterationsInCurrentStep.empty()) { return std::nullopt; } else { - return {*m_iterationsInCurrentStep.begin()}; + return {*data.iterationsInCurrentStep.begin()}; } } @@ -119,6 +142,8 @@ class SeriesIterator std::optional loopBody(); void deactivateDeadIteration(iteration_index_t); + + void initSeriesInLinearReadMode(); }; /** @@ -146,8 +171,13 @@ class ReadIterations using iterator_t = SeriesIterator; Series m_series; + std::optional alreadyOpened; + std::optional m_parsePreference; - ReadIterations(Series); + ReadIterations( + Series, + Access, + std::optional parsePreference); public: iterator_t begin(); diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 30b618db40..7b85986992 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -30,6 +30,7 @@ #include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/Attributable.hpp" #include "openPMD/backend/Container.hpp" +#include "openPMD/backend/ParsePreference.hpp" #include "openPMD/config.hpp" #include "openPMD/version.hpp" @@ -159,6 +160,14 @@ namespace internal */ bool m_lastFlushSuccessful = false; + /** + * Remember the preference that the backend specified for parsing. + * Not used in file-based iteration encoding, empty then. + * In linear read mode, parsing only starts after calling + * Series::readIterations(), empty before that point. + */ + std::optional m_parsePreference; + void close(); }; // SeriesData @@ -618,8 +627,10 @@ OPENPMD_private * ReadIterations since those methods should be aware when the current step * is broken). */ - std::optional > - readGorVBased(bool do_always_throw_errors, bool init); + std::optional > readGorVBased( + bool do_always_throw_errors, + bool init, + std::set const &ignoreIterations = {}); void readBase(); std::string iterationFilename(IterationIndex_t i); diff --git a/include/openPMD/backend/ParsePreference.hpp b/include/openPMD/backend/ParsePreference.hpp new file mode 100644 index 0000000000..cb68d78f99 --- /dev/null +++ b/include/openPMD/backend/ParsePreference.hpp @@ -0,0 +1,31 @@ +/* Copyright 2023 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#pragma once + +namespace openPMD::internal +{ +enum class ParsePreference : char +{ + UpFront, //m_handler->m_backendAccess != Access::READ_ONLY) + if (access::write(m_handler->m_backendAccess)) { for (auto &group : m_attributeWrites) for (auto &att : group.second) @@ -102,49 +102,55 @@ std::future ADIOS1IOHandlerImpl::flush() case O::CREATE_FILE: createFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CHECK_FILE: checkFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CREATE_PATH: createPath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::OPEN_PATH: openPath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CREATE_DATASET: createDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::WRITE_ATT: writeAttribute( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::OPEN_FILE: openFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::KEEP_SYNCHRONOUS: keepSynchronous( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( + i.parameter.get())); + break; + case O::DEREGISTER: + deregister( + i.writable, + deref_dynamic_cast>( i.parameter.get())); break; default: @@ -183,19 +189,19 @@ std::future ADIOS1IOHandlerImpl::flush() case O::EXTEND_DATASET: extendDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CLOSE_PATH: closePath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::OPEN_DATASET: openDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CLOSE_FILE: @@ -207,79 +213,79 @@ std::future ADIOS1IOHandlerImpl::flush() case O::DELETE_FILE: deleteFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::DELETE_PATH: deletePath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::DELETE_DATASET: deleteDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::DELETE_ATT: deleteAttribute( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::WRITE_DATASET: writeDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::READ_DATASET: readDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::GET_BUFFER_VIEW: getBufferView( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::READ_ATT: readAttribute( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::LIST_PATHS: listPaths( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::LIST_DATASETS: listDatasets( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::LIST_ATTS: listAttributes( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::ADVANCE: advance( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::AVAILABLE_CHUNKS: availableChunks( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; default: @@ -366,6 +372,7 @@ void ADIOS1IOHandler::enqueue(IOTask const &i) case Operation::OPEN_FILE: case Operation::WRITE_ATT: case Operation::KEEP_SYNCHRONOUS: + case Operation::DEREGISTER: m_setup.push(i); return; default: diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index d768ec5b6b..ff6db0a196 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -64,6 +64,8 @@ namespace openPMD #if openPMD_HAVE_ADIOS2 +#define HAS_ADIOS_2_8 (ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 208) + #if openPMD_HAVE_MPI ADIOS2IOHandlerImpl::ADIOS2IOHandlerImpl( @@ -146,7 +148,11 @@ void ADIOS2IOHandlerImpl::init(json::TracingJSON cfg) [](unsigned char c) { return std::tolower(c); }); // environment-variable based configuration - m_schema = auxiliary::getEnvNum("OPENPMD2_ADIOS2_SCHEMA", m_schema); + if (int schemaViaEnv = auxiliary::getEnvNum("OPENPMD2_ADIOS2_SCHEMA", -1); + schemaViaEnv != -1) + { + m_schema = schemaViaEnv; + } if (cfg.json().contains("adios2")) { @@ -471,7 +477,7 @@ void ADIOS2IOHandlerImpl::createFile( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[ADIOS2] Creating a file in read-only mode is not possible."); if (!writable->written) @@ -596,7 +602,7 @@ void ADIOS2IOHandlerImpl::createPath( void ADIOS2IOHandlerImpl::createDataset( Writable *writable, const Parameter ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) { throw std::runtime_error( "[ADIOS2] Creating a dataset in a file opened as read " @@ -608,7 +614,7 @@ void ADIOS2IOHandlerImpl::createDataset( std::string name = auxiliary::removeSlashes(parameters.name); auto const file = - refreshFileFromParent(writable, /* preferParentFile = */ false); + refreshFileFromParent(writable, /* preferParentFile = */ true); auto filePos = setAndGetFilePosition(writable, name); filePos->gd = ADIOS2FilePosition::GD::DATASET; auto const varName = nameOfVariable(writable); @@ -679,7 +685,7 @@ void ADIOS2IOHandlerImpl::extendDataset( Writable *writable, const Parameter ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[ADIOS2] Cannot extend datasets in read-only mode."); setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); @@ -691,7 +697,7 @@ void ADIOS2IOHandlerImpl::extendDataset( } void ADIOS2IOHandlerImpl::openFile( - Writable *writable, const Parameter ¶meters) + Writable *writable, Parameter ¶meters) { if (!auxiliary::directory_exists(m_handler->directory)) { @@ -714,7 +720,8 @@ void ADIOS2IOHandlerImpl::openFile( m_iterationEncoding = parameters.encoding; // enforce opening the file // lazy opening is deathly in parallel situations - getFileData(file, IfFileNotOpen::OpenImplicitly); + auto &fileData = getFileData(file, IfFileNotOpen::OpenImplicitly); + *parameters.out_parsePreference = fileData.parsePreference; } void ADIOS2IOHandlerImpl::closeFile( @@ -741,6 +748,8 @@ void ADIOS2IOHandlerImpl::closeFile( /* flushUnconditionally = */ false); m_fileData.erase(it); } + m_dirty.erase(fileIterator->second); + m_files.erase(fileIterator); } } @@ -770,7 +779,7 @@ void ADIOS2IOHandlerImpl::openDataset( writable->abstractFilePosition.reset(); auto pos = setAndGetFilePosition(writable, name); pos->gd = ADIOS2FilePosition::GD::DATASET; - auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); + auto file = refreshFileFromParent(writable, /* preferParentFile = */ true); auto varName = nameOfVariable(writable); *parameters.dtype = detail::fromADIOS2Type(getFileData(file, IfFileNotOpen::ThrowError) @@ -810,7 +819,7 @@ void ADIOS2IOHandlerImpl::writeDataset( Writable *writable, const Parameter ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[ADIOS2] Cannot write data in read-only mode."); setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); @@ -839,7 +848,7 @@ void ADIOS2IOHandlerImpl::writeAttribute( break; case AttributeLayout::ByAdiosVariables: { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[ADIOS2] Cannot write attribute in read-only mode."); auto pos = setAndGetFilePosition(writable); auto file = @@ -987,6 +996,8 @@ void ADIOS2IOHandlerImpl::getBufferView( break; } + ba.requireActiveStep(); + if (parameters.update) { detail::I_UpdateSpan &updater = @@ -1262,7 +1273,7 @@ void ADIOS2IOHandlerImpl::listAttributes( void ADIOS2IOHandlerImpl::advance( Writable *writable, Parameter ¶meters) { - auto file = m_files[writable]; + auto file = m_files.at(writable); auto &ba = getFileData(file, IfFileNotOpen::ThrowError); *parameters.status = ba.advance(parameters.mode, /* calledExplicitly = */ true); @@ -1274,7 +1285,7 @@ void ADIOS2IOHandlerImpl::closePath( VERIFY_ALWAYS( writable->written, "[ADIOS2] Cannot close a path that has not been written yet."); - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) { // nothing to do return; @@ -1308,8 +1319,21 @@ void ADIOS2IOHandlerImpl::availableChunks( std::string varName = nameOfVariable(writable); auto engine = ba.getEngine(); // make sure that data are present auto datatype = detail::fromADIOS2Type(ba.m_IO.VariableType(varName)); + bool allSteps = m_handler->m_frontendAccess != Access::READ_LINEAR && + ba.streamStatus == detail::BufferedActions::StreamStatus::NoStream; switchAdios2VariableType( - datatype, parameters, ba.m_IO, engine, varName); + datatype, + parameters, + ba.m_IO, + engine, + varName, + /* allSteps = */ allSteps); +} + +void ADIOS2IOHandlerImpl::deregister( + Writable *writable, Parameter const &) +{ + m_files.erase(writable); } adios2::Mode ADIOS2IOHandlerImpl::adios2AccessMode(std::string const &fullPath) @@ -1318,13 +1342,25 @@ adios2::Mode ADIOS2IOHandlerImpl::adios2AccessMode(std::string const &fullPath) { case Access::CREATE: return adios2::Mode::Write; +#if HAS_ADIOS_2_8 + case Access::READ_LINEAR: + return adios2::Mode::Read; + case Access::READ_ONLY: + return adios2::Mode::ReadRandomAccess; +#else + case Access::READ_LINEAR: case Access::READ_ONLY: return adios2::Mode::Read; +#endif case Access::READ_WRITE: if (auxiliary::directory_exists(fullPath) || auxiliary::file_exists(fullPath)) { +#if HAS_ADIOS_2_8 + return adios2::Mode::ReadRandomAccess; +#else return adios2::Mode::Read; +#endif } else { @@ -1713,7 +1749,7 @@ namespace detail const Parameter ¶meters) { VERIFY_ALWAYS( - impl->m_handler->m_backendAccess != Access::READ_ONLY, + access::write(impl->m_handler->m_backendAccess), "[ADIOS2] Cannot write attribute in read-only mode."); auto pos = impl->setAndGetFilePosition(writable); auto file = impl->refreshFileFromParent( @@ -1885,7 +1921,7 @@ namespace detail adios2::Engine &engine) { VERIFY_ALWAYS( - impl->m_handler->m_backendAccess != Access::READ_ONLY, + access::write(impl->m_handler->m_backendAccess), "[ADIOS2] Cannot write data in read-only mode."); auto ptr = std::static_pointer_cast(bp.param.data).get(); @@ -1954,26 +1990,48 @@ namespace detail Parameter ¶ms, adios2::IO &IO, adios2::Engine &engine, - std::string const &varName) + std::string const &varName, + bool allSteps) { auto var = IO.InquireVariable(varName); - auto blocksInfo = engine.BlocksInfo(var, engine.CurrentStep()); auto &table = *params.chunks; - table.reserve(blocksInfo.size()); - for (auto const &info : blocksInfo) + auto addBlocksInfo = [&table](auto const &blocksInfo_) { + for (auto const &info : blocksInfo_) + { + Offset offset; + Extent extent; + auto size = info.Start.size(); + offset.reserve(size); + extent.reserve(size); + for (unsigned i = 0; i < size; ++i) + { + offset.push_back(info.Start[i]); + extent.push_back(info.Count[i]); + } + table.emplace_back( + std::move(offset), std::move(extent), info.WriterID); + } + }; + if (allSteps) { - Offset offset; - Extent extent; - auto size = info.Start.size(); - offset.reserve(size); - extent.reserve(size); - for (unsigned i = 0; i < size; ++i) + auto allBlocks = var.AllStepsBlocksInfo(); + table.reserve(std::accumulate( + allBlocks.begin(), + allBlocks.end(), + size_t(0), + [](size_t acc, auto const &block) { + return acc + block.size(); + })); + for (auto const &blocksInfo : allBlocks) { - offset.push_back(info.Start[i]); - extent.push_back(info.Count[i]); + addBlocksInfo(blocksInfo); } - table.emplace_back( - std::move(offset), std::move(extent), info.WriterID); + } + else + { + auto blocksInfo = engine.BlocksInfo(var, engine.CurrentStep()); + table.reserve(blocksInfo.size()); + addBlocksInfo(blocksInfo); } } @@ -2359,7 +2417,6 @@ namespace detail BufferedActions::BufferedActions( ADIOS2IOHandlerImpl &impl, InvalidatableFile file) : m_file(impl.fullPath(std::move(file))) - , m_IOName(std::to_string(impl.nameCounter++)) , m_ADIOS(impl.m_ADIOS) , m_impl(&impl) , m_engineType(impl.m_engineType) @@ -2367,8 +2424,8 @@ namespace detail // Declaring these members in the constructor body to avoid // initialization order hazards. Need the IO_ prefix since in some // situation there seems to be trouble with number-only IO names - m_IO = impl.m_ADIOS.DeclareIO("IO_" + m_IOName); m_mode = impl.adios2AccessMode(m_file); + create_IO(); if (!m_IO) { throw std::runtime_error( @@ -2382,6 +2439,12 @@ namespace detail } } + void BufferedActions::create_IO() + { + m_IOName = std::to_string(m_impl->nameCounter++); + m_IO = m_impl->m_ADIOS.DeclareIO("IO_" + m_IOName); + } + BufferedActions::~BufferedActions() { finalize(); @@ -2428,99 +2491,298 @@ namespace detail finalized = true; } - void BufferedActions::configure_IO(ADIOS2IOHandlerImpl &impl) + namespace { - (void)impl; - static std::set streamingEngines = { - "sst", - "insitumpi", - "inline", - "staging", - "nullcore", - "ssc", - "filestream", - "bp5"}; - // diskStreamingEngines is a subset of streamingEngines - static std::set diskStreamingEngines{"bp5", "filestream"}; - static std::set fileEngines = { - "bp4", "bp3", "hdf5", "file"}; + constexpr char const *alwaysSupportsUpfrontParsing[] = {"bp3", "hdf5"}; + constexpr char const *supportsUpfrontParsingInRandomAccessMode[] = { + "bp4", "bp5", "file", "filestream"}; + constexpr char const *nonPersistentEngines[] = { + "sst", "insitumpi", "inline", "staging", "nullcore", "ssc"}; - // step/variable-based iteration encoding requires the new schema - if (m_impl->m_iterationEncoding == IterationEncoding::variableBased) + bool supportedEngine(std::string const &engineType) { - m_impl->m_schema = ADIOS2Schema::schema_2021_02_09; + auto is_in_list = [&engineType](auto &list) { + for (auto const &e : list) + { + if (engineType == e) + { + return true; + } + } + return false; + }; + return is_in_list(alwaysSupportsUpfrontParsing) || + is_in_list(supportsUpfrontParsingInRandomAccessMode) || + is_in_list(nonPersistentEngines); } - // set engine type - bool isStreaming = false; + bool + supportsUpfrontParsing(Access access, std::string const &engineType) { - m_IO.SetEngine(m_engineType); - auto it = streamingEngines.find(m_engineType); - if (it != streamingEngines.end()) - { - isStreaming = true; - optimizeAttributesStreaming = - // Optimizing attributes in streaming mode is not needed in - // the variable-based ADIOS2 schema - schema() == SupportedSchema::s_0000_00_00 && - // Also, it should only be done when truly streaming, not - // when using a disk-based engine that behaves like a - // streaming engine (otherwise attributes might vanish) - diskStreamingEngines.find(m_engineType) == - diskStreamingEngines.end(); - streamStatus = StreamStatus::OutsideOfStep; + for (auto const &e : alwaysSupportsUpfrontParsing) + { + if (e == engineType) + { + return true; + } } - else + if (access != Access::READ_LINEAR) { - it = fileEngines.find(m_engineType); - if (it != fileEngines.end()) + for (auto const &e : supportsUpfrontParsingInRandomAccessMode) { - switch (m_mode) + if (e == engineType) { - case adios2::Mode::Read: - /* - * File engines, read mode: - * Use of steps is dictated by what is detected in the - * file being read. - */ - streamStatus = StreamStatus::Undecided; - delayOpeningTheFirstStep = true; - break; - case adios2::Mode::Write: - case adios2::Mode::Append: - /* - * File engines, write mode: - * Default for old layout is no steps. - * Default for new layout is to use steps. - */ - switch (schema()) - { - case SupportedSchema::s_0000_00_00: - streamStatus = StreamStatus::NoStream; - break; - case SupportedSchema::s_2021_02_09: - streamStatus = StreamStatus::OutsideOfStep; - break; - } - break; - default: - throw std::runtime_error("Unreachable!"); + return true; } - optimizeAttributesStreaming = false; } - else + } + return false; + } + + enum class PerstepParsing + { + Supported, + Unsupported, + Required + }; + + PerstepParsing + supportsPerstepParsing(Access access, std::string const &engineType) + { + // required in all streaming engines + for (auto const &e : nonPersistentEngines) + { + if (engineType == e) { - throw std::runtime_error( - "[ADIOS2IOHandler] Unknown engine type. Please choose " - "one out of [sst, staging, bp4, bp3, hdf5, file, " - "filestream, null]"); - // not listing unsupported engines + return PerstepParsing::Required; } } + // supported in file engines in READ_LINEAR mode + if (access != Access::READ_RANDOM_ACCESS) + { + return PerstepParsing::Supported; + } + + return PerstepParsing::Unsupported; + } + + bool nonpersistentEngine(std::string const &engineType) + { + for (auto &e : nonPersistentEngines) + { + if (e == engineType) + { + return true; + } + } + return false; + } + + bool + useStepsInWriting(SupportedSchema schema, std::string const &engineType) + { + if (engineType == "bp5") + { + /* + * BP5 does not require steps when reading, but it requires + * them when writing. + */ + return true; + } + switch (supportsPerstepParsing(Access::CREATE, engineType)) + { + case PerstepParsing::Required: + return true; + case PerstepParsing::Supported: + switch (schema) + { + case SupportedSchema::s_0000_00_00: + return false; + case SupportedSchema::s_2021_02_09: + return true; + } + break; + case PerstepParsing::Unsupported: + return false; + } + return false; // unreachable + } + } // namespace + + void BufferedActions::configure_IO_Read( + std::optional userSpecifiedUsesteps) + { + if (userSpecifiedUsesteps.has_value() && + m_impl->m_handler->m_backendAccess != Access::READ_WRITE) + { + std::cerr << "Explicitly specified `adios2.usesteps` in Read mode. " + "Usage of steps will be determined by what is found " + "in the file being read." + << std::endl; + } + + bool upfrontParsing = supportsUpfrontParsing( + m_impl->m_handler->m_backendAccess, m_engineType); + PerstepParsing perstepParsing = supportsPerstepParsing( + m_impl->m_handler->m_backendAccess, m_engineType); + + switch (m_impl->m_handler->m_backendAccess) + { + case Access::READ_LINEAR: + switch (perstepParsing) + { + case PerstepParsing::Supported: + case PerstepParsing::Required: + // all is fine, we can go forward with READ_LINEAR mode + /* + * We don't know yet if per-step parsing will be fine since the + * engine is not opened yet. + * In non-persistent (streaming) engines, per-step parsing is + * always fine and always required. + */ + streamStatus = nonpersistentEngine(m_engineType) + ? StreamStatus::OutsideOfStep + : StreamStatus::Undecided; + parsePreference = ParsePreference::PerStep; + m_IO.SetParameter("StreamReader", "On"); + break; + case PerstepParsing::Unsupported: + streamStatus = StreamStatus::NoStream; + parsePreference = ParsePreference::UpFront; + /* + * Note that in BP4 with linear access mode, we set the + * StreamReader option, disabling upfrontParsing capability. + * So, this branch is only taken by niche engines, such as + * BP3 or HDF5, or by BP5 with old ADIOS2 schema and normal read + * mode. Need to fall back to random access parsing. + */ +#if HAS_ADIOS_2_8 + m_mode = adios2::Mode::ReadRandomAccess; +#endif + break; + } + break; + case Access::READ_ONLY: + case Access::READ_WRITE: + /* + * Prefer up-front parsing, but try to fallback to per-step parsing + * if possible. + */ + if (upfrontParsing == nonpersistentEngine(m_engineType)) + { + throw error::Internal( + "Internal control flow error: With access types " + "READ_ONLY/READ_WRITE, support for upfront parsing is " + "equivalent to the chosen engine being file-based."); + } + if (upfrontParsing) + { + streamStatus = StreamStatus::NoStream; + parsePreference = ParsePreference::UpFront; + } + else + { + /* + * Scenario: A step-only workflow was used (i.e. a streaming + * engine), but Access::READ_ONLY was specified. + * Fall back to streaming read mode. + */ + m_mode = adios2::Mode::Read; + parsePreference = ParsePreference::PerStep; + streamStatus = StreamStatus::OutsideOfStep; + } + break; + default: + VERIFY_ALWAYS( + access::writeOnly(m_impl->m_handler->m_backendAccess), + "Internal control flow error: Must set parse preference for " + "any read mode."); + } + } + + void BufferedActions::configure_IO_Write( + std::optional userSpecifiedUsesteps) + { + optimizeAttributesStreaming = + // Optimizing attributes in streaming mode is not needed in + // the variable-based ADIOS2 schema + schema() == SupportedSchema::s_0000_00_00 && + // Also, it should only be done when truly streaming, not + // when using a disk-based engine that behaves like a + // streaming engine (otherwise attributes might vanish) + nonpersistentEngine(m_engineType); + + bool useSteps = useStepsInWriting(schema(), m_engineType); + if (userSpecifiedUsesteps.has_value()) + { + useSteps = userSpecifiedUsesteps.value(); + if (!useSteps && nonpersistentEngine(m_engineType)) + { + throw error::WrongAPIUsage( + "Cannot switch off IO steps for non-persistent stream " + "engines in ADIOS2."); + } + } + + streamStatus = + useSteps ? StreamStatus::OutsideOfStep : StreamStatus::NoStream; + } + + void BufferedActions::configure_IO(ADIOS2IOHandlerImpl &impl) + { + // step/variable-based iteration encoding requires the new schema + // but new schema is available only in ADIOS2 >= v2.8 + // use old schema to support at least one single iteration otherwise + if (!m_impl->m_schema.has_value()) + { + switch (m_impl->m_iterationEncoding) + { + case IterationEncoding::variableBased: + m_impl->m_schema = ADIOS2Schema::schema_2021_02_09; + break; + case IterationEncoding::groupBased: + case IterationEncoding::fileBased: + m_impl->m_schema = ADIOS2Schema::schema_0000_00_00; + break; + } + } + + // set engine type + { + m_IO.SetEngine(m_engineType); + } + + if (!supportedEngine(m_engineType)) + { + std::stringstream sstream; + sstream + << "User-selected ADIOS2 engine '" << m_engineType + << "' is not recognized by the openPMD-api. Select one of: '"; + bool first_entry = true; + auto add_entries = [&first_entry, &sstream](auto &list) { + for (auto const &e : list) + { + if (first_entry) + { + sstream << e; + first_entry = false; + } + else + { + sstream << ", " << e; + } + } + }; + add_entries(alwaysSupportsUpfrontParsing); + add_entries(supportsUpfrontParsingInRandomAccessMode); + add_entries(nonPersistentEngines); + sstream << "'." << std::endl; + throw error::WrongAPIUsage(sstream.str()); } // set engine parameters std::set alreadyConfigured; + std::optional userSpecifiedUsesteps; auto engineConfig = impl.config(ADIOS2Defaults::str_engine); if (!engineConfig.json().is_null()) { @@ -2552,14 +2814,8 @@ namespace detail if (!_useAdiosSteps.json().is_null() && m_mode != adios2::Mode::Read) { - bool tmp = _useAdiosSteps.json(); - if (isStreaming && !bool(tmp)) - { - throw std::runtime_error( - "Cannot switch off steps for streaming engines."); - } - streamStatus = bool(tmp) ? StreamStatus::OutsideOfStep - : StreamStatus::NoStream; + userSpecifiedUsesteps = + std::make_optional(_useAdiosSteps.json().get()); } if (engineConfig.json().contains(ADIOS2Defaults::str_flushtarget)) @@ -2598,6 +2854,36 @@ namespace detail } } } + + switch (m_impl->m_handler->m_backendAccess) + { + case Access::READ_LINEAR: + case Access::READ_ONLY: + configure_IO_Read(userSpecifiedUsesteps); + break; + case Access::READ_WRITE: + if ( +#if HAS_ADIOS_2_8 + m_mode == adios2::Mode::Read || + m_mode == adios2::Mode::ReadRandomAccess +#else + m_mode == adios2::Mode::Read +#endif + ) + { + configure_IO_Read(userSpecifiedUsesteps); + } + else + { + configure_IO_Write(userSpecifiedUsesteps); + } + break; + case Access::APPEND: + case Access::CREATE: + configure_IO_Write(userSpecifiedUsesteps); + break; + } + auto notYetConfigured = [&alreadyConfigured](std::string const ¶m) { auto it = alreadyConfigured.find( auxiliary::lowerCase(std::string(param))); @@ -2729,48 +3015,80 @@ namespace detail // usesSteps attribute only written upon ::advance() // this makes sure that the attribute is only put in case // the streaming API was used. - m_IO.DefineAttribute( - ADIOS2Defaults::str_adios2Schema, m_impl->m_schema); m_engine = std::make_optional( adios2::Engine(m_IO.Open(m_file, tempMode))); break; } +#if HAS_ADIOS_2_8 + case adios2::Mode::ReadRandomAccess: +#endif case adios2::Mode::Read: { m_engine = std::make_optional( adios2::Engine(m_IO.Open(m_file, m_mode))); - // decide attribute layout - // in streaming mode, this needs to be done after opening - // a step - // in file-based mode, we do it before - auto layoutVersion = [IO{m_IO}]() mutable { - auto attr = IO.InquireAttribute( + /* + * First round: decide attribute layout. + * This MUST occur before the `switch(streamStatus)` construct + * since the streamStatus might be changed after taking a look + * at the used schema. + */ + bool openedANewStep = false; + { + if (!supportsUpfrontParsing( + m_impl->m_handler->m_backendAccess, m_engineType)) + { + /* + * In BP5 with Linear read mode, we now need to + * tentatively open the first IO step. + * Otherwise we don't see the schema attribute. + * This branch is also taken by Streaming engines. + */ + if (m_engine->BeginStep() != adios2::StepStatus::OK) + { + throw std::runtime_error( + "[ADIOS2] Unexpected step status when " + "opening file/stream."); + } + openedANewStep = true; + } + auto attr = m_IO.InquireAttribute( ADIOS2Defaults::str_adios2Schema); if (!attr) { - return ADIOS2Schema::schema_0000_00_00; + m_impl->m_schema = ADIOS2Schema::schema_0000_00_00; } else { - return attr.Data()[0]; + m_impl->m_schema = attr.Data()[0]; } }; - // decide streaming mode + + /* + * Second round: Decide the streamStatus. + */ switch (streamStatus) { case StreamStatus::Undecided: { - m_impl->m_schema = layoutVersion(); auto attr = m_IO.InquireAttribute( ADIOS2Defaults::str_usesstepsAttribute); if (attr && attr.Data()[0] == 1) { - if (delayOpeningTheFirstStep) + if (parsePreference == ParsePreference::UpFront) { + if (openedANewStep) + { + throw error::Internal( + "Logic error in ADIOS2 backend! No need to " + "indiscriminately open a step before doing " + "anything in an engine that supports " + "up-front parsing."); + } streamStatus = StreamStatus::Parsing; } else { - if (m_engine.value().BeginStep() != - adios2::StepStatus::OK) + if (!openedANewStep && + m_engine.value().BeginStep() != + adios2::StepStatus::OK) { throw std::runtime_error( "[ADIOS2] Unexpected step status when " @@ -2781,23 +3099,37 @@ namespace detail } else { + /* + * If openedANewStep is true, then the file consists + * of one large step, we just leave it open. + */ streamStatus = StreamStatus::NoStream; } break; } + case StreamStatus::NoStream: + // using random-access mode + case StreamStatus::DuringStep: + // IO step might have sneakily been opened + // by setLayoutVersion(), because otherwise we don't see + // the schema attribute + break; case StreamStatus::OutsideOfStep: - if (m_engine.value().BeginStep() != adios2::StepStatus::OK) + if (openedANewStep) + { + streamStatus = StreamStatus::DuringStep; + } + else { - throw std::runtime_error( - "[ADIOS2] Unexpected step status when " - "opening file/stream."); + throw error::Internal( + "Control flow error: Step should have been opened " + "before this point."); } - m_impl->m_schema = layoutVersion(); - streamStatus = StreamStatus::DuringStep; break; default: throw std::runtime_error("[ADIOS2] Control flow error!"); } + if (attributeLayout() == AttributeLayout::ByAdiosVariables) { preloadAttributes.preloadAttributes(m_IO, m_engine.value()); @@ -2944,6 +3276,13 @@ namespace detail ba->run(*this); } + if (!initializedDefaults) + { + m_IO.DefineAttribute( + ADIOS2Defaults::str_adios2Schema, m_impl->m_schema.value()); + initializedDefaults = true; + } + if (writeAttributes) { for (auto &pair : m_attributeWrites) @@ -2952,7 +3291,12 @@ namespace detail } } +#if HAS_ADIOS_2_8 + if (this->m_mode == adios2::Mode::Read || + this->m_mode == adios2::Mode::ReadRandomAccess) +#else if (this->m_mode == adios2::Mode::Read) +#endif { level = FlushLevel::UserFlush; } @@ -3052,6 +3396,9 @@ namespace detail decideFlushAPICall(eng); break; case adios2::Mode::Read: +#if HAS_ADIOS_2_8 + case adios2::Mode::ReadRandomAccess: +#endif eng.PerformGets(); break; default: @@ -3074,8 +3421,14 @@ namespace detail // sic! no else if (streamStatus == StreamStatus::NoStream) { - m_IO.DefineAttribute( - ADIOS2Defaults::str_usesstepsAttribute, 0); + if ((m_mode == adios2::Mode::Write || + m_mode == adios2::Mode::Append) && + !m_IO.InquireAttribute( + ADIOS2Defaults::str_usesstepsAttribute)) + { + m_IO.DefineAttribute( + ADIOS2Defaults::str_usesstepsAttribute, 0); + } flush( ADIOS2FlushParams{FlushLevel::UserFlush}, /* writeAttributes = */ false); @@ -3090,7 +3443,10 @@ namespace detail * The usessteps tag should only be set when the Series is *logically* * using steps. */ - if (calledExplicitly) + if (calledExplicitly && + (m_mode == adios2::Mode::Write || m_mode == adios2::Mode::Append) && + !m_IO.InquireAttribute( + ADIOS2Defaults::str_usesstepsAttribute)) { m_IO.DefineAttribute( ADIOS2Defaults::str_usesstepsAttribute, 1); diff --git a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp index f4c0d7ec45..6b1630e854 100644 --- a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp @@ -412,7 +412,7 @@ template void CommonADIOS1IOHandlerImpl::createFile( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[ADIOS1] Creating a file in read-only mode is not possible."); @@ -470,7 +470,7 @@ template void CommonADIOS1IOHandlerImpl::createPath( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[ADIOS1] Creating a path in a file opened as read only is not " "possible."); @@ -534,7 +534,7 @@ template void CommonADIOS1IOHandlerImpl::createDataset( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[ADIOS1] Creating a dataset in a file opened as read only is not " "possible."); @@ -543,9 +543,7 @@ void CommonADIOS1IOHandlerImpl::createDataset( { /* ADIOS variable definitions require the file to be (re-)opened to take * effect/not cause errors */ - auto res = m_filePaths.find(writable); - if (res == m_filePaths.end()) - res = m_filePaths.find(writable->parent); + auto res = m_filePaths.find(writable->parent); int64_t group = m_groups[res->second]; @@ -664,7 +662,7 @@ void CommonADIOS1IOHandlerImpl::extendDataset( template void CommonADIOS1IOHandlerImpl::openFile( - Writable *writable, Parameter const ¶meters) + Writable *writable, Parameter ¶meters) { if (!auxiliary::directory_exists(m_handler->directory)) error::throwReadError( @@ -739,7 +737,7 @@ void CommonADIOS1IOHandlerImpl::closeFile( if (myGroup != m_groups.end()) { auto attributeWrites = m_attributeWrites.find(myGroup->second); - if (this->m_handler->m_backendAccess != Access::READ_ONLY && + if (access::write(this->m_handler->m_backendAccess) && attributeWrites != m_attributeWrites.end()) { for (auto &att : attributeWrites->second) @@ -849,9 +847,7 @@ void CommonADIOS1IOHandlerImpl::openDataset( Writable *writable, Parameter ¶meters) { ADIOS_FILE *f; - auto res = m_filePaths.find(writable); - if (res == m_filePaths.end()) - res = m_filePaths.find(writable->parent); + auto res = m_filePaths.find(writable->parent); f = m_openReadFileHandles.at(res->second); /* Sanitize name */ @@ -1017,7 +1013,7 @@ template void CommonADIOS1IOHandlerImpl::deleteFile( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[ADIOS1] Deleting a file opened as read only is not possible."); @@ -1103,7 +1099,7 @@ template void CommonADIOS1IOHandlerImpl::writeDataset( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[ADIOS1] Writing into a dataset in a file opened as read-only is " "not possible."); @@ -1149,7 +1145,7 @@ void CommonADIOS1IOHandlerImpl::writeAttribute( // cannot do this return; } - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[ADIOS1] Writing an attribute in a file opened as read only is " "not possible."); @@ -1999,6 +1995,13 @@ void CommonADIOS1IOHandlerImpl::listAttributes( } } +template +void CommonADIOS1IOHandlerImpl::deregister( + Writable *writable, Parameter const &) +{ + m_filePaths.erase(writable); +} + template void CommonADIOS1IOHandlerImpl::initJson(json::TracingJSON config) { diff --git a/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp b/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp index fe628e48aa..396a9106d8 100644 --- a/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp @@ -123,49 +123,55 @@ std::future ParallelADIOS1IOHandlerImpl::flush() case O::CREATE_FILE: createFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CHECK_FILE: checkFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CREATE_PATH: createPath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::OPEN_PATH: openPath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CREATE_DATASET: createDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::WRITE_ATT: writeAttribute( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::OPEN_FILE: openFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::KEEP_SYNCHRONOUS: keepSynchronous( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( + i.parameter.get())); + break; + case O::DEREGISTER: + deregister( + i.writable, + deref_dynamic_cast>( i.parameter.get())); break; default: @@ -202,19 +208,19 @@ std::future ParallelADIOS1IOHandlerImpl::flush() case O::EXTEND_DATASET: extendDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CLOSE_PATH: closePath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::OPEN_DATASET: openDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::CLOSE_FILE: @@ -226,79 +232,79 @@ std::future ParallelADIOS1IOHandlerImpl::flush() case O::DELETE_FILE: deleteFile( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::DELETE_PATH: deletePath( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::DELETE_DATASET: deleteDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::DELETE_ATT: deleteAttribute( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::WRITE_DATASET: writeDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::READ_DATASET: readDataset( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::GET_BUFFER_VIEW: getBufferView( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::READ_ATT: readAttribute( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::LIST_PATHS: listPaths( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::LIST_DATASETS: listDatasets( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::LIST_ATTS: listAttributes( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::ADVANCE: advance( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; case O::AVAILABLE_CHUNKS: availableChunks( i.writable, - deref_dynamic_cast >( + deref_dynamic_cast>( i.parameter.get())); break; default: @@ -384,6 +390,7 @@ void ParallelADIOS1IOHandler::enqueue(IOTask const &i) case Operation::OPEN_FILE: case Operation::WRITE_ATT: case Operation::KEEP_SYNCHRONOUS: + case Operation::DEREGISTER: m_setup.push(i); return; default: diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index 1516571069..499b2e6e69 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -220,7 +220,7 @@ HDF5IOHandlerImpl::~HDF5IOHandlerImpl() void HDF5IOHandlerImpl::createFile( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Creating a file in read-only mode is not possible."); @@ -258,6 +258,7 @@ void HDF5IOHandlerImpl::createFile( flags = H5F_ACC_EXCL; break; case Access::READ_ONLY: + case Access::READ_LINEAR: // condition has been checked above throw std::runtime_error( "[HDF5] Control flow error in createFile backend access mode."); @@ -322,7 +323,7 @@ void HDF5IOHandlerImpl::checkFile( void HDF5IOHandlerImpl::createPath( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Creating a path in a file opened as read only is not " "possible."); @@ -413,7 +414,7 @@ void HDF5IOHandlerImpl::createPath( void HDF5IOHandlerImpl::createDataset( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Creating a dataset in a file opened as read only is not " "possible."); @@ -652,7 +653,7 @@ void HDF5IOHandlerImpl::createDataset( void HDF5IOHandlerImpl::extendDataset( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Extending a dataset in a file opened as read only is not " "possible."); @@ -771,7 +772,7 @@ void HDF5IOHandlerImpl::availableChunks( } void HDF5IOHandlerImpl::openFile( - Writable *writable, Parameter const ¶meters) + Writable *writable, Parameter ¶meters) { if (!auxiliary::directory_exists(m_handler->directory)) throw error::ReadError( @@ -796,19 +797,16 @@ void HDF5IOHandlerImpl::openFile( unsigned flags; Access at = m_handler->m_backendAccess; - if (at == Access::READ_ONLY) + if (access::readOnly(at)) flags = H5F_ACC_RDONLY; /* * Within the HDF5 backend, APPEND and READ_WRITE mode are * equivalent, but the openPMD frontend exposes no reading * functionality in APPEND mode. */ - else if ( - at == Access::READ_WRITE || at == Access::CREATE || - at == Access::APPEND) - flags = H5F_ACC_RDWR; else - throw std::runtime_error("[HDF5] Unknown file Access"); + flags = H5F_ACC_RDWR; + hid_t file_id; file_id = H5Fopen(name.c_str(), flags, m_fileAccessProperty); if (file_id < 0) @@ -1066,7 +1064,7 @@ void HDF5IOHandlerImpl::openDataset( void HDF5IOHandlerImpl::deleteFile( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Deleting a file opened as read only is not possible."); @@ -1100,7 +1098,7 @@ void HDF5IOHandlerImpl::deleteFile( void HDF5IOHandlerImpl::deletePath( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Deleting a path in a file opened as read only is not " "possible."); @@ -1152,7 +1150,7 @@ void HDF5IOHandlerImpl::deletePath( void HDF5IOHandlerImpl::deleteDataset( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Deleting a path in a file opened as read only is not " "possible."); @@ -1204,7 +1202,7 @@ void HDF5IOHandlerImpl::deleteDataset( void HDF5IOHandlerImpl::deleteAttribute( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Deleting an attribute in a file opened as read only is not " "possible."); @@ -1239,7 +1237,7 @@ void HDF5IOHandlerImpl::deleteAttribute( void HDF5IOHandlerImpl::writeDataset( Writable *writable, Parameter const ¶meters) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Writing into a dataset in a file opened as read only is " "not possible."); @@ -1366,7 +1364,7 @@ void HDF5IOHandlerImpl::writeAttribute( // cannot do this return; } - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) throw std::runtime_error( "[HDF5] Writing an attribute in a file opened as read only is not " "possible."); @@ -2486,6 +2484,12 @@ void HDF5IOHandlerImpl::listAttributes( "listing"); } +void HDF5IOHandlerImpl::deregister( + Writable *writable, Parameter const &) +{ + m_fileNames.erase(writable); +} + std::optional HDF5IOHandlerImpl::getFile(Writable *writable) { diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 8f25b56584..13f20c193d 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -91,7 +91,7 @@ void JSONIOHandlerImpl::createFile( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Creating a file in read-only mode is not possible."); if (!writable->written) @@ -199,7 +199,7 @@ void JSONIOHandlerImpl::createPath( void JSONIOHandlerImpl::createDataset( Writable *writable, Parameter const ¶meter) { - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) { throw std::runtime_error( "[JSON] Creating a dataset in a file opened as read only is not " @@ -267,7 +267,7 @@ void JSONIOHandlerImpl::extendDataset( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Cannot extend a dataset in read-only mode.") setAndGetFilePosition(writable); refreshFileFromParent(writable); @@ -526,7 +526,7 @@ void JSONIOHandlerImpl::availableChunks( } void JSONIOHandlerImpl::openFile( - Writable *writable, Parameter const ¶meter) + Writable *writable, Parameter ¶meter) { if (!auxiliary::directory_exists(m_handler->directory)) { @@ -558,6 +558,7 @@ void JSONIOHandlerImpl::closeFile( if (fileIterator != m_files.end()) { putJsonContents(fileIterator->second); + m_dirty.erase(fileIterator->second); // do not invalidate the file // it still exists, it is just not open m_files.erase(fileIterator); @@ -615,7 +616,7 @@ void JSONIOHandlerImpl::deleteFile( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Cannot delete files in read-only mode") if (!writable->written) @@ -646,7 +647,7 @@ void JSONIOHandlerImpl::deletePath( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Cannot delete paths in read-only mode") if (!writable->written) @@ -724,7 +725,7 @@ void JSONIOHandlerImpl::deleteDataset( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Cannot delete datasets in read-only mode") if (!writable->written) @@ -766,7 +767,7 @@ void JSONIOHandlerImpl::deleteAttribute( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Cannot delete attributes in read-only mode") if (!writable->written) { @@ -783,7 +784,7 @@ void JSONIOHandlerImpl::writeDataset( Writable *writable, Parameter const ¶meters) { VERIFY_ALWAYS( - m_handler->m_backendAccess != Access::READ_ONLY, + access::write(m_handler->m_backendAccess), "[JSON] Cannot write data in read-only mode."); auto pos = setAndGetFilePosition(writable); @@ -806,7 +807,7 @@ void JSONIOHandlerImpl::writeAttribute( // cannot do this return; } - if (m_handler->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(m_handler->m_backendAccess)) { throw std::runtime_error( "[JSON] Creating a dataset in a file opened as read only is not " @@ -940,6 +941,12 @@ void JSONIOHandlerImpl::listAttributes( } } +void JSONIOHandlerImpl::deregister( + Writable *writable, Parameter const &) +{ + m_files.erase(writable); +} + std::shared_ptr JSONIOHandlerImpl::getFilehandle(File fileName, Access access) { @@ -948,11 +955,8 @@ JSONIOHandlerImpl::getFilehandle(File fileName, Access access) "[JSON] Tried opening a file that has been overwritten or deleted.") auto path = fullPath(std::move(fileName)); auto fs = std::make_shared(); - switch (access) + if (access::write(access)) { - case Access::CREATE: - case Access::READ_WRITE: - case Access::APPEND: /* * Always truncate when writing, we alway write entire JSON * datasets, never partial ones. @@ -961,10 +965,10 @@ JSONIOHandlerImpl::getFilehandle(File fileName, Access access) * functionality in APPEND mode. */ fs->open(path, std::ios_base::out | std::ios_base::trunc); - break; - case Access::READ_ONLY: + } + else + { fs->open(path, std::ios_base::in); - break; } VERIFY(fs->good(), "[JSON] Failed opening a file '" + path + "'"); return fs; diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 54d23589f7..26ab93940e 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -306,18 +306,15 @@ void Iteration::flushVariableBased( void Iteration::flush(internal::FlushParams const &flushParams) { - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: { for (auto &m : meshes) m.second.flush(m.first, flushParams); for (auto &species : particles) species.second.flush(species.first, flushParams); - break; } - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + else + { /* Find the root point [Series] of this file, * meshesPath and particlesPath are stored there */ Series s = retrieveSeries(); @@ -355,8 +352,6 @@ void Iteration::flush(internal::FlushParams const &flushParams) } flushAttributes(flushParams); - break; - } } } @@ -686,8 +681,10 @@ auto Iteration::beginStep(bool reread) -> BeginStepStatus } auto Iteration::beginStep( - std::optional thisObject, Series &series, bool reread) - -> BeginStepStatus + std::optional thisObject, + Series &series, + bool reread, + std::set const &ignoreIterations) -> BeginStepStatus { BeginStepStatus res; using IE = IterationEncoding; @@ -743,36 +740,26 @@ auto Iteration::beginStep( if (reread && status != AdvanceStatus::RANDOMACCESS && (series.iterationEncoding() == IE::groupBased || series.iterationEncoding() == IE::variableBased) && - (IOHandl->m_frontendAccess == Access::READ_ONLY || - IOHandl->m_frontendAccess == Access::READ_WRITE)) + access::read(series.IOHandler()->m_frontendAccess)) { - switch (IOHandl->m_frontendAccess) + bool previous = series.iterations.written(); + series.iterations.written() = false; + auto oldStatus = IOHandl->m_seriesStatus; + IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing; + try { - case Access::READ_ONLY: - case Access::READ_WRITE: { - bool previous = series.iterations.written(); - series.iterations.written() = false; - auto oldStatus = IOHandl->m_seriesStatus; - IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing; - try - { - res.iterationsInOpenedStep = series.readGorVBased( - /* do_always_throw_errors = */ true, /* init = */ false); - } - catch (...) - { - IOHandl->m_seriesStatus = oldStatus; - throw; - } - IOHandl->m_seriesStatus = oldStatus; - series.iterations.written() = previous; - break; + res.iterationsInOpenedStep = series.readGorVBased( + /* do_always_throw_errors = */ true, + /* init = */ false, + ignoreIterations); } - case Access::CREATE: - case Access::APPEND: - // no re-reading necessary - break; + catch (...) + { + IOHandl->m_seriesStatus = oldStatus; + throw; } + IOHandl->m_seriesStatus = oldStatus; + series.iterations.written() = previous; } res.stepStatus = status; @@ -871,10 +858,8 @@ void Iteration::linkHierarchy(Writable &w) void Iteration::runDeferredParseAccess() { - switch (IOHandler()->m_frontendAccess) + if (access::read(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: - case Access::READ_WRITE: { auto &it = get(); if (!it.m_deferredParseAccess.has_value()) { @@ -906,12 +891,6 @@ void Iteration::runDeferredParseAccess() // reset this thing it.m_deferredParseAccess = std::optional(); IOHandler()->m_seriesStatus = oldStatus; - break; - } - case Access::CREATE: - case Access::APPEND: - // no parsing in those modes - return; } } diff --git a/src/Mesh.cpp b/src/Mesh.cpp index 5b91dd26dc..7454ff005e 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -217,16 +217,13 @@ template Mesh &Mesh::setTimeOffset(float); void Mesh::flush_impl( std::string const &name, internal::FlushParams const &flushParams) { - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: { for (auto &comp : *this) comp.second.flush(comp.first, flushParams); - break; } - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + else + { if (!written()) { if (scalar()) @@ -268,8 +265,6 @@ void Mesh::flush_impl( } } flushAttributes(flushParams); - break; - } } } diff --git a/src/ParticleSpecies.cpp b/src/ParticleSpecies.cpp index 998c6c6317..eb502ae371 100644 --- a/src/ParticleSpecies.cpp +++ b/src/ParticleSpecies.cpp @@ -161,18 +161,15 @@ namespace void ParticleSpecies::flush( std::string const &path, internal::FlushParams const &flushParams) { - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: { for (auto &record : *this) record.second.flush(record.first, flushParams); for (auto &patch : particlePatches) patch.second.flush(patch.first, flushParams); - break; } - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + else + { auto it = find("position"); if (it != end()) it->second.setUnitDimension({{UnitDimension::L, 1}}); @@ -191,8 +188,6 @@ void ParticleSpecies::flush( for (auto &patch : particlePatches) patch.second.flush(patch.first, flushParams); } - break; - } } } diff --git a/src/ReadIterations.cpp b/src/ReadIterations.cpp index 679146e896..fdc64c7845 100644 --- a/src/ReadIterations.cpp +++ b/src/ReadIterations.cpp @@ -20,6 +20,7 @@ */ #include "openPMD/ReadIterations.hpp" +#include "openPMD/Error.hpp" #include "openPMD/Series.hpp" @@ -28,11 +29,94 @@ namespace openPMD { -SeriesIterator::SeriesIterator() : m_series() -{} +namespace +{ + bool reread(std::optional parsePreference) + { + if (parsePreference.has_value()) + { + using PP = Parameter::ParsePreference; + + switch (parsePreference.value()) + { + case PP::PerStep: + return true; + case PP::UpFront: + return false; + } + return false; + } + else + { + throw error::Internal( + "Group/Variable-based encoding: Parse preference must be set."); + } + } +} // namespace + +SeriesIterator::SeriesIterator() = default; + +void SeriesIterator::initSeriesInLinearReadMode() +{ + auto &data = *m_data; + auto &series = *data.series; + series.IOHandler()->m_seriesStatus = internal::SeriesStatus::Parsing; + try + { + switch (series.iterationEncoding()) + { + using IE = IterationEncoding; + case IE::fileBased: + series.readFileBased(); + break; + case IE::groupBased: + case IE::variableBased: { + Parameter fOpen; + fOpen.name = series.get().m_name; + fOpen.encoding = series.iterationEncoding(); + series.IOHandler()->enqueue(IOTask(&series, fOpen)); + series.IOHandler()->flush(internal::defaultFlushParams); + using PP = Parameter::ParsePreference; + switch (*fOpen.out_parsePreference) + { + case PP::PerStep: + series.advance(AdvanceMode::BEGINSTEP); + series.readGorVBased( + /* do_always_throw_errors = */ false, /* init = */ true); + break; + case PP::UpFront: + series.readGorVBased( + /* do_always_throw_errors = */ false, /* init = */ true); + series.advance(AdvanceMode::BEGINSTEP); + break; + } + data.parsePreference = *fOpen.out_parsePreference; + break; + } + } + } + catch (...) + { + series.IOHandler()->m_seriesStatus = internal::SeriesStatus::Default; + throw; + } + series.IOHandler()->m_seriesStatus = internal::SeriesStatus::Default; +} -SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) +SeriesIterator::SeriesIterator( + Series series_in, std::optional parsePreference) + : m_data{std::make_shared()} { + auto &data = *m_data; + data.parsePreference = std::move(parsePreference); + data.series = std::move(series_in); + auto &series = data.series.value(); + if (series.IOHandler()->m_frontendAccess == Access::READ_LINEAR && + series.iterations.empty()) + { + initSeriesInLinearReadMode(); + } + auto it = series.get().iterations.begin(); if (it == series.get().iterations.end()) { @@ -74,9 +158,9 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) openIteration(series.iterations.begin()->second); status = it->second.beginStep(/* reread = */ true); - for (auto const &pair : m_series.value().iterations) + for (auto const &pair : series.iterations) { - m_iterationsInCurrentStep.push_back(pair.first); + data.iterationsInCurrentStep.push_back(pair.first); } break; case IterationEncoding::groupBased: @@ -88,8 +172,8 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) */ Iteration::BeginStepStatus::AvailableIterations_t availableIterations; - std::tie(status, availableIterations) = - it->second.beginStep(/* reread = */ true); + std::tie(status, availableIterations) = it->second.beginStep( + /* reread = */ reread(data.parsePreference)); /* * In random-access mode, do not use the information read in the * `snapshot` attribute, instead simply go through iterations @@ -99,11 +183,11 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) if (availableIterations.has_value() && status != AdvanceStatus::RANDOMACCESS) { - m_iterationsInCurrentStep = availableIterations.value(); - if (!m_iterationsInCurrentStep.empty()) + data.iterationsInCurrentStep = availableIterations.value(); + if (!data.iterationsInCurrentStep.empty()) { - openIteration( - series.iterations.at(m_iterationsInCurrentStep.at(0))); + openIteration(series.iterations.at( + data.iterationsInCurrentStep.at(0))); } } else if (!series.iterations.empty()) @@ -112,13 +196,14 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) * Fallback implementation: Assume that each step corresponds * with an iteration in ascending order. */ - m_iterationsInCurrentStep = {series.iterations.begin()->first}; + data.iterationsInCurrentStep = { + series.iterations.begin()->first}; openIteration(series.iterations.begin()->second); } else { // this is a no-op, but let's keep it explicit - m_iterationsInCurrentStep = {}; + data.iterationsInCurrentStep = {}; } break; @@ -141,20 +226,21 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) std::optional SeriesIterator::nextIterationInStep() { + auto &data = *m_data; using ret_t = std::optional; - if (m_iterationsInCurrentStep.empty()) + if (data.iterationsInCurrentStep.empty()) { return ret_t{}; } - m_iterationsInCurrentStep.pop_front(); - if (m_iterationsInCurrentStep.empty()) + data.iterationsInCurrentStep.pop_front(); + if (data.iterationsInCurrentStep.empty()) { return ret_t{}; } - auto oldIterationIndex = m_currentIteration; - m_currentIteration = *m_iterationsInCurrentStep.begin(); - auto &series = m_series.value(); + auto oldIterationIndex = data.currentIteration; + data.currentIteration = *data.iterationsInCurrentStep.begin(); + auto &series = data.series.value(); switch (series.iterationEncoding()) { @@ -171,11 +257,11 @@ std::optional SeriesIterator::nextIterationInStep() try { - series.iterations[m_currentIteration].open(); + series.iterations[data.currentIteration].open(); } catch (error::ReadError const &err) { - std::cerr << "Cannot read iteration '" << m_currentIteration + std::cerr << "Cannot read iteration '" << data.currentIteration << "' and will skip it due to read error:\n" << err.what() << std::endl; return nextIterationInStep(); @@ -189,12 +275,12 @@ std::optional SeriesIterator::nextIterationInStep() /* * Errors in here might appear due to deferred iteration parsing. */ - series.iterations[m_currentIteration].open(); + series.iterations[data.currentIteration].open(); /* * Errors in here might appear due to reparsing after opening a * new step. */ - series.iterations[m_currentIteration].beginStep( + series.iterations[data.currentIteration].beginStep( /* reread = */ true); } catch (error::ReadError const &err) @@ -212,28 +298,32 @@ std::optional SeriesIterator::nextIterationInStep() std::optional SeriesIterator::nextStep(size_t recursion_depth) { + auto &data = *m_data; // since we are in group-based iteration layout, it does not // matter which iteration we begin a step upon AdvanceStatus status{}; Iteration::BeginStepStatus::AvailableIterations_t availableIterations; try { - std::tie(status, availableIterations) = - Iteration::beginStep({}, *m_series, /* reread = */ true); + std::tie(status, availableIterations) = Iteration::beginStep( + {}, + *data.series, + /* reread = */ reread(data.parsePreference), + data.ignoreIterations); } catch (error::ReadError const &err) { std::cerr << "[SeriesIterator] Cannot read iteration due to error " "below, will skip it.\n" << err.what() << std::endl; - m_series->advance(AdvanceMode::ENDSTEP); + data.series->advance(AdvanceMode::ENDSTEP); return nextStep(recursion_depth + 1); } if (availableIterations.has_value() && status != AdvanceStatus::RANDOMACCESS) { - m_iterationsInCurrentStep = availableIterations.value(); + data.iterationsInCurrentStep = availableIterations.value(); } else { @@ -241,8 +331,8 @@ std::optional SeriesIterator::nextStep(size_t recursion_depth) * Fallback implementation: Assume that each step corresponds * with an iteration in ascending order. */ - auto &series = m_series.value(); - auto it = series.iterations.find(m_currentIteration); + auto &series = data.series.value(); + auto it = series.iterations.find(data.currentIteration); auto itEnd = series.iterations.end(); if (it == itEnd) { @@ -261,8 +351,7 @@ std::optional SeriesIterator::nextStep(size_t recursion_depth) * will skip such iterations and hope to find something in a * later IO step. No need to finish right now. */ - m_iterationsInCurrentStep = {}; - m_series->advance(AdvanceMode::ENDSTEP); + data.iterationsInCurrentStep = {}; } } else @@ -289,13 +378,12 @@ std::optional SeriesIterator::nextStep(size_t recursion_depth) * hope to find something in a later IO step. No need to * finish right now. */ - m_iterationsInCurrentStep = {}; - m_series->advance(AdvanceMode::ENDSTEP); + data.iterationsInCurrentStep = {}; } } else { - m_iterationsInCurrentStep = {it->first}; + data.iterationsInCurrentStep = {it->first}; } } } @@ -311,15 +399,16 @@ std::optional SeriesIterator::nextStep(size_t recursion_depth) std::optional SeriesIterator::loopBody() { - Series &series = m_series.value(); + auto &data = *m_data; + Series &series = data.series.value(); auto &iterations = series.iterations; /* * Might not be present because parsing might have failed in previous step */ - if (iterations.contains(m_currentIteration)) + if (iterations.contains(data.currentIteration)) { - auto ¤tIteration = iterations[m_currentIteration]; + auto ¤tIteration = iterations[data.currentIteration]; if (!currentIteration.closed()) { currentIteration.close(); @@ -327,7 +416,7 @@ std::optional SeriesIterator::loopBody() } auto guardReturn = - [&iterations]( + [&series, &iterations]( auto const &option) -> std::optional { if (!option.has_value() || *option.value() == end()) { @@ -336,33 +425,49 @@ std::optional SeriesIterator::loopBody() auto currentIterationIndex = option.value()->peekCurrentIteration(); if (!currentIterationIndex.has_value()) { + series.advance(AdvanceMode::ENDSTEP); return std::nullopt; } - auto iteration = iterations.at(currentIterationIndex.value()); - if (iteration.get().m_closed != internal::CloseStatus::ClosedInBackend) + // If we had the iteration already, then it's either not there at all + // (because old iterations are deleted in linear access mode), + // or it's still there but closed in random-access mode + auto index = currentIterationIndex.value(); + + if (iterations.contains(index)) { - try + auto iteration = iterations.at(index); + if (iteration.get().m_closed != + internal::CloseStatus::ClosedInBackend) { - iteration.open(); - option.value()->setCurrentIteration(); - return option; + try + { + iterations.at(index).open(); + option.value()->setCurrentIteration(); + return option; + } + catch (error::ReadError const &err) + { + std::cerr << "Cannot read iteration '" + << currentIterationIndex.value() + << "' and will skip it due to read error:\n" + << err.what() << std::endl; + option.value()->deactivateDeadIteration( + currentIterationIndex.value()); + return std::nullopt; + } } - catch (error::ReadError const &err) + else { - std::cerr << "Cannot read iteration '" - << currentIterationIndex.value() - << "' and will skip it due to read error:\n" - << err.what() << std::endl; - option.value()->deactivateDeadIteration( - currentIterationIndex.value()); - return std::nullopt; + // we had this iteration already, skip it + iteration.endStep(); + return std::nullopt; // empty, go into next iteration } } else { // we had this iteration already, skip it - iteration.endStep(); - return std::nullopt; // empty, go into next iteration + series.advance(AdvanceMode::ENDSTEP); + return std::nullopt; } }; @@ -390,35 +495,38 @@ std::optional SeriesIterator::loopBody() void SeriesIterator::deactivateDeadIteration(iteration_index_t index) { - switch (m_series->iterationEncoding()) + auto &data = *m_data; + switch (data.series->iterationEncoding()) { case IterationEncoding::fileBased: { Parameter param; - m_series->IOHandler()->enqueue( - IOTask(&m_series->iterations[index], std::move(param))); - m_series->IOHandler()->flush({FlushLevel::UserFlush}); + data.series->IOHandler()->enqueue( + IOTask(&data.series->iterations[index], std::move(param))); + data.series->IOHandler()->flush({FlushLevel::UserFlush}); } break; case IterationEncoding::variableBased: case IterationEncoding::groupBased: { Parameter param; param.mode = AdvanceMode::ENDSTEP; - m_series->IOHandler()->enqueue( - IOTask(&m_series->iterations[index], std::move(param))); - m_series->IOHandler()->flush({FlushLevel::UserFlush}); + data.series->IOHandler()->enqueue( + IOTask(&data.series->iterations[index], std::move(param))); + data.series->IOHandler()->flush({FlushLevel::UserFlush}); } break; } - m_series->iterations.container().erase(index); + data.series->iterations.container().erase(index); } SeriesIterator &SeriesIterator::operator++() { - if (!m_series.has_value()) + auto &data = *m_data; + if (!data.series.has_value()) { *this = end(); return *this; } + auto oldIterationIndex = data.currentIteration; std::optional res; /* * loopBody() might return an empty option to indicate a skipped iteration. @@ -435,21 +543,45 @@ SeriesIterator &SeriesIterator::operator++() auto resvalue = res.value(); if (*resvalue != end()) { - (**resvalue).setStepStatus(StepStatus::DuringStep); + auto &series = data.series.value(); + auto index = data.currentIteration; + auto &iteration = series.iterations[index]; + iteration.setStepStatus(StepStatus::DuringStep); + + if (series.IOHandler()->m_frontendAccess == Access::READ_LINEAR) + { + /* + * Linear read mode: Any data outside the current iteration is + * inaccessible. Delete the iteration. This has two effects: + * + * 1) Avoid confusion. + * 2) Avoid memory buildup in long-running workflows with many + * iterations. + * + * @todo Also delete data in the backends upon doing this. + */ + auto &container = series.iterations.container(); + container.erase(oldIterationIndex); + data.ignoreIterations.emplace(oldIterationIndex); + } } return *resvalue; } IndexedIteration SeriesIterator::operator*() { + auto &data = *m_data; return IndexedIteration( - m_series.value().iterations[m_currentIteration], m_currentIteration); + data.series.value().iterations[data.currentIteration], + data.currentIteration); } bool SeriesIterator::operator==(SeriesIterator const &other) const { - return this->m_currentIteration == other.m_currentIteration && - this->m_series.has_value() == other.m_series.has_value(); + return (this->m_data.operator bool() && other.m_data.operator bool() && + (this->m_data->currentIteration == + other.m_data->currentIteration)) || + (!this->m_data.operator bool() && !other.m_data.operator bool()); } bool SeriesIterator::operator!=(SeriesIterator const &other) const @@ -462,12 +594,26 @@ SeriesIterator SeriesIterator::end() return SeriesIterator{}; } -ReadIterations::ReadIterations(Series series) : m_series(std::move(series)) -{} +ReadIterations::ReadIterations( + Series series, + Access access, + std::optional parsePreference) + : m_series(std::move(series)), m_parsePreference(std::move(parsePreference)) +{ + if (access == Access::READ_LINEAR) + { + // Open the iterator now already, so that metadata may already be read + alreadyOpened = iterator_t{m_series, m_parsePreference}; + } +} ReadIterations::iterator_t ReadIterations::begin() { - return iterator_t{m_series}; + if (!alreadyOpened.has_value()) + { + alreadyOpened = iterator_t{m_series, m_parsePreference}; + } + return alreadyOpened.value(); } ReadIterations::iterator_t ReadIterations::end() diff --git a/src/Record.cpp b/src/Record.cpp index 485e817e14..dc6949efdb 100644 --- a/src/Record.cpp +++ b/src/Record.cpp @@ -46,16 +46,13 @@ Record &Record::setUnitDimension(std::map const &udim) void Record::flush_impl( std::string const &name, internal::FlushParams const &flushParams) { - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: { for (auto &comp : *this) comp.second.flush(comp.first, flushParams); - break; } - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + else + { if (!written()) { if (scalar()) @@ -99,8 +96,6 @@ void Record::flush_impl( } flushAttributes(flushParams); - break; - } } } diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index b7679ca598..bf46587ee2 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -200,18 +200,16 @@ void RecordComponent::flush( rc.m_name = name; return; } - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: while (!rc.m_chunks.empty()) { IOHandler()->enqueue(rc.m_chunks.front()); rc.m_chunks.pop(); } - break; - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + } + else + { /* * This catches when a user forgets to use resetDataset. */ @@ -277,8 +275,6 @@ void RecordComponent::flush( } flushAttributes(flushParams); - break; - } } } diff --git a/src/Series.cpp b/src/Series.cpp index 5b17c24642..6fe3f53d02 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -334,10 +334,17 @@ Series &Series::setIterationFormat(std::string const &i) if (iterationEncoding() == IterationEncoding::groupBased || iterationEncoding() == IterationEncoding::variableBased) - if (basePath() != i && (openPMD() == "1.0.1" || openPMD() == "1.0.0")) + { + if (!containsAttribute("basePath")) + { + setBasePath(i); + } + else if ( + basePath() != i && (openPMD() == "1.0.1" || openPMD() == "1.0.0")) throw std::invalid_argument( "iterationFormat must not differ from basePath " + basePath() + " for group- or variableBased data"); + } setAttribute("iterationFormat", i); return *this; @@ -576,6 +583,12 @@ Given file pattern: ')END" switch (IOHandler()->m_frontendAccess) { + case Access::READ_LINEAR: + // don't parse anything here + // no data accessible before opening the first step + // setIterationEncoding(input->iterationEncoding); + series.m_iterationEncoding = input->iterationEncoding; + break; case Access::READ_ONLY: case Access::READ_WRITE: { /* Allow creation of values in Containers and setting of Attributes @@ -736,6 +749,7 @@ void Series::flushFileBased( switch (IOHandler()->m_frontendAccess) { case Access::READ_ONLY: + case Access::READ_LINEAR: for (auto it = begin; it != end; ++it) { // Phase 1 @@ -845,9 +859,8 @@ void Series::flushGorVBased( bool flushIOHandler) { auto &series = get(); - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: for (auto it = begin; it != end; ++it) { // Phase 1 @@ -880,10 +893,9 @@ void Series::flushGorVBased( IOHandler()->flush(flushParams); } } - break; - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + } + else + { if (!written()) { if (IOHandler()->m_frontendAccess == Access::APPEND) @@ -957,8 +969,6 @@ void Series::flushGorVBased( { IOHandler()->flush(flushParams); } - break; - } } } @@ -1020,7 +1030,7 @@ void Series::readFileBased() /* Frontend access type might change during Series::read() to allow * parameter modification. Backend access type stays unchanged for the * lifetime of a Series. */ - if (IOHandler()->m_backendAccess == Access::READ_ONLY) + if (access::readOnly(IOHandler()->m_backendAccess)) throw error::ReadError( error::AffectedObject::File, error::Reason::Inaccessible, @@ -1285,7 +1295,10 @@ namespace } } // namespace -auto Series::readGorVBased(bool do_always_throw_errors, bool do_init) +auto Series::readGorVBased( + bool do_always_throw_errors, + bool do_init, + std::set const &ignoreIterations) -> std::optional> { auto &series = get(); @@ -1294,6 +1307,7 @@ auto Series::readGorVBased(bool do_always_throw_errors, bool do_init) fOpen.encoding = iterationEncoding(); IOHandler()->enqueue(IOTask(this, fOpen)); IOHandler()->flush(internal::defaultFlushParams); + series.m_parsePreference = *fOpen.out_parsePreference; if (do_init) { @@ -1311,7 +1325,28 @@ auto Series::readGorVBased(bool do_always_throw_errors, bool do_init) if (encoding == "groupBased") series.m_iterationEncoding = IterationEncoding::groupBased; else if (encoding == "variableBased") + { series.m_iterationEncoding = IterationEncoding::variableBased; + if (IOHandler()->m_frontendAccess == Access::READ_ONLY) + { + std::cerr << R"( +The opened Series uses variable-based encoding, but is being accessed by +READ_ONLY mode which operates in random-access manner. +Random-access is (currently) unsupported by variable-based encoding +and some iterations may not be found by this access mode. +Consider using Access::READ_LINEAR and Series::readIterations().)" + << std::endl; + } + else if (IOHandler()->m_frontendAccess == Access::READ_WRITE) + { + throw error::WrongAPIUsage(R"( +The opened Series uses variable-based encoding, but is being accessed by +READ_WRITE mode which does not (yet) support variable-based encoding. +Please choose either Access::READ_LINEAR for reading or Access::APPEND for +creating new iterations. + )"); + } + } else if (encoding == "fileBased") { series.m_iterationEncoding = IterationEncoding::fileBased; @@ -1446,9 +1481,6 @@ auto Series::readGorVBased(bool do_always_throw_errors, bool do_init) return std::nullopt; }; - /* - * @todo in BP5, a BeginStep() might be necessary before this - */ auto currentSteps = currentSnapshot(); switch (iterationEncoding()) @@ -1462,6 +1494,10 @@ auto Series::readGorVBased(bool do_always_throw_errors, bool do_init) for (auto const &it : *pList.paths) { IterationIndex_t index = std::stoull(it); + if (ignoreIterations.find(index) != ignoreIterations.end()) + { + continue; + } if (auto err = internal::withRWAccess( IOHandler()->m_seriesStatus, [&]() { @@ -1491,10 +1527,20 @@ auto Series::readGorVBased(bool do_always_throw_errors, bool do_init) } } case IterationEncoding::variableBased: { - std::deque res = {0}; + std::deque res{}; if (currentSteps.has_value() && !currentSteps.value().empty()) { - res = {currentSteps.value().begin(), currentSteps.value().end()}; + for (auto index : currentSteps.value()) + { + if (ignoreIterations.find(index) == ignoreIterations.end()) + { + res.push_back(index); + } + } + } + else + { + res = {0}; } for (auto it : res) { @@ -1564,7 +1610,22 @@ void Series::readBase() IOHandler()->flush(internal::defaultFlushParams); if (auto val = Attribute(*aRead.resource).getOptional(); val.has_value()) + { + if ( // might have been previously initialized in READ_LINEAR access + // mode + containsAttribute("basePath") && + getAttribute("basePath").get() != val.value()) + { + throw error::ReadError( + error::AffectedObject::Attribute, + error::Reason::UnexpectedContent, + {}, + "Value for 'basePath' ('" + val.value() + + "') does not match expected value '" + + getAttribute("basePath").get() + "'."); + } setAttribute("basePath", val.value()); + } else throw error::ReadError( error::AffectedObject::Attribute, @@ -2265,7 +2326,8 @@ ReadIterations Series::readIterations() { // Use private constructor instead of copy constructor to avoid // object slicing - return {this->m_series}; + return { + this->m_series, IOHandler()->m_frontendAccess, get().m_parsePreference}; } WriteIterations Series::writeIterations() diff --git a/src/backend/PatchRecordComponent.cpp b/src/backend/PatchRecordComponent.cpp index 3db0545d40..e1477ef7bd 100644 --- a/src/backend/PatchRecordComponent.cpp +++ b/src/backend/PatchRecordComponent.cpp @@ -84,19 +84,16 @@ void PatchRecordComponent::flush( std::string const &name, internal::FlushParams const &flushParams) { auto &rc = get(); - switch (IOHandler()->m_frontendAccess) + if (access::readOnly(IOHandler()->m_frontendAccess)) { - case Access::READ_ONLY: { while (!rc.m_chunks.empty()) { IOHandler()->enqueue(rc.m_chunks.front()); rc.m_chunks.pop(); } - break; } - case Access::READ_WRITE: - case Access::CREATE: - case Access::APPEND: { + else + { if (!written()) { Parameter dCreate; @@ -114,8 +111,6 @@ void PatchRecordComponent::flush( } flushAttributes(flushParams); - break; - } } } diff --git a/src/backend/Writable.cpp b/src/backend/Writable.cpp index 32d4cd9816..f886e94046 100644 --- a/src/backend/Writable.cpp +++ b/src/backend/Writable.cpp @@ -27,6 +27,21 @@ namespace openPMD Writable::Writable(internal::AttributableData *a) : attributable{a} {} +Writable::~Writable() +{ + if (!IOHandler || !IOHandler->has_value()) + { + return; + } + /* + * Enqueueing a pointer to this object, which is now being deleted. + * The DEREGISTER task must not dereference the pointer, but only use it to + * remove references to this object from internal data structures. + */ + IOHandler->value()->enqueue( + IOTask(this, Parameter())); +} + void Writable::seriesFlush(std::string backendConfig) { seriesFlush({FlushLevel::UserFlush, std::move(backendConfig)}); diff --git a/src/binding/python/Access.cpp b/src/binding/python/Access.cpp index 338f42db25..8fcdcb73c7 100644 --- a/src/binding/python/Access.cpp +++ b/src/binding/python/Access.cpp @@ -29,7 +29,67 @@ using namespace openPMD; void init_Access(py::module &m) { py::enum_(m, "Access") - .value("read_only", Access::READ_ONLY) - .value("read_write", Access::READ_WRITE) - .value("create", Access::CREATE); + .value( + "read_only", + Access::READ_ONLY, + R"(\ +Open Series as read-only, fails if Series is not found. +When to use READ_ONLY or READ_LINEAR: + +* When intending to use Series.read_iterations() +(i.e. step-by-step reading of iterations, e.g. in streaming), +then Access.read_linear is preferred and always supported. +Data is parsed inside Series.read_iterations(), no data is available +right after opening the Series. +* Otherwise (i.e. for random-access workflows), Access.read_only +is required, but works only in backends that support random access. +Data is parsed and available right after opening the Series. + +In both modes, parsing of iterations can be deferred with the JSON/TOML +option `defer_iteration_parsing`. + +Detailed rules: + +1. In backends that have no notion of IO steps (all except ADIOS2), +Access.read_only can always be used. +2. In backends that can be accessed either in random-access or +step-by-step, the chosen access mode decides which approach is used. +Examples are the BP4 and BP5 engines of ADIOS2. +3. In streaming backends, random-access is not possible. +When using such a backend, the access mode will be coerced +automatically to Access.read_linear. Use of Series.read_iterations() +is mandatory for access. +4. Reading a variable-based Series is only fully supported with +Access.read_linear. +If using Access.read_only, the dataset will be considered to only +have one single step. +If the dataset only has one single step, this is guaranteed to work +as expected. Otherwise, it is undefined which step's data is returned.)") + .value( + "read_random_access", + Access::READ_RANDOM_ACCESS, + "more explicit alias for read_only") + .value( + "read_write", + Access::READ_WRITE, + "Open existing Series as writable. Read mode corresponds with " + "Access::READ_RANDOM_ACCESS.") + .value( + "create", + Access::CREATE, + "create new series and truncate existing (files)") + .value( + "append", + Access::APPEND, + "write new iterations to an existing series without reading") + .value( + "read_linear", + Access::READ_LINEAR, + R"(\ + Open Series as read-only, fails if Series is not found. +This access mode requires use of Series.read_iterations(). +Global attributes are available directly after calling +Series.read_iterations(), Iterations and all their corresponding data +become available by use of the returned Iterator, e.g. in a foreach loop. +See Access.read_only for when to use this.)"); } diff --git a/src/binding/python/openpmd_api/pipe/__main__.py b/src/binding/python/openpmd_api/pipe/__main__.py old mode 100755 new mode 100644 index d7f0590567..436bd233f1 --- a/src/binding/python/openpmd_api/pipe/__main__.py +++ b/src/binding/python/openpmd_api/pipe/__main__.py @@ -204,7 +204,7 @@ def run(self): if not HAVE_MPI or (args.mpi is None and self.comm.size == 1): print("Opening data source") sys.stdout.flush() - inseries = io.Series(self.infile, io.Access.read_only, + inseries = io.Series(self.infile, io.Access.read_linear, self.inconfig) print("Opening data sink") sys.stdout.flush() @@ -215,7 +215,7 @@ def run(self): else: print("Opening data source on rank {}.".format(self.comm.rank)) sys.stdout.flush() - inseries = io.Series(self.infile, io.Access.read_only, self.comm, + inseries = io.Series(self.infile, io.Access.read_linear, self.comm, self.inconfig) print("Opening data sink on rank {}.".format(self.comm.rank)) sys.stdout.flush() diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 44039adb66..a334c4b241 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -9,6 +9,11 @@ #if openPMD_HAVE_MPI #include +#if openPMD_HAVE_ADIOS2 +#include +#define HAS_ADIOS_2_8 (ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 208) +#endif + #include #include #include @@ -1128,9 +1133,13 @@ void adios2_streaming(bool variableBasedLayout) using namespace std::chrono_literals; std::this_thread::sleep_for(1s); + /* + * READ_LINEAR always works in Streaming, but READ_ONLY must stay + * working at least for groupbased iteration encoding + */ Series readSeries( "../samples/adios2_stream.sst", - Access::READ_ONLY, + variableBasedLayout ? Access::READ_LINEAR : Access::READ_ONLY, // inline TOML R"(defer_iteration_parsing = true)"); @@ -1388,31 +1397,73 @@ TEST_CASE("adios2_ssc", "[parallel][adios2]") adios2_ssc(); } +enum class ParseMode +{ + /* + * Conventional workflow. Just parse the whole thing and yield iterations + * in rising order. + */ + NoSteps, + /* + * The Series is parsed ahead of time upon opening, but it has steps. + * Parsing ahead of time is the conventional workflow to support + * random-access. + * Reading such a Series with the streaming API is only possible if all + * steps are in ascending order, otherwise the openPMD-api has no way of + * associating IO steps with interation indices. + * Reading such a Series with the Streaming API will become possible with + * the Linear read mode to be introduced by #1291. + */ + AheadOfTimeWithoutSnapshot, + /* + * In Linear read mode, a Series is not parsed ahead of time, but + * step-by-step, giving the openPMD-api a way to associate IO steps with + * iterations. No snapshot attribute exists, so the fallback mode is chosen: + * Iterations are returned in ascending order. + * If an IO step returns an iteration whose index is lower than the + * last one, it will be skipped. + * This mode of parsing is not available for the BP4 engine with ADIOS2 + * schema 0, since BP4 does not associate attributes with the step in + * which they were created, making it impossible to separate parsing into + * single steps. + */ + LinearWithoutSnapshot, + /* + * Snapshot attribute exists and dictates the iteration index returned by + * an IO step. Duplicate iterations will be skipped. + */ + WithSnapshot +}; + void append_mode( std::string const &extension, bool variableBased, + ParseMode parseMode, std::string jsonConfig = "{}") { std::string filename = (variableBased ? "../samples/append/append_variablebased." : "../samples/append/append_groupbased.") + extension; + int mpi_size{}, mpi_rank{}; + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); MPI_Barrier(MPI_COMM_WORLD); if (auxiliary::directory_exists("../samples/append")) { auxiliary::remove_directory("../samples/append"); } MPI_Barrier(MPI_COMM_WORLD); - std::vector data(10, 0); - auto writeSomeIterations = [&data]( + std::vector data(10, 999); + auto writeSomeIterations = [&data, mpi_size, mpi_rank]( WriteIterations &&writeIterations, std::vector indices) { for (auto index : indices) { auto it = writeIterations[index]; auto dataset = it.meshes["E"]["x"]; - dataset.resetDataset({Datatype::INT, {10}}); - dataset.storeChunk(data, {0}, {10}); + dataset.resetDataset({Datatype::INT, {unsigned(mpi_size), 10}}); + dataset.storeChunk(data, {unsigned(mpi_rank), 0}, {1, 10}); // test that it works without closing too it.close(); } @@ -1430,6 +1481,7 @@ void append_mode( writeSomeIterations( write.writeIterations(), std::vector{0, 1}); } + MPI_Barrier(MPI_COMM_WORLD); { Series write(filename, Access::APPEND, MPI_COMM_WORLD, jsonConfig); if (variableBased) @@ -1448,9 +1500,10 @@ void append_mode( } writeSomeIterations( - write.writeIterations(), std::vector{2, 3}); + write.writeIterations(), std::vector{3, 2}); write.flush(); } + MPI_Barrier(MPI_COMM_WORLD); { using namespace std::chrono_literals; /* @@ -1476,32 +1529,142 @@ void append_mode( } writeSomeIterations( - write.writeIterations(), std::vector{4, 3}); + write.writeIterations(), std::vector{4, 3, 10}); write.flush(); } + MPI_Barrier(MPI_COMM_WORLD); { - Series read(filename, Access::READ_ONLY, MPI_COMM_WORLD); - if (variableBased || extension == "bp5") + Series write(filename, Access::APPEND, MPI_COMM_WORLD, jsonConfig); + if (variableBased) { + write.setIterationEncoding(IterationEncoding::variableBased); + } + if (write.backend() == "MPI_ADIOS1") + { + REQUIRE_THROWS_AS( + write.flush(), error::OperationUnsupportedInBackend); + // destructor will be noisy now + return; + } + + writeSomeIterations( + write.writeIterations(), std::vector{7, 1, 11}); + write.flush(); + } + MPI_Barrier(MPI_COMM_WORLD); + + auto verifyIteration = [mpi_size](auto &&it) { + auto chunk = it.meshes["E"]["x"].template loadChunk( + {0, 0}, {unsigned(mpi_size), 10}); + it.seriesFlush(); + for (size_t i = 0; i < unsigned(mpi_size) * 10; ++i) + { + REQUIRE(chunk.get()[i] == 999); + } + }; + + { + switch (parseMode) + { + case ParseMode::NoSteps: { + Series read(filename, Access::READ_LINEAR, MPI_COMM_WORLD); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + for (auto iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; + } + REQUIRE(counter == 8); + } + break; + case ParseMode::LinearWithoutSnapshot: { + Series read(filename, Access::READ_LINEAR, MPI_COMM_WORLD); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 3, 4, 10, 11}; + for (auto iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; + } + REQUIRE(counter == 6); + } + break; + case ParseMode::WithSnapshot: { // in variable-based encodings, iterations are not parsed ahead of // time but as they go + Series read(filename, Access::READ_LINEAR, MPI_COMM_WORLD); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 3, 2, 4, 10, 7, 11}; + for (auto iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; + } + REQUIRE(counter == 8); + // Cannot do listSeries here because the Series is already drained + REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); + } + break; + case ParseMode::AheadOfTimeWithoutSnapshot: { + Series read(filename, Access::READ_LINEAR, MPI_COMM_WORLD); unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + /* + * This one is a bit tricky: + * The BP4 engine has no way of parsing a Series in the old + * ADIOS2 schema step-by-step, since attributes are not + * associated with the step in which they were created. + * As a result, when readIterations() is called, the whole thing + * is parsed immediately ahead-of-time. + * We can then iterate through the iterations and access metadata, + * but since the IO steps don't correspond with the order of + * iterations returned (there is no way to figure out that order), + * we cannot load data in here. + * BP4 in the old ADIOS2 schema only supports either of the + * following: 1) A Series in which the iterations are present in + * ascending order. 2) Or accessing the Series in READ_ONLY mode. + */ for (auto const &iteration : read.readIterations()) { - REQUIRE(iteration.iterationIndex == counter); + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); ++counter; } - REQUIRE(counter == 5); + REQUIRE(counter == 8); + /* + * Roadmap: for now, reading this should work by ignoring the last + * duplicate iteration. + * After merging https://github.com/openPMD/openPMD-api/pull/949, we + * should see both instances when reading. + * Final goal: Read only the last instance. + */ + REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); } - else + break; + } + } + MPI_Barrier(MPI_COMM_WORLD); + if (!variableBased) + { + Series read(filename, Access::READ_ONLY, MPI_COMM_WORLD); + REQUIRE(read.iterations.size() == 8); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + for (auto iteration : read.readIterations()) { - REQUIRE(read.iterations.size() == 5); - helper::listSeries(read); + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; } + REQUIRE(counter == 8); } #if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ 208002700 + MPI_Barrier(MPI_COMM_WORLD); // AppendAfterSteps has a bug before that version if (extension == "bp5") { @@ -1533,30 +1696,69 @@ void append_mode( write.writeIterations(), std::vector{4, 5}); write.flush(); } + MPI_Barrier(MPI_COMM_WORLD); + { + Series read(filename, Access::READ_LINEAR, MPI_COMM_WORLD); + switch (parseMode) + { + case ParseMode::LinearWithoutSnapshot: { + uint64_t iterationOrder[] = {0, 1, 3, 4, 10}; + unsigned counter = 0; + for (auto iteration : read.readIterations()) + { + REQUIRE( + iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; + } + REQUIRE(counter == 5); + } + break; + case ParseMode::WithSnapshot: { + // in variable-based encodings, iterations are not parsed ahead + // of time but as they go + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 3, 2, 4, 10, 7, 5}; + for (auto iteration : read.readIterations()) + { + REQUIRE( + iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; + } + REQUIRE(counter == 8); + } + break; + default: + throw std::runtime_error("Test configured wrong."); + break; + } + } + MPI_Barrier(MPI_COMM_WORLD); + if (!variableBased) { Series read(filename, Access::READ_ONLY, MPI_COMM_WORLD); - // in variable-based encodings, iterations are not parsed ahead of - // time but as they go + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 5, 7, 10}; unsigned counter = 0; for (auto const &iteration : read.readIterations()) { - REQUIRE(iteration.iterationIndex == counter); + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); ++counter; } - REQUIRE(counter == 6); - helper::listSeries(read); + REQUIRE(counter == 8); + // Cannot do listSeries here because the Series is already + // drained + REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); } } #endif } -TEST_CASE("append_mode", "[parallel]") +TEST_CASE("append_mode", "[serial]") { for (auto const &t : testedFileExtensions()) { - if (t == "bp" || t == "bp4" || t == "bp5") - { - std::string jsonConfigOld = R"END( + std::string jsonConfigOld = R"END( { "adios2": { @@ -1567,7 +1769,7 @@ TEST_CASE("append_mode", "[parallel]") } } })END"; - std::string jsonConfigNew = R"END( + std::string jsonConfigNew = R"END( { "adios2": { @@ -1578,28 +1780,27 @@ TEST_CASE("append_mode", "[parallel]") } } })END"; + if (t == "bp" || t == "bp4" || t == "bp5") + { /* * Troublesome combination: * 1) ADIOS2 v2.7 * 2) Parallel writer * 3) Append mode - * 4) Writing to a scalar variable * - * 4) is done by schema 2021 which will be phased out, so the tests - * are just deactivated. */ - if (auxiliary::getEnvNum("OPENPMD2_ADIOS2_SCHEMA", 0) != 0) - { - continue; - } - append_mode(t, false, jsonConfigOld); - // append_mode(t, true, jsonConfigOld); - // append_mode(t, false, jsonConfigNew); - // append_mode(t, true, jsonConfigNew); +#if HAS_ADIOS_2_8 + append_mode( + t, false, ParseMode::LinearWithoutSnapshot, jsonConfigOld); + append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew); + // This test config does not make sense + // append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew); +#endif } else { - append_mode(t, false); + append_mode(t, false, ParseMode::NoSteps); } } } @@ -1663,4 +1864,4 @@ TEST_CASE("unavailable_backend", "[core][parallel]") } #endif } -#endif +#endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index b2490c6803..2e20c53676 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -4837,7 +4837,8 @@ this = "should not warn" void bp4_steps( std::string const &file, std::string const &options_write, - std::string const &options_read) + std::string const &options_read, + Access access = Access::READ_ONLY) { { Series writeSeries(file, Access::CREATE, options_write); @@ -4862,7 +4863,7 @@ void bp4_steps( return; } - Series readSeries(file, Access::READ_ONLY, options_read); + Series readSeries(file, access, options_read); size_t last_iteration_index = 0; for (auto iteration : readSeries.readIterations()) @@ -4915,6 +4916,7 @@ TEST_CASE("bp4_steps", "[serial][adios2]") type = "bp4" UseSteps = false )"; + // sing the yes no song bp4_steps("../samples/bp4steps_yes_yes.bp", useSteps, useSteps); bp4_steps("../samples/bp4steps_no_yes.bp", dontUseSteps, useSteps); @@ -4923,6 +4925,17 @@ TEST_CASE("bp4_steps", "[serial][adios2]") bp4_steps("../samples/nullcore.bp", nullcore, ""); bp4_steps("../samples/bp4steps_default.bp", "{}", "{}"); + // bp4_steps( + // "../samples/newlayout_bp4steps_yes_yes.bp", + // useSteps, + // useSteps, + // Access::READ_LINEAR); + // bp4_steps( + // "../samples/newlayout_bp4steps_yes_no.bp", + // useSteps, + // dontUseSteps, + // Access::READ_LINEAR); + /* * Do this whole thing once more, but this time use the new attribute * layout. @@ -4957,6 +4970,17 @@ TEST_CASE("bp4_steps", "[serial][adios2]") "../samples/newlayout_bp4steps_no_yes.bp", dontUseSteps, useSteps); bp4_steps( "../samples/newlayout_bp4steps_no_no.bp", dontUseSteps, dontUseSteps); + + bp4_steps( + "../samples/newlayout_bp4steps_yes_yes.bp", + useSteps, + useSteps, + Access::READ_LINEAR); + bp4_steps( + "../samples/newlayout_bp4steps_yes_no.bp", + useSteps, + dontUseSteps, + Access::READ_LINEAR); } #endif @@ -5358,6 +5382,7 @@ void variableBasedSeries(std::string const &file) constexpr Extent::value_type extent = 1000; { Series writeSeries(file, Access::CREATE, selectADIOS2); + writeSeries.setAttribute("some_global", "attribute"); writeSeries.setIterationEncoding(IterationEncoding::variableBased); REQUIRE( writeSeries.iterationEncoding() == @@ -5406,10 +5431,19 @@ void variableBasedSeries(std::string const &file) auto testRead = [&file, &extent, &selectADIOS2]( std::string const &jsonConfig) { + /* + * Need linear read mode to access more than a single iteration in + * variable-based iteration encoding. + */ Series readSeries( - file, Access::READ_ONLY, json::merge(selectADIOS2, jsonConfig)); + file, Access::READ_LINEAR, json::merge(selectADIOS2, jsonConfig)); size_t last_iteration_index = 0; + REQUIRE(!readSeries.containsAttribute("some_global")); + readSeries.readIterations(); + REQUIRE( + readSeries.getAttribute("some_global").get() == + "attribute"); for (auto iteration : readSeries.readIterations()) { if (iteration.iterationIndex > 2) @@ -5810,42 +5844,46 @@ void iterate_nonstreaming_series( } } - Series readSeries( - file, - Access::READ_ONLY, - json::merge(jsonConfig, R"({"defer_iteration_parsing": true})")); - - size_t last_iteration_index = 0; - // conventionally written Series must be readable with streaming-aware API! - for (auto iteration : readSeries.readIterations()) + for (auto access : {Access::READ_LINEAR, Access::READ_ONLY}) { - // ReadIterations takes care of Iteration::open()ing iterations - auto E_x = iteration.meshes["E"]["x"]; - REQUIRE(E_x.getDimensionality() == 2); - REQUIRE(E_x.getExtent()[0] == 2); - REQUIRE(E_x.getExtent()[1] == extent); - auto chunk = E_x.loadChunk({0, 0}, {1, extent}); - auto chunk2 = E_x.loadChunk({1, 0}, {1, extent}); - // we encourage manually closing iterations, but it should not matter - // so let's do the switcharoo for this test - if (last_iteration_index % 2 == 0) - { - readSeries.flush(); - } - else - { - iteration.close(); - } + Series readSeries( + file, + access, + json::merge(jsonConfig, R"({"defer_iteration_parsing": true})")); - int value = variableBasedLayout ? 0 : iteration.iterationIndex; - for (size_t i = 0; i < extent; ++i) + size_t last_iteration_index = 0; + // conventionally written Series must be readable with streaming-aware + // API! + for (auto iteration : readSeries.readIterations()) { - REQUIRE(chunk.get()[i] == value); - REQUIRE(chunk2.get()[i] == int(i)); + // ReadIterations takes care of Iteration::open()ing iterations + auto E_x = iteration.meshes["E"]["x"]; + REQUIRE(E_x.getDimensionality() == 2); + REQUIRE(E_x.getExtent()[0] == 2); + REQUIRE(E_x.getExtent()[1] == extent); + auto chunk = E_x.loadChunk({0, 0}, {1, extent}); + auto chunk2 = E_x.loadChunk({1, 0}, {1, extent}); + // we encourage manually closing iterations, but it should not + // matter so let's do the switcharoo for this test + if (last_iteration_index % 2 == 0) + { + readSeries.flush(); + } + else + { + iteration.close(); + } + + int value = variableBasedLayout ? 0 : iteration.iterationIndex; + for (size_t i = 0; i < extent; ++i) + { + REQUIRE(chunk.get()[i] == value); + REQUIRE(chunk2.get()[i] == int(i)); + } + last_iteration_index = iteration.iterationIndex; } - last_iteration_index = iteration.iterationIndex; + REQUIRE(last_iteration_index == 9); } - REQUIRE(last_iteration_index == 9); } TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]") @@ -5870,13 +5908,13 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]") backend.extension, false, json::merge( - backend.jsonBaseConfig(), "adios2.engine = \"bp5\"")); + backend.jsonBaseConfig(), "adios2.engine.type = \"bp5\"")); iterate_nonstreaming_series( "../samples/iterate_nonstreaming_series_groupbased_bp5." + backend.extension, false, json::merge( - backend.jsonBaseConfig(), "adios2.engine = \"bp5\"")); + backend.jsonBaseConfig(), "adios2.engine.type = \"bp5\"")); } #endif } @@ -6306,7 +6344,12 @@ void chaotic_stream(std::string filename, bool variableBased) series.close(); REQUIRE(!series.operator bool()); - Series read(filename, Access::READ_ONLY); + /* + * Random-access read mode would go by the openPMD group names instead + * of the ADIOS2 steps. + * Hence, the order would be ascending. + */ + Series read(filename, Access::READ_LINEAR); size_t index = 0; for (const auto &iteration : read.readIterations()) { @@ -6335,11 +6378,13 @@ TEST_CASE("chaotic_stream", "[serial]") #ifdef openPMD_USE_INVASIVE_TESTS void unfinished_iteration_test( - std::string const &ext, bool filebased, std::string const &config = "{}") + std::string const &ext, + IterationEncoding encoding, + std::string const &config = "{}") { std::cout << "\n\nTESTING " << ext << "\n\n" << std::endl; std::string file = std::string("../samples/unfinished_iteration") + - (filebased ? "_%T." : ".") + ext; + (encoding == IterationEncoding::fileBased ? "_%T." : ".") + ext; { Series write(file, Access::CREATE, config); auto it0 = write.writeIterations()[0]; @@ -6356,11 +6401,11 @@ void unfinished_iteration_test( auto electron_mass = it10.particles["e"]["mass"][RecordComponent::SCALAR]; } - auto tryReading = [&config, file, filebased]( + auto tryReading = [&config, file, encoding]( + Access access, std::string const &additionalConfig = "{}") { { - Series read( - file, Access::READ_ONLY, json::merge(config, additionalConfig)); + Series read(file, access, json::merge(config, additionalConfig)); std::vector iterations; std::cout << "Going to list iterations in " << file << ":" @@ -6388,10 +6433,10 @@ void unfinished_iteration_test( std::vector{0, 10})); } - if (filebased) + if (encoding == IterationEncoding::fileBased && + access == Access::READ_ONLY) { - Series read( - file, Access::READ_ONLY, json::merge(config, additionalConfig)); + Series read(file, access, json::merge(config, additionalConfig)); if (additionalConfig == "{}") { // Eager parsing, defective iteration has already been removed @@ -6408,38 +6453,54 @@ void unfinished_iteration_test( } }; - tryReading(); - tryReading(R"({"defer_iteration_parsing": true})"); + tryReading(Access::READ_LINEAR); + tryReading(Access::READ_LINEAR, R"({"defer_iteration_parsing": true})"); + if (encoding != IterationEncoding::variableBased) + { + /* + * In variable-based iteration encoding, READ_ONLY mode will make + * iteration metadata leak into other iterations, causing iteration 0 + * to fail being parsed. + * (See also the warning that occurs when trying to access a variable- + * based Series in READ_ONLY mode) + */ + tryReading(Access::READ_ONLY); + tryReading(Access::READ_ONLY, R"({"defer_iteration_parsing": true})"); + } } TEST_CASE("unfinished_iteration_test", "[serial]") { #if openPMD_HAVE_ADIOS2 - unfinished_iteration_test("bp", false, R"({"backend": "adios2"})"); + unfinished_iteration_test( + "bp", IterationEncoding::groupBased, R"({"backend": "adios2"})"); unfinished_iteration_test( "bp", - false, + IterationEncoding::variableBased, R"( -{ - "backend": "adios2", - "iteration_encoding": "variable_based", - "adios2": { - "schema": 20210209 - } -} -)"); - unfinished_iteration_test("bp", true, R"({"backend": "adios2"})"); + { + "backend": "adios2", + "iteration_encoding": "variable_based", + "adios2": { + "schema": 20210209 + } + } + )"); + unfinished_iteration_test( + "bp", IterationEncoding::fileBased, R"({"backend": "adios2"})"); #endif #if openPMD_HAVE_ADIOS1 - unfinished_iteration_test("adios1.bp", false, R"({"backend": "adios1"})"); - unfinished_iteration_test("adios1.bp", true, R"({"backend": "adios1"})"); + unfinished_iteration_test( + "adios1.bp", IterationEncoding::groupBased, R"({"backend": "adios1"})"); + unfinished_iteration_test( + "adios1.bp", IterationEncoding::fileBased, R"({"backend": "adios1"})"); #endif #if openPMD_HAVE_HDF5 - unfinished_iteration_test("h5", false); - unfinished_iteration_test("h5", true); + unfinished_iteration_test("h5", IterationEncoding::groupBased); + unfinished_iteration_test("h5", IterationEncoding::fileBased); #endif - unfinished_iteration_test("json", false); - unfinished_iteration_test("json", true); + unfinished_iteration_test("json", IterationEncoding::groupBased); + unfinished_iteration_test("json", IterationEncoding::fileBased); } #endif @@ -6554,14 +6615,16 @@ enum class ParseMode */ AheadOfTimeWithoutSnapshot, /* - * A Series of the BP5 engine is not parsed ahead of time, but step-by-step, - * giving the openPMD-api a way to associate IO steps with iterations. - * No snapshot attribute exists, so the fallback mode is chosen: + * In Linear read mode, a Series is not parsed ahead of time, but + * step-by-step, giving the openPMD-api a way to associate IO steps with + * iterations. No snapshot attribute exists, so the fallback mode is chosen: * Iterations are returned in ascending order. * If an IO step returns an iteration whose index is lower than the * last one, it will be skipped. - * This mode of parsing will be generalized into the Linear read mode with - * PR #1291. + * This mode of parsing is not available for the BP4 engine with ADIOS2 + * schema 0, since BP4 does not associate attributes with the step in + * which they were created, making it impossible to separate parsing into + * single steps. */ LinearWithoutSnapshot, /* @@ -6581,7 +6644,7 @@ void append_mode( { auxiliary::remove_directory("../samples/append"); } - std::vector data(10, 0); + std::vector data(10, 999); auto writeSomeIterations = [&data]( WriteIterations &&writeIterations, std::vector indices) { @@ -6626,7 +6689,7 @@ void append_mode( } writeSomeIterations( - write.writeIterations(), std::vector{2, 3}); + write.writeIterations(), std::vector{3, 2}); write.flush(); } { @@ -6675,40 +6738,55 @@ void append_mode( write.writeIterations(), std::vector{7, 1, 11}); write.flush(); } + + auto verifyIteration = [](auto &&it) { + auto chunk = it.meshes["E"]["x"].template loadChunk({0}, {10}); + it.seriesFlush(); + for (size_t i = 0; i < 10; ++i) + { + REQUIRE(chunk.get()[i] == 999); + } + }; + { - Series read(filename, Access::READ_ONLY); switch (parseMode) { case ParseMode::NoSteps: { + Series read(filename, Access::READ_LINEAR); unsigned counter = 0; uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; - for (auto const &iteration : read.readIterations()) + for (auto iteration : read.readIterations()) { REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); ++counter; } REQUIRE(counter == 8); } break; case ParseMode::LinearWithoutSnapshot: { + Series read(filename, Access::READ_LINEAR); unsigned counter = 0; - uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 11}; - for (auto const &iteration : read.readIterations()) + uint64_t iterationOrder[] = {0, 1, 3, 4, 10, 11}; + for (auto iteration : read.readIterations()) { REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); ++counter; } - REQUIRE(counter == 7); + REQUIRE(counter == 6); } break; case ParseMode::WithSnapshot: { // in variable-based encodings, iterations are not parsed ahead of // time but as they go + Series read(filename, Access::READ_LINEAR); unsigned counter = 0; - uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 11}; - for (auto const &iteration : read.readIterations()) + uint64_t iterationOrder[] = {0, 1, 3, 2, 4, 10, 7, 11}; + for (auto iteration : read.readIterations()) { REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); ++counter; } REQUIRE(counter == 8); @@ -6717,17 +6795,27 @@ void append_mode( } break; case ParseMode::AheadOfTimeWithoutSnapshot: { - REQUIRE(read.iterations.size() == 8); + Series read(filename, Access::READ_LINEAR); unsigned counter = 0; uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; /* - * Use conventional read API since streaming API is not possible - * without Linear read mode. - * (See also comments inside ParseMode enum). + * This one is a bit tricky: + * The BP4 engine has no way of parsing a Series in the old + * ADIOS2 schema step-by-step, since attributes are not + * associated with the step in which they were created. + * As a result, when readIterations() is called, the whole thing + * is parsed immediately ahead-of-time. + * We can then iterate through the iterations and access metadata, + * but since the IO steps don't correspond with the order of + * iterations returned (there is no way to figure out that order), + * we cannot load data in here. + * BP4 in the old ADIOS2 schema only supports either of the + * following: 1) A Series in which the iterations are present in + * ascending order. 2) Or accessing the Series in READ_ONLY mode. */ - for (auto const &iteration : read.iterations) + for (auto const &iteration : read.readIterations()) { - REQUIRE(iteration.first == iterationOrder[counter]); + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); ++counter; } REQUIRE(counter == 8); @@ -6738,11 +6826,25 @@ void append_mode( * should see both instances when reading. * Final goal: Read only the last instance. */ - helper::listSeries(read); + REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); } break; } } + if (!variableBased) + { + Series read(filename, Access::READ_ONLY); + REQUIRE(read.iterations.size() == 8); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + for (auto iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); + ++counter; + } + REQUIRE(counter == 8); + } // AppendAfterSteps has a bug before that version #if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ @@ -6777,49 +6879,57 @@ void append_mode( write.flush(); } { - Series read(filename, Access::READ_ONLY); + Series read(filename, Access::READ_LINEAR); switch (parseMode) { case ParseMode::LinearWithoutSnapshot: { - uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10}; + uint64_t iterationOrder[] = {0, 1, 3, 4, 10}; unsigned counter = 0; - for (auto const &iteration : read.readIterations()) + for (auto iteration : read.readIterations()) { REQUIRE( iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); ++counter; } - REQUIRE(counter == 6); - // Cannot do listSeries here because the Series is already - // drained - REQUIRE_THROWS_AS( - helper::listSeries(read), error::WrongAPIUsage); + REQUIRE(counter == 5); } break; case ParseMode::WithSnapshot: { // in variable-based encodings, iterations are not parsed ahead // of time but as they go unsigned counter = 0; - uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 5}; - for (auto const &iteration : read.readIterations()) + uint64_t iterationOrder[] = {0, 1, 3, 2, 4, 10, 7, 5}; + for (auto iteration : read.readIterations()) { REQUIRE( iteration.iterationIndex == iterationOrder[counter]); + verifyIteration(iteration); ++counter; } REQUIRE(counter == 8); - // Cannot do listSeries here because the Series is already - // drained - REQUIRE_THROWS_AS( - helper::listSeries(read), error::WrongAPIUsage); } break; - case ParseMode::NoSteps: - case ParseMode::AheadOfTimeWithoutSnapshot: + default: throw std::runtime_error("Test configured wrong."); break; } } + if (!variableBased) + { + Series read(filename, Access::READ_ONLY); + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 5, 7, 10}; + unsigned counter = 0; + for (auto const &iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); + // Cannot do listSeries here because the Series is already + // drained + REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); + } } #endif } @@ -6850,46 +6960,24 @@ TEST_CASE("append_mode", "[serial]") } } })END"; - if (t == "bp5") - { - append_mode( - "../samples/append/groupbased." + t, - false, - ParseMode::LinearWithoutSnapshot, - jsonConfigOld); - append_mode( - "../samples/append/groupbased_newschema." + t, - false, - ParseMode::WithSnapshot, - jsonConfigNew); - append_mode( - "../samples/append/variablebased." + t, - true, - ParseMode::WithSnapshot, - jsonConfigOld); - append_mode( - "../samples/append/variablebased_newschema." + t, - true, - ParseMode::WithSnapshot, - jsonConfigNew); - } - else if (t == "bp" || t == "bp4") + if (t == "bp" || t == "bp4" || t == "bp5") { append_mode( "../samples/append/append_groupbased." + t, false, - ParseMode::AheadOfTimeWithoutSnapshot, + ParseMode::LinearWithoutSnapshot, jsonConfigOld); append_mode( "../samples/append/append_groupbased." + t, false, ParseMode::WithSnapshot, jsonConfigNew); - append_mode( - "../samples/append/append_variablebased." + t, - true, - ParseMode::WithSnapshot, - jsonConfigOld); + // This test config does not make sense + // append_mode( + // "../samples/append/append_variablebased." + t, + // true, + // ParseMode::WithSnapshot, + // jsonConfigOld); append_mode( "../samples/append/append_variablebased." + t, true,