From 84958d66f66557b19d8e314e0ce973e8411343db Mon Sep 17 00:00:00 2001 From: Sasha Meister <117230141+ssh-meister@users.noreply.github.com> Date: Fri, 4 Jul 2025 10:45:11 +0000 Subject: [PATCH 1/3] Add ListToEntries Signed-off-by: Sasha Meister <117230141+ssh-meister@users.noreply.github.com> --- docs/src/sdp/api.rst | 3 + sdp/processors/__init__.py | 1 + .../modify_manifest/data_to_data.py | 154 ++++++++++++++++++ tests/test_data_to_data.py | 46 ++++-- 4 files changed, 194 insertions(+), 10 deletions(-) diff --git a/docs/src/sdp/api.rst b/docs/src/sdp/api.rst index bfa2bc62..32970048 100644 --- a/docs/src/sdp/api.rst +++ b/docs/src/sdp/api.rst @@ -219,6 +219,9 @@ Data modifications .. autodata:: sdp.processors.InverseNormalizeText :annotation: +.. autodata:: sdp.processors.ListToEntries + :annotation: + Data filtering '''''''''''''' diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index c3ff70b6..c1dde33a 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -106,6 +106,7 @@ SubIfASRSubstitution, SubMakeLowercase, SubRegex, + ListToEntries, ) from sdp.processors.modify_manifest.data_to_dropbool import ( DropASRError, diff --git a/sdp/processors/modify_manifest/data_to_data.py b/sdp/processors/modify_manifest/data_to_data.py index 16e1de6d..a8a583d1 100644 --- a/sdp/processors/modify_manifest/data_to_data.py +++ b/sdp/processors/modify_manifest/data_to_data.py @@ -1127,3 +1127,157 @@ def process(self): if self.failed_files: logger.warning(f"Failed to process {len(self.failed_files)} files.") logger.debug(f"Failed files: {self.failed_files}") + + +class ListToEntries(BaseParallelProcessor): + """ + A dataset processor that transforms a single entry containing a list of items into multiple entries, + one for each item in the list. + + This is useful when a manifest field (e.g., "segments") contains a list of sub-entries, and you want + to flatten these into individual records for further processing. + + Args: + field_with_list (str): The name of the field in the input entry that contains a list. + output_field (str, optional): The name of the output field to assign to items in the list + if they are not dictionaries. Required if the list contains primitive types (e.g., strings). + fields_to_save (list[str], optional): A list of field names to preserve from the original entry. + All other fields will be removed. + fields_to_remove (list[str], optional): A list of field names to explicitly remove from the original entry, + in addition to those excluded by `fields_to_save`. + **kwargs: Additional arguments passed to the BaseParallelProcessor. + + Raises: + TypeError: If the specified list field is not of type list. + ValueError: If the list items are not dictionaries and `output_field` is not provided. + + Returns: + A manifest where each entry corresponds to one item in the original list from the input entry. + This effectively transforms a single input entry containing a list of items into multiple standalone + entries, each suitable for further dataset processing. + + .. admonition:: Example 1 (list of dicts) + + .. code-block:: yaml + + - _target_: sdp.processors.ListToEntries + input_manifest_file: ${workspace_dir}/input_manifest.json + output_manifest_file: ${workspace_dir}/output_manifest.json + field_with_list: "segments" + + Input:: + + { + "audio_filepath": "sample.wav", + "segments": [ + {"start": 0.0, "end": 1.5, "text": "Hello"}, + {"start": 1.6, "end": 3.0, "text": "World"} + ] + } + + Output:: + + [ + { + "audio_filepath": "sample.wav", + "start": 0.0, + "end": 1.5, + "text": "Hello" + }, + { + "audio_filepath": "sample.wav", + "start": 1.6, + "end": 3.0, + "text": "World" + } + ] + + .. admonition:: Example 2 (list of primitives) + + .. code-block:: yaml + + - _target_: sdp.processors.ListToEntries + input_manifest_file: ${workspace_dir}/input_manifest.json + output_manifest_file: ${workspace_dir}/output_manifest.json + field_with_list: "text_chunks" + output_field: "text" + + Input:: + + { + "audio_filepath": "sample.wav", + "text_chunks": [ + "Hello", + "World" + ] + } + + Output:: + + [ + { + "audio_filepath": "sample.wav", + "text": "Hello" + }, + { + "audio_filepath": "sample.wav", + "text": "World" + } + ] + + """ + + def __init__(self, + field_with_list: str, + output_field: str = None, + fields_to_save: list[str] = None, + fields_to_remove: list[str] = None, + **kwargs): + super().__init__(**kwargs) + self.field_with_list = field_with_list + self.output_field = output_field + self.fields_to_save = fields_to_save + self.fields_to_remove = fields_to_remove + + def process_dataset_entry(self, data_entry): + _entries = [] + + # Check that the target field is actually a list + if not isinstance(data_entry[self.field_with_list], list): + raise TypeError(f'Values of {self.field_with_list} field should be list type only: {data_entry}') + + # Remove the list field from the entry and get the list of items + items_list = data_entry.pop(self.field_with_list) + + # If items are not dicts, output_field must be specified to store the item + if not isinstance(items_list[0], dict) and not self.output_field: + raise ValueError(f'Type of items in items list `{self.field_with_list}` is not dict ({type(items_list[0])}). In this case `output_field` should be provided.') + + # Determine which fields to remove from the entry before expanding + fields_to_remove = set() + if self.fields_to_save is not None: + for field in data_entry: + if field not in self.fields_to_save: + fields_to_remove.add(field) + + if self.fields_to_remove is not None: + fields_to_remove.update(self.fields_to_remove) + + # Remove specified fields + for field in fields_to_remove: + data_entry.pop(field) + + # Expand the list into multiple entries + for item in items_list: + _entry = data_entry.copy() + + # If item is a dict, merge its keys; otherwise, store it in `output_field` + if isinstance(item, dict): + _entry.update(item) + else: + _entry[self.output_field] = item + + _entry = DataEntry(_entry) + _entries.append(_entry) + + return _entries diff --git a/tests/test_data_to_data.py b/tests/test_data_to_data.py index 5bd75f47..ef9c130f 100644 --- a/tests/test_data_to_data.py +++ b/tests/test_data_to_data.py @@ -19,6 +19,7 @@ SubIfASRSubstitution, SubMakeLowercase, SubRegex, + ListToEntries, ) test_params_list = [] @@ -29,13 +30,13 @@ InsIfASRInsertion, {"insert_words": [" nemo", "nemo ", " nemo "]}, {"text": "i love the toolkit", "pred_text": "i love the nemo toolkit"}, - {"text": "i love the nemo toolkit", "pred_text": "i love the nemo toolkit"}, + [{"text": "i love the nemo toolkit", "pred_text": "i love the nemo toolkit"}], ), ( InsIfASRInsertion, {"insert_words": [" nemo", "nemo ", " nemo "]}, {"text": "i love the toolkit", "pred_text": "i love the new nemo toolkit"}, - {"text": "i love the toolkit", "pred_text": "i love the new nemo toolkit"}, + [{"text": "i love the toolkit", "pred_text": "i love the new nemo toolkit"}], ), ] ) @@ -46,7 +47,7 @@ SubIfASRSubstitution, {"sub_words": {"nmo ": "nemo "}}, {"text": "i love the nmo toolkit", "pred_text": "i love the nemo toolkit"}, - {"text": "i love the nemo toolkit", "pred_text": "i love the nemo toolkit"}, + [{"text": "i love the nemo toolkit", "pred_text": "i love the nemo toolkit"}], ), ] ) @@ -57,7 +58,7 @@ SubIfASRSubstitution, {"sub_words": {"nmo ": "nemo "}}, {"text": "i love the nmo toolkit", "pred_text": "i love the nemo toolkit"}, - {"text": "i love the nemo toolkit", "pred_text": "i love the nemo toolkit"}, + [{"text": "i love the nemo toolkit", "pred_text": "i love the nemo toolkit"}], ), ] ) @@ -68,13 +69,13 @@ SubMakeLowercase, {}, {"text": "Hello Привет 123"}, - {"text": "hello привет 123"}, + [{"text": "hello привет 123"}], ), ( SubMakeLowercase, {"text_key": "text_new"}, {"text_new": "Hello Привет 123"}, - {"text_new": "hello привет 123"}, + [{"text_new": "hello привет 123"}], ), ] ) @@ -85,8 +86,34 @@ SubRegex, {"regex_params_list": [{"pattern": "\s<.*>\s", "repl": " "}]}, {"text": "hello world"}, - {"text": "hello world"}, + [{"text": "hello world"}], + ), + ] +) + +test_params_list.extend( + [ + # Test: list of dictionaries (e.g., segments) + ( + ListToEntries, + {"field_with_list": "segments", "fields_to_remove": ["duration"]}, + {"audio_filepath": "a.wav", "segments": [{"start": 0.0, "end": 1.0, "text": "Hello"}, {"start": 1.1, "end": 2.0, "text": "World"}], "duration": 2.5}, + [{"audio_filepath": "a.wav", "start": 0.0, "end": 1.0, "text": "Hello"}, {"audio_filepath": "a.wav", "start": 1.1, "end": 2.0, "text": "World"}] + ), + # Test: list of primitive values (strings), requires output_field + ( + ListToEntries, + {"field_with_list": "text_chunks", "output_field": "text"}, + {"audio_filepath": "b.wav", "text_chunks": ["Привет", "Мир"], "lang": "ru"}, + [{"audio_filepath": "b.wav", "lang": "ru", "text": "Привет"}, {"audio_filepath": "b.wav", "lang": "ru", "text": "Мир"}] ), + # Test: only keep specified fields (fields_to_save) + ( + ListToEntries, + {"field_with_list": "segments", "fields_to_save": ["audio_filepath"]}, + {"audio_filepath": "c.wav", "segments": [{"start": 0, "text": "A"}, {"start": 1, "text": "B"}], "remove_me": "to_delete"}, + [{"audio_filepath": "c.wav", "start": 0, "text": "A"}, {"audio_filepath": "c.wav", "start": 1, "text": "B"}] + ) ] ) @@ -94,7 +121,6 @@ @pytest.mark.parametrize("test_class,class_kwargs,test_input,expected_output", test_params_list, ids=str) def test_data_to_data(test_class, class_kwargs, test_input, expected_output): processor = test_class(**class_kwargs, output_manifest_file=None) + result = [entry.data for entry in processor.process_dataset_entry(test_input)] - output = processor.process_dataset_entry(test_input)[0].data - - assert output == expected_output + assert result == expected_output \ No newline at end of file From 74085d619307c4364f146f8b7514430bd5ecc288 Mon Sep 17 00:00:00 2001 From: Sasha Meister <117230141+ssh-meister@users.noreply.github.com> Date: Fri, 4 Jul 2025 10:56:19 +0000 Subject: [PATCH 2/3] optional to nitpick_ignore Signed-off-by: Sasha Meister <117230141+ssh-meister@users.noreply.github.com> --- docs/src/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/src/conf.py b/docs/src/conf.py index 975a5dbf..3c1e3b13 100644 --- a/docs/src/conf.py +++ b/docs/src/conf.py @@ -185,6 +185,7 @@ def setup(app): nitpick_ignore = [ ('py:class', 'abc.ABC'), ('py:class', 'sdp.processors.base_processor.DataEntry'), + ('py:class', 'optional'), ] # nitpick_ignore_regex = [('py:class', '*')] From 1db78c904dbb335c891edb48bae997f605855d2a Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Mon, 21 Jul 2025 04:47:27 -0700 Subject: [PATCH 3/3] =?UTF-8?q?Changes=20addressing=20the=20reviewer?= =?UTF-8?q?=E2=80=99s=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sasha Meister --- .../modify_manifest/data_to_data.py | 26 ++----------------- tests/test_data_to_data.py | 11 ++------ 2 files changed, 4 insertions(+), 33 deletions(-) diff --git a/sdp/processors/modify_manifest/data_to_data.py b/sdp/processors/modify_manifest/data_to_data.py index 1eff361e..daec7c89 100644 --- a/sdp/processors/modify_manifest/data_to_data.py +++ b/sdp/processors/modify_manifest/data_to_data.py @@ -1136,10 +1136,6 @@ class ListToEntries(BaseParallelProcessor): field_with_list (str): The name of the field in the input entry that contains a list. output_field (str, optional): The name of the output field to assign to items in the list if they are not dictionaries. Required if the list contains primitive types (e.g., strings). - fields_to_save (list[str], optional): A list of field names to preserve from the original entry. - All other fields will be removed. - fields_to_remove (list[str], optional): A list of field names to explicitly remove from the original entry, - in addition to those excluded by `fields_to_save`. **kwargs: Additional arguments passed to the BaseParallelProcessor. Raises: @@ -1225,15 +1221,11 @@ class ListToEntries(BaseParallelProcessor): def __init__(self, field_with_list: str, output_field: str = None, - fields_to_save: list[str] = None, - fields_to_remove: list[str] = None, **kwargs): super().__init__(**kwargs) self.field_with_list = field_with_list self.output_field = output_field - self.fields_to_save = fields_to_save - self.fields_to_remove = fields_to_remove - + def process_dataset_entry(self, data_entry): _entries = [] @@ -1247,21 +1239,7 @@ def process_dataset_entry(self, data_entry): # If items are not dicts, output_field must be specified to store the item if not isinstance(items_list[0], dict) and not self.output_field: raise ValueError(f'Type of items in items list `{self.field_with_list}` is not dict ({type(items_list[0])}). In this case `output_field` should be provided.') - - # Determine which fields to remove from the entry before expanding - fields_to_remove = set() - if self.fields_to_save is not None: - for field in data_entry: - if field not in self.fields_to_save: - fields_to_remove.add(field) - - if self.fields_to_remove is not None: - fields_to_remove.update(self.fields_to_remove) - - # Remove specified fields - for field in fields_to_remove: - data_entry.pop(field) - + # Expand the list into multiple entries for item in items_list: _entry = data_entry.copy() diff --git a/tests/test_data_to_data.py b/tests/test_data_to_data.py index 2dd407b6..a18e40e8 100644 --- a/tests/test_data_to_data.py +++ b/tests/test_data_to_data.py @@ -97,9 +97,9 @@ # Test: list of dictionaries (e.g., segments) ( ListToEntries, - {"field_with_list": "segments", "fields_to_remove": ["duration"]}, + {"field_with_list": "segments"}, {"audio_filepath": "a.wav", "segments": [{"start": 0.0, "end": 1.0, "text": "Hello"}, {"start": 1.1, "end": 2.0, "text": "World"}], "duration": 2.5}, - [{"audio_filepath": "a.wav", "start": 0.0, "end": 1.0, "text": "Hello"}, {"audio_filepath": "a.wav", "start": 1.1, "end": 2.0, "text": "World"}] + [{"audio_filepath": "a.wav", "duration": 2.5, "start": 0.0, "end": 1.0, "text": "Hello"}, {"audio_filepath": "a.wav", "duration": 2.5, "start": 1.1, "end": 2.0, "text": "World"}] ), # Test: list of primitive values (strings), requires output_field ( @@ -108,13 +108,6 @@ {"audio_filepath": "b.wav", "text_chunks": ["Привет", "Мир"], "lang": "ru"}, [{"audio_filepath": "b.wav", "lang": "ru", "text": "Привет"}, {"audio_filepath": "b.wav", "lang": "ru", "text": "Мир"}] ), - # Test: only keep specified fields (fields_to_save) - ( - ListToEntries, - {"field_with_list": "segments", "fields_to_save": ["audio_filepath"]}, - {"audio_filepath": "c.wav", "segments": [{"start": 0, "text": "A"}, {"start": 1, "text": "B"}], "remove_me": "to_delete"}, - [{"audio_filepath": "c.wav", "start": 0, "text": "A"}, {"audio_filepath": "c.wav", "start": 1, "text": "B"}] - ), ] )