From aea1cae6ef9e6e3db627aba466b90b6a94f14e4f Mon Sep 17 00:00:00 2001 From: Young Date: Mon, 13 Sep 2021 04:25:07 +0000 Subject: [PATCH 1/5] Supporting shared processor --- qlib/data/dataset/handler.py | 90 +++++++++++++++++++++++++--------- qlib/data/dataset/processor.py | 17 +++++++ 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index b823728fb73..5889715a4ee 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -295,11 +295,14 @@ class DataHandlerLP(DataHandler): # process type PTYPE_I = "independent" - # - self._infer will be processed by infer_processors - # - self._learn will be processed by learn_processors + # - self._infer will be processed by shared_processors + infer_processors + # - self._learn will be processed by shared_processors + learn_processors + + # NOTE: PTYPE_A = "append" - # - self._infer will be processed by infer_processors - # - self._learn will be processed by infer_processors + learn_processors + + # - self._infer will be processed by shared_processors + infer_processors + # - self._learn will be processed by shared_processors + infer_processors + learn_processors # - (e.g. self._infer processed by learn_processors ) def __init__( @@ -308,8 +311,9 @@ def __init__( start_time=None, end_time=None, data_loader: Union[dict, str, DataLoader] = None, - infer_processors=[], - learn_processors=[], + infer_processors: List = [], + learn_processors: List = [], + shared_processors: List = [], process_type=PTYPE_A, drop_raw=False, **kwargs, @@ -360,7 +364,8 @@ def __init__( # Setup preprocessor self.infer_processors = [] # for lint self.learn_processors = [] # for lint - for pname in "infer_processors", "learn_processors": + self.shared_processors = [] # for lint + for pname in "infer_processors", "learn_processors", "shared_processors": for proc in locals()[pname]: getattr(self, pname).append( init_instance_by_config( @@ -375,9 +380,12 @@ def __init__( super().__init__(instruments, start_time, end_time, data_loader, **kwargs) def get_all_processors(self): - return self.infer_processors + self.learn_processors + return self.shared_processors + self.infer_processors + self.learn_processors def fit(self): + """ + fit data without processing the data + """ for proc in self.get_all_processors(): with TimeInspector.logt(f"{proc.__class__.__name__}"): proc.fit(self._data) @@ -390,30 +398,67 @@ def fit_process_data(self): """ self.process_data(with_fit=True) + @staticmethod + def _run_proc_l( + df: pd.DataFrame, proc_l: List[processor_module.Processor], with_fit: bool, check_for_infer: bool + ) -> pd.DataFrame: + for proc in proc_l: + if check_for_infer and not proc.is_for_infer(): + raise TypeError("Only processors usable for inference can be used in `infer_processors` ") + with TimeInspector.logt(f"{proc.__class__.__name__}"): + if with_fit: + proc.fit(df) + df = proc(df) + return df + + def _is_proc_readonly(proc_l: List[processor_module.Processor]): + """ + NOTE: it will return True if `len(proc_l) == 0` + """ + for p in proc_l: + if not p.readonly(): + return False + return True + def process_data(self, with_fit: bool = False): """ process_data data. Fun `processor.fit` if necessary + Notation: (data) [processor] + + # data processing flow of self.process_type == DataHandlerLP.PTYPE_I + (self._data)-[shared_processors]-(_shared_df)-[learn_processors]-(_learn_df) + \ + -[infer_processors]-(_infer_df) + + # data processing flow of self.process_type == DataHandlerLP.PTYPE_A + (self._data)-[shared_processors]-(_shared_df)-[infer_processors]-(_infer_df)-[learn_processors]-(_learn_df) + Parameters ---------- with_fit : bool The input of the `fit` will be the output of the previous processor """ + # shared data processors + # 1) assign + _shared_df = self._data + if self._is_proc_readonly(self.shared_processors): # avoid modifying the original data + _shared_df = _shared_df.copy() + # 2) process + _shared_df = self._run_proc_l(_shared_df, self.shared_processors, with_fit=with_fit, check_for_infer=True) + # data for inference - _infer_df = self._data - if len(self.infer_processors) > 0 and not self.drop_raw: # avoid modifying the original data + # 1) assign + _infer_df = _shared_df + if self._is_proc_readonly(self.infer_processors): # avoid modifying the original data _infer_df = _infer_df.copy() + # 2) process + _infer_df = self._run_proc_l(_infer_df, self.infer_processors, with_fit=with_fit, check_for_infer=True) - for proc in self.infer_processors: - if not proc.is_for_infer(): - raise TypeError("Only processors usable for inference can be used in `infer_processors` ") - with TimeInspector.logt(f"{proc.__class__.__name__}"): - if with_fit: - proc.fit(_infer_df) - _infer_df = proc(_infer_df) self._infer = _infer_df # data for learning + # 1) assign if self.process_type == DataHandlerLP.PTYPE_I: _learn_df = self._data elif self.process_type == DataHandlerLP.PTYPE_A: @@ -421,14 +466,11 @@ def process_data(self, with_fit: bool = False): _learn_df = _infer_df else: raise NotImplementedError(f"This type of input is not supported") - - if len(self.learn_processors) > 0: # avoid modifying the original data + if self._is_proc_readonly(self.learn_processors): # avoid modifying the original data _learn_df = _learn_df.copy() - for proc in self.learn_processors: - with TimeInspector.logt(f"{proc.__class__.__name__}"): - if with_fit: - proc.fit(_learn_df) - _learn_df = proc(_learn_df) + # 2) process + _learn_df = self._is_proc_readonly(self.learn_processors, with_fit=True, check_for_infer=False) + self._learn = _learn_df if self.drop_raw: diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index fce22ddfcf9..9114ac918ed 100644 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -73,6 +73,14 @@ def is_for_infer(self) -> bool: """ return True + def readonly(self) -> bool: + """ + Does the processor treat the input data readonly (i.e. does not write the input data) when processsing + + Knowning the readonly information is helpful to the Handler to avoid uncessary copy + """ + return False + def config(self, **kwargs): attr_list = {"fit_start_time", "fit_end_time"} for k, v in kwargs.items(): @@ -92,6 +100,9 @@ def __init__(self, fields_group=None): def __call__(self, df): return df.dropna(subset=get_group_columns(df, self.fields_group)) + def readonly(self): + return True + class DropnaLabel(DropnaProcessor): def __init__(self, fields_group="label"): @@ -113,6 +124,9 @@ def __call__(self, df): mask = df.columns.isin(self.col_list) return df.loc[:, ~mask] + def readonly(self): + return True + class FilterCol(Processor): def __init__(self, fields_group="feature", col_list=[]): @@ -128,6 +142,9 @@ def __call__(self, df): mask = df.columns.get_level_values(-1).isin(self.col_list) return df.loc[:, mask] + def readonly(self): + return True + class TanhProcess(Processor): """Use tanh to process noise data""" From 088540489ede8993269b0e22e37b52bcb8bf4315 Mon Sep 17 00:00:00 2001 From: Young Date: Mon, 13 Sep 2021 05:05:16 +0000 Subject: [PATCH 2/5] fix readonly reverse bug --- qlib/data/dataset/handler.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 5889715a4ee..4ecdf79d442 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -411,6 +411,7 @@ def _run_proc_l( df = proc(df) return df + @staticmethod def _is_proc_readonly(proc_l: List[processor_module.Processor]): """ NOTE: it will return True if `len(proc_l) == 0` @@ -442,7 +443,7 @@ def process_data(self, with_fit: bool = False): # shared data processors # 1) assign _shared_df = self._data - if self._is_proc_readonly(self.shared_processors): # avoid modifying the original data + if not self._is_proc_readonly(self.shared_processors): # avoid modifying the original data _shared_df = _shared_df.copy() # 2) process _shared_df = self._run_proc_l(_shared_df, self.shared_processors, with_fit=with_fit, check_for_infer=True) @@ -450,7 +451,7 @@ def process_data(self, with_fit: bool = False): # data for inference # 1) assign _infer_df = _shared_df - if self._is_proc_readonly(self.infer_processors): # avoid modifying the original data + if not self._is_proc_readonly(self.infer_processors): # avoid modifying the original data _infer_df = _infer_df.copy() # 2) process _infer_df = self._run_proc_l(_infer_df, self.infer_processors, with_fit=with_fit, check_for_infer=True) @@ -466,10 +467,10 @@ def process_data(self, with_fit: bool = False): _learn_df = _infer_df else: raise NotImplementedError(f"This type of input is not supported") - if self._is_proc_readonly(self.learn_processors): # avoid modifying the original data + if not self._is_proc_readonly(self.learn_processors): # avoid modifying the original data _learn_df = _learn_df.copy() # 2) process - _learn_df = self._is_proc_readonly(self.learn_processors, with_fit=True, check_for_infer=False) + _learn_df = self._run_proc_l(self.learn_processors, with_fit=True, check_for_infer=False) self._learn = _learn_df From ba689128a1ef667cfdbdcacd73b8701828407e55 Mon Sep 17 00:00:00 2001 From: Young Date: Mon, 13 Sep 2021 05:20:31 +0000 Subject: [PATCH 3/5] remove pytests dependency --- tests/storage_tests/test_storage.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py index aad8d11e489..e2ab28af4ba 100644 --- a/tests/storage_tests/test_storage.py +++ b/tests/storage_tests/test_storage.py @@ -5,7 +5,6 @@ from pathlib import Path from collections.abc import Iterable -import pytest import numpy as np from qlib.tests import TestAutoData @@ -33,13 +32,13 @@ def test_calendar_storage(self): print(f"calendar[-1]: {calendar[-1]}") calendar = CalendarStorage(freq="1min", future=False, provider_uri="not_found") - with pytest.raises(ValueError): + with self.assertRaises(ValueError): print(calendar.data) - with pytest.raises(ValueError): + with self.assertRaises(ValueError): print(calendar[:]) - with pytest.raises(ValueError): + with self.assertRaises(ValueError): print(calendar[0]) def test_instrument_storage(self): @@ -90,10 +89,10 @@ def test_instrument_storage(self): print(f"instrument['SH600000']: {instrument['SH600000']}") instrument = InstrumentStorage(market="csi300", provider_uri="not_found") - with pytest.raises(ValueError): + with self.assertRaises(ValueError): print(instrument.data) - with pytest.raises(ValueError): + with self.assertRaises(ValueError): print(instrument["sSH600000"]) def test_feature_storage(self): @@ -152,7 +151,7 @@ def test_feature_storage(self): feature = FeatureStorage(instrument="SH600004", field="close", freq="day", provider_uri=self.provider_uri) - with pytest.raises(IndexError): + with self.assertRaises(IndexError): print(feature[0]) assert isinstance( feature[815][1], (float, np.float32) From 4db4d5738dc8fba8905b3f34b1fbf0cb3a5a9586 Mon Sep 17 00:00:00 2001 From: Young Date: Mon, 13 Sep 2021 05:22:49 +0000 Subject: [PATCH 4/5] with fit bug --- qlib/data/dataset/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 4ecdf79d442..e4bff2e673f 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -470,7 +470,7 @@ def process_data(self, with_fit: bool = False): if not self._is_proc_readonly(self.learn_processors): # avoid modifying the original data _learn_df = _learn_df.copy() # 2) process - _learn_df = self._run_proc_l(self.learn_processors, with_fit=True, check_for_infer=False) + _learn_df = self._run_proc_l(self.learn_processors, with_fit=with_fit, check_for_infer=False) self._learn = _learn_df From 7944982ee9d0a9e8b9267f812c7cda6a97a97afe Mon Sep 17 00:00:00 2001 From: Young Date: Mon, 13 Sep 2021 06:25:44 +0000 Subject: [PATCH 5/5] fix parameter error --- qlib/data/dataset/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index e4bff2e673f..92e73de0483 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -470,7 +470,7 @@ def process_data(self, with_fit: bool = False): if not self._is_proc_readonly(self.learn_processors): # avoid modifying the original data _learn_df = _learn_df.copy() # 2) process - _learn_df = self._run_proc_l(self.learn_processors, with_fit=with_fit, check_for_infer=False) + _learn_df = self._run_proc_l(_learn_df, self.learn_processors, with_fit=with_fit, check_for_infer=False) self._learn = _learn_df