From 3d78ef1ab5dc2ebfb4ed8500ae162c53d61d34b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=96=E9=B9=A4=20=E7=8E=8B?= Date: Tue, 14 Jun 2022 22:33:30 +0800 Subject: [PATCH 1/9] update auto_ftp --- RLA/auto_ftp.py | 5 +++-- RLA/easy_log/tester.py | 15 ++++++++------- test/test_scripts.py | 16 ++++++++++++++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/RLA/auto_ftp.py b/RLA/auto_ftp.py index 3989681..cee02ca 100644 --- a/RLA/auto_ftp.py +++ b/RLA/auto_ftp.py @@ -197,13 +197,14 @@ def upload_file(self, remote_dir, local_dir, local_file): self.close() def download_file(self, remote_file, local_file): + self.sftp = self.sftpconnect() logger.info("try download {}".format(local_file)) if not os.path.isfile(local_file): logger.info("new file {}".format(local_file)) - self.sftp.get(remote_file) + self.sftp.get(remote_file, local_file) elif self.sftp.stat(remote_file).st_size != os.path.getsize(local_file): logger.info("update file {}".format(local_file)) - self.sftp.get(remote_file) + self.sftp.get(remote_file, local_file) else: logger.info("skip download file {}".format(remote_file)) diff --git a/RLA/easy_log/tester.py b/RLA/easy_log/tester.py index 39dc086..ec79f03 100644 --- a/RLA/easy_log/tester.py +++ b/RLA/easy_log/tester.py @@ -313,16 +313,17 @@ def sync_log_file(self): from RLA.auto_ftp import FTPHandler from RLA.auto_ftp import SFTPHandler try: - try: - ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], - username=self.private_config["REMOTE_SETTING"]["username"], - password=self.private_config["REMOTE_SETTING"]["password"]) - except Exception as e: - logger.warn("sending log file failed. {}".format(e)) - logger.warn("try to send log file through sftp") + if 'file_transfer_protocol' not in self.private_config["REMOTE_SETTING"].keys() or self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'sftp': ftp = SFTPHandler(sftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], username=self.private_config["REMOTE_SETTING"]["username"], password=self.private_config["REMOTE_SETTING"]["password"]) + elif self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'ftp': + ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], + username=self.private_config["REMOTE_SETTING"]["username"], + password=self.private_config["REMOTE_SETTING"]["password"]) + else: + raise ValueError("designated file_transfer_protocol {} is not supported".format(self.private_config["REMOTE_SETTING"]['file_transfer_protocol'])) + for root, dirs, files in os.walk(self.log_dir): suffix = root.split("/{}/".format(LOG)) assert len(suffix) == 2, "root should only have one pattern \"/log/\"" diff --git a/test/test_scripts.py b/test/test_scripts.py index 2266fbd..8fb0c57 100644 --- a/test/test_scripts.py +++ b/test/test_scripts.py @@ -1,7 +1,9 @@ from test._base import BaseTest from RLA.easy_log.log_tools import DeleteLogTool, Filter from RLA.easy_log.log_tools import ArchiveLogTool, ViewLogTool - +from RLA.easy_log.tester import exp_manager +from RLA.auto_ftp import SFTPHandler +import os class ScriptTest(BaseTest): @@ -55,4 +57,14 @@ def test_archive(self): def test_view(self): self.remove_and_copy_data() dlt = ViewLogTool(proj_root=self.TARGET_DATA_ROOT, task_table_name=self.TASK_NAME, regex='2022/03/01/21-13*') - dlt.view_log(skip_ask=True) \ No newline at end of file + dlt.view_log(skip_ask=True) + + def test_sync_log(self): + exp_manager.configure(task_name='test', + private_config_path='./test/test_data_root/rla_config.yaml', + log_root='./test/test_data_root/source/') + ftp = SFTPHandler(sftp_server=exp_manager.private_config["REMOTE_SETTING"]["ftp_server"], + username=exp_manager.private_config["REMOTE_SETTING"]["username"], + password=exp_manager.private_config["REMOTE_SETTING"]["password"]) + ftp.upload_file(os.getcwd() + '/' + 'test/test_data_root/target/', 'test/test_data_root/source/', 'test.txt') + ftp.download_file(os.getcwd() + '/' + 'test/test_data_root/source/download.txt', 'test/test_data_root/target/download.txt') \ No newline at end of file From 7db61bb40829f421d0b7c2a02b6d8e5849701c0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=96=E9=B9=A4=20=E7=8E=8B?= Date: Tue, 14 Jun 2022 22:44:04 +0800 Subject: [PATCH 2/9] update auto_ftp --- RLA/auto_ftp.py | 5 +++-- RLA/easy_log/tester.py | 15 ++++++++------- test/test_scripts.py | 16 ++++++++++++++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/RLA/auto_ftp.py b/RLA/auto_ftp.py index 3989681..cee02ca 100644 --- a/RLA/auto_ftp.py +++ b/RLA/auto_ftp.py @@ -197,13 +197,14 @@ def upload_file(self, remote_dir, local_dir, local_file): self.close() def download_file(self, remote_file, local_file): + self.sftp = self.sftpconnect() logger.info("try download {}".format(local_file)) if not os.path.isfile(local_file): logger.info("new file {}".format(local_file)) - self.sftp.get(remote_file) + self.sftp.get(remote_file, local_file) elif self.sftp.stat(remote_file).st_size != os.path.getsize(local_file): logger.info("update file {}".format(local_file)) - self.sftp.get(remote_file) + self.sftp.get(remote_file, local_file) else: logger.info("skip download file {}".format(remote_file)) diff --git a/RLA/easy_log/tester.py b/RLA/easy_log/tester.py index 39dc086..ec79f03 100644 --- a/RLA/easy_log/tester.py +++ b/RLA/easy_log/tester.py @@ -313,16 +313,17 @@ def sync_log_file(self): from RLA.auto_ftp import FTPHandler from RLA.auto_ftp import SFTPHandler try: - try: - ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], - username=self.private_config["REMOTE_SETTING"]["username"], - password=self.private_config["REMOTE_SETTING"]["password"]) - except Exception as e: - logger.warn("sending log file failed. {}".format(e)) - logger.warn("try to send log file through sftp") + if 'file_transfer_protocol' not in self.private_config["REMOTE_SETTING"].keys() or self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'sftp': ftp = SFTPHandler(sftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], username=self.private_config["REMOTE_SETTING"]["username"], password=self.private_config["REMOTE_SETTING"]["password"]) + elif self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'ftp': + ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], + username=self.private_config["REMOTE_SETTING"]["username"], + password=self.private_config["REMOTE_SETTING"]["password"]) + else: + raise ValueError("designated file_transfer_protocol {} is not supported".format(self.private_config["REMOTE_SETTING"]['file_transfer_protocol'])) + for root, dirs, files in os.walk(self.log_dir): suffix = root.split("/{}/".format(LOG)) assert len(suffix) == 2, "root should only have one pattern \"/log/\"" diff --git a/test/test_scripts.py b/test/test_scripts.py index 2266fbd..42703b0 100644 --- a/test/test_scripts.py +++ b/test/test_scripts.py @@ -1,7 +1,9 @@ from test._base import BaseTest from RLA.easy_log.log_tools import DeleteLogTool, Filter from RLA.easy_log.log_tools import ArchiveLogTool, ViewLogTool - +from RLA.easy_log.tester import exp_manager +from RLA.auto_ftp import SFTPHandler +import os class ScriptTest(BaseTest): @@ -55,4 +57,14 @@ def test_archive(self): def test_view(self): self.remove_and_copy_data() dlt = ViewLogTool(proj_root=self.TARGET_DATA_ROOT, task_table_name=self.TASK_NAME, regex='2022/03/01/21-13*') - dlt.view_log(skip_ask=True) \ No newline at end of file + dlt.view_log(skip_ask=True) + + def test_sync_log(self): + exp_manager.configure(task_name='test', + private_config_path='./test/test_data_root/rla_config.yaml', + log_root='./test/test_data_root/source/') + ftp = SFTPHandler(sftp_server=exp_manager.private_config["REMOTE_SETTING"]["ftp_server"], + username=exp_manager.private_config["REMOTE_SETTING"]["username"], + password=exp_manager.private_config["REMOTE_SETTING"]["password"]) + ftp.upload_file(os.getcwd() + '/' + 'test/test_data_root/target/', 'test/test_data_root/source/', 'test.txt') + ftp.download_file(os.getcwd() + '/' + 'test/test_data_root/source/download.txt', 'test/test_data_root/target/download.txt') From e7b02a7bd16b2ba57d071d631ba3a8de3c74faed Mon Sep 17 00:00:00 2001 From: Xiong-Hui Chen Date: Wed, 22 Jun 2022 20:48:03 +0800 Subject: [PATCH 3/9] doc&feat&refactor&test: update serveral docs, feats, tests and refactor code base. 1. add comments to some important functions. 2. add a deprecated_alias wrapper for the requirement of keyword modificatioon. 3. refactor sync_log_file 4. remove blanks on the structured experiment data-item name. 5. add some test scripts --- .gitignore | 1 + README.md | 16 +- RLA/auto_ftp.py | 10 ++ RLA/const.py | 6 +- RLA/easy_log/complex_data_recorder.py | 30 +++- RLA/easy_log/tester.py | 217 +++++++++++++++--------- RLA/easy_plot/plot_func.py | 12 +- RLA/utils/utils.py | 34 ++++ example/rla_config.yaml | 31 ++++ example/sb3_ppo_example/rla_config.yaml | 3 +- example/sb_ppo_example/rla_config.yaml | 3 +- example/simplest_code/rla_config.yaml | 5 +- test/remote_data_root/__init__.py | 2 + test/test_proj/__init__.py | 0 test/test_proj/proj/__init__.py | 0 test/test_proj/proj/test_manager.py | 147 ++++++++++++++++ test/test_proj/proj/torch_net.py | 52 ++++++ test/test_scripts.py | 29 ++-- 18 files changed, 487 insertions(+), 111 deletions(-) create mode 100644 example/rla_config.yaml create mode 100644 test/remote_data_root/__init__.py create mode 100644 test/test_proj/__init__.py create mode 100644 test/test_proj/proj/__init__.py create mode 100644 test/test_proj/proj/test_manager.py create mode 100644 test/test_proj/proj/torch_net.py diff --git a/.gitignore b/.gitignore index 28e0ecd..adb641a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ RLA.egg-info** **/.ipynb_checkpoints/* **/.DS_Store test/target_data_root/* +**/private_config.py diff --git a/README.md b/README.md index 6b715e0..1d22b8d 100644 --- a/README.md +++ b/README.md @@ -226,18 +226,21 @@ In practice, we might conduct our experiments in multiple physical machines for ``` SEND_LOG_FILE: True REMOTE_SETTING: - ftp_server: '' - username: '' - password: '' - remote_data_root: '' + ftp_server: '114.114.114.114' + username: 'agent' + password: '123' + remote_data_root: 'remote_project/data_root/' + file_transfer_protocol: 'sftp' ``` -where we set `SEND_LOG_FILE` to True and `ftp_server`, `username` and `password` are the ip address, username and passward of the master node. `remote_data_root` define the data_root of the database in the main node. For the main node, just keep `SEND_LOG_FILE` to False. In our experiment code, we should call the function `RLA.easy_log.tester.exp_manager.sync_log_file` periodically, then the data items we be sent to the `remote_data_root` of the main node. For example, +where `SEND_LOG_FILE` is set to True, `ftp_server`, `username` and `password` are the ip address, username and passward of the master node respectively, and `file_transfer_protocol` is the protocol to send data. `remote_data_root` defines the data_root of the database in the main node. +For the main node, configure the exp_manger by `exp_manager.configure(..., is_master_node=True)`. +In our experiment code, we should call the function `RLA.easy_log.tester.exp_manager.sync_log_file` periodically, for example, ``` for i in range(1000): # your trianing code. exp_manager.sync_log_file() ``` -Since `SEND_LOG_FILE` is set to False in the main node, the `exp_manager.sync_log_file()` will be skipped in the main node. +then the data items we be sent to the `remote_data_root` of the main node. Since `SEND_LOG_FILE` is set to False in the main node, the `exp_manager.sync_log_file()` will be skipped in the main node. PS: 1. You might meet "socket.error: [Errno 111] Connection refused" problem in this process. The solution can be found [here](https://stackoverflow.com/questions/16428401/unable-to-use-ip-address-with-ftplib-python). @@ -251,3 +254,4 @@ PS: - [ ] add comments and documents to the functions. - [ ] add an auto integration script. - [ ] download / upload experiment logs through timestamp. +- [ ] add a document to the plot function. diff --git a/RLA/auto_ftp.py b/RLA/auto_ftp.py index cee02ca..446e319 100644 --- a/RLA/auto_ftp.py +++ b/RLA/auto_ftp.py @@ -3,10 +3,20 @@ import shutil import os import traceback +from RLA.const import * from RLA.easy_log import logger import pysftp + +def ftp_factory(name, server, username, password, ignore=None): + if name == FTP_PROTOCOL_NAME.FTP: + return FTPHandler(ftp_server=server, username=username,password=password, ignore=ignore) + elif name == FTP_PROTOCOL_NAME.SFTP: + return SFTPHandler(sftp_server=server, username=username, password=password, ignore=ignore) + else: + raise NotImplementedError + class FTPHandler(object): def __init__(self, ftp_server, username, password, ignore=None): diff --git a/RLA/const.py b/RLA/const.py index 4238e3c..a502cf6 100644 --- a/RLA/const.py +++ b/RLA/const.py @@ -2,4 +2,8 @@ class FRAMEWORK: tensorflow = 'tensorflow' - torch = 'torch' \ No newline at end of file + torch = 'torch' + +class FTP_PROTOCOL_NAME: + FTP = 'ftp' + SFTP = 'sftp' diff --git a/RLA/easy_log/complex_data_recorder.py b/RLA/easy_log/complex_data_recorder.py index 37887f5..c4897ce 100644 --- a/RLA/easy_log/complex_data_recorder.py +++ b/RLA/easy_log/complex_data_recorder.py @@ -7,6 +7,7 @@ import matplotlib.pyplot as plt from RLA.easy_log.tester import exp_manager from RLA.easy_log.time_step import time_step_holder +from typing import Callable # video recorder @@ -27,8 +28,35 @@ def save(cls, name=None, fig=None, cover=False, add_timestamp=True, **kwargs): plt.savefig(save_path, **kwargs) @classmethod - def pretty_plot_wrapper(cls, name, plot_func, cover=False, legend_outside=False, xlabel='', ylabel='', title='', + def pretty_plot_wrapper(cls, name:str, plot_func:Callable, + cover=False, legend_outside=False, xlabel='', ylabel='', title='', add_timestamp=True, *args, **kwargs): + """ + Save the customized plot figure to the RLA database. + + :param name: file name to save. + :type name: str + :param plot_func: the function to plot figures + :type plot_func: function + :param cover: if you would like to cover the original figure with the same name, you can set cover to True + :type cover: bool + :param legend_outside: let legend be outside of the figure. + :type legend_outside: bool + :param xlabel: name of xlabel + :type xlabel: str + :param ylabel: name of xlabel + :type ylabel: str + :param title: title of the plotted figure + :type title: str + :param add_timestamp: add the timestamp (recorded by the timestep holder) to the name. + :type add_timestamp: str + :param args: other parameters to plt.savefig + :type args: + :param kwargs: other parameters to plt.savefig + :type kwargs: + :return: + :rtype: + """ plt.cla() plot_func() lgd = plt.legend(prop={'size': 15}, loc=2 if legend_outside else None, diff --git a/RLA/easy_log/tester.py b/RLA/easy_log/tester.py index ec79f03..2159b4f 100644 --- a/RLA/easy_log/tester.py +++ b/RLA/easy_log/tester.py @@ -24,38 +24,39 @@ import yaml import shutil import argparse -from typing import Optional, Union, Dict, Any +from typing import Dict, List, Tuple, Type, Union, Optional +from RLA.utils.utils import deprecated_alias, load_yaml from RLA.const import DEFAULT_X_NAME, FRAMEWORK import pathspec -def import_hyper_parameters(task_name, record_date): +def import_hyper_parameters(task_table_name, record_date): """ - return the hyper parameters of the experiment in task_name/record_date, which is stored in Tester. + return the hyper parameters of the experiment in task_table_name/record_date, which is stored in Tester. - :param task_name: + :param task_table_name: :param record_date: :return: """ logger.warn("the function is deprecated. please check the ExperimentLoader as the new implementation") global tester assert isinstance(tester, Tester) - load_tester = tester.load_tester(record_date, task_name, tester.root) + load_tester = tester.load_tester(record_date, task_table_name, tester.data_root) args = argparse.Namespace(**load_tester.hyper_param) return args -def load_from_record_date(task_name, record_date): +def load_from_record_date(task_table_name, record_date): """ - load the checkpoint of the experiment in task_name/record_date. - :param task_name: + load the checkpoint of the experiment in task_table_name/record_date. + :param task_table_name: :param record_date: :return: """ logger.warn("the function is deprecated. please check the ExperimentLoader as the new implementation") global tester assert isinstance(tester, Tester) - load_tester = tester.load_tester(record_date, task_name, tester.root) + load_tester = tester.load_tester(record_date, task_table_name, tester.data_root) # load checkpoint load_tester.new_saver(var_prefix='', max_to_keep=1) load_iter, load_res = load_tester.load_checkpoint() @@ -64,17 +65,17 @@ def load_from_record_date(task_name, record_date): return load_iter, load_res -def fork_tester_log_files(task_name, record_date): +def fork_tester_log_files(task_table_name, record_date): """ - copy the log files in task_name/record_date to the new experiment. - :param task_name: + copy the log files in task_table_name/record_date to the new experiment. + :param task_table_name: :param record_date: :return: """ logger.warn("the function is deprecated. please check the ExperimentLoader as the new implementation") global tester assert isinstance(tester, Tester) - load_tester = tester.load_tester(record_date, task_name, tester.root) + load_tester = tester.load_tester(record_date, task_table_name, tester.data_root) # copy log file tester.log_file_copy(load_tester) # copy attribute @@ -83,7 +84,7 @@ def fork_tester_log_files(task_name, record_date): tester.private_config = load_tester.private_config -class Tester(object): +class Tester(object,): def __init__(self): self.__custom_recorder = {} @@ -107,32 +108,64 @@ def __init__(self): self.saver = None self.dl_framework = None - def configure(self, task_name, private_config_path, log_root=None, data_root=None, ignore_file_path=None, run_file=None): - fs = open(private_config_path, encoding="UTF-8") - try: - self.private_config = yaml.load(fs) - except TypeError: - self.private_config = yaml.safe_load(fs) - + @deprecated_alias(task_name='task_table_name', private_config_path='rla_config', log_root='data_root') + def configure(self, task_table_name: str, rla_config: Union[str, dict], data_root: str, + ignore_file_path: Optional[str] = None, run_file: Optional[str] = None, + is_master_node: bool = False, code_root: Optional[str] = None): + """ + The function to configure your exp_manager, which should be run before your experiments. + :param task_table_name: define a ``table'' to store a collection of experiment data item. + :type task_table_name: str + :param rla_config: Pass the location of rla_config.yaml. It defines all of the running strategies of RLA. + Ref to RLAssistant/example/rla_config.yaml + :type rla_config: str + :param data_root: define the location of the RLA database. + :type data_root: str + :param ignore_file_path: RLA will backup the codebase of each experiment (defined in rla_config.yaml). + If there are some files unnecessary to backup, + you can customize the pattern of files to ignore with the same rules of gitignore (https://git-scm.com/docs/gitignore). + We recommend you to pass the location of .gitignore directly to ignore_file_path. + :type ignore_file_path: str + :param run_file: If you have extra files out of your codebase (e.g., some scripts to run the code), you can pass it to the run_file. + Then we will backup the run_file too. + :type run_file: str + :param is_master_node: In "distributed training & centralized logs" mode (By set SEND_LOG_FILE in rla_config.yaml to True), + you should mark the master node (is_master_node=True) to collect logs of the slave nodes (is_master_node=False). + :type is_master_node: bool + : param code_root: Define the root of your codebase (for backup) explicitly. It will be in the same location as rla_config.yaml by default. + """ + if isinstance(rla_config, str): + self.private_config = load_yaml(rla_config) + elif isinstance(rla_config, dict): + self.private_config = rla_config + else: + raise NotImplementedError self.run_file = run_file self.ignore_file_path = ignore_file_path - self.task_name = task_name - if log_root is not None: - self.data_root = log_root - else: - self.data_root = data_root + self.task_table_name = task_table_name + self.data_root = data_root logger.info("private_config: ") self.dl_framework = self.private_config["DL_FRAMEWORK"] - self.project_root = "/".join(private_config_path.split("/")[:-1]) + self.is_master_node = is_master_node + + if code_root is None: + if isinstance(rla_config, str): + self.project_root = "/".join(rla_config.split("/")[:-1]) + else: + raise NotImplementedError("If you pass the rla_config dict directly, " + "you should define the root of your codebase (for backup) explicitly by pass the code_root.") + else: + self.project_root = code_root for k, v in self.private_config.items(): logger.info("k: {}, v: {}".format(k, v)) + def set_hyper_param(self, **argkw): """ This method is to record all of hyper parameters to test object. Place pass your parameters as follow format: - self.set_hyper_param(param_a=a,param_b=b) + self.set_hyper_param(param_a=a,param_b=b) or a dict self.set_hyper_param(**{'param_a'=a,'param_b'=b}) Note: It is invalid to pass a local object to this function. @@ -158,11 +191,11 @@ def log_files_gen(self): info = self.auto_parse_info() info = '&' + info self.info = info - code_dir, _ = self.__create_file_directory(osp.join(self.data_root, CODE, self.task_name), '', is_file=False) - log_dir, _ = self.__create_file_directory(osp.join(self.data_root, LOG, self.task_name), '', is_file=False) - self.pkl_dir, self.pkl_file = self.__create_file_directory(osp.join(self.data_root, ARCHIVE_TESTER, self.task_name), '.pkl') - self.checkpoint_dir, _ = self.__create_file_directory(osp.join(self.data_root, CHECKPOINT, self.task_name), is_file=False) - self.results_dir, _ = self.__create_file_directory(osp.join(self.data_root, OTHER_RESULTS, self.task_name), is_file=False) + code_dir, _ = self.__create_file_directory(osp.join(self.data_root, CODE, self.task_table_name), '', is_file=False) + log_dir, _ = self.__create_file_directory(osp.join(self.data_root, LOG, self.task_table_name), '', is_file=False) + self.pkl_dir, self.pkl_file = self.__create_file_directory(osp.join(self.data_root, ARCHIVE_TESTER, self.task_table_name), '.pkl') + self.checkpoint_dir, _ = self.__create_file_directory(osp.join(self.data_root, CHECKPOINT, self.task_table_name), is_file=False) + self.results_dir, _ = self.__create_file_directory(osp.join(self.data_root, OTHER_RESULTS, self.task_table_name), is_file=False) self.log_dir = log_dir self.code_dir = code_dir @@ -174,11 +207,11 @@ def log_files_gen(self): def update_log_files_location(self, root): self.data_root = root - code_dir, _ = self.__create_file_directory(osp.join(self.data_root, CODE, self.task_name), '', is_file=False) - log_dir, _ = self.__create_file_directory(osp.join(self.data_root, LOG, self.task_name), '', is_file=False) - self.pkl_dir, self.pkl_file = self.__create_file_directory(osp.join(self.data_root, ARCHIVE_TESTER, self.task_name), '.pkl') - self.checkpoint_dir, _ = self.__create_file_directory(osp.join(self.data_root, CHECKPOINT, self.task_name), is_file=False) - self.results_dir, _ = self.__create_file_directory(osp.join(self.data_root, OTHER_RESULTS, self.task_name), is_file=False) + code_dir, _ = self.__create_file_directory(osp.join(self.data_root, CODE, self.task_table_name), '', is_file=False) + log_dir, _ = self.__create_file_directory(osp.join(self.data_root, LOG, self.task_table_name), '', is_file=False) + self.pkl_dir, self.pkl_file = self.__create_file_directory(osp.join(self.data_root, ARCHIVE_TESTER, self.task_table_name), '.pkl') + self.checkpoint_dir, _ = self.__create_file_directory(osp.join(self.data_root, CHECKPOINT, self.task_table_name), is_file=False) + self.results_dir, _ = self.__create_file_directory(osp.join(self.data_root, OTHER_RESULTS, self.task_table_name), is_file=False) self.log_dir = log_dir self.code_dir = code_dir self.print_log_dir() @@ -220,9 +253,9 @@ def print_log_dir(self): logger.info("results_dir: {}".format(self.results_dir)) @classmethod - def load_tester(cls, record_date, task_name, log_root): + def load_tester(cls, record_date, task_table_name, log_root): logger.info("load tester") - res_dir, res_file = cls.log_file_finder(record_date, task_name=task_name, + res_dir, res_file = cls.log_file_finder(record_date, task_table_name=task_table_name, file_root=osp.join(log_root, ARCHIVE_TESTER), log_type='files') import dill @@ -291,8 +324,7 @@ def _feed_hyper_params_to_tb(self, metric_dict=None): if isinstance(fmt, logger.TensorBoardOutputFormat): fmt.add_hyper_params_to_tb(self.hyper_param, metric_dict) - - def sync_log_file(self): + def sync_log_file(self, skip_error=False): """ syn_log_file is an automatic synchronization function. It will send all log files (e.g., code/**, checkpoint/**, log/**, etc.) to your target server via the FTP protocol. @@ -309,48 +341,74 @@ def sync_log_file(self): logger.warn("sync: start") # ignore_files = self.private_config["IGNORE_RULE"] - if self.private_config["SEND_LOG_FILE"]: - from RLA.auto_ftp import FTPHandler - from RLA.auto_ftp import SFTPHandler + def send_data(ftp_obj): + for root, dirs, files in os.walk(self.log_dir): + suffix = root.split("/{}/".format(LOG)) + assert len(suffix) == 2, "root should only have one pattern \"/log/\"" + remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_data_root") + if remote_data_root is None: + remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_log_root") + logger.warn("the parameter remote_log_root will be renamed to remote_data_root in future versions.") + else: + raise RuntimeError("miss remote_log_root in rla_config") + remote_root = osp.join(remote_data_root, LOG, suffix[1]) + local_root = root + logger.warn("sync {} <- {}".format(remote_root, local_root)) + for file in files: + ftp_obj.upload_file(remote_root, local_root, file) + + if self.private_config["SEND_LOG_FILE"] and not self.is_master_node: + from RLA.auto_ftp import ftp_factory + alternative_protocol = 'ftp' try: - if 'file_transfer_protocol' not in self.private_config["REMOTE_SETTING"].keys() or self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'sftp': - ftp = SFTPHandler(sftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], - username=self.private_config["REMOTE_SETTING"]["username"], - password=self.private_config["REMOTE_SETTING"]["password"]) - elif self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'ftp': - ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], - username=self.private_config["REMOTE_SETTING"]["username"], - password=self.private_config["REMOTE_SETTING"]["password"]) + if 'file_transfer_protocol' not in self.private_config["REMOTE_SETTING"].keys(): + self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] = 'ftp' + ftp = ftp_factory(name=self.private_config["REMOTE_SETTING"]['file_transfer_protocol'], + server=self.private_config["REMOTE_SETTING"]["ftp_server"], + username=self.private_config["REMOTE_SETTING"]["username"], + password=self.private_config["REMOTE_SETTING"]["password"]) + if self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] == 'ftp': + alternative_protocol = 'sftp' else: - raise ValueError("designated file_transfer_protocol {} is not supported".format(self.private_config["REMOTE_SETTING"]['file_transfer_protocol'])) - - for root, dirs, files in os.walk(self.log_dir): - suffix = root.split("/{}/".format(LOG)) - assert len(suffix) == 2, "root should only have one pattern \"/log/\"" - remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_data_root") - if remote_data_root is None: - remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_log_root") - logger.warn("the parameter remote_log_root will be renamed to remote_data_root in future versions.") - else: - raise RuntimeError("miss remote_log_root in rla_config") - remote_root = osp.join(remote_data_root, LOG, suffix[1]) - local_root = root - logger.warn("sync {} <- {}".format(remote_root, local_root)) - for file in files: - ftp.upload_file(remote_root, local_root, file) - + alternative_protocol = 'ftp' + # elif self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'ftp': + # ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], + # username=self.private_config["REMOTE_SETTING"]["username"], + # password=self.private_config["REMOTE_SETTING"]["password"]) + # alternative_protocol = 'sftp' + # else: + # raise ValueError("designated file_transfer_protocol {} is not supported".format(self.private_config["REMOTE_SETTING"]['file_transfer_protocol'])) + send_data(ftp_obj=ftp) logger.warn("sync: send success!") except Exception as e: - logger.warn("sending log file failed. {}".format(e)) - import traceback - logger.warn(traceback.format_exc()) + try: + logger.warn("failed to send log files through {}: {} ".format(self.private_config["REMOTE_SETTING"]['file_transfer_protocol'], e)) + logger.warn("try another protocol:", alternative_protocol) + ftp = ftp_factory(name=alternative_protocol, + server=self.private_config["REMOTE_SETTING"]["ftp_server"], + username=self.private_config["REMOTE_SETTING"]["username"], + password=self.private_config["REMOTE_SETTING"]["password"]) + send_data(ftp_obj=ftp) + logger.warn("sync: send success!") + except Exception as e: + logger.warn("failed to send log files through {}: {} ".format(alternative_protocol, e)) + + logger.warn("server info ftp_server {}, username {}, password {}, remote_data_root {}".format( + self.private_config["REMOTE_SETTING"]["ftp_server"], + self.private_config["REMOTE_SETTING"]["username"], + self.private_config["REMOTE_SETTING"]["password"], + self.private_config["REMOTE_SETTING"]["remote_data_root"])) + import traceback + logger.warn(traceback.format_exc()) + if not skip_error: + raise RuntimeError("fail to sync") else: - logger.warn("SEND_LOG_FILE in rla_config.yaml is set to False. skip the sync process.") + logger.warn("skip the sync process.") @classmethod - def log_file_finder(cls, record_date, task_name='train', file_root='../checkpoint/', log_type='dir'): + def log_file_finder(cls, record_date, task_table_name='train', file_root='../checkpoint/', log_type='dir'): record_date = datetime.datetime.strptime(record_date, '%Y/%m/%d/%H-%M-%S-%f') - prefix = osp.join(file_root, task_name) + prefix = osp.join(file_root, task_table_name) directory = str(record_date.strftime("%Y/%m/%d")) directory = osp.join(prefix, directory) file_found = '' @@ -424,6 +482,8 @@ def __copy_source_code(self, run_file, code_dir): for dir_name in self.private_config["BACKUP_CONFIG"]["backup_code_dir"]: shutil.copytree(osp.join(self.project_root, dir_name), osp.join(code_dir, dir_name), ignore=self.get_ignore_files) + if run_file is not None: + shutil.copy(run_file, code_dir) else: raise NotImplementedError @@ -437,13 +497,13 @@ def __create_file_directory(self, prefix, ext='', is_file=True, record_date=None directory = osp.join(prefix, directory) if is_file: os.makedirs(directory, exist_ok=True) - file_name = '{dir}/{timestep} {ip} {info}{ext}'.format(dir=directory, + file_name = '{dir}/{timestep}_{ip}_{info}{ext}'.format(dir=directory, timestep=self.record_date_to_str(record_date), ip=str(self.ipaddr), info=self.info, ext=ext) else: - directory = '{dir}/{timestep} {ip} {info}{ext}/'.format(dir=directory, + directory = '{dir}/{timestep}_{ip}_{info}{ext}/'.format(dir=directory, timestep=self.record_date_to_str(record_date), ip=str(self.ipaddr), info=self.info, @@ -628,6 +688,7 @@ def sizeof_fmt(num, suffix='B'): summary = self.dict_to_table_text_summary(large_mermory_dict, 'large_memory') self.add_summary_to_logger(summary, 'large_memory') + def dict_to_table_text_summary(self, input_dict, name): import tensorflow as tf with tf.Session(graph=tf.Graph()) as sess: diff --git a/RLA/easy_plot/plot_func.py b/RLA/easy_plot/plot_func.py index 96f29aa..79c6c22 100644 --- a/RLA/easy_plot/plot_func.py +++ b/RLA/easy_plot/plot_func.py @@ -108,19 +108,15 @@ def word_replace_back(strings): return eval(strings.replace('--', '/').replace("||", "\'")) -def plot_res_func(prefix_dir, regs, param_keys, - value_keys, - scale_dict=None, - # misc_scale=None, misc_scale_index=None, +def plot_res_func(prefix_dir:str, regs, param_keys, + value_keys, scale_dict=None, replace_legend_keys=None, - legend_rescale=None, save_name=None, resample=int(1e3), smooth_step=1.0, ylabel=None, x_bound=None, y_bound=None, x_start=None, use_buf=False, remove_outlier=False, xlabel=None, key_to_legend_fn=None, - verbose=True, - *args, **kwargs): + verbose=True, *args, **kwargs): dirs = [] if key_to_legend_fn is None: key_to_legend_fn = default_key_to_legend @@ -129,6 +125,8 @@ def plot_res_func(prefix_dir, regs, param_keys, reg_group = {} for regex_str in regs: + if regex_str[0] == '/': + regex_str = regex_str[1:] if verbose: print("check regs {}. log found: ".format(osp.join(prefix_dir, regex_str))) diff --git a/RLA/utils/utils.py b/RLA/utils/utils.py index e69de29..b6a0476 100644 --- a/RLA/utils/utils.py +++ b/RLA/utils/utils.py @@ -0,0 +1,34 @@ +import functools +from typing import Callable +import yaml + +import functools +import warnings +def deprecated_alias(**aliases): + def deco(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + rename_kwargs(f.__name__, kwargs, aliases) + return f(*args, **kwargs) + return wrapper + return deco + +def rename_kwargs(func_name, kwargs, aliases): + for alias, new in aliases.items(): + if alias in kwargs: + if new in kwargs: + raise TypeError('{} received both {} and {}'.format( + func_name, alias, new)) + warnings.warn('{} is deprecated; use {}'.format(alias, new), + DeprecationWarning, + 3) + kwargs[new] = kwargs.pop(alias) + + +def load_yaml(path): + fs = open(path, encoding="UTF-8") + try: + private_config = yaml.load(fs) + except TypeError: + private_config = yaml.safe_load(fs) + return private_config \ No newline at end of file diff --git a/example/rla_config.yaml b/example/rla_config.yaml new file mode 100644 index 0000000..2622457 --- /dev/null +++ b/example/rla_config.yaml @@ -0,0 +1,31 @@ +PROJECT_TYPE: + # lib: backup the project in YOUR_PROJECT_ROOT/build/lib. + # It suit to the situation when you run the code by building a package. (e.g., "python setup.py install") + # source: backup the project in YOUR_PROJECT_ROOT/{backup_code_dir}. + # It suit to the situation when you run your code directly. + # and all log files in easy_log. + backup_code_by: 'source' + +# When the following path is set to relative path, the current working directory is the path which rla_config.yaml in. +BACKUP_CONFIG: + backup_code_dir: + - 'project' + lib_dir: './build/lib/' +# option: 'stdout', 'log', 'tensorboard', 'csv' +LOG_USED: + - 'stdout' + - 'log' + - 'tensorboard' + - 'csv' + +# select a DL framework: "tensorflow" or "torch". +DL_FRAMEWORK: 'tensorflow' + +SEND_LOG_FILE: False +REMOTE_SETTING: + ftp_server: '' + username: '' + password: '' + remote_data_root: '' + # option: ftp or sftp + file_transfer_protocol: 'sftp' diff --git a/example/sb3_ppo_example/rla_config.yaml b/example/sb3_ppo_example/rla_config.yaml index 2a8e9fc..81ac50f 100644 --- a/example/sb3_ppo_example/rla_config.yaml +++ b/example/sb3_ppo_example/rla_config.yaml @@ -27,4 +27,5 @@ REMOTE_SETTING: username: '' password: '' remote_data_root: '' - + # option: ftp or sftp + file_transfer_protocol: 'sftp' diff --git a/example/sb_ppo_example/rla_config.yaml b/example/sb_ppo_example/rla_config.yaml index a8224ed..8c9e13e 100644 --- a/example/sb_ppo_example/rla_config.yaml +++ b/example/sb_ppo_example/rla_config.yaml @@ -27,4 +27,5 @@ REMOTE_SETTING: username: '' password: '' remote_data_root: '' - + # option: ftp or sftp + file_transfer_protocol: 'sftp' diff --git a/example/simplest_code/rla_config.yaml b/example/simplest_code/rla_config.yaml index 65714b3..2622457 100644 --- a/example/simplest_code/rla_config.yaml +++ b/example/simplest_code/rla_config.yaml @@ -11,7 +11,7 @@ BACKUP_CONFIG: backup_code_dir: - 'project' lib_dir: './build/lib/' - +# option: 'stdout', 'log', 'tensorboard', 'csv' LOG_USED: - 'stdout' - 'log' @@ -27,4 +27,5 @@ REMOTE_SETTING: username: '' password: '' remote_data_root: '' - + # option: ftp or sftp + file_transfer_protocol: 'sftp' diff --git a/test/remote_data_root/__init__.py b/test/remote_data_root/__init__.py new file mode 100644 index 0000000..87f241c --- /dev/null +++ b/test/remote_data_root/__init__.py @@ -0,0 +1,2 @@ +# Created by xionghuichen at 2022/6/22 +# Email: chenxh@lamda.nju.edu.cn diff --git a/test/test_proj/__init__.py b/test/test_proj/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_proj/proj/__init__.py b/test/test_proj/proj/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_proj/proj/test_manager.py b/test/test_proj/proj/test_manager.py new file mode 100644 index 0000000..d5bbf68 --- /dev/null +++ b/test/test_proj/proj/test_manager.py @@ -0,0 +1,147 @@ +from test._base import BaseTest +from RLA.easy_log.tester import exp_manager +from RLA.easy_log import logger +from RLA.easy_log.complex_data_recorder import MatplotlibRecorder as mpr +import numpy as np +from RLA.utils.utils import load_yaml +import os +import yaml +def target_func(x): + return np.tanh(np.mean(x, axis=-1, keepdims=True)) + +class ManagerTest(BaseTest): + + def _load_rla_config(self): + return load_yaml(f'../../../example/rla_config.yaml') + + def _init_proj(self, config_yaml, **kwargs): + task_name = 'test_manger_demo_task' + rla_data_root = '../../test_data_root' + config_yaml['BACKUP_CONFIG']['backup_code_dir'] = ['proj'] + exp_manager.configure(task_name, private_config_path=config_yaml, data_root=rla_data_root, + code_root='../', **kwargs) + exp_manager.log_files_gen() + exp_manager.print_args() + + def test_log_tf(self): + kwargs = { + 'input_size': 16, + 'learning_rate': 0.0001, + } + exp_manager.set_hyper_param(**kwargs) + exp_manager.add_record_param(['input_size']) + yaml = self._load_rla_config() + self._init_proj(yaml) + import tensorflow as tf + import numpy as np + X_ph = tf.placeholder(dtype=tf.float32, shape=[None, kwargs["input_size"]], name='x') + y_ph = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='x') + l = X_ph + # build a neural network + for _ in range(3): + l = tf.nn.tanh(tf.layers.dense(l, 64, kernel_initializer=tf.keras.initializers.glorot_normal)) + + out = tf.layers.dense(l, 1, kernel_initializer=tf.keras.initializers.glorot_normal) + loss = tf.reduce_mean(np.square(out - y_ph)) + opt = tf.train.AdamOptimizer(learning_rate=kwargs["learning_rate"]).minimize(loss) + sess = tf.Session().__enter__() + sess.run(tf.variables_initializer(tf.global_variables())) + + exp_manager.new_saver(var_prefix='', max_to_keep=1) + # synthetic target function. + + for i in range(0, 100): + exp_manager.time_step_holder.set_time(i) + x_input = np.random.normal(0, 3, [64, kwargs["input_size"]]) + y = target_func(x_input) + loss_out, y_pred = sess.run([loss, out, opt], feed_dict={X_ph: x_input, y_ph: y})[:-1] + # moving averaged + logger.ma_record_tabular("perf/mse", loss_out, 10) + logger.record_tabular("y_out", np.mean(y)) + if i % 20 == 0: + exp_manager.save_checkpoint() + if i % 10 == 0: + def plot_func(): + import matplotlib.pyplot as plt + testX = np.repeat(np.expand_dims(np.arange(-10, 10, 0.1), axis=-1), repeats=kwargs["input_size"], axis=-1) + testY = target_func(testX) + predY = sess.run(out, feed_dict={X_ph: testX}) + plt.plot(testX.mean(axis=-1), predY.mean(axis=-1), label='pred') + plt.plot(testX.mean(axis=-1), testY.mean(axis=-1), label='real') + mpr.pretty_plot_wrapper('react_func', plot_func, xlabel='x', ylabel='y', title='react test') + logger.dump_tabular() + + + def test_load_checkpoint_tf(self): + pass + + def test_log_torch(self): + kwargs = { + 'input_size': 16, + 'learning_rate': 0.0001, + } + exp_manager.set_hyper_param(**kwargs) + exp_manager.add_record_param(['input_size']) + yaml = self._load_rla_config() + yaml['DL_FRAMEWORK'] = 'torch' + self._init_proj(yaml) + from torch_net import MLP, to_tensor + from torch import nn + from torch.nn import functional as F + import torch as th + mlp = MLP(feature_dim=kwargs['input_size'], net_arch=[64, 64, 64], activation_fn=nn.Tanh) + exp_manager.new_saver(var_prefix='', max_to_keep=1) + optimizer = th.optim.Adam(mlp.parameters(), lr=kwargs['learning_rate']) + for i in range(0, 100): + exp_manager.time_step_holder.set_time(i) + x_input = np.random.normal(0, 3, [64, kwargs["input_size"]]) + x_input = x_input.astype(np.float32) + y = target_func(x_input) + mse_loss = F.mse_loss(mlp(to_tensor(x_input)), to_tensor(y)) + optimizer.zero_grad() + mse_loss.backward() + optimizer.step() + logger.ma_record_tabular("perf/mse", np.mean(mse_loss.detach().numpy()), 10) + logger.record_tabular("y_out", np.mean(y)) + if i % 10 == 0: + def plot_func(): + import matplotlib.pyplot as plt + testX = np.repeat(np.expand_dims(np.arange(-10, 10, 0.1), axis=-1), repeats=kwargs["input_size"], axis=-1) + testX = testX.astype(np.float32) + testY = target_func(testX) + predY = mlp(to_tensor(testX)).detach() + plt.plot(testX.mean(axis=-1), predY.mean(axis=-1), label='pred') + plt.plot(testX.mean(axis=-1), testY.mean(axis=-1), label='real') + mpr.pretty_plot_wrapper('react_func', plot_func, xlabel='x', ylabel='y', title='react test') + if i % 20 == 0: + exp_manager.save_checkpoint(model_dict={'mlp': mlp.state_dict(), 'opt': optimizer.state_dict(), 'epoch': i}) + pass + logger.dump_tabular() + + def test_load_checkpoint_torch(self): + pass + + def test_sent_to_master(self): + kwargs = { + 'input_size': 16, + 'learning_rate': 0.0001, + } + exp_manager.set_hyper_param(**kwargs) + exp_manager.add_record_param(['input_size']) + yaml = self._load_rla_config() + import private_config + yaml['DL_FRAMEWORK'] = 'torch' + yaml['SEND_LOG_FILE'] = True + yaml['REMOTE_SETTING']['ftp_server'] = '127.0.0.1' + yaml['REMOTE_SETTING']['file_transfer_protocol'] = 'ftp' + yaml['REMOTE_SETTING']['username'] = private_config.username + yaml['REMOTE_SETTING']['password'] = private_config.passward + yaml['REMOTE_SETTING']['remote_data_root'] = private_config.remote_root + + self._init_proj(yaml, is_master_node=False) + for i in range(0, 100): + exp_manager.time_step_holder.set_time(i) + logger.record_tabular("i", i) + logger.dump_tabular() + if i % 10 == 0: + exp_manager.sync_log_file() diff --git a/test/test_proj/proj/torch_net.py b/test/test_proj/proj/torch_net.py new file mode 100644 index 0000000..b9f0232 --- /dev/null +++ b/test/test_proj/proj/torch_net.py @@ -0,0 +1,52 @@ +# Created by xionghuichen at 2022/6/22 +# Email: chenxh@lamda.nju.edu.cn + +import torch as th +from torch import nn +from typing import Dict, List, Tuple, Type, Union + + + +def get_device(device: Union[th.device, str] = "auto") -> th.device: + """ + Retrieve PyTorch device. + It checks that the requested device is available first. + For now, it supports only cpu and cuda. + By default, it tries to use the gpu. + + :param device: One for 'auto', 'cuda', 'cpu' + :return: + """ + # Cuda by default + if device == "auto": + device = "cuda" + # Force conversion to th.device + device = th.device(device) + + # Cuda not available + if device.type == th.device("cuda").type and not th.cuda.is_available(): + return th.device("cpu") + return device + +def to_tensor(x, device="auto"): + return th.as_tensor(x).to(get_device('auto')) + +class MLP(nn.Module): + def __init__(self, feature_dim: int, net_arch: List[int], activation_fn: Type[nn.Module], + device: Union[th.device, str] = "auto"): + super(MLP, self).__init__() + device = get_device(device) + net = [] + last_layer_dim_shared = feature_dim + for layer in net_arch: + net.append(nn.Linear(last_layer_dim_shared, layer)) # add linear of size layer + net.append(activation_fn()) + last_layer_dim_shared = layer + net.append(nn.Linear(last_layer_dim_shared, 1)) + self.net = nn.Sequential(*net).to(device) + + + def forward(self, features: th.Tensor) -> th.Tensor: + return self.net(features) + + diff --git a/test/test_scripts.py b/test/test_scripts.py index 42703b0..8670e0d 100644 --- a/test/test_scripts.py +++ b/test/test_scripts.py @@ -2,7 +2,7 @@ from RLA.easy_log.log_tools import DeleteLogTool, Filter from RLA.easy_log.log_tools import ArchiveLogTool, ViewLogTool from RLA.easy_log.tester import exp_manager -from RLA.auto_ftp import SFTPHandler + import os class ScriptTest(BaseTest): @@ -20,7 +20,7 @@ def test_delete_reg(self) -> None: log_found = dlt.delete_related_log(skip_ask=True) assert log_found == 0 - def test_delete_reg_small_ts(self): + def test_delete_reg_small_ts(self) -> None: """ test delete log filtered by regex and threshold of time-step. """ @@ -42,7 +42,7 @@ def test_delete_reg_small_ts(self): log_found = dlt.delete_small_timestep_log(skip_ask=True) assert log_found == 0 - def test_archive(self): + def test_archive(self) -> None: self.remove_and_copy_data() # archive experiments. dlt = ArchiveLogTool(proj_root=self.TARGET_DATA_ROOT, task_table_name=self.TASK_NAME, regex='2022/03/01/21-13*') @@ -54,17 +54,18 @@ def test_archive(self): log_found = dlt.delete_related_log(skip_ask=True) assert log_found == 10 - def test_view(self): + def test_view(self) -> None: self.remove_and_copy_data() dlt = ViewLogTool(proj_root=self.TARGET_DATA_ROOT, task_table_name=self.TASK_NAME, regex='2022/03/01/21-13*') dlt.view_log(skip_ask=True) - - def test_sync_log(self): - exp_manager.configure(task_name='test', - private_config_path='./test/test_data_root/rla_config.yaml', - log_root='./test/test_data_root/source/') - ftp = SFTPHandler(sftp_server=exp_manager.private_config["REMOTE_SETTING"]["ftp_server"], - username=exp_manager.private_config["REMOTE_SETTING"]["username"], - password=exp_manager.private_config["REMOTE_SETTING"]["password"]) - ftp.upload_file(os.getcwd() + '/' + 'test/test_data_root/target/', 'test/test_data_root/source/', 'test.txt') - ftp.download_file(os.getcwd() + '/' + 'test/test_data_root/source/download.txt', 'test/test_data_root/target/download.txt') + # + # def test_sync_log(self) -> None: + # from RLA.auto_ftp import SFTPHandler + # exp_manager.configure(task_name='test', + # private_config_path='./test/test_data_root/rla_config.yaml', + # log_root='./test/test_data_root/source/') + # ftp = SFTPHandler(sftp_server=exp_manager.private_config["REMOTE_SETTING"]["ftp_server"], + # username=exp_manager.private_config["REMOTE_SETTING"]["username"], + # password=exp_manager.private_config["REMOTE_SETTING"]["password"]) + # ftp.upload_file(os.getcwd() + '/' + 'test/test_data_root/target/', 'test/test_data_root/source/', 'test.txt') + # ftp.download_file(os.getcwd() + '/' + 'test/test_data_root/source/download.txt', 'test/test_data_root/target/download.txt') From 0e2775f9f385ba79cd0b5cdd60945f44cbc0962a Mon Sep 17 00:00:00 2001 From: Xiong-Hui Chen Date: Wed, 22 Jun 2022 20:56:44 +0800 Subject: [PATCH 4/9] rm useless code --- RLA/easy_log/tester.py | 13 +++---------- test/test_proj/proj/test_manager.py | 4 ++-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/RLA/easy_log/tester.py b/RLA/easy_log/tester.py index 2159b4f..6bf30f2 100644 --- a/RLA/easy_log/tester.py +++ b/RLA/easy_log/tester.py @@ -265,7 +265,6 @@ def load_tester(cls, record_date, task_table_name, log_root): load_tester.update_log_files_location(root=log_root) return load_tester - def add_record_param(self, keys): for k in keys: if '.' in k: @@ -336,7 +335,8 @@ def sync_log_file(self, skip_error=False): password: password of target server remote_porject_dir: log root of target server, e.g., "/Project/SRG/SRG/var_gan_imitation/" - :return: + :param skip_error: if skip_error==True, we will skip the error of sync. + :type skip_error: bool """ logger.warn("sync: start") @@ -371,13 +371,6 @@ def send_data(ftp_obj): alternative_protocol = 'sftp' else: alternative_protocol = 'ftp' - # elif self.private_config["REMOTE_SETTING"]['file_transfer_protocol'] is 'ftp': - # ftp = FTPHandler(ftp_server=self.private_config["REMOTE_SETTING"]["ftp_server"], - # username=self.private_config["REMOTE_SETTING"]["username"], - # password=self.private_config["REMOTE_SETTING"]["password"]) - # alternative_protocol = 'sftp' - # else: - # raise ValueError("designated file_transfer_protocol {} is not supported".format(self.private_config["REMOTE_SETTING"]['file_transfer_protocol'])) send_data(ftp_obj=ftp) logger.warn("sync: send success!") except Exception as e: @@ -391,7 +384,7 @@ def send_data(ftp_obj): send_data(ftp_obj=ftp) logger.warn("sync: send success!") except Exception as e: - logger.warn("failed to send log files through {}: {} ".format(alternative_protocol, e)) + logger.warn("fail to send log files through {}: {} ".format(alternative_protocol, e)) logger.warn("server info ftp_server {}, username {}, password {}, remote_data_root {}".format( self.private_config["REMOTE_SETTING"]["ftp_server"], diff --git a/test/test_proj/proj/test_manager.py b/test/test_proj/proj/test_manager.py index d5bbf68..d2a00ec 100644 --- a/test/test_proj/proj/test_manager.py +++ b/test/test_proj/proj/test_manager.py @@ -4,8 +4,8 @@ from RLA.easy_log.complex_data_recorder import MatplotlibRecorder as mpr import numpy as np from RLA.utils.utils import load_yaml -import os -import yaml + + def target_func(x): return np.tanh(np.mean(x, axis=-1, keepdims=True)) From 5ee52a4f0384317227eeff072ea16e935943c9cf Mon Sep 17 00:00:00 2001 From: Xiong-Hui Chen Date: Thu, 23 Jun 2022 00:10:28 +0800 Subject: [PATCH 5/9] refactor(test): refactor path --- test/test_proj/proj/test_manager.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/test_proj/proj/test_manager.py b/test/test_proj/proj/test_manager.py index d2a00ec..7b883ca 100644 --- a/test/test_proj/proj/test_manager.py +++ b/test/test_proj/proj/test_manager.py @@ -4,22 +4,28 @@ from RLA.easy_log.complex_data_recorder import MatplotlibRecorder as mpr import numpy as np from RLA.utils.utils import load_yaml +import os def target_func(x): return np.tanh(np.mean(x, axis=-1, keepdims=True)) + +RLA_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +DATABASE_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +CODE_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + class ManagerTest(BaseTest): def _load_rla_config(self): - return load_yaml(f'../../../example/rla_config.yaml') + return load_yaml(os.path.join(RLA_REPO_ROOT, 'example/rla_config.yaml')) def _init_proj(self, config_yaml, **kwargs): task_name = 'test_manger_demo_task' - rla_data_root = '../../test_data_root' + rla_data_root = os.path.join(DATABASE_ROOT, 'test_data_root') config_yaml['BACKUP_CONFIG']['backup_code_dir'] = ['proj'] exp_manager.configure(task_name, private_config_path=config_yaml, data_root=rla_data_root, - code_root='../', **kwargs) + code_root=CODE_ROOT, **kwargs) exp_manager.log_files_gen() exp_manager.print_args() From f260a95337a943d7bae00285012966e4afc3c9f2 Mon Sep 17 00:00:00 2001 From: Xiong-Hui Chen Date: Thu, 23 Jun 2022 00:33:12 +0800 Subject: [PATCH 6/9] docs: update docs --- test/test_proj/proj/test_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/test_proj/proj/test_manager.py b/test/test_proj/proj/test_manager.py index 7b883ca..9470d29 100644 --- a/test/test_proj/proj/test_manager.py +++ b/test/test_proj/proj/test_manager.py @@ -39,7 +39,6 @@ def test_log_tf(self): yaml = self._load_rla_config() self._init_proj(yaml) import tensorflow as tf - import numpy as np X_ph = tf.placeholder(dtype=tf.float32, shape=[None, kwargs["input_size"]], name='x') y_ph = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='x') l = X_ph @@ -48,7 +47,7 @@ def test_log_tf(self): l = tf.nn.tanh(tf.layers.dense(l, 64, kernel_initializer=tf.keras.initializers.glorot_normal)) out = tf.layers.dense(l, 1, kernel_initializer=tf.keras.initializers.glorot_normal) - loss = tf.reduce_mean(np.square(out - y_ph)) + loss = tf.reduce_mean(tf.square(out - y_ph)) opt = tf.train.AdamOptimizer(learning_rate=kwargs["learning_rate"]).minimize(loss) sess = tf.Session().__enter__() sess.run(tf.variables_initializer(tf.global_variables())) From 0529ed5900d7f86920d17bd34df08fd2c04d0d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=96=E9=B9=A4=20=E7=8E=8B?= Date: Thu, 23 Jun 2022 00:41:45 +0800 Subject: [PATCH 7/9] test manager test_sent_to_master --- RLA/auto_ftp.py | 2 +- example/rla_config.yaml | 2 +- example/simplest_code/project/main.py | 2 +- test/test_proj/proj/test_manager.py | 16 ++++++++-------- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/RLA/auto_ftp.py b/RLA/auto_ftp.py index 446e319..f23bf9c 100644 --- a/RLA/auto_ftp.py +++ b/RLA/auto_ftp.py @@ -203,7 +203,7 @@ def upload_file(self, remote_dir, local_dir, local_file): raise expection logger.warn('create dir succeed {}'.format(remote_dir)) self.sftp.cwd(remote_dir) - self.sftp.put(local_dir + local_file) + self.sftp.put(os.path.join(local_dir, local_file)) self.close() def download_file(self, remote_file, local_file): diff --git a/example/rla_config.yaml b/example/rla_config.yaml index 2622457..f9ca9f9 100644 --- a/example/rla_config.yaml +++ b/example/rla_config.yaml @@ -26,6 +26,6 @@ REMOTE_SETTING: ftp_server: '' username: '' password: '' - remote_data_root: '' + remote_log_root: '' # option: ftp or sftp file_transfer_protocol: 'sftp' diff --git a/example/simplest_code/project/main.py b/example/simplest_code/project/main.py index 335f7e4..8ff68e8 100644 --- a/example/simplest_code/project/main.py +++ b/example/simplest_code/project/main.py @@ -57,7 +57,7 @@ def set_global_seeds(seed): l = tf.nn.tanh(tf.layers.dense(l, 64, kernel_initializer=tf.keras.initializers.glorot_normal)) out = tf.layers.dense(l, 1, kernel_initializer=tf.keras.initializers.glorot_normal) -loss = tf.reduce_mean(np.square(out - y_ph)) +loss = tf.reduce_mean(tf.square(out - y_ph)) opt = tf.train.AdamOptimizer(learning_rate=kwargs["learning_rate"]).minimize(loss) sess = tf.Session().__enter__() diff --git a/test/test_proj/proj/test_manager.py b/test/test_proj/proj/test_manager.py index 7b883ca..f930eb0 100644 --- a/test/test_proj/proj/test_manager.py +++ b/test/test_proj/proj/test_manager.py @@ -48,7 +48,7 @@ def test_log_tf(self): l = tf.nn.tanh(tf.layers.dense(l, 64, kernel_initializer=tf.keras.initializers.glorot_normal)) out = tf.layers.dense(l, 1, kernel_initializer=tf.keras.initializers.glorot_normal) - loss = tf.reduce_mean(np.square(out - y_ph)) + loss = tf.reduce_mean(tf.square(out - y_ph)) opt = tf.train.AdamOptimizer(learning_rate=kwargs["learning_rate"]).minimize(loss) sess = tf.Session().__enter__() sess.run(tf.variables_initializer(tf.global_variables())) @@ -91,7 +91,7 @@ def test_log_torch(self): yaml = self._load_rla_config() yaml['DL_FRAMEWORK'] = 'torch' self._init_proj(yaml) - from torch_net import MLP, to_tensor + from test.test_proj.proj.torch_net import MLP, to_tensor from torch import nn from torch.nn import functional as F import torch as th @@ -107,7 +107,7 @@ def test_log_torch(self): optimizer.zero_grad() mse_loss.backward() optimizer.step() - logger.ma_record_tabular("perf/mse", np.mean(mse_loss.detach().numpy()), 10) + logger.ma_record_tabular("perf/mse", np.mean(mse_loss.detach().cpu().numpy()), 10) logger.record_tabular("y_out", np.mean(y)) if i % 10 == 0: def plot_func(): @@ -115,7 +115,7 @@ def plot_func(): testX = np.repeat(np.expand_dims(np.arange(-10, 10, 0.1), axis=-1), repeats=kwargs["input_size"], axis=-1) testX = testX.astype(np.float32) testY = target_func(testX) - predY = mlp(to_tensor(testX)).detach() + predY = mlp(to_tensor(testX)).detach().cpu().numpy() plt.plot(testX.mean(axis=-1), predY.mean(axis=-1), label='pred') plt.plot(testX.mean(axis=-1), testY.mean(axis=-1), label='real') mpr.pretty_plot_wrapper('react_func', plot_func, xlabel='x', ylabel='y', title='react test') @@ -135,14 +135,14 @@ def test_sent_to_master(self): exp_manager.set_hyper_param(**kwargs) exp_manager.add_record_param(['input_size']) yaml = self._load_rla_config() - import private_config + from test.test_proj.proj import private_config yaml['DL_FRAMEWORK'] = 'torch' yaml['SEND_LOG_FILE'] = True yaml['REMOTE_SETTING']['ftp_server'] = '127.0.0.1' - yaml['REMOTE_SETTING']['file_transfer_protocol'] = 'ftp' + yaml['REMOTE_SETTING']['file_transfer_protocol'] = 'sftp' yaml['REMOTE_SETTING']['username'] = private_config.username - yaml['REMOTE_SETTING']['password'] = private_config.passward - yaml['REMOTE_SETTING']['remote_data_root'] = private_config.remote_root + yaml['REMOTE_SETTING']['password'] = private_config.password + yaml['REMOTE_SETTING']['remote_log_root'] = private_config.remote_root self._init_proj(yaml, is_master_node=False) for i in range(0, 100): From c947a801e5e077c2dded714cedcbad0e55089dda Mon Sep 17 00:00:00 2001 From: Xiong-Hui Chen Date: Thu, 23 Jun 2022 00:45:17 +0800 Subject: [PATCH 8/9] docs: add comments --- README.md | 2 +- example/simplest_code/project/main.py | 2 +- test/test_scripts.py | 24 ++++++++++-------------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 1d22b8d..d78131c 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Currently, we store the data items in standard file systems and manage the relat - The directory "code" is a backup of code for experiment reproducibility. - The directory "checkpoint" save weights of neural networks. - We have a table named "demo_task", which is the root directory of log/archive_tester/checkpoint/code/results. -- The "index" of experiments in named in the formulation of `${%Y}/${%m}/${%d}/${%H-%M-%S-%f} ${ip address} ${tracked hyper-parameters}`. +- The "index" of experiments in named in the formulation of `${%Y}/${%m}/${%d}/${%H-%M-%S-%f}_${ip address}_${tracked hyper-parameters}`. ### Tools to Manage the Database diff --git a/example/simplest_code/project/main.py b/example/simplest_code/project/main.py index 335f7e4..8ff68e8 100644 --- a/example/simplest_code/project/main.py +++ b/example/simplest_code/project/main.py @@ -57,7 +57,7 @@ def set_global_seeds(seed): l = tf.nn.tanh(tf.layers.dense(l, 64, kernel_initializer=tf.keras.initializers.glorot_normal)) out = tf.layers.dense(l, 1, kernel_initializer=tf.keras.initializers.glorot_normal) -loss = tf.reduce_mean(np.square(out - y_ph)) +loss = tf.reduce_mean(tf.square(out - y_ph)) opt = tf.train.AdamOptimizer(learning_rate=kwargs["learning_rate"]).minimize(loss) sess = tf.Session().__enter__() diff --git a/test/test_scripts.py b/test/test_scripts.py index 8670e0d..5b23387 100644 --- a/test/test_scripts.py +++ b/test/test_scripts.py @@ -9,7 +9,7 @@ class ScriptTest(BaseTest): def test_delete_reg(self) -> None: """ - test delete log filtered by regex. + test delete log filtered by regex. See rla_scripts/delete_expt.py """ self.remove_and_copy_data() filter = Filter() @@ -22,7 +22,7 @@ def test_delete_reg(self) -> None: def test_delete_reg_small_ts(self) -> None: """ - test delete log filtered by regex and threshold of time-step. + test delete log filtered by regex and threshold of time-step. See rla_scripts/delete_expt.py """ self.remove_and_copy_data() filter = Filter() @@ -43,6 +43,9 @@ def test_delete_reg_small_ts(self) -> None: assert log_found == 0 def test_archive(self) -> None: + """ + archive experiment log. See rla_scripts/archive_expt.py + """ self.remove_and_copy_data() # archive experiments. dlt = ArchiveLogTool(proj_root=self.TARGET_DATA_ROOT, task_table_name=self.TASK_NAME, regex='2022/03/01/21-13*') @@ -55,17 +58,10 @@ def test_archive(self) -> None: assert log_found == 10 def test_view(self) -> None: + """ + view experiment log.See rla_scripts/view_expt.py + + """ self.remove_and_copy_data() dlt = ViewLogTool(proj_root=self.TARGET_DATA_ROOT, task_table_name=self.TASK_NAME, regex='2022/03/01/21-13*') - dlt.view_log(skip_ask=True) - # - # def test_sync_log(self) -> None: - # from RLA.auto_ftp import SFTPHandler - # exp_manager.configure(task_name='test', - # private_config_path='./test/test_data_root/rla_config.yaml', - # log_root='./test/test_data_root/source/') - # ftp = SFTPHandler(sftp_server=exp_manager.private_config["REMOTE_SETTING"]["ftp_server"], - # username=exp_manager.private_config["REMOTE_SETTING"]["username"], - # password=exp_manager.private_config["REMOTE_SETTING"]["password"]) - # ftp.upload_file(os.getcwd() + '/' + 'test/test_data_root/target/', 'test/test_data_root/source/', 'test.txt') - # ftp.download_file(os.getcwd() + '/' + 'test/test_data_root/source/download.txt', 'test/test_data_root/target/download.txt') + dlt.view_log(skip_ask=True) \ No newline at end of file From 2a3b26cf84ce3bf963e41f12a399f33bbe0e6d00 Mon Sep 17 00:00:00 2001 From: Xiong-Hui Chen Date: Thu, 23 Jun 2022 00:55:02 +0800 Subject: [PATCH 9/9] fix: remote_data_root alias --- README.md | 2 +- RLA/easy_log/tester.py | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index d78131c..e6e3154 100644 --- a/README.md +++ b/README.md @@ -243,7 +243,7 @@ for i in range(1000): then the data items we be sent to the `remote_data_root` of the main node. Since `SEND_LOG_FILE` is set to False in the main node, the `exp_manager.sync_log_file()` will be skipped in the main node. PS: -1. You might meet "socket.error: [Errno 111] Connection refused" problem in this process. The solution can be found [here](https://stackoverflow.com/questions/16428401/unable-to-use-ip-address-with-ftplib-python). +1. You might meet "socket.error: [Errno 111] Connection refused" problem in this process. The solution can be found [here](https://stackoverflow.com/a/70784201/6055868). 2. An alternative way is building your own NFS for your physical machines and locate data_root to the NFS. diff --git a/RLA/easy_log/tester.py b/RLA/easy_log/tester.py index 6bf30f2..6ec2720 100644 --- a/RLA/easy_log/tester.py +++ b/RLA/easy_log/tester.py @@ -340,17 +340,17 @@ def sync_log_file(self, skip_error=False): """ logger.warn("sync: start") - # ignore_files = self.private_config["IGNORE_RULE"] + remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_data_root") + if remote_data_root is None: + remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_log_root") + logger.warn("the parameter remote_log_root will be renamed to remote_data_root in future versions.") + else: + raise RuntimeError("miss remote_log_root in rla_config") + def send_data(ftp_obj): for root, dirs, files in os.walk(self.log_dir): suffix = root.split("/{}/".format(LOG)) assert len(suffix) == 2, "root should only have one pattern \"/log/\"" - remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_data_root") - if remote_data_root is None: - remote_data_root = self.private_config["REMOTE_SETTING"].get("remote_log_root") - logger.warn("the parameter remote_log_root will be renamed to remote_data_root in future versions.") - else: - raise RuntimeError("miss remote_log_root in rla_config") remote_root = osp.join(remote_data_root, LOG, suffix[1]) local_root = root logger.warn("sync {} <- {}".format(remote_root, local_root)) @@ -389,8 +389,7 @@ def send_data(ftp_obj): logger.warn("server info ftp_server {}, username {}, password {}, remote_data_root {}".format( self.private_config["REMOTE_SETTING"]["ftp_server"], self.private_config["REMOTE_SETTING"]["username"], - self.private_config["REMOTE_SETTING"]["password"], - self.private_config["REMOTE_SETTING"]["remote_data_root"])) + self.private_config["REMOTE_SETTING"]["password"], remote_data_root)) import traceback logger.warn(traceback.format_exc()) if not skip_error: