diff --git a/front-end/src/API.js b/front-end/src/API.js index 8df7b4b..f7aea84 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -189,7 +189,7 @@ export async function downloadDataFile(node) { contentType = resp.headers.get("content-type"); if (contentType.startsWith("text")) { resp.text().then(data => { - downloadFile(data, contentType, node.config["path_or_buf"]); + downloadFile(data, contentType, node.config["file"]); }) } }).catch(err => console.log(err)); diff --git a/front-end/src/components/CustomNode/NodeConfig.js b/front-end/src/components/CustomNode/NodeConfig.js index de521d2..2a28445 100644 --- a/front-end/src/components/CustomNode/NodeConfig.js +++ b/front-end/src/components/CustomNode/NodeConfig.js @@ -85,8 +85,8 @@ function OptionInput(props) { } return ( - {props.name} -
{props.desc}
+ {props.label} +
{props.docstring}
{ inputComp }
) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index a33acb8..e30d5eb 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,11 +1,16 @@ import pandas as pd +from .parameters import * + class Node: """Node object """ - def __init__(self, node_info, options=None): + options = Options() + option_types = OptionTypes() + + def __init__(self, node_info): self.name = node_info.get('name') self.node_id = node_info.get('node_id') self.node_type = node_info.get('node_type') @@ -14,12 +19,9 @@ def __init__(self, node_info, options=None): self.is_global = node_info.get('is_global') is True - # Execution options are passed up from children - self.options = options or dict() - - # User-override takes precedence + self.option_values = dict() if node_info.get("options"): - self.options.update(node_info["options"]) + self.option_values.update(node_info["options"]) def execute(self, predecessor_data, flow_vars): pass @@ -35,12 +37,6 @@ class FlowNode(Node): """FlowNode object """ display_name = "Flow Control" - DEFAULT_OPTIONS = { - - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**FlowNode.DEFAULT_OPTIONS, **options}) class StringNode(FlowNode): @@ -53,27 +49,13 @@ class StringNode(FlowNode): num_out = 1 color = 'purple' - DEFAULT_OPTIONS = { - 'default_value': None, - 'var_name': 'my_var', + OPTIONS = { + "default_value": StringParameter("Default Value", + docstring="Value this node will pass as a flow variable"), + "var_name": StringParameter("Variable Name", default="my_var", + docstring="Name of the variable to use in another Node") } - OPTION_TYPES = { - 'default_value': { - "type": "string", - "name": "Default Value", - "desc": "Value this Node will pass as a flow variable" - }, - 'var_name': { - "type": "string", - "name": "Variable Name", - "desc": "Name of the variable to use in another Node" - } - } - - def __init__(self, node_info): - super().__init__(node_info) - class IONode(Node): """IONodes deal with file-handling in/out of the Workflow. @@ -85,13 +67,6 @@ class IONode(Node): color = 'black' display_name = "I/O" - DEFAULT_OPTIONS = { - # 'file': None, - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**IONode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): pass @@ -112,49 +87,24 @@ class ReadCsvNode(IONode): num_in = 0 num_out = 1 - DEFAULT_OPTIONS = { - 'filepath_or_buffer': None, - 'sep': ',', - 'header': 'infer', - 'index_col': None - } - - OPTION_TYPES = { - 'filepath_or_buffer': { - "type": "file", - "name": "File", - "desc": "CSV File" - }, - 'sep': { - "type": "string", - "name": "Delimiter", - "desc": "column delimiter, default ','" - }, - 'header': { - "type": "string", - "name": "Column Name Row", - "desc": "Row number with column names (0-indexed) or 'infer'" - }, - 'index_col': { - "type": "string", - "name": "Index Column Name", - "desc": "Column to use as index (facilitates joining)" - } + OPTIONS = { + "file": FileParameter("File", docstring="CSV File"), + "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), + # user-specified headers are probably integers, but haven't figured out + # arguments with multiple possible types + "header": StringParameter("Header Row", default="infer", + docstring="Row number containing column names (0-indexed)"), } - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: - # TODO: FileStorage implemented in Django to store in /tmp - # Better filename/path handling should be implemented. # Read CSV needs exactly 0 input DataFrame NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) NodeUtils.replace_flow_vars(self.options, flow_vars) - opts = self.options - df = pd.read_csv(opts["filepath_or_buffer"], sep=opts["sep"], - header=opts["header"]) + fname = self.options["file"].get_value() + sep = self.options["sep"].get_value() + hdr = self.options["header"].get_value() + df = pd.read_csv(fname, sep=sep, header=hdr) return df.to_json() except Exception as e: raise NodeException('read csv', str(e)) @@ -177,34 +127,12 @@ class WriteCsvNode(IONode): num_out = 0 download_result = True - DEFAULT_OPTIONS = { - 'path_or_buf': None, - 'sep': ',', - 'index': True, - } - - OPTION_TYPES = { - "path_or_buf": { - "type": "string", - "name": "Filename", - "desc": "Filename to write" - }, - "sep": { - "type": "string", - "name": "Delimiter", - "desc": "column delimiter, default ','" - }, - "index": { - "type": "boolean", - "name": "Index", - "desc": "Write index column or not" - } - + OPTIONS = { + "file": StringParameter("Filename", docstring="CSV file to write"), + "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), + "index": BooleanParameter("Write Index", default=True, docstring="Write index as column?"), } - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: # Write CSV needs exactly 1 input DataFrame @@ -214,8 +142,10 @@ def execute(self, predecessor_data, flow_vars): df = pd.DataFrame.from_dict(predecessor_data[0]) # Write to CSV and save - opts = self.options - df.to_csv(opts["path_or_buf"], sep=opts["sep"], index=opts["index"]) + fname = self.options["file"].get_value() + sep = self.options["sep"].get_value() + index = self.options["index"].get_value() + df.to_csv(fname, sep=sep, index=index) return df.to_json() except Exception as e: raise NodeException('write csv', str(e)) @@ -232,11 +162,6 @@ class ManipulationNode(Node): display_name = "Manipulation" color = 'goldenrod' - DEFAULT_OPTIONS = {} - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**ManipulationNode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): pass @@ -310,9 +235,6 @@ class PivotNode(ManipulationNode): } } - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) @@ -328,21 +250,10 @@ class JoinNode(ManipulationNode): num_in = 2 num_out = 1 - DEFAULT_OPTIONS = { - "on": None, + OPTIONS = { + "on": StringParameter("Join Column", docstring="Name of column to join on") } - OPTION_TYPES = { - "on": { - "type": "string", - "name": "Join Column", - "desc": "Name of column to join on" - } - } - - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): # Join cannot accept more than 2 input DataFrames # TODO: Add more error-checking if 1, or no, DataFrames passed through @@ -350,7 +261,8 @@ def execute(self, predecessor_data, flow_vars): NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) first_df = pd.DataFrame.from_dict(predecessor_data[0]) second_df = pd.DataFrame.from_dict(predecessor_data[1]) - combined_df = pd.merge(first_df, second_df, on=self.options["on"]) + combined_df = pd.merge(first_df, second_df, + on=self.options["on"].get_value()) return combined_df.to_json() except Exception as e: raise NodeException('join', str(e)) @@ -437,6 +349,9 @@ def validate_predecessor_data(predecessor_data_len, num_in, node_key): @staticmethod def replace_flow_vars(node_options, flow_vars): + # TODO: this will no longer work with the Node.options descriptor, + # which uses Node.option_values to populate the Parameter + # class values upon access for var in flow_vars: node_options[var['var_name']] = var['default_value'] diff --git a/pyworkflow/pyworkflow/parameters.py b/pyworkflow/pyworkflow/parameters.py new file mode 100644 index 0000000..380cdb3 --- /dev/null +++ b/pyworkflow/pyworkflow/parameters.py @@ -0,0 +1,130 @@ +import os + + +class Options: + """ + Descriptor for accessing node parameters as Parameter instances + + Clones the values in the class variable `OPTIONS` and sets their values + with the values in in the instance variable `option_values`. + """ + + def __get__(self, obj, objtype): + # return class variable OPTIONS if invoked from class + if obj is None: + return getattr(objtype, "OPTIONS", dict()) + # otherwise clone class's options and set values from instance + options = dict() + for k, v in obj.OPTIONS.items(): + options[k] = v.clone() + for k, v in getattr(obj, "option_values", dict()).items(): + if k in options: + options[k].set_value(v) + return options + + +class OptionTypes: + """ + Descriptor for accessing parameter names, types, and descriptions. + + This will never reference instance parameter values, only turn the + class OPTIONS into a dict. + """ + + def __get__(self, obj, objtype): + # handle both instance- and class-callers + item = obj or objtype + if getattr(item, "OPTIONS", None) is None: + return dict() + return {k: v.to_json() for k, v in item.OPTIONS.items()} + + +class Parameter: + type = None + + def __init__(self, label="", default=None, docstring=None): + self._label = label + self._value = None + self._default = default + self._docstring = docstring + + def clone(self): + return self.__class__(self.label, self.default, self.docstring) + + def get_value(self): + return self._value or self.default + + def set_value(self, value): + self._value = value + + @property + def label(self): + return self._label + + @property + def default(self): + return self._default + + @property + def docstring(self): + return self._docstring + + def validate(self): + raise NotImplementedError() + + def to_json(self): + return { + "type": self.type, + "label": self.label, + "value": self.get_value(), + "docstring": self.docstring + } + + +class FileParameter(Parameter): + type = "file" + + def validate(self): + value = self.get_value() + if (value is None) or (not os.path.exists(value)): + raise ParameterValidationError(self) + + +class StringParameter(Parameter): + type = "string" + + def validate(self): + value = self.get_value() + if not isinstance(value, str): + raise ParameterValidationError(self) + + +class IntegerParameter(Parameter): + type = "int" + + def validate(self): + value = self.get_value() + if not isinstance(value, int): + raise ParameterValidationError(self) + + +class BooleanParameter(Parameter): + type = "boolean" + + def validate(self): + value = self.get_value() + if not isinstance(value, bool): + raise ParameterValidationError(self) + + +class ParameterValidationError(Exception): + + def __init__(self, parameter): + self.parameter = parameter + + def __str__(self): + param = self.parameter + value = param.get_value() + value_type = type(value).__name__ + param_type = type(param).__name__ + return f"Invalid value '{value}' (type '{value_type}') for {param_type}" diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index c540b4e..5cd40d5 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -84,7 +84,10 @@ def update_or_add_node(self, node: Node): # attributes to add to graph node_dict = node.__dict__ for key in node_dict.keys(): - graph.nodes[node.node_id][key] = node_dict[key] + out_key = key + if key == "option_values": + out_key = "options" + graph.nodes[node.node_id][out_key] = node_dict[key] return @@ -203,7 +206,7 @@ def execute(self, node_id): if node_to_retrieve.node_type == 'FlowNode': flow_vars.append(node_to_retrieve.options) else: - preceding_data.append(Workflow.retrieve_node_data(node_to_retrieve)) + preceding_data.append(self.retrieve_node_data(node_to_retrieve)) except WorkflowException: # TODO: Should this append None, skip reading, or raise exception to view? @@ -249,7 +252,7 @@ def download_file(self, node_id): try: # TODO: Change to generic "file" option to allow for more than WriteCsv - to_open = self.path(node.options['path_or_buf']) + to_open = self.path(node.options['file'].get_value()) return open(to_open) except KeyError: raise WorkflowException('download_file', '%s does not have an associated file' % node_id) @@ -280,8 +283,7 @@ def store_node_data(workflow, node_id, data): except Exception as e: return None - @staticmethod - def retrieve_node_data(node_to_retrieve): + def retrieve_node_data(self, node_to_retrieve): """Retrieve Node data Reads a saved DataFrame, referenced by the Node's 'data' attribute. @@ -297,7 +299,7 @@ def retrieve_node_data(node_to_retrieve): problem parsing the file. """ try: - with open(node_to_retrieve.data) as f: + with open(self.path(node_to_retrieve.data)) as f: return json.load(f) except OSError as e: raise WorkflowException('retrieve node data', str(e)) diff --git a/vp/node/views.py b/vp/node/views.py index f365ad4..4a27150 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -247,7 +247,7 @@ def execute_node(request, node_id): def retrieve_data(request, node_id): try: node_to_retrieve = request.pyworkflow.get_node(node_id) - data = Workflow.retrieve_node_data(node_to_retrieve) + data = request.pyworkflow.retrieve_node_data(node_to_retrieve) return JsonResponse(data, safe=False, status=200) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) @@ -260,7 +260,7 @@ def create_node(request): json_data = json.loads(request.body) # for options with type 'file', replace value with FileStorage path for field, info in json_data.get("option_types", dict()).items(): - if info["type"] == "file" or info["name"] == "Filename": + if info["type"] == "file" or info["label"] == "Filename": opt_value = json_data["options"][field] if opt_value is not None: json_data["options"][field] = request.pyworkflow.path(opt_value) diff --git a/vp/vp/views.py b/vp/vp/views.py index e1d8632..898c095 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -55,8 +55,8 @@ def retrieve_nodes_for_user(request): 'num_out': child.num_out, 'color': child.color or parent.color, 'doc': child.__doc__, - 'options': {**parent.DEFAULT_OPTIONS, **child.DEFAULT_OPTIONS}, - 'option_types': getattr(child, "OPTION_TYPES", dict()), + 'options': {k: v.get_value() for k, v in child.options.items()}, + 'option_types': child.option_types, 'download_result': getattr(child, "download_result", False) }