Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion front-end/src/API.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export async function downloadDataFile(node) {
contentType = resp.headers.get("content-type");
if (contentType.startsWith("text")) {
resp.text().then(data => {
downloadFile(data, contentType, node.config["path_or_buf"]);
downloadFile(data, contentType, node.config["file"]);
})
}
}).catch(err => console.log(err));
Expand Down
4 changes: 2 additions & 2 deletions front-end/src/components/CustomNode/NodeConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ function OptionInput(props) {
}
return (
<Form.Group>
<Form.Label>{props.name}</Form.Label>
<div style={{fontSize: '0.7rem'}}>{props.desc}</div>
<Form.Label>{props.label}</Form.Label>
<div style={{fontSize: '0.7rem'}}>{props.docstring}</div>
{ inputComp }
</Form.Group>
)
Expand Down
163 changes: 39 additions & 124 deletions pyworkflow/pyworkflow/node.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import pandas as pd

from .parameters import *


class Node:
"""Node object

"""
def __init__(self, node_info, options=None):
options = Options()
option_types = OptionTypes()

def __init__(self, node_info):
self.name = node_info.get('name')
self.node_id = node_info.get('node_id')
self.node_type = node_info.get('node_type')
Expand All @@ -14,12 +19,9 @@ def __init__(self, node_info, options=None):

self.is_global = node_info.get('is_global') is True

# Execution options are passed up from children
self.options = options or dict()

# User-override takes precedence
self.option_values = dict()
if node_info.get("options"):
self.options.update(node_info["options"])
self.option_values.update(node_info["options"])

def execute(self, predecessor_data, flow_vars):
pass
Expand All @@ -35,12 +37,6 @@ class FlowNode(Node):
"""FlowNode object
"""
display_name = "Flow Control"
DEFAULT_OPTIONS = {

}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**FlowNode.DEFAULT_OPTIONS, **options})


class StringNode(FlowNode):
Expand All @@ -53,27 +49,13 @@ class StringNode(FlowNode):
num_out = 1
color = 'purple'

DEFAULT_OPTIONS = {
'default_value': None,
'var_name': 'my_var',
OPTIONS = {
"default_value": StringParameter("Default Value",
docstring="Value this node will pass as a flow variable"),
"var_name": StringParameter("Variable Name", default="my_var",
docstring="Name of the variable to use in another Node")
}

OPTION_TYPES = {
'default_value': {
"type": "string",
"name": "Default Value",
"desc": "Value this Node will pass as a flow variable"
},
'var_name': {
"type": "string",
"name": "Variable Name",
"desc": "Name of the variable to use in another Node"
}
}

def __init__(self, node_info):
super().__init__(node_info)


class IONode(Node):
"""IONodes deal with file-handling in/out of the Workflow.
Expand All @@ -85,13 +67,6 @@ class IONode(Node):
color = 'black'
display_name = "I/O"

DEFAULT_OPTIONS = {
# 'file': None,
}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**IONode.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data, flow_vars):
pass

Expand All @@ -112,49 +87,24 @@ class ReadCsvNode(IONode):
num_in = 0
num_out = 1

DEFAULT_OPTIONS = {
'filepath_or_buffer': None,
'sep': ',',
'header': 'infer',
'index_col': None
}

OPTION_TYPES = {
'filepath_or_buffer': {
"type": "file",
"name": "File",
"desc": "CSV File"
},
'sep': {
"type": "string",
"name": "Delimiter",
"desc": "column delimiter, default ','"
},
'header': {
"type": "string",
"name": "Column Name Row",
"desc": "Row number with column names (0-indexed) or 'infer'"
},
'index_col': {
"type": "string",
"name": "Index Column Name",
"desc": "Column to use as index (facilitates joining)"
}
OPTIONS = {
"file": FileParameter("File", docstring="CSV File"),
"sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"),
# user-specified headers are probably integers, but haven't figured out
# arguments with multiple possible types
"header": StringParameter("Header Row", default="infer",
docstring="Row number containing column names (0-indexed)"),
}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data, flow_vars):
try:
# TODO: FileStorage implemented in Django to store in /tmp
# Better filename/path handling should be implemented.
# Read CSV needs exactly 0 input DataFrame
NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key)
NodeUtils.replace_flow_vars(self.options, flow_vars)
opts = self.options
df = pd.read_csv(opts["filepath_or_buffer"], sep=opts["sep"],
header=opts["header"])
fname = self.options["file"].get_value()
sep = self.options["sep"].get_value()
hdr = self.options["header"].get_value()
df = pd.read_csv(fname, sep=sep, header=hdr)
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))
Expand All @@ -177,34 +127,12 @@ class WriteCsvNode(IONode):
num_out = 0
download_result = True

DEFAULT_OPTIONS = {
'path_or_buf': None,
'sep': ',',
'index': True,
}

OPTION_TYPES = {
"path_or_buf": {
"type": "string",
"name": "Filename",
"desc": "Filename to write"
},
"sep": {
"type": "string",
"name": "Delimiter",
"desc": "column delimiter, default ','"
},
"index": {
"type": "boolean",
"name": "Index",
"desc": "Write index column or not"
}

OPTIONS = {
"file": StringParameter("Filename", docstring="CSV file to write"),
"sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"),
"index": BooleanParameter("Write Index", default=True, docstring="Write index as column?"),
}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data, flow_vars):
try:
# Write CSV needs exactly 1 input DataFrame
Expand All @@ -214,8 +142,10 @@ def execute(self, predecessor_data, flow_vars):
df = pd.DataFrame.from_dict(predecessor_data[0])

# Write to CSV and save
opts = self.options
df.to_csv(opts["path_or_buf"], sep=opts["sep"], index=opts["index"])
fname = self.options["file"].get_value()
sep = self.options["sep"].get_value()
index = self.options["index"].get_value()
df.to_csv(fname, sep=sep, index=index)
return df.to_json()
except Exception as e:
raise NodeException('write csv', str(e))
Expand All @@ -232,11 +162,6 @@ class ManipulationNode(Node):
display_name = "Manipulation"
color = 'goldenrod'

DEFAULT_OPTIONS = {}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**ManipulationNode.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data, flow_vars):
pass

Expand Down Expand Up @@ -310,9 +235,6 @@ class PivotNode(ManipulationNode):
}
}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data, flow_vars):
try:
NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key)
Expand All @@ -328,29 +250,19 @@ class JoinNode(ManipulationNode):
num_in = 2
num_out = 1

DEFAULT_OPTIONS = {
"on": None,
OPTIONS = {
"on": StringParameter("Join Column", docstring="Name of column to join on")
}

OPTION_TYPES = {
"on": {
"type": "string",
"name": "Join Column",
"desc": "Name of column to join on"
}
}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data, flow_vars):
# Join cannot accept more than 2 input DataFrames
# TODO: Add more error-checking if 1, or no, DataFrames passed through
try:
NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key)
first_df = pd.DataFrame.from_dict(predecessor_data[0])
second_df = pd.DataFrame.from_dict(predecessor_data[1])
combined_df = pd.merge(first_df, second_df, on=self.options["on"])
combined_df = pd.merge(first_df, second_df,
on=self.options["on"].get_value())
return combined_df.to_json()
except Exception as e:
raise NodeException('join', str(e))
Expand Down Expand Up @@ -437,6 +349,9 @@ def validate_predecessor_data(predecessor_data_len, num_in, node_key):

@staticmethod
def replace_flow_vars(node_options, flow_vars):
# TODO: this will no longer work with the Node.options descriptor,
# which uses Node.option_values to populate the Parameter
# class values upon access
for var in flow_vars:
node_options[var['var_name']] = var['default_value']

Expand Down
Loading