From 681429e66ea990e53630bb275fb37c4e45aadd09 Mon Sep 17 00:00:00 2001 From: Sasha Meister <117230141+ssh-meister@users.noreply.github.com> Date: Fri, 4 Jul 2025 10:10:30 +0000 Subject: [PATCH 1/5] Group file management processors Signed-off-by: Sasha Meister <117230141+ssh-meister@users.noreply.github.com> --- docs/src/sdp/api.rst | 19 +- sdp/processors/__init__.py | 12 +- sdp/processors/manage_files/convert_audio.py | 170 ++++++++++++++++++ sdp/processors/manage_files/extract.py | 118 ++++++++++++ sdp/processors/manage_files/remove.py | 86 +++++++++ .../modify_manifest/data_to_data.py | 149 --------------- 6 files changed, 398 insertions(+), 156 deletions(-) create mode 100644 sdp/processors/manage_files/convert_audio.py create mode 100644 sdp/processors/manage_files/extract.py create mode 100644 sdp/processors/manage_files/remove.py diff --git a/docs/src/sdp/api.rst b/docs/src/sdp/api.rst index bfa2bc62..57f8594d 100644 --- a/docs/src/sdp/api.rst +++ b/docs/src/sdp/api.rst @@ -254,13 +254,25 @@ ASR-based processors Data modifications '''''''''''''''''' +.. autodata:: sdp.processors.InsIfASRInsertion + :annotation: + +.. autodata:: sdp.processors.SubIfASRSubstitution + :annotation: + +Files management +'''''''''''''''' + .. autodata:: sdp.processors.SoxConvert :annotation: -.. autodata:: sdp.processors.InsIfASRInsertion +.. autodata:: sdp.processors.FfmpegConvert :annotation: -.. autodata:: sdp.processors.SubIfASRSubstitution +.. autodata:: sdp.processors.ExtractTar + :annotation: + +.. autodata:: sdp.processors.RemoveFiles :annotation: Data filtering @@ -355,9 +367,6 @@ Miscellaneous .. autodata:: sdp.processors.GetAudioDuration :annotation: -.. autodata:: sdp.processors.FfmpegConvert - :annotation: - .. autodata:: sdp.processors.CreateInitialManifestByExt :annotation: diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index c3ff70b6..87535e86 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -92,7 +92,6 @@ CopyManifestData, CountNumWords, ExtractFromBrackets, - FfmpegConvert, GetAudioDuration, GetWER, InsIfASRInsertion, @@ -101,7 +100,6 @@ MakeSentence, ReadDocxLines, ReadTxtLines, - SoxConvert, SplitLineBySentence, SubIfASRSubstitution, SubMakeLowercase, @@ -128,6 +126,16 @@ from sdp.processors.modify_manifest.make_letters_uppercase_after_period import ( MakeLettersUppercaseAfterPeriod, ) +from sdp.processors.manage_files.convert_audio import ( + FfmpegConvert, + SoxConvert, +) +from sdp.processors.manage_files.extract import ( + ExtractTar, +) +from sdp.processors.manage_files.remove import ( + RemoveFiles, +) from sdp.processors.nemo.asr_inference import ASRInference from sdp.processors.nemo.estimate_bandwidth import EstimateBandwidth from sdp.processors.nemo.pc_inference import PCInference diff --git a/sdp/processors/manage_files/convert_audio.py b/sdp/processors/manage_files/convert_audio.py new file mode 100644 index 00000000..3fbd3870 --- /dev/null +++ b/sdp/processors/manage_files/convert_audio.py @@ -0,0 +1,170 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Optional +from sox import Transformer + +from sdp.logging import logger +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry + +from sdp.utils.common import ffmpeg_convert + + +class FfmpegConvert(BaseParallelProcessor): + """ + Processor for converting video or audio files to audio using FFmpeg and updating the dataset with the path to the resampled audio. + If ``id_key`` is not None, the output file path will be ``/.wav``. + If ``id_key`` is None, the output file path will be ``/.wav``. + + .. note:: ``id_key`` can be used to create subdirectories inside ``resampled_audio_dir`` (by using forward slashes ``/``). + e.g. if ``id_key`` takes the form ``dir_name1/dir_name2/filename``, the output file path will be + + ``/dir_name1/dirname2/filename.wav``. + + Args: + converted_audio_dir (str): The directory to store the resampled audio files. + input_file_key (str): The field in the dataset representing the path to the input video or audio files. + output_file_key (str): The field in the dataset representing the path to the resampled audio files with ``output_format``. If ``id_key`` is None, the output file path will be ``/.wav``. + id_key (str): (Optional) The field in the dataset representing the unique ID or identifier for each entry. If ``id_key`` is not None, the output file path will be ``/.wav``. Defaults to None. + output_format (str): (Optional) Format of the output audio files. Defaults to `wav`. + target_samplerate (int): (Optional) The target sampling rate for the resampled audio. Defaults to 16000. + target_nchannels (int): (Optional) The target number of channels for the resampled audio. Defaults to 1. + **kwargs: Additional keyword arguments to be passed to the base class `BaseParallelProcessor`. + + """ + + def __init__( + self, + converted_audio_dir: str, + input_file_key: str, + output_file_key: str, + id_key: str = None, + output_format: str = "wav", + base_dir: str = None, + target_samplerate: int = 16000, + target_nchannels: int = 1, + **kwargs, + ): + super().__init__(**kwargs) + self.converted_audio_dir = converted_audio_dir + self.input_file_key = input_file_key + self.output_file_key = output_file_key + self.output_format = output_format + self.id_key = id_key + self.base_dir = base_dir + self.target_samplerate = target_samplerate + self.target_nchannels = target_nchannels + + def prepare(self): + assert self.output_format == "wav", "Currently only wav format is supported" + os.makedirs(self.converted_audio_dir, exist_ok=True) + + def process_dataset_entry(self, data_entry): + input_file = data_entry[self.input_file_key] + if self.id_key: + key = data_entry[self.id_key] + os.makedirs(os.path.join(self.converted_audio_dir, *key.split("/")[:-1]), exist_ok=True) + else: + key = os.path.splitext(input_file)[0].split("/")[-1] + + if self.base_dir: + new_dir = os.path.dirname(os.path.relpath(input_file, self.base_dir)) + os.makedirs(os.path.join(self.converted_audio_dir, new_dir), exist_ok=True) + + key = os.path.join(new_dir, key) + + audio_file = os.path.join(self.converted_audio_dir, key) + "." + self.output_format + + if not os.path.isfile(audio_file): + ffmpeg_convert(input_file, audio_file, self.target_samplerate, self.target_nchannels) + + data_entry[self.output_file_key] = audio_file + return [DataEntry(data=data_entry)] + + +class SoxConvert(BaseParallelProcessor): + """Processor for Sox to convert audio files to specified format. + + Args: + output_manifest_file (str): Path to the output manifest file. + input_audio_file_key (str): Key in the manifest file that contains the path to the input audio file. + output_audio_file_key (str): Key in the manifest file that contains the path to the output audio file. + converted_audio_dir (str): Path to the directory where the converted audio files will be stored. + output_format (str): Format of the output audio file. + rate (int): Sample rate of the output audio file. + channels (int): Number of channels of the output audio file. + workspace_dir (str, Optional): Path to the workspace directory. Defaults to None. + """ + + def __init__( + self, + converted_audio_dir: str, + input_audio_file_key: str = "audio_filepath", + output_audio_file_key: str = "audio_filepath", + output_format: str = "wav", + rate: int = 16000, + channels: int = 1, + workspace_dir: Optional[str] = None, + **kwargs, + ): + # Extract workspace_dir from kwargs to avoid passing it to BaseProcessor + if "workspace_dir" in kwargs: + workspace_dir = kwargs.pop("workspace_dir") + + super().__init__(**kwargs) + self.input_audio_file_key = input_audio_file_key + self.output_audio_file_key = output_audio_file_key + self.converted_audio_dir = converted_audio_dir + self.output_format = output_format + self.workspace_dir = workspace_dir + + # Store the new parameters for later use: + self.rate = rate + self.channels = channels + + def prepare(self): + # Debug print for workspace_dir + logger.info(f"SoxConvert workspace_dir: {self.workspace_dir}") + os.makedirs(self.converted_audio_dir, exist_ok=True) + + def process_dataset_entry(self, data_entry): + audio_path = data_entry[self.input_audio_file_key] + + # If workspace_dir is provided, join it with audio_path to get absolute path + if self.workspace_dir is not None: + full_audio_path = os.path.join(self.workspace_dir, audio_path) + else: + full_audio_path = audio_path + + # Debug print first file path + if not hasattr(self, '_debug_printed'): + logger.info(f"First audio_path from manifest: {audio_path}") + logger.info(f"First full_audio_path: {full_audio_path}") + logger.info(f"Path exists: {os.path.exists(full_audio_path)}") + self._debug_printed = True + + key = os.path.splitext(audio_path)[0].split("/")[-1] + converted_file = os.path.join(self.converted_audio_dir, key) + f".{self.output_format}" + + if not os.path.isfile(converted_file): + transformer = Transformer() + + transformer.rate(self.rate) + transformer.channels(self.channels) + + transformer.build(full_audio_path, converted_file) + + data_entry[self.output_audio_file_key] = converted_file + return [DataEntry(data=data_entry)] \ No newline at end of file diff --git a/sdp/processors/manage_files/extract.py b/sdp/processors/manage_files/extract.py new file mode 100644 index 00000000..4c6126f6 --- /dev/null +++ b/sdp/processors/manage_files/extract.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tarfile +import os +from pathlib import Path + +from sdp.logging import logger +from sdp.processors.base_processor import DataEntry, BaseParallelProcessor + + +class ExtractTar(BaseParallelProcessor): + """ + A processor that extracts `.tar` archives for each entry in a dataset. + + This processor reads a filepath to a tar archive from a specific field in the dataset entry, + extracts the contents into a specified directory, and optionally appends the extracted file paths + or the extraction directory to the entry under a new field. + + Args: + field_to_tar_filepath (str): The field in the input entry that contains the path to the `.tar` file. + extraction_dir (str): The base directory where extracted files should be placed. + remove_source_tar (bool): If True, deletes the original `.tar` file after successful extraction. + skip_invalid_filepaths (bool): If True, logs and skips invalid paths instead of raising exceptions. + filepath_prefix_field (str): Optional field in the entry used as a subdirectory prefix under `extraction_dir`. + output_filepath_field (str): Field name where the output (path or list of paths) will be stored. + get_extracted_filepaths (bool): If True, collects and returns a list of all extracted file paths. + + Returns: + A manifest where each entry is updated with the path to the extracted files or directory. + """ + + def __init__( + self, + field_to_tar_filepath: str, + extraction_dir: str, + remove_source_tar: bool = False, + skip_invalid_filepaths: bool = False, + filepath_prefix_field: str = None, + output_filepath_field: str = 'extracted', + get_extracted_filepaths: bool = False, + **kwargs + ): + super().__init__(**kwargs) + self.field_to_tar_filepath = field_to_tar_filepath + self.extraction_dir = extraction_dir + self.remove_source_tar = remove_source_tar + self.skip_invalid_filepaths = skip_invalid_filepaths + self.filepath_prefix_field = filepath_prefix_field + self.output_filepath_field = output_filepath_field + self.get_extracted_filepaths = get_extracted_filepaths + + def process_dataset_entry(self, data_entry): + # Read the tar file path from the specified field + tar_filepath = data_entry[self.field_to_tar_filepath] + + # Handle missing or invalid tar file path + if not isinstance(tar_filepath, str) or not os.path.exists(tar_filepath): + if self.skip_invalid_filepaths: + logger.info(f"Invalid filepath {tar_filepath}. Skipping..") + output_filepath = None + else: + raise ValueError(f"Invalid filepath {tar_filepath}.") + else: + # Determine output path using optional prefix and tar filename + output_filepath_prefix = ( + data_entry[self.filepath_prefix_field] + if self.filepath_prefix_field and data_entry.get(self.filepath_prefix_field) + else '' + ) + output_filepath = os.path.join( + self.extraction_dir, + output_filepath_prefix, + os.path.basename(tar_filepath).split('.')[0] + ) + os.makedirs(output_filepath, exist_ok=True) + + # Extract tar archive into target directory + try: + with tarfile.open(tar_filepath, 'r') as tar: + tar.extractall(path=output_filepath) + except Exception as e: + if self.skip_invalid_filepaths: + logger.info(f"Error extracting {tar_filepath}: {e}. Skipping..") + output_filepath = None + else: + raise ValueError(f"Error extracting {tar_filepath}: {e}") + + # Gather list of all extracted files if requested + extracted_filepaths = [] + if output_filepath is not None and self.get_extracted_filepaths: + extraction_folder_path = Path(output_filepath) + extracted_filepaths = [ + str(file) for file in extraction_folder_path.rglob("*") if file.is_file() + ] + + # Optionally remove the original tar archive after extraction + if self.remove_source_tar: + os.remove(tar_filepath) + + # Write the extraction result into the entry (either path or file list) + if self.get_extracted_filepaths: + data_entry[self.output_filepath_field] = extracted_filepaths + else: + data_entry[self.output_filepath_field] = output_filepath + + return [DataEntry(data=data_entry)] \ No newline at end of file diff --git a/sdp/processors/manage_files/remove.py b/sdp/processors/manage_files/remove.py new file mode 100644 index 00000000..5ff9f0ca --- /dev/null +++ b/sdp/processors/manage_files/remove.py @@ -0,0 +1,86 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil + +from sdp.logging import logger +from sdp.processors.base_processor import DataEntry, BaseParallelProcessor + + +class RemoveFiles(BaseParallelProcessor): + """ + A processor that removes files or directories from the filesystem based on a filepath + specified in the input data entry. + + This processor is typically used for cleanup tasks after processing files. + + Args: + filepath_field (str): The key in the data entry that holds the path to the file or directory to remove. + + drop_filepath_field (bool): Whether to remove the filepath field from the resulting data entry. Defaults to True. + + **kwargs: Additional arguments passed to the BaseParallelProcessor. + + Returns: + A manifest where each entry is the same as the input, optionally without the filepath field, + and with the file or directory at the specified path removed from disk. + + Example entry before processing:: + + { + "id": "abc123", + "path_to_remove": "/tmp/some_file.wav" + } + + Example entry after processing (if `drop_filepath_field=True`):: + + { + "id": "abc123" + } + """ + + def __init__(self, + filepath_field: str, + drop_filepath_field: bool = True, + **kwargs): + + super().__init__(**kwargs) + self.filepath_field = filepath_field + self.drop_filepath_field = drop_filepath_field + + def process_dataset_entry(self, data_entry): + """ + Remove the file or directory specified in the given field of the data entry. + + Args: + data_entry (dict): A single input sample from the dataset manifest. + + Returns: + List[DataEntry]: A single-element list containing the updated entry. + """ + filepath = data_entry[self.filepath_field] + + # Remove the target path from the filesystem + if os.path.isdir(filepath): + shutil.rmtree(filepath) # Recursively delete directory + else: + os.remove(filepath) # Delete a single file + + # Optionally remove the filepath field from the data entry + if self.drop_filepath_field: + data_entry.pop(self.filepath_field) + + # Wrap and return the modified entry + return [DataEntry(data=data_entry)] \ No newline at end of file diff --git a/sdp/processors/modify_manifest/data_to_data.py b/sdp/processors/modify_manifest/data_to_data.py index 16e1de6d..907802cb 100644 --- a/sdp/processors/modify_manifest/data_to_data.py +++ b/sdp/processors/modify_manifest/data_to_data.py @@ -20,7 +20,6 @@ import soundfile import torchaudio from docx import Document -from sox import Transformer from tqdm import tqdm import json @@ -76,78 +75,6 @@ def process_dataset_entry(self, data_entry): return [DataEntry(data=data_entry)] -class FfmpegConvert(BaseParallelProcessor): - """ - Processor for converting video or audio files to audio using FFmpeg and updating the dataset with the path to the resampled audio. - If ``id_key`` is not None, the output file path will be ``/.wav``. - If ``id_key`` is None, the output file path will be ``/.wav``. - - .. note:: ``id_key`` can be used to create subdirectories inside ``resampled_audio_dir`` (by using forward slashes ``/``). - e.g. if ``id_key`` takes the form ``dir_name1/dir_name2/filename``, the output file path will be - - ``/dir_name1/dirname2/filename.wav``. - - Args: - converted_audio_dir (str): The directory to store the resampled audio files. - input_file_key (str): The field in the dataset representing the path to the input video or audio files. - output_file_key (str): The field in the dataset representing the path to the resampled audio files with ``output_format``. If ``id_key`` is None, the output file path will be ``/.wav``. - id_key (str): (Optional) The field in the dataset representing the unique ID or identifier for each entry. If ``id_key`` is not None, the output file path will be ``/.wav``. Defaults to None. - output_format (str): (Optional) Format of the output audio files. Defaults to `wav`. - target_samplerate (int): (Optional) The target sampling rate for the resampled audio. Defaults to 16000. - target_nchannels (int): (Optional) The target number of channels for the resampled audio. Defaults to 1. - **kwargs: Additional keyword arguments to be passed to the base class `BaseParallelProcessor`. - - """ - - def __init__( - self, - converted_audio_dir: str, - input_file_key: str, - output_file_key: str, - id_key: str = None, - output_format: str = "wav", - base_dir: str = None, - target_samplerate: int = 16000, - target_nchannels: int = 1, - **kwargs, - ): - super().__init__(**kwargs) - self.converted_audio_dir = converted_audio_dir - self.input_file_key = input_file_key - self.output_file_key = output_file_key - self.output_format = output_format - self.id_key = id_key - self.base_dir = base_dir - self.target_samplerate = target_samplerate - self.target_nchannels = target_nchannels - - def prepare(self): - assert self.output_format == "wav", "Currently only wav format is supported" - os.makedirs(self.converted_audio_dir, exist_ok=True) - - def process_dataset_entry(self, data_entry): - input_file = data_entry[self.input_file_key] - if self.id_key: - key = data_entry[self.id_key] - os.makedirs(os.path.join(self.converted_audio_dir, *key.split("/")[:-1]), exist_ok=True) - else: - key = os.path.splitext(input_file)[0].split("/")[-1] - - if self.base_dir: - new_dir = os.path.dirname(os.path.relpath(input_file, self.base_dir)) - os.makedirs(os.path.join(self.converted_audio_dir, new_dir), exist_ok=True) - - key = os.path.join(new_dir, key) - - audio_file = os.path.join(self.converted_audio_dir, key) + "." + self.output_format - - if not os.path.isfile(audio_file): - ffmpeg_convert(input_file, audio_file, self.target_samplerate, self.target_nchannels) - - data_entry[self.output_file_key] = audio_file - return [DataEntry(data=data_entry)] - - class ReadTxtLines(BaseParallelProcessor): """ The text file specified in source_filepath will be read, and each line in it will be added as a line in the output manifest, @@ -183,82 +110,6 @@ def process_dataset_entry(self, data_entry): return data_list -class SoxConvert(BaseParallelProcessor): - """Processor for Sox to convert audio files to specified format. - - Args: - output_manifest_file (str): Path to the output manifest file. - input_audio_file_key (str): Key in the manifest file that contains the path to the input audio file. - output_audio_file_key (str): Key in the manifest file that contains the path to the output audio file. - converted_audio_dir (str): Path to the directory where the converted audio files will be stored. - output_format (str): Format of the output audio file. - rate (int): Sample rate of the output audio file. - channels (int): Number of channels of the output audio file. - workspace_dir (str, Optional): Path to the workspace directory. Defaults to None. - """ - - def __init__( - self, - converted_audio_dir: str, - input_audio_file_key: str = "audio_filepath", - output_audio_file_key: str = "audio_filepath", - output_format: str = "wav", - rate: int = 16000, - channels: int = 1, - workspace_dir: Optional[str] = None, - **kwargs, - ): - # Extract workspace_dir from kwargs to avoid passing it to BaseProcessor - if "workspace_dir" in kwargs: - workspace_dir = kwargs.pop("workspace_dir") - - super().__init__(**kwargs) - self.input_audio_file_key = input_audio_file_key - self.output_audio_file_key = output_audio_file_key - self.converted_audio_dir = converted_audio_dir - self.output_format = output_format - self.workspace_dir = workspace_dir - - # Store the new parameters for later use: - self.rate = rate - self.channels = channels - - def prepare(self): - # Debug print for workspace_dir - logger.info(f"SoxConvert workspace_dir: {self.workspace_dir}") - os.makedirs(self.converted_audio_dir, exist_ok=True) - - def process_dataset_entry(self, data_entry): - audio_path = data_entry[self.input_audio_file_key] - - # If workspace_dir is provided, join it with audio_path to get absolute path - if self.workspace_dir is not None: - full_audio_path = os.path.join(self.workspace_dir, audio_path) - else: - full_audio_path = audio_path - - # Debug print first file path - if not hasattr(self, '_debug_printed'): - logger.info(f"First audio_path from manifest: {audio_path}") - logger.info(f"First full_audio_path: {full_audio_path}") - logger.info(f"Path exists: {os.path.exists(full_audio_path)}") - self._debug_printed = True - - key = os.path.splitext(audio_path)[0].split("/")[-1] - converted_file = os.path.join(self.converted_audio_dir, key) + f".{self.output_format}" - - if not os.path.isfile(converted_file): - transformer = Transformer() - - transformer.rate(self.rate) - transformer.channels(self.channels) - - transformer.build(full_audio_path, converted_file) - - data_entry[self.output_audio_file_key] = converted_file - return [DataEntry(data=data_entry)] - - class CountNumWords(BaseParallelProcessor): """ Processor for counting the number of words in the text_key field saving the number in num_words_key. From f8b506ea7f79cb1e0c9d58df25b9c8cca4217ba2 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 21 Jul 2025 01:04:09 -0700 Subject: [PATCH 2/5] =?UTF-8?q?Changes=20addressing=20the=20reviewer?= =?UTF-8?q?=E2=80=99s=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: root --- sdp/processors/base_processor.py | 1 - sdp/processors/manage_files/__init__.py | 13 +++++++ sdp/processors/manage_files/remove.py | 51 ++++++++++++++++++++++++- 3 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 sdp/processors/manage_files/__init__.py diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index 6fc22ee8..a4257e53 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -19,7 +19,6 @@ import time from abc import ABC, abstractmethod from dataclasses import dataclass -from itertools import chain from typing import Any, Dict, List, Optional, Union from tqdm import tqdm diff --git a/sdp/processors/manage_files/__init__.py b/sdp/processors/manage_files/__init__.py new file mode 100644 index 00000000..341a77c5 --- /dev/null +++ b/sdp/processors/manage_files/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/sdp/processors/manage_files/remove.py b/sdp/processors/manage_files/remove.py index 5ff9f0ca..c982ef80 100644 --- a/sdp/processors/manage_files/remove.py +++ b/sdp/processors/manage_files/remove.py @@ -14,6 +14,12 @@ import os import shutil +from pathlib import Path +from collections import Counter +from tqdm import tqdm +import itertools +from tqdm.contrib.concurrent import process_map + from sdp.logging import logger from sdp.processors.base_processor import DataEntry, BaseParallelProcessor @@ -30,6 +36,8 @@ class RemoveFiles(BaseParallelProcessor): filepath_field (str): The key in the data entry that holds the path to the file or directory to remove. drop_filepath_field (bool): Whether to remove the filepath field from the resulting data entry. Defaults to True. + + recursive (bool): Whether to recursively remove files from directories. Defaults to False. **kwargs: Additional arguments passed to the BaseParallelProcessor. @@ -54,11 +62,49 @@ class RemoveFiles(BaseParallelProcessor): def __init__(self, filepath_field: str, drop_filepath_field: bool = True, + recursive: bool = False, **kwargs): super().__init__(**kwargs) self.filepath_field = filepath_field self.drop_filepath_field = drop_filepath_field + self.recursive = recursive + + def _count_files(self, data_entry): + """ + Count the number of files to be removed. + """ + filepath = Path(data_entry[self.filepath_field]) + if filepath.is_dir(): + if self.recursive: + file_counter = Counter(f.suffix for f in filepath.iterdir() if f.is_file()) + else: + raise IsADirectoryError(f"Directory {filepath} is not empty and recursive is False") + else: + file_counter = Counter({filepath.suffix : 1}) + return file_counter + + def prepare(self): + """ + Prepare the processor by counting the number of files to be removed. + """ + file_counter = Counter() + for manifest_chunk in self._chunk_manifest(): + chunk_counts = itertools.chain( + process_map( + self._count_files, + manifest_chunk, + max_workers=self.max_workers, + chunksize=self.chunksize, + desc="Counting files to remove", + ) + ) + for entry_file_counter in chunk_counts: + file_counter += entry_file_counter + + print(f"Total files to remove: {sum(file_counter.values())}") + for extension, count in file_counter.items(): + logger.info(f"{extension}\t\t{count}") def process_dataset_entry(self, data_entry): """ @@ -74,7 +120,10 @@ def process_dataset_entry(self, data_entry): # Remove the target path from the filesystem if os.path.isdir(filepath): - shutil.rmtree(filepath) # Recursively delete directory + if self.recursive: + shutil.rmtree(filepath) # Recursively delete directory + else: + raise IsADirectoryError(f"Directory {filepath} is not empty and recursive is False") else: os.remove(filepath) # Delete a single file From 8cb916e0ddd27e62a910533188c3b78042a78318 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Mon, 21 Jul 2025 01:29:10 -0700 Subject: [PATCH 3/5] Fix docs build issue Signed-off-by: Sasha Meister --- docs/src/sdp/api.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/src/sdp/api.rst b/docs/src/sdp/api.rst index e088a9a3..4eecb933 100644 --- a/docs/src/sdp/api.rst +++ b/docs/src/sdp/api.rst @@ -99,22 +99,22 @@ UzbekVoice Earnings21/22 ''''''''''''' -.. autodata:: sdp.processors.datasets.earnings21.CreateInitialAudioAndManifest +.. autodata:: sdp.processors.datasets.earnings.CreateInitialAudioAndManifest :annotation: -.. autodata:: sdp.processors.datasets.earnings21.CreateFullAudioManifestEarnings21 +.. autodata:: sdp.processors.datasets.earnings.CreateFullAudioManifestEarnings21 :annotation: -.. autodata:: sdp.processors.datasets.earnings21.SpeakerSegmentedManifest +.. autodata:: sdp.processors.datasets.earnings.SpeakerSegmentedManifest :annotation: -.. autodata:: sdp.processors.datasets.earnings21.CreateSentenceSegmentedManifest +.. autodata:: sdp.processors.datasets.earnings.CreateSentenceSegmentedManifest :annotation: -.. autodata:: sdp.processors.datasets.earnings21.NeMoForcedAligner +.. autodata:: sdp.processors.datasets.earnings.NeMoForcedAligner :annotation: -.. autodata:: sdp.processors.datasets.earnings21.ApplyEarnings21Normalizations +.. autodata:: sdp.processors.datasets.earnings.ApplyEarnings21Normalizations :annotation: From 3860ba5ded751d3f4f43b6acfea773a1c201efd9 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Mon, 21 Jul 2025 01:44:50 -0700 Subject: [PATCH 4/5] Earnings21/22 added to docs Signed-off-by: Sasha Meister --- docs/src/sdp/existing_configs.rst | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/docs/src/sdp/existing_configs.rst b/docs/src/sdp/existing_configs.rst index d0a3e64e..f2a56deb 100644 --- a/docs/src/sdp/existing_configs.rst +++ b/docs/src/sdp/existing_configs.rst @@ -424,4 +424,18 @@ NemoRunIPL :hidden: config-docs/ipl/config - config-docs/ipl/nemo_run_config \ No newline at end of file + config-docs/ipl/nemo_run_config + +Earnings21/22 +~~~~~~~~~~~~~ + +**Supported configs**. + +* **IPL**: + `config `__ | + :doc:`documentation ` + +.. toctree:: + :hidden: + + config-docs/english/earnings/config \ No newline at end of file From 4ae1adc8813d673f5d07be89c3e06af5248b6c59 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Mon, 21 Jul 2025 01:46:02 -0700 Subject: [PATCH 5/5] Fix doc header Signed-off-by: Sasha Meister --- docs/src/sdp/existing_configs.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/sdp/existing_configs.rst b/docs/src/sdp/existing_configs.rst index f2a56deb..37922108 100644 --- a/docs/src/sdp/existing_configs.rst +++ b/docs/src/sdp/existing_configs.rst @@ -431,7 +431,7 @@ Earnings21/22 **Supported configs**. -* **IPL**: +* **English**: `config `__ | :doc:`documentation `