Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyworkflow/pyworkflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .workflow import Workflow, WorkflowException
from .node import Node, IONode, ManipulationNode, ReadCsvNode, WriteCsvNode, JoinNode, NodeException
from .node_factory import node_factory
from .node import *
from .node_factory import node_factory, create_node
67 changes: 59 additions & 8 deletions pyworkflow/pyworkflow/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pandas as pd


class Node:
"""Node object

Expand All @@ -11,14 +12,16 @@ def __init__(self, node_info, options=None):
self.node_key = node_info.get('node_key')
self.data = node_info.get('data')

self.is_global = True if node_info.get('is_global') else False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor suggestion: self.is_global = node_info.get('is_global') is True (or make it a property)

Do we have a mechanism to designate a flow variable as global? If it's given as an option to be configured by the user, then this line will need to check self.options, right?


# Execution options are passed up from children
self.options = options or dict()

# User-override takes precedence
if node_info.get("options"):
self.options.update(node_info["options"])

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
pass

def validate(self):
Expand All @@ -28,6 +31,48 @@ def __str__(self):
return "Test"


class FlowNode(Node):
"""FlowNode object
"""
DEFAULT_OPTIONS = {

}

def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**FlowNode.DEFAULT_OPTIONS, **options})


class StringNode(FlowNode):
"""StringNode object

Allows for Strings to replace 'string' fields in Nodes
"""
name = "String Input"
num_in = 1
num_out = 1
color = 'purple'

DEFAULT_OPTIONS = {
'default_value': None,
'var_name': 'my_var',
}

OPTION_TYPES = {
'default_value': {
"type": "string",
"name": "Default Value",
"desc": "Value this Node will pass as a flow variable"
},
'var_name': {
"type": "string",
"name": "Variable Name",
"desc": "Name of the variable to use in another Node"
}
}

def __init__(self, node_info):
super().__init__(node_info)

class IONode(Node):
"""IONodes deal with file-handling in/out of the Workflow.

Expand All @@ -44,7 +89,7 @@ class IONode(Node):
def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**IONode.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
pass

def validate(self):
Expand Down Expand Up @@ -92,11 +137,11 @@ class ReadCsvNode(IONode):
def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
try:
# TODO: FileStorage implemented in Django to store in /tmp
# Better filename/path handling should be implemented.

NodeUtils.replace_flow_vars(self.options, flow_vars)
df = pd.read_csv(**self.options)
return df.to_json()
except Exception as e:
Expand Down Expand Up @@ -127,7 +172,7 @@ class WriteCsvNode(IONode):
def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
try:
# Write CSV needs exactly 1 input DataFrame
NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key)
Expand Down Expand Up @@ -157,7 +202,7 @@ class ManipulationNode(Node):
def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**ManipulationNode.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
pass

def validate(self):
Expand Down Expand Up @@ -185,7 +230,7 @@ class PivotNode(ManipulationNode):
def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
try:
NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key)
input_df = pd.DataFrame.from_dict(predecessor_data[0])
Expand All @@ -205,7 +250,7 @@ class JoinNode(ManipulationNode):
def __init__(self, node_info, options=dict()):
super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options})

def execute(self, predecessor_data):
def execute(self, predecessor_data, flow_vars):
# Join cannot accept more than 2 input DataFrames
# TODO: Add more error-checking if 1, or no, DataFrames passed through
try:
Expand Down Expand Up @@ -245,3 +290,9 @@ def validate_predecessor_data(predecessor_data_len, num_in, node_key):
exception_txt % (node_key, num_in, predecessor_data_len)
)

@staticmethod
def replace_flow_vars(node_options, flow_vars):
for var in flow_vars:
node_options[var['var_name']] = var['default_value']

return
18 changes: 18 additions & 0 deletions pyworkflow/pyworkflow/node_factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from .node import *
import json


def create_node(payload):
json_data = json.loads(payload)

try:
return node_factory(json_data)
except OSError as e:
raise NodeException('create_node', 'Problem parsing JSON')

def node_factory(node_info):
# Create a new Node with info
# TODO: should perform error-checking or add default values if missing
Expand All @@ -11,12 +20,21 @@ def node_factory(node_info):
new_node = io_node(node_key, node_info)
elif node_type == 'ManipulationNode':
new_node = manipulation_node(node_key, node_info)
elif node_type == 'FlowNode':
new_node = flow_node(node_key, node_info)
else:
new_node = None

return new_node


def flow_node(node_key, node_info):
if node_key == 'StringNode':
return StringNode(node_info)
else:
return None


def io_node(node_key, node_info):
if node_key == 'ReadCsvNode':
return ReadCsvNode(node_info)
Expand Down
Loading