From 9c690b44089b8a53d03a641eb0177da3c0943603 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Mon, 13 Apr 2020 23:03:40 -0400 Subject: [PATCH 1/9] feat: add node parameter classes and access descriptors --- pyworkflow/pyworkflow/parameters.py | 130 ++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 pyworkflow/pyworkflow/parameters.py diff --git a/pyworkflow/pyworkflow/parameters.py b/pyworkflow/pyworkflow/parameters.py new file mode 100644 index 0000000..380cdb3 --- /dev/null +++ b/pyworkflow/pyworkflow/parameters.py @@ -0,0 +1,130 @@ +import os + + +class Options: + """ + Descriptor for accessing node parameters as Parameter instances + + Clones the values in the class variable `OPTIONS` and sets their values + with the values in in the instance variable `option_values`. + """ + + def __get__(self, obj, objtype): + # return class variable OPTIONS if invoked from class + if obj is None: + return getattr(objtype, "OPTIONS", dict()) + # otherwise clone class's options and set values from instance + options = dict() + for k, v in obj.OPTIONS.items(): + options[k] = v.clone() + for k, v in getattr(obj, "option_values", dict()).items(): + if k in options: + options[k].set_value(v) + return options + + +class OptionTypes: + """ + Descriptor for accessing parameter names, types, and descriptions. + + This will never reference instance parameter values, only turn the + class OPTIONS into a dict. + """ + + def __get__(self, obj, objtype): + # handle both instance- and class-callers + item = obj or objtype + if getattr(item, "OPTIONS", None) is None: + return dict() + return {k: v.to_json() for k, v in item.OPTIONS.items()} + + +class Parameter: + type = None + + def __init__(self, label="", default=None, docstring=None): + self._label = label + self._value = None + self._default = default + self._docstring = docstring + + def clone(self): + return self.__class__(self.label, self.default, self.docstring) + + def get_value(self): + return self._value or self.default + + def set_value(self, value): + self._value = value + + @property + def label(self): + return self._label + + @property + def default(self): + return self._default + + @property + def docstring(self): + return self._docstring + + def validate(self): + raise NotImplementedError() + + def to_json(self): + return { + "type": self.type, + "label": self.label, + "value": self.get_value(), + "docstring": self.docstring + } + + +class FileParameter(Parameter): + type = "file" + + def validate(self): + value = self.get_value() + if (value is None) or (not os.path.exists(value)): + raise ParameterValidationError(self) + + +class StringParameter(Parameter): + type = "string" + + def validate(self): + value = self.get_value() + if not isinstance(value, str): + raise ParameterValidationError(self) + + +class IntegerParameter(Parameter): + type = "int" + + def validate(self): + value = self.get_value() + if not isinstance(value, int): + raise ParameterValidationError(self) + + +class BooleanParameter(Parameter): + type = "boolean" + + def validate(self): + value = self.get_value() + if not isinstance(value, bool): + raise ParameterValidationError(self) + + +class ParameterValidationError(Exception): + + def __init__(self, parameter): + self.parameter = parameter + + def __str__(self): + param = self.parameter + value = param.get_value() + value_type = type(value).__name__ + param_type = type(param).__name__ + return f"Invalid value '{value}' (type '{value_type}') for {param_type}" From ec1ad707bb0cc710980394173d13fba28237ce09 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Mon, 13 Apr 2020 23:04:33 -0400 Subject: [PATCH 2/9] refactor: simplify node inheritance and use parameter classes --- pyworkflow/pyworkflow/node.py | 166 +++++++++------------------------- vp/vp/views.py | 4 +- 2 files changed, 44 insertions(+), 126 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index ca75cb3..1ac04b0 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,11 +1,16 @@ import pandas as pd +from .parameters import * + class Node: """Node object """ - def __init__(self, node_info, options=None): + options = Options() + option_types = OptionTypes() + + def __init__(self, node_info): self.name = node_info.get('name') self.node_id = node_info.get('node_id') self.node_type = node_info.get('node_type') @@ -14,12 +19,9 @@ def __init__(self, node_info, options=None): self.is_global = node_info.get('is_global') is True - # Execution options are passed up from children - self.options = options or dict() - - # User-override takes precedence + self.option_values = dict() if node_info.get("options"): - self.options.update(node_info["options"]) + self.option_values.update(node_info["options"]) def execute(self, predecessor_data, flow_vars): pass @@ -35,12 +37,6 @@ class FlowNode(Node): """FlowNode object """ display_name = "Flow Control" - DEFAULT_OPTIONS = { - - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**FlowNode.DEFAULT_OPTIONS, **options}) class StringNode(FlowNode): @@ -53,27 +49,13 @@ class StringNode(FlowNode): num_out = 1 color = 'purple' - DEFAULT_OPTIONS = { - 'default_value': None, - 'var_name': 'my_var', - } - - OPTION_TYPES = { - 'default_value': { - "type": "string", - "name": "Default Value", - "desc": "Value this Node will pass as a flow variable" - }, - 'var_name': { - "type": "string", - "name": "Variable Name", - "desc": "Name of the variable to use in another Node" - } + OPTIONS = { + "default_value": StringParameter("Default Value", + docstring="Value this node will pass as a flow variable"), + "var_name": StringParameter("Variable Name", default="my_var", + docstring="Name of the variable to use in another Node") } - def __init__(self, node_info): - super().__init__(node_info) - class IONode(Node): """IONodes deal with file-handling in/out of the Workflow. @@ -85,13 +67,6 @@ class IONode(Node): color = 'black' display_name = "I/O" - DEFAULT_OPTIONS = { - # 'file': None, - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**IONode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): pass @@ -112,49 +87,27 @@ class ReadCsvNode(IONode): num_in = 0 num_out = 1 - DEFAULT_OPTIONS = { - 'filepath_or_buffer': None, - 'sep': ',', - 'header': 'infer', - 'index_col': None + OPTIONS = { + "file": FileParameter("File", docstring="CSV File"), + "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), + # user-specified headers are probably integers, but haven't figured out + # arguments with multiple possible types + "header": StringParameter("Header Row", default="infer", + docstring="Row number containing column names (0-indexed)"), } - OPTION_TYPES = { - 'filepath_or_buffer': { - "type": "file", - "name": "File", - "desc": "CSV File" - }, - 'sep': { - "type": "string", - "name": "Delimiter", - "desc": "column delimiter, default ','" - }, - 'header': { - "type": "string", - "name": "Column Name Row", - "desc": "Row number with column names (0-indexed) or 'infer'" - }, - 'index_col': { - "type": "string", - "name": "Index Column Name", - "desc": "Column to use as index (facilitates joining)" - } - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): + print(self.option_values) + print(self.options) try: - # TODO: FileStorage implemented in Django to store in /tmp - # Better filename/path handling should be implemented. NodeUtils.replace_flow_vars(self.options, flow_vars) - opts = self.options - df = pd.read_csv(opts["filepath_or_buffer"], sep=opts["sep"], - header=opts["header"]) + fname = self.options["file"].get_value() + sep = self.options["sep"].get_value() + hdr = self.options["header"].get_value() + df = pd.read_csv(fname, sep=sep, header=hdr) return df.to_json() except Exception as e: + raise e raise NodeException('read csv', str(e)) def __str__(self): @@ -175,34 +128,12 @@ class WriteCsvNode(IONode): num_out = 0 download_result = True - DEFAULT_OPTIONS = { - 'path_or_buf': None, - 'sep': ',', - 'index': True, + OPTIONS = { + "file": StringParameter("Filename", docstring="CSV file to write"), + "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), + "index": BooleanParameter("Write Index", default=True, docstring="Write index as column?"), } - OPTION_TYPES = { - "path_or_buf": { - "type": "string", - "name": "Filename", - "desc": "Filename to write" - }, - "sep": { - "type": "string", - "name": "Delimiter", - "desc": "column delimiter, default ','" - }, - "index": { - "type": "boolean", - "name": "Index", - "desc": "Write index column or not" - } - - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: # Write CSV needs exactly 1 input DataFrame @@ -212,8 +143,10 @@ def execute(self, predecessor_data, flow_vars): df = pd.DataFrame.from_dict(predecessor_data[0]) # Write to CSV and save - opts = self.options - df.to_csv(opts["path_or_buf"], sep=opts["sep"], index=opts["index"]) + fname = self.options["file"].get_value() + sep = self.options["sep"].get_value() + index = self.options["index"].get_value() + df.to_csv(fname, sep=sep, index=index) return df.to_json() except Exception as e: raise NodeException('write csv', str(e)) @@ -230,11 +163,6 @@ class ManipulationNode(Node): display_name = "Manipulation" color = 'goldenrod' - DEFAULT_OPTIONS = {} - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**ManipulationNode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): pass @@ -308,9 +236,6 @@ class PivotNode(ManipulationNode): } } - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) @@ -326,21 +251,10 @@ class JoinNode(ManipulationNode): num_in = 2 num_out = 1 - DEFAULT_OPTIONS = { - "on": None, + OPTIONS = { + "on": StringParameter("Join Column", docstring="Name of column to join on") } - OPTION_TYPES = { - "on": { - "type": "string", - "name": "Join Column", - "desc": "Name of column to join on" - } - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): # Join cannot accept more than 2 input DataFrames # TODO: Add more error-checking if 1, or no, DataFrames passed through @@ -348,7 +262,8 @@ def execute(self, predecessor_data, flow_vars): NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) first_df = pd.DataFrame.from_dict(predecessor_data[0]) second_df = pd.DataFrame.from_dict(predecessor_data[1]) - combined_df = pd.merge(first_df, second_df, on=self.options["on"]) + combined_df = pd.merge(first_df, second_df, + on=self.options["on"].get_value()) return combined_df.to_json() except Exception as e: raise NodeException('join', str(e)) @@ -384,6 +299,9 @@ def validate_predecessor_data(predecessor_data_len, num_in, node_key): @staticmethod def replace_flow_vars(node_options, flow_vars): + # TODO: this will no longer work with the Node.options descriptor, + # which uses Node.option_values to populate the Parameter + # class values upon access for var in flow_vars: node_options[var['var_name']] = var['default_value'] diff --git a/vp/vp/views.py b/vp/vp/views.py index e1d8632..898c095 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -55,8 +55,8 @@ def retrieve_nodes_for_user(request): 'num_out': child.num_out, 'color': child.color or parent.color, 'doc': child.__doc__, - 'options': {**parent.DEFAULT_OPTIONS, **child.DEFAULT_OPTIONS}, - 'option_types': getattr(child, "OPTION_TYPES", dict()), + 'options': {k: v.get_value() for k, v in child.options.items()}, + 'option_types': child.option_types, 'download_result': getattr(child, "download_result", False) } From ae7adad5319d24695ed2ee9e56706ac15c335f44 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Mon, 13 Apr 2020 23:07:25 -0400 Subject: [PATCH 3/9] fix: serialize `Node.option_values` as `options` in networkx graph --- pyworkflow/pyworkflow/workflow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index deb951c..77c3d6d 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -93,7 +93,10 @@ def update_or_add_node(self, node: Node): # attributes to add to graph node_dict = node.__dict__ for key in node_dict.keys(): - graph.nodes[node.node_id][key] = node_dict[key] + out_key = key + if key == "option_values": + out_key = "options" + graph.nodes[node.node_id][out_key] = node_dict[key] return From 4ab033c5a538a17ed07e6b1bbd8748455f96d295 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Mon, 13 Apr 2020 23:09:08 -0400 Subject: [PATCH 4/9] fix: refer to new WriteCsvNode file option name --- front-end/src/API.js | 2 +- pyworkflow/pyworkflow/workflow.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/front-end/src/API.js b/front-end/src/API.js index 8df7b4b..f7aea84 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -189,7 +189,7 @@ export async function downloadDataFile(node) { contentType = resp.headers.get("content-type"); if (contentType.startsWith("text")) { resp.text().then(data => { - downloadFile(data, contentType, node.config["path_or_buf"]); + downloadFile(data, contentType, node.config["file"]); }) } }).catch(err => console.log(err)); diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 77c3d6d..64014b8 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -261,7 +261,7 @@ def download_file(self, node_id): try: # TODO: Change to generic "file" option to allow for more than WriteCsv - to_open = Workflow.path(self, node.options['path_or_buf']) + to_open = Workflow.path(self, node.options['file'].get_value()) return open(to_open) except KeyError: raise WorkflowException('download_file', '%s does not have an associated file' % node_id) From 86c4b650d08175baef6f575ea5b8f0f38446bf8e Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Mon, 13 Apr 2020 23:10:53 -0400 Subject: [PATCH 5/9] fix: check parameter `label` rather than `name` when prefixing files --- vp/node/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vp/node/views.py b/vp/node/views.py index 91ea9e2..e61255b 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -260,7 +260,7 @@ def create_node(request): json_data = json.loads(request.body) # for options with type 'file', replace value with FileStorage path for field, info in json_data.get("option_types", dict()).items(): - if info["type"] == "file" or info["name"] == "Filename": + if info["type"] == "file" or info["label"] == "Filename": opt_value = json_data["options"][field] if opt_value is not None: json_data["options"][field] = Workflow.path(request.pyworkflow, opt_value) From b29cda68d94c68197db9675773017f62414bd623 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Mon, 13 Apr 2020 23:12:27 -0400 Subject: [PATCH 6/9] refactor: make `retrieve_node_data` an instance method --- pyworkflow/pyworkflow/workflow.py | 7 +++---- vp/node/views.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 64014b8..e43fbb1 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -215,7 +215,7 @@ def execute(self, node_id): if node_to_retrieve.node_type == 'FlowNode': flow_vars.append(node_to_retrieve.options) else: - preceding_data.append(Workflow.retrieve_node_data(node_to_retrieve)) + preceding_data.append(self.retrieve_node_data(node_to_retrieve)) except WorkflowException: # TODO: Should this append None, skip reading, or raise exception to view? @@ -292,8 +292,7 @@ def store_node_data(workflow, node_id, data): except Exception as e: return None - @staticmethod - def retrieve_node_data(node_to_retrieve): + def retrieve_node_data(self, node_to_retrieve): """Retrieve Node data Reads a saved DataFrame, referenced by the Node's 'data' attribute. @@ -309,7 +308,7 @@ def retrieve_node_data(node_to_retrieve): problem parsing the file. """ try: - with open(node_to_retrieve.data) as f: + with open(Workflow.path(self, node_to_retrieve.data)) as f: return json.load(f) except OSError as e: raise WorkflowException('retrieve node data', str(e)) diff --git a/vp/node/views.py b/vp/node/views.py index e61255b..cc292eb 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -247,7 +247,7 @@ def execute_node(request, node_id): def retrieve_data(request, node_id): try: node_to_retrieve = request.pyworkflow.get_node(node_id) - data = Workflow.retrieve_node_data(node_to_retrieve) + data = request.pyworkflow.retrieve_node_data(node_to_retrieve) return JsonResponse(data, safe=False, status=200) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) From 7be321bc4bfaaa7c017d2eb5029aa7a8a815a1b6 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Tue, 14 Apr 2020 17:58:50 -0400 Subject: [PATCH 7/9] fix(ui): use new node attrs in node config form --- front-end/src/components/CustomNode/NodeConfig.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/front-end/src/components/CustomNode/NodeConfig.js b/front-end/src/components/CustomNode/NodeConfig.js index de521d2..2a28445 100644 --- a/front-end/src/components/CustomNode/NodeConfig.js +++ b/front-end/src/components/CustomNode/NodeConfig.js @@ -85,8 +85,8 @@ function OptionInput(props) { } return ( - {props.name} -
{props.desc}
+ {props.label} +
{props.docstring}
{ inputComp }
) From 8fb44d51e81c108940df262ebf1230678dfaecc1 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Tue, 14 Apr 2020 18:06:55 -0400 Subject: [PATCH 8/9] fix: remove stray debug lines --- pyworkflow/pyworkflow/node.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 1ac04b0..bd6c1b4 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -97,8 +97,6 @@ class ReadCsvNode(IONode): } def execute(self, predecessor_data, flow_vars): - print(self.option_values) - print(self.options) try: NodeUtils.replace_flow_vars(self.options, flow_vars) fname = self.options["file"].get_value() @@ -107,7 +105,6 @@ def execute(self, predecessor_data, flow_vars): df = pd.read_csv(fname, sep=sep, header=hdr) return df.to_json() except Exception as e: - raise e raise NodeException('read csv', str(e)) def __str__(self): From 3e0ca55185984bc8c71be2120d768e45fc2df1c2 Mon Sep 17 00:00:00 2001 From: Samir Reddigari Date: Tue, 14 Apr 2020 18:07:29 -0400 Subject: [PATCH 9/9] fix: use instance method for path derivation --- pyworkflow/pyworkflow/workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 2944a17..5cd40d5 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -252,7 +252,7 @@ def download_file(self, node_id): try: # TODO: Change to generic "file" option to allow for more than WriteCsv - to_open = Workflow.path(self, node.options['file'].get_value()) + to_open = self.path(node.options['file'].get_value()) return open(to_open) except KeyError: raise WorkflowException('download_file', '%s does not have an associated file' % node_id) @@ -299,7 +299,7 @@ def retrieve_node_data(self, node_to_retrieve): problem parsing the file. """ try: - with open(Workflow.path(self, node_to_retrieve.data)) as f: + with open(self.path(node_to_retrieve.data)) as f: return json.load(f) except OSError as e: raise WorkflowException('retrieve node data', str(e))