From 5ad15d1de5ee2c15944b64b19df9e6f1c5e45c4b Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 19 Dec 2024 02:59:55 +0000 Subject: [PATCH 1/9] Perf: load data systems on rank 0 --- deepmd/pt/utils/dataloader.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 67e5195f6d..7398c8d659 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -2,7 +2,7 @@ import logging import os import time -from multiprocessing.dummy import ( +from multiprocessing import ( Pool, ) from queue import ( @@ -88,25 +88,25 @@ def __init__( systems = [os.path.join(systems, item) for item in file.keys()] self.systems: list[DeepmdDataSetForLoader] = [] - if len(systems) >= 100: - log.info(f"Constructing DataLoaders from {len(systems)} systems") def construct_dataset(system): + if len(systems) >= 100: + log.info(f"Constructing DataLoaders from {len(systems)} systems") return DeepmdDataSetForLoader( system=system, type_map=type_map, ) - with Pool( - os.cpu_count() - // ( - int(os.environ["LOCAL_WORLD_SIZE"]) - if dist.is_available() and dist.is_initialized() - else 1 - ) - ) as pool: - self.systems = pool.map(construct_dataset, systems) - + global_rank = dist.get_rank() if dist.is_initialized() else 0 + if global_rank == 0: + with Pool(os.cpu_count()) as pool: + self.systems = pool.map(construct_dataset, systems) + if dist.is_initialized(): + dist.broadcast_object_list(self.systems) + else: + self.systems = [None] * len(systems) # type: ignore + dist.broadcast_object_list(self.systems) + assert self.systems[-1] is not None self.sampler_list: list[DistributedSampler] = [] self.index = [] self.total_batch = 0 From 96c9f03a53b4f20aabe61d835da86c01af69e5c9 Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 19 Dec 2024 03:06:56 +0000 Subject: [PATCH 2/9] refactor --- deepmd/pt/utils/dataloader.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 7398c8d659..032fa34b73 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -101,10 +101,9 @@ def construct_dataset(system): if global_rank == 0: with Pool(os.cpu_count()) as pool: self.systems = pool.map(construct_dataset, systems) - if dist.is_initialized(): - dist.broadcast_object_list(self.systems) else: self.systems = [None] * len(systems) # type: ignore + if dist.is_initialized(): dist.broadcast_object_list(self.systems) assert self.systems[-1] is not None self.sampler_list: list[DistributedSampler] = [] From e67ca0056e926af3d64d29c67abbf5ca458fe0e1 Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 19 Dec 2024 03:08:36 +0000 Subject: [PATCH 3/9] update logging --- deepmd/pt/utils/dataloader.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 032fa34b73..7a3cfdfc2f 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -87,18 +87,16 @@ def __init__( with h5py.File(systems) as file: systems = [os.path.join(systems, item) for item in file.keys()] - self.systems: list[DeepmdDataSetForLoader] = [] - def construct_dataset(system): - if len(systems) >= 100: - log.info(f"Constructing DataLoaders from {len(systems)} systems") return DeepmdDataSetForLoader( system=system, type_map=type_map, ) - + self.systems: list[DeepmdDataSetForLoader] = [] global_rank = dist.get_rank() if dist.is_initialized() else 0 if global_rank == 0: + if len(systems) >= 100: + log.info(f"Constructing DataLoaders from {len(systems)} systems") with Pool(os.cpu_count()) as pool: self.systems = pool.map(construct_dataset, systems) else: From 7bf77cbccb2c441368334a587ca3e300083493fb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 03:08:54 +0000 Subject: [PATCH 4/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deepmd/pt/utils/dataloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 032fa34b73..0b439a84e9 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -102,7 +102,7 @@ def construct_dataset(system): with Pool(os.cpu_count()) as pool: self.systems = pool.map(construct_dataset, systems) else: - self.systems = [None] * len(systems) # type: ignore + self.systems = [None] * len(systems) # type: ignore if dist.is_initialized(): dist.broadcast_object_list(self.systems) assert self.systems[-1] is not None From 4954f06e0204e87d220c6b79ef1038b5d7dcea32 Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 19 Dec 2024 03:09:27 +0000 Subject: [PATCH 5/9] revert changes on mp --- deepmd/pt/utils/dataloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 7a3cfdfc2f..8070f413e0 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -2,7 +2,7 @@ import logging import os import time -from multiprocessing import ( +from multiprocessing.dummy import ( Pool, ) from queue import ( From 555c3f97893844a57a3fea51fee15d546544e0d9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 03:12:16 +0000 Subject: [PATCH 6/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deepmd/pt/utils/dataloader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index a46df3fe4a..961a8ed9c4 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -92,6 +92,7 @@ def construct_dataset(system): system=system, type_map=type_map, ) + self.systems: list[DeepmdDataSetForLoader] = [] global_rank = dist.get_rank() if dist.is_initialized() else 0 if global_rank == 0: From 1dcc5d5711343ba9b6695f5c86f4812f8fcf7e7a Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 19 Dec 2024 04:08:17 +0000 Subject: [PATCH 7/9] always output logging info --- deepmd/pt/utils/dataloader.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 961a8ed9c4..802183832e 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -96,8 +96,7 @@ def construct_dataset(system): self.systems: list[DeepmdDataSetForLoader] = [] global_rank = dist.get_rank() if dist.is_initialized() else 0 if global_rank == 0: - if len(systems) >= 100: - log.info(f"Constructing DataLoaders from {len(systems)} systems") + log.info(f"Constructing DataLoaders from {len(systems)} systems") with Pool(os.cpu_count()) as pool: self.systems = pool.map(construct_dataset, systems) else: From feb590284e6269d6bf82c7d8ce3fbff672d0cc5e Mon Sep 17 00:00:00 2001 From: Chun Cai Date: Wed, 25 Dec 2024 10:24:53 +0800 Subject: [PATCH 8/9] using env.NUM_WORKERS for pool size Signed-off-by: Chun Cai --- deepmd/pt/utils/dataloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 802183832e..dbeb86079d 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -97,7 +97,7 @@ def construct_dataset(system): global_rank = dist.get_rank() if dist.is_initialized() else 0 if global_rank == 0: log.info(f"Constructing DataLoaders from {len(systems)} systems") - with Pool(os.cpu_count()) as pool: + with Pool(env.NUM_WORKERS) as pool: self.systems = pool.map(construct_dataset, systems) else: self.systems = [None] * len(systems) # type: ignore From 4c137c34e014989115d534e9d0d1e09030a4587a Mon Sep 17 00:00:00 2001 From: Chun Cai Date: Thu, 26 Dec 2024 10:17:28 +0800 Subject: [PATCH 9/9] fix the case for NUM_WORKER=0 Co-authored-by: Jinzhe Zeng Signed-off-by: Chun Cai --- deepmd/pt/utils/dataloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index dbeb86079d..048fd912e4 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -97,7 +97,7 @@ def construct_dataset(system): global_rank = dist.get_rank() if dist.is_initialized() else 0 if global_rank == 0: log.info(f"Constructing DataLoaders from {len(systems)} systems") - with Pool(env.NUM_WORKERS) as pool: + with Pool(max(1, env.NUM_WORKERS)) as pool: self.systems = pool.map(construct_dataset, systems) else: self.systems = [None] * len(systems) # type: ignore