diff --git a/CHANGELOG.md b/CHANGELOG.md index 719eb1e17..e75398a52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Full documentation for rocLibrary is available at [https://rocm.docs.amd.com/pro ### Added * `PipelineSerializer` class to implement pipeline serialization functionality in rocAL. * Serialization test to validate pipeline serialization functionality. +* Python support and example to test serialization ### Changes * Adds new public APIs rocalSerialize() and rocalGetSerializedString() for serializing pipelines. diff --git a/rocAL/include/loaders/image/node_image_loader_single_shard.h b/rocAL/include/loaders/image/node_image_loader_single_shard.h index aefe80e5e..091797104 100644 --- a/rocAL/include/loaders/image/node_image_loader_single_shard.h +++ b/rocAL/include/loaders/image/node_image_loader_single_shard.h @@ -41,6 +41,7 @@ class ImageLoaderSingleShardNode : public Node { const std::map feature_key_map = std::map(), unsigned sequence_length = 0, unsigned step = 0, unsigned stride = 0, ExternalSourceFileMode external_file_mode = ExternalSourceFileMode::NONE, const std::string &index_path = ""); std::shared_ptr get_loader_module(); + std::string node_name() const override { return "ImageLoaderSingleShardNode"; } protected: void create_node() override{}; @@ -48,4 +49,4 @@ class ImageLoaderSingleShardNode : public Node { private: std::shared_ptr _loader_module = nullptr; -}; \ No newline at end of file +}; diff --git a/rocAL/include/pipeline/master_graph.h b/rocAL/include/pipeline/master_graph.h index f7fd3844d..95fd41da5 100644 --- a/rocAL/include/pipeline/master_graph.h +++ b/rocAL/include/pipeline/master_graph.h @@ -326,6 +326,10 @@ inline std::shared_ptr MasterGraph::add_node(const s _loader_modules.emplace_back(loader_module); node->set_graph_id(_loaders_count++); _root_nodes.push_back(node); + + // Add each operator to the pipeline operators list + _pipeline_operators.push_back(std::make_shared(node->node_name() + "_" + std::to_string(_op_idx++), "loader", node)); + for (auto &output : outputs) _tensor_map.insert(std::make_pair(output, node)); diff --git a/rocAL/source/loaders/image/node_image_loader_single_shard.cpp b/rocAL/source/loaders/image/node_image_loader_single_shard.cpp index 7c2c44644..7223bc23e 100644 --- a/rocAL/source/loaders/image/node_image_loader_single_shard.cpp +++ b/rocAL/source/loaders/image/node_image_loader_single_shard.cpp @@ -52,6 +52,23 @@ void ImageLoaderSingleShardNode::init(unsigned shard_id, unsigned shard_count, u reader_cfg.set_external_filemode(external_file_mode); reader_cfg.set_index_path(index_path); reader_cfg.set_sharding_info(sharding_info); + + std::array arg_names = { + "shard_id", "shard_count", "cpu_num_threads", "source_path", + "json_path", "storage_type", "decoder_type", "shuffle", "loop", + "load_batch_count", "mem_type", "meta_data_reader", "decoder_keep_orig", + "last_batch_policy", "pad_last_batch_repeated", "stick_to_shard", "shard_size", + "feature_key_map", "sequence_length", "step", "stride", + "external_file_mode", "index_path" + }; + + set_node_arguments(arg_names, std::make_index_sequence{}, shard_id, + shard_count, cpu_num_threads, source_path, json_path, storage_type, + decoder_type, shuffle, loop, load_batch_count, mem_type, meta_data_reader, decoder_keep_original, + sharding_info.last_batch_policy, sharding_info.pad_last_batch_repeated, + sharding_info.stick_to_shard, sharding_info.shard_size, feature_key_map, + sequence_length, step, stride, external_file_mode, index_path); + _loader_module->initialize(reader_cfg, DecoderConfig(decoder_type), mem_type, _batch_size, decoder_keep_original); diff --git a/rocAL/source/pipeline/pipeline_serializer.cpp b/rocAL/source/pipeline/pipeline_serializer.cpp index 11571adcc..a9783029c 100644 --- a/rocAL/source/pipeline/pipeline_serializer.cpp +++ b/rocAL/source/pipeline/pipeline_serializer.cpp @@ -161,7 +161,7 @@ void PipelineSerializer::serialize_pipeop_arguments(const std::vector& serialize_parameter_to_protobuf(param, op_arg); } else if (op_arg.type_name == "enum") { if (op_arg.values.empty()) { - THROW("Enum argument " + op_arg.arg_name + " has no values."); + THROW("Enum argument '" + op_arg.arg_name + "' requires at least one value."); } rocal_proto::EnumType* enum_arg = arg->mutable_enum_value(); enum_arg->set_name(op_arg.sub_type_name); diff --git a/rocAL_pybind/amd/rocal/pipeline.py b/rocAL_pybind/amd/rocal/pipeline.py index 14d0ac74d..0929cbb47 100644 --- a/rocAL_pybind/amd/rocal/pipeline.py +++ b/rocAL_pybind/amd/rocal/pipeline.py @@ -272,7 +272,22 @@ def run(self): return b.getOutputTensors(self._handle) except: raise StopIteration - + + def serialize(self, filename=None): + """ + Serialize the pipeline and store into protobuffers + + Args: + filename (str, optional): Optional output path to write the serialized data to file + + Returns: + bytes: The serialized pipeline as protobuf payload + """ + serialized_str = b.rocalSerialize(self._handle) + if filename: + with open(filename, 'wb') as f: + f.write(serialized_str) + return serialized_str def _discriminate_args(func, **func_kwargs): """!Split args on those applicable to Pipeline constructor and the decorated function.""" diff --git a/rocAL_pybind/rocal_pybind.cpp b/rocAL_pybind/rocal_pybind.cpp index 10a7a6c1f..881ef323f 100644 --- a/rocAL_pybind/rocal_pybind.cpp +++ b/rocAL_pybind/rocal_pybind.cpp @@ -294,6 +294,22 @@ PYBIND11_MODULE(rocal_pybind, m) { m.def("rocalVerify", &rocalVerify); m.def("rocalRun", &rocalRun, py::return_value_policy::reference); m.def("rocalRelease", &rocalRelease, py::return_value_policy::reference); + m.def("rocalSerialize", [](RocalContext context) { + size_t size; + RocalStatus status = rocalSerialize(context, &size); + if (status != ROCAL_OK) { + throw std::runtime_error("Failed to serialize pipeline"); + } + // Allocate size+1 bytes to handle null terminator safely + std::vector buffer(size + 1, '\0'); + status = rocalGetSerializedString(context, buffer.data()); + if (status != ROCAL_OK) { + throw std::runtime_error("Failed to get serialized string"); + } + + // Return only the first 'size' bytes as Python bytes object + return py::bytes(buffer.data(), size); + }, "Returns the serialized pipeline as string"); // rocal_api_types.h py::class_(m, "TimingInfo") .def_readwrite("load_time", &TimingInfo::load_time) diff --git a/tests/python_api/pipeline_serialize_test.py b/tests/python_api/pipeline_serialize_test.py new file mode 100644 index 000000000..d8306f7ee --- /dev/null +++ b/tests/python_api/pipeline_serialize_test.py @@ -0,0 +1,200 @@ +# Copyright (c) 2024 - 2025 Advanced Micro Devices, Inc. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import random +import sys +import os +import cv2 +from amd.rocal.plugin.pytorch import ROCALClassificationIterator +from amd.rocal.pipeline import Pipeline +import amd.rocal.fn as fn +import amd.rocal.types as types + +def save_output_images(img, idx, output_dir, device=True, layout="NCHW"): + """Save output images for verification""" + if device is False: + image = img.cpu().numpy() + else: + image = img.numpy() + if layout == "NCHW": + image = image.transpose([1, 2, 0]) + image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + cv2.imwrite(os.path.join(output_dir, f"serialize_test_{idx}.png"), image) + + +def create_test_pipeline(data_path, rocal_cpu=True, batch_size=2): + """Create a test pipeline with supported augmentations""" + num_threads = 1 + device_id = 0 + random_seed = random.SystemRandom().randint(0, 2**32 - 1) + local_rank = 0 + world_size = 1 + + # Create pipeline + pipeline = Pipeline( + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + seed=random_seed, + rocal_cpu=rocal_cpu + ) + + with pipeline: + # File reader + jpegs, labels = fn.readers.file(file_root=data_path) + + # Image decoder + decode = fn.decoders.image( + jpegs, + output_type=types.RGB, + file_root=data_path, + shard_id=local_rank, + num_shards=world_size, + random_shuffle=True + ) + + # Brightness augmentation + brightness = fn.brightness( + decode, + brightness=0.5, + output_layout=types.NCHW, + output_dtype=types.UINT8 + ) + + pipeline.set_outputs(brightness) + + return pipeline + + +def test_serialization(data_path, rocal_cpu=True, batch_size=2): + """Test pipeline serialization functionality and return serialized string""" + print(f">>> Testing Pipeline Serialization on {'CPU' if rocal_cpu else 'GPU'}") + + # Create output directory + output_dir = "output_folder/serialize_test" + try: + os.makedirs(output_dir, exist_ok=True) + except OSError as error: + print(f"Error creating output directory: {error}") + return None + + try: + # Create and build pipeline + print("Creating test pipeline...") + pipeline = create_test_pipeline(data_path, rocal_cpu, batch_size) + pipeline.build() + + # Test serialization + print("\n=== Testing Pipeline Serialization ===") + + # Test Serialize to string + print("Test 1: Serializing pipeline to string...") + serialized_string = pipeline.serialize() + + if serialized_string is None or len(serialized_string) == 0: + print("ERROR: Failed to serialize pipeline - empty result") + return None + + print(f"Serialized string size: {len(serialized_string)} bytes") + print("Serialization to string: SUCCESS") + + # Display serialized content (first 500 chars for readability) + print("\n=== Serialized Pipeline Content (Preview) ===") + try: + # Try to decode as text for preview + preview_text = serialized_string.decode('utf-8', errors='ignore')[:500] + print(preview_text) + if len(serialized_string) > 500: + print("... (truncated)") + except Exception: + # If binary, show hex representation + print("Binary content (hex preview):") + print(serialized_string[:100].hex()) + if len(serialized_string) > 100: + print("... (truncated)") + print("=== End of Serialized Content Preview ===") + + # Test pipeline execution after serialization + print("\n=== Testing Pipeline Execution After Serialization ===") + + imageIteratorPipeline = ROCALClassificationIterator(pipeline) + print(f"Available images: {pipeline.get_remaining_images()}") + + iteration_count = 0 + for i, batch_data in enumerate(imageIteratorPipeline): + + print(f"\nIteration {iteration_count + 1}:") + images, labels = batch_data + + print(f" Batch shape: {images[0].shape} images") + print(f" Labels: {labels}") + + # Save output images + if len(images) > 0: + save_output_images( + images[0][0], + f"serialization_{iteration_count}", + output_dir, + device=rocal_cpu, + layout="NCHW" + ) + print(f" Saved output image: serialization_{iteration_count}.png") + + iteration_count += 1 + + imageIteratorPipeline.reset() + print("\n=== Serialization Test Completed Successfully ===") + return serialized_string + + except Exception as e: + print(f"ERROR: Exception during serialization test: {str(e)}") + import traceback + traceback.print_exc() + return None + + +def main(): + """Main function to run serialization tests""" + if len(sys.argv) < 2: + print('Usage: python pipeline_serialize_test.py [cpu/gpu] [batch_size]') + sys.exit(1) + + # Parse arguments + data_path = sys.argv[1] + rocal_cpu = (sys.argv[2].lower() == "cpu") if len(sys.argv) > 2 else True + batch_size = int(sys.argv[3]) if len(sys.argv) > 3 else 2 + + # Validate data path + if not os.path.exists(data_path): + print(f"ERROR: Data path does not exist: {data_path}") + sys.exit(1) + + # Run the serialization test + serialized_string = test_serialization(data_path, rocal_cpu, batch_size) + success = serialized_string is not None + + if success: + print("SERIALIZATION TESTS PASSED!") + else: + print("SERIALIZATION TESTS FAILED") + + +if __name__ == '__main__': + main()