Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions bittensor/_dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def __new__(
save_dataset = config.dataset.save_dataset,
max_datasets = config.dataset.max_datasets,
no_tokenizer = config.dataset.no_tokenizer,
num_batches = config.dataset.num_batches
num_batches = config.dataset.num_batches,
max_directories = config.dataset.max_directories
)
else:
return dataset_impl.GenesisTextDataset(
Expand All @@ -105,7 +106,8 @@ def __new__(
save_dataset = config.dataset.save_dataset,
max_datasets = config.dataset.max_datasets,
no_tokenizer = config.dataset.no_tokenizer,
num_batches = config.dataset.num_batches
num_batches = config.dataset.num_batches,
max_directories = config.dataset.max_directories
)

@classmethod
Expand Down Expand Up @@ -138,6 +140,7 @@ def add_args(cls, parser: argparse.ArgumentParser, prefix: str = None ):
parser.add_argument('--' + prefix_str + 'dataset.no_tokenizer', action='store_true', help='To return non-tokenized text (EXPERIMENTAL, DO NOT USE)',default=False)
parser.add_argument('--' + prefix_str + 'dataset.num_batches', type=int, help='The number of data to download each time(measured by the number of batches).', default=bittensor.defaults.dataset.num_batches)
parser.add_argument('--' + prefix_str + 'dataset._mock', action='store_true', help='To turn on dataset mocking for testing purposes.', default=False)
parser.add_argument('--' + prefix_str + 'dataset.max_directories', type=int, help='Maximum number of directories to consider when loading text from IPFS', default=bittensor.defaults.dataset.max_directories)

except argparse.ArgumentError:
# re-parsing arguments.
Expand Down Expand Up @@ -165,6 +168,7 @@ def add_defaults(cls, defaults):
defaults.dataset.save_dataset = os.getenv('BT_DATASET_SAVE_DATASET') if os.getenv('BT_DATASET_SAVE_DATASET') != None else False
defaults.dataset.max_datasets = os.getenv('BT_DATASET_MAX_DATASETS') if os.getenv('BT_DATASET_MAX_DATASETS') != None else 3
defaults.dataset.num_batches = os.getenv('BT_DATASET_NUM_BATCHES') if os.getenv('BT_DATASET_NUM_BATCHES') != None else 500
defaults.dataset.max_directories = os.getenv('BT_DATASET_MAX_DIRECTORIES') if os.getenv('BT_DATASET_MAX_DIRECTORIES') != None else 250

@classmethod
def check_config( cls, config: 'bittensor.Config' ):
Expand Down
45 changes: 24 additions & 21 deletions bittensor/_dataset/dataset_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import concurrent
import json
import os
import random
import time
from multiprocessing import cpu_count
from typing import Union

import requests
Expand All @@ -36,7 +38,8 @@

logger = logger.opt(colors=True)

class Dataset():

class Dataset:
""" Implementation for the dataset class, which handles dataloading from ipfs
"""
def __init__(self):
Expand Down Expand Up @@ -132,7 +135,8 @@ def __init__(
save_dataset,
max_datasets,
no_tokenizer,
num_batches
num_batches,
max_directories
):
super().__init__()
self.block_size = block_size
Expand All @@ -150,6 +154,7 @@ def __init__(
self.backup_dataset_cap_size = 5e7 # set 50MB limit per folder
self.IPFS_fails_max = 10
self.num_batches = num_batches
self.max_directories = max_directories

# Retrieve a random slice of the genesis dataset
self.data = []
Expand Down Expand Up @@ -473,25 +478,23 @@ def construct_text_corpus(self, min_data_len = 0):
i = 0

# --- Dont stop until the corpus size and the minimum data_length was reached.
for directory in directories:
# --- Get a directory that leads to a datafile.
random_datafile_dir = self.get_root_text_hash(directory)
if random_datafile_dir == None:
pass

# --- Get text from the datafile directory
text = self.get_text(random_datafile_dir)

if text != None:
text_list = text.split()
data_corpus.extend(text_list)
total_dataset_size += int(random_datafile_dir['Size'])
total_dataset_len += len(text_list)

i += 1

if (total_dataset_len > min_data_len) or self.IPFS_fails > self.IPFS_fails_max:
break
n_workers = cpu_count() if self.num_workers == 0 else self.num_workers
with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor:
future_map = {}
for idx, call_arg in enumerate(directories[:self.max_directories]):
future = executor.submit(self.get_text, call_arg)
future_map[future] = call_arg

for i, future in enumerate(concurrent.futures.as_completed(future_map)):
text = future.result()

if text is not None:
text_list = text.split()
data_corpus.extend(text_list)
total_dataset_len += len(text_list)

if (total_dataset_len > min_data_len) or self.IPFS_fails > self.IPFS_fails_max:
break

else:
logger.error("It appears the directory is empty... Restart your miner to try again.")
Expand Down
4 changes: 3 additions & 1 deletion bittensor/_dataset/dataset_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def __init__(
save_dataset,
max_datasets,
no_tokenizer,
num_batches
num_batches,
max_directories
):
super().__init__()
self.block_size = block_size
Expand All @@ -52,6 +53,7 @@ def __init__(
self.max_datasets = max_datasets
self.__infinite_dataset_iterator = None
self.no_tokenizer = no_tokenizer
self.max_directories = max_directories

# Retrieve a random slice of the genesis dataset
self.data = []
Expand Down