From 592d893d64d01b71ed24fe4f44ed50d5d2dd0533 Mon Sep 17 00:00:00 2001 From: Joey Legere Date: Wed, 12 Oct 2022 23:31:33 -0400 Subject: [PATCH 1/2] use threadpool and futures for dataloader --- bittensor/_dataset/dataset_impl.py | 40 +++++++++++++++--------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/bittensor/_dataset/dataset_impl.py b/bittensor/_dataset/dataset_impl.py index f104b632bf..56674fc896 100644 --- a/bittensor/_dataset/dataset_impl.py +++ b/bittensor/_dataset/dataset_impl.py @@ -17,10 +17,12 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. +import concurrent import json import os import random import time +from multiprocessing import cpu_count from typing import Union import requests @@ -36,7 +38,8 @@ logger = logger.opt(colors=True) -class Dataset(): + +class Dataset: """ Implementation for the dataset class, which handles dataloading from ipfs """ def __init__(self): @@ -473,25 +476,22 @@ def construct_text_corpus(self, min_data_len = 0): i = 0 # --- Dont stop until the corpus size and the minimum data_length was reached. - for directory in directories: - # --- Get a directory that leads to a datafile. - random_datafile_dir = self.get_root_text_hash(directory) - if random_datafile_dir == None: - pass - - # --- Get text from the datafile directory - text = self.get_text(random_datafile_dir) - - if text != None: - text_list = text.split() - data_corpus.extend(text_list) - total_dataset_size += int(random_datafile_dir['Size']) - total_dataset_len += len(text_list) - - i += 1 - - if (total_dataset_len > min_data_len) or self.IPFS_fails > self.IPFS_fails_max: - break + with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor: + future_map = {} + for idx, call_arg in enumerate(directories[:500]): + future = executor.submit(self.get_text, call_arg) + future_map[future] = call_arg + + for i, future in enumerate(concurrent.futures.as_completed(future_map)): + text = future.result() + + if text is not None: + text_list = text.split() + data_corpus.extend(text_list) + total_dataset_len += len(text_list) + + if (total_dataset_len > min_data_len) or self.IPFS_fails > self.IPFS_fails_max: + break else: logger.error("It appears the directory is empty... Restart your miner to try again.") From a8582a144282851bc80a31ec7ed8441b612528e0 Mon Sep 17 00:00:00 2001 From: Joey Legere Date: Tue, 18 Oct 2022 14:50:22 -0400 Subject: [PATCH 2/2] add cli arg for max directories --- bittensor/_dataset/__init__.py | 8 ++++++-- bittensor/_dataset/dataset_impl.py | 9 ++++++--- bittensor/_dataset/dataset_mock.py | 4 +++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/bittensor/_dataset/__init__.py b/bittensor/_dataset/__init__.py index 53858344df..5f29818eb2 100644 --- a/bittensor/_dataset/__init__.py +++ b/bittensor/_dataset/__init__.py @@ -93,7 +93,8 @@ def __new__( save_dataset = config.dataset.save_dataset, max_datasets = config.dataset.max_datasets, no_tokenizer = config.dataset.no_tokenizer, - num_batches = config.dataset.num_batches + num_batches = config.dataset.num_batches, + max_directories = config.dataset.max_directories ) else: return dataset_impl.GenesisTextDataset( @@ -105,7 +106,8 @@ def __new__( save_dataset = config.dataset.save_dataset, max_datasets = config.dataset.max_datasets, no_tokenizer = config.dataset.no_tokenizer, - num_batches = config.dataset.num_batches + num_batches = config.dataset.num_batches, + max_directories = config.dataset.max_directories ) @classmethod @@ -138,6 +140,7 @@ def add_args(cls, parser: argparse.ArgumentParser, prefix: str = None ): parser.add_argument('--' + prefix_str + 'dataset.no_tokenizer', action='store_true', help='To return non-tokenized text (EXPERIMENTAL, DO NOT USE)',default=False) parser.add_argument('--' + prefix_str + 'dataset.num_batches', type=int, help='The number of data to download each time(measured by the number of batches).', default=bittensor.defaults.dataset.num_batches) parser.add_argument('--' + prefix_str + 'dataset._mock', action='store_true', help='To turn on dataset mocking for testing purposes.', default=False) + parser.add_argument('--' + prefix_str + 'dataset.max_directories', type=int, help='Maximum number of directories to consider when loading text from IPFS', default=bittensor.defaults.dataset.max_directories) except argparse.ArgumentError: # re-parsing arguments. @@ -165,6 +168,7 @@ def add_defaults(cls, defaults): defaults.dataset.save_dataset = os.getenv('BT_DATASET_SAVE_DATASET') if os.getenv('BT_DATASET_SAVE_DATASET') != None else False defaults.dataset.max_datasets = os.getenv('BT_DATASET_MAX_DATASETS') if os.getenv('BT_DATASET_MAX_DATASETS') != None else 3 defaults.dataset.num_batches = os.getenv('BT_DATASET_NUM_BATCHES') if os.getenv('BT_DATASET_NUM_BATCHES') != None else 500 + defaults.dataset.max_directories = os.getenv('BT_DATASET_MAX_DIRECTORIES') if os.getenv('BT_DATASET_MAX_DIRECTORIES') != None else 250 @classmethod def check_config( cls, config: 'bittensor.Config' ): diff --git a/bittensor/_dataset/dataset_impl.py b/bittensor/_dataset/dataset_impl.py index 56674fc896..6710b3eebc 100644 --- a/bittensor/_dataset/dataset_impl.py +++ b/bittensor/_dataset/dataset_impl.py @@ -135,7 +135,8 @@ def __init__( save_dataset, max_datasets, no_tokenizer, - num_batches + num_batches, + max_directories ): super().__init__() self.block_size = block_size @@ -153,6 +154,7 @@ def __init__( self.backup_dataset_cap_size = 5e7 # set 50MB limit per folder self.IPFS_fails_max = 10 self.num_batches = num_batches + self.max_directories = max_directories # Retrieve a random slice of the genesis dataset self.data = [] @@ -476,9 +478,10 @@ def construct_text_corpus(self, min_data_len = 0): i = 0 # --- Dont stop until the corpus size and the minimum data_length was reached. - with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor: + n_workers = cpu_count() if self.num_workers == 0 else self.num_workers + with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor: future_map = {} - for idx, call_arg in enumerate(directories[:500]): + for idx, call_arg in enumerate(directories[:self.max_directories]): future = executor.submit(self.get_text, call_arg) future_map[future] = call_arg diff --git a/bittensor/_dataset/dataset_mock.py b/bittensor/_dataset/dataset_mock.py index 1cf2d0cf6d..0c6302a473 100644 --- a/bittensor/_dataset/dataset_mock.py +++ b/bittensor/_dataset/dataset_mock.py @@ -38,7 +38,8 @@ def __init__( save_dataset, max_datasets, no_tokenizer, - num_batches + num_batches, + max_directories ): super().__init__() self.block_size = block_size @@ -52,6 +53,7 @@ def __init__( self.max_datasets = max_datasets self.__infinite_dataset_iterator = None self.no_tokenizer = no_tokenizer + self.max_directories = max_directories # Retrieve a random slice of the genesis dataset self.data = []