diff --git a/pyworkflow/pyworkflow/__init__.py b/pyworkflow/pyworkflow/__init__.py index e4eb1d2..6674170 100644 --- a/pyworkflow/pyworkflow/__init__.py +++ b/pyworkflow/pyworkflow/__init__.py @@ -1,3 +1,3 @@ from .workflow import Workflow, WorkflowException -from .node import Node, IONode, ManipulationNode, ReadCsvNode, WriteCsvNode, JoinNode, NodeException -from .node_factory import node_factory +from .node import * +from .node_factory import node_factory, create_node diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index b5827e9..02d36f6 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,5 +1,6 @@ import pandas as pd + class Node: """Node object @@ -11,6 +12,8 @@ def __init__(self, node_info, options=None): self.node_key = node_info.get('node_key') self.data = node_info.get('data') + self.is_global = True if node_info.get('is_global') else False + # Execution options are passed up from children self.options = options or dict() @@ -18,7 +21,7 @@ def __init__(self, node_info, options=None): if node_info.get("options"): self.options.update(node_info["options"]) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): pass def validate(self): @@ -28,6 +31,48 @@ def __str__(self): return "Test" +class FlowNode(Node): + """FlowNode object + """ + DEFAULT_OPTIONS = { + + } + + def __init__(self, node_info, options=dict()): + super().__init__(node_info, {**FlowNode.DEFAULT_OPTIONS, **options}) + + +class StringNode(FlowNode): + """StringNode object + + Allows for Strings to replace 'string' fields in Nodes + """ + name = "String Input" + num_in = 1 + num_out = 1 + color = 'purple' + + DEFAULT_OPTIONS = { + 'default_value': None, + 'var_name': 'my_var', + } + + OPTION_TYPES = { + 'default_value': { + "type": "string", + "name": "Default Value", + "desc": "Value this Node will pass as a flow variable" + }, + 'var_name': { + "type": "string", + "name": "Variable Name", + "desc": "Name of the variable to use in another Node" + } + } + + def __init__(self, node_info): + super().__init__(node_info) + class IONode(Node): """IONodes deal with file-handling in/out of the Workflow. @@ -44,7 +89,7 @@ class IONode(Node): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**IONode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): pass def validate(self): @@ -92,11 +137,11 @@ class ReadCsvNode(IONode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): try: # TODO: FileStorage implemented in Django to store in /tmp # Better filename/path handling should be implemented. - + NodeUtils.replace_flow_vars(self.options, flow_vars) df = pd.read_csv(**self.options) return df.to_json() except Exception as e: @@ -127,7 +172,7 @@ class WriteCsvNode(IONode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): try: # Write CSV needs exactly 1 input DataFrame NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) @@ -157,7 +202,7 @@ class ManipulationNode(Node): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**ManipulationNode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): pass def validate(self): @@ -185,7 +230,7 @@ class PivotNode(ManipulationNode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): try: NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) input_df = pd.DataFrame.from_dict(predecessor_data[0]) @@ -205,7 +250,7 @@ class JoinNode(ManipulationNode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): # Join cannot accept more than 2 input DataFrames # TODO: Add more error-checking if 1, or no, DataFrames passed through try: @@ -245,3 +290,9 @@ def validate_predecessor_data(predecessor_data_len, num_in, node_key): exception_txt % (node_key, num_in, predecessor_data_len) ) + @staticmethod + def replace_flow_vars(node_options, flow_vars): + for var in flow_vars: + node_options[var['var_name']] = var['default_value'] + + return diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index 3b43887..f05be97 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -1,6 +1,15 @@ from .node import * +import json +def create_node(payload): + json_data = json.loads(payload) + + try: + return node_factory(json_data) + except OSError as e: + raise NodeException('create_node', 'Problem parsing JSON') + def node_factory(node_info): # Create a new Node with info # TODO: should perform error-checking or add default values if missing @@ -11,12 +20,21 @@ def node_factory(node_info): new_node = io_node(node_key, node_info) elif node_type == 'ManipulationNode': new_node = manipulation_node(node_key, node_info) + elif node_type == 'FlowNode': + new_node = flow_node(node_key, node_info) else: new_node = None return new_node +def flow_node(node_key, node_info): + if node_key == 'StringNode': + return StringNode(node_info) + else: + return None + + def io_node(node_key, node_info): if node_key == 'ReadCsvNode': return ReadCsvNode(node_info) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 3c7284c..2d8e31d 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -19,11 +19,12 @@ class Workflow: file_path: Location of a workflow file """ - def __init__(self, graph=nx.DiGraph(), file_path=None, workflow_name='a-name'): + def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph()): #TODO: need to discuss a way to generating the workflow name. For now passing a default name. self._graph = graph self._file_path = file_path - self._workflow_name = workflow_name + self._name = name + self._flow_vars = flow_vars @property def graph(self): @@ -44,6 +45,10 @@ def file_path(self, file_path: str): else: raise WorkflowException('set_file_path', 'File ' + file_path + ' is not JSON.') + @property + def flow_vars(self): + return self._flow_vars + def get_node(self, node_id): """Retrieves Node from workflow, if exists @@ -56,6 +61,18 @@ def get_node(self, node_id): node_info = self.graph.nodes[node_id] return node_factory(node_info) + def get_flow_var(self, node_id): + """Retrieves a global flow variable from workflow, if exists + + Return: + FlowNode object, if one exists. Otherwise, None. + """ + if self._flow_vars.has_node(node_id) is not True: + return None + + node_info = self.flow_vars.nodes[node_id] + return node_factory(node_info) + def update_or_add_node(self, node: Node): """ Update or add a Node object to the graph. @@ -65,20 +82,31 @@ def update_or_add_node(self, node: Node): TODO: * validate() always returns True; this should perform actual validation """ - if node.validate(): - # If Node not in graph yet, add it - if self._graph.has_node(node.node_id) is False: - self._graph.add_node(node.node_id) + # Do not add/update if invalid + if node.validate() is False: + raise WorkflowException('update_or_add_node', 'Node is invalid') - # Iterate through all Node attributes to add to graph - node_dict = node.__dict__ - for key in node_dict.keys(): - self._graph.nodes[node.node_id][key] = node_dict[key] + # Select the correct graph to modify + graph = self.flow_vars if node.is_global else self.graph + + if graph.has_node(node.node_id) is False: + graph.add_node(node.node_id) + + # NetworkX cannot store mutable data, so iterate through all Node + # attributes to add to graph + node_dict = node.__dict__ + for key in node_dict.keys(): + graph.nodes[node.node_id][key] = node_dict[key] return - def get_workflow_name(self): - return self._workflow_name + @property + def name(self): + return self._name + + @name.setter + def name(self, name: str): + self._name = name def add_edge(self, node_from: Node, node_to: Node): """ Add a Node object to the graph. @@ -132,7 +160,10 @@ def remove_node(self, node): WorkflowException: on issue with removing node from graph """ try: - self._graph.remove_node(node.node_id) + # Select the correct graph to modify + graph = self.flow_vars if node.is_global else self.graph + + graph.remove_node(node.node_id) except nx.NetworkXError: raise WorkflowException('remove_node', 'Node does not exist in graph.') @@ -165,20 +196,32 @@ def execute(self, node_id): if node_to_execute is None: raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) - # Read in any data from predecessor nodes + # Read in any data from predecessor/flow nodes + # TODO: This should work for local FlowNodes, but global flow_vars still + # need a way to be assigned/replace Node options preceding_data = list() + flow_vars = list() for predecessor in self.get_node_predecessors(node_id): try: - preceding_data.append(self.retrieve_node_data(self, predecessor)) + node_to_retrieve = self.get_node(predecessor) + + if node_to_retrieve is None: + raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % node_id) + + if node_to_retrieve.node_type == 'FlowNode': + flow_vars.append(node_to_retrieve.options) + else: + preceding_data.append(Workflow.retrieve_node_data(node_to_retrieve)) + except WorkflowException: # TODO: Should this append None, skip reading, or raise exception to view? preceding_data.append(None) # Pass in data to current Node to use in execution - output = node_to_execute.execute(preceding_data) + output = node_to_execute.execute(preceding_data, flow_vars) # Save new execution data to disk - node_to_execute.data = self.store_node_data(self, node_id, output) + node_to_execute.data = Workflow.store_node_data(self, node_id, output) if node_to_execute.data is None: raise WorkflowException('execute', 'There was a problem saving node output.') @@ -207,7 +250,7 @@ def store_node_data(workflow, node_id, data): Returns: """ - file_name = Workflow.generate_file_name(workflow.get_workflow_name(), node_id) + file_name = Workflow.generate_file_name(workflow.name, node_id) try: return fs.save(file_name, ContentFile(data)) @@ -215,15 +258,13 @@ def store_node_data(workflow, node_id, data): return None @staticmethod - def retrieve_node_data(workflow, node_id): + def retrieve_node_data(node_to_retrieve): """Retrieve Node data - Gets the Node specified by 'node_id' and attempts to read a saved - DataFrame if one exists. + Reads a saved DataFrame, referenced by the Node's 'data' attribute. Args: - workflow: The workflow containing the Node. - node_id: The Node containing a DataFrame saved to disk. + node_to_retrieve: The Node containing a DataFrame saved to disk. Returns: Contents of the file (a DataFrame) in a JSON object. @@ -232,11 +273,6 @@ def retrieve_node_data(workflow, node_id): WorkflowException: Node does not exist, file does not exist, or problem parsing the file. """ - node_to_retrieve = workflow.get_node(node_id) - - if node_to_retrieve is None: - raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % node_id) - try: with fs.open(node_to_retrieve.data) as f: return json.load(f) @@ -275,6 +311,9 @@ def generate_file_name(workflow_name, node_id): node_id: the id of the workflow """ #TODO: need to add validation + if workflow_name is None: + workflow_name = "a-name" + return workflow_name + '-' + str(node_id) @classmethod @@ -289,12 +328,19 @@ def from_session(cls, data): """ file_path = data.get('file_path') graph_data = data.get('graph') - workflow_name = data.get('workflow_name') + name = data.get('name') + flow_vars_data = data.get('flow_vars') if graph_data is None: graph = None else: graph = nx.readwrite.json_graph.node_link_graph(graph_data) - return cls(graph, file_path) + + if flow_vars_data is None: + flow_vars = None + else: + flow_vars = nx.readwrite.json_graph.node_link_graph(flow_vars_data) + + return cls(graph, file_path, name, flow_vars) @classmethod def from_file(cls, file_like): @@ -312,16 +358,18 @@ def from_request(cls, json_data): graph = nx.readwrite.json_graph.node_link_graph(json_data) return cls(graph) - def to_graph_json(self): - return nx.readwrite.json_graph.node_link_data(self.graph) + @staticmethod + def to_graph_json(graph): + return nx.readwrite.json_graph.node_link_data(graph) def to_session_dict(self): """Store Workflow information in the Django session. """ out = dict() - out['graph'] = self.to_graph_json() + out['graph'] = Workflow.to_graph_json(self.graph) out['file_path'] = self.file_path - out['workflow_name'] = self._workflow_name + out['name'] = self.name + out['flow_vars'] = Workflow.to_graph_json(self.flow_vars) return out diff --git a/vp/node/urls.py b/vp/node/urls.py index 9882088..11529a6 100644 --- a/vp/node/urls.py +++ b/vp/node/urls.py @@ -4,6 +4,7 @@ urlpatterns = [ path('', views.node, name='node'), path('', views.handle_node, name='handle node'), + path('global/', views.handle_node, name='handle node'), path('/execute', views.execute_node, name='execute node'), path('/retrieve_data', views.retrieve_data, name='retrieve data'), path('edge//', views.handle_edge, name='handle edge') diff --git a/vp/node/views.py b/vp/node/views.py index a033964..2f7dc48 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -4,7 +4,7 @@ from django.core.files.storage import FileSystemStorage from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt -from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory +from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory, create_node from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema from django.conf import settings @@ -34,7 +34,10 @@ def node(request): 405 - Method not allowed """ # Extract request info for node creation - new_node = create_node(request.body) + try: + new_node = create_node(request.body) + except NodeException as e: + return JsonResponse({e.action: e.reason}, status=400) # If None, create_node failed if new_node is None: @@ -42,17 +45,31 @@ def node(request): 'message': 'Missing required Node information' }, status=400) - # Check node_id is unique in graph - if request.pyworkflow.get_node(new_node.node_id) is not None: + # Check Node is unique in the graph + if new_node.is_global: + node_exists = request.pyworkflow.get_flow_var(new_node.node_id) + else: + node_exists = request.pyworkflow.get_node(new_node.node_id) + + if node_exists: return JsonResponse({ - 'message': 'A node with id %s already exists in the graph.' % new_node.node_id + 'message': 'A %s with id %s already exists in the graph.' % ( + 'flow variable' if new_node.is_global else 'node', + new_node.node_id + ) }, status=400) - # Add Node to graph - request.pyworkflow.update_or_add_node(new_node) + # Add to the graph + try: + request.pyworkflow.update_or_add_node(new_node) + except WorkflowException as e: + return JsonResponse({e.action: e.reason}, status=400) return JsonResponse({ - 'message': 'Added new node to graph with id: %s' % new_node.node_id + 'message': 'Added new %s to graph with id: %s' % ( + 'flow variable' if new_node.is_global else 'node', + new_node.node_id + ) }) @@ -143,8 +160,11 @@ def handle_node(request, node_id): 405 - Method not allowed 500 - Error processing Node change """ - # Check if the graph contains the requested Node - retrieved_node = request.pyworkflow.get_node(node_id) + # Retrieve the global or local Node + if request.path_info.startswith('/node/global/'): + retrieved_node = request.pyworkflow.get_flow_var(node_id) + else: + retrieved_node = request.pyworkflow.get_node(node_id) if retrieved_node is None: return JsonResponse({ @@ -167,6 +187,14 @@ def handle_node(request, node_id): 'updated_node': str(type(updated_node)), }, status=500) + # Nodes need to be the same type to update + if retrieved_node.is_global != updated_node.is_global: + return JsonResponse({ + 'message': 'Node scopes do not match. Need correct info.', + 'retrieved_node': str(retrieved_node.is_global), + 'updated_node': str(updated_node.is_global), + }, status=500) + request.pyworkflow.update_or_add_node(updated_node) response = JsonResponse(updated_node.__dict__, safe=False) elif request.method == 'DELETE': @@ -223,21 +251,8 @@ def execute_node(request, node_id): @api_view(['GET']) def retrieve_data(request, node_id): try: - data = Workflow.retrieve_node_data(request.pyworkflow, node_id) + node_to_retrieve = request.pyworkflow.get_node(node_id) + data = Workflow.retrieve_node_data(node_to_retrieve) return JsonResponse(data, safe=False, status=200) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) - - -def create_node(payload): - """Pass all request info to Node Factory. - - """ - json_data = json.loads(payload) - - try: - return node_factory(json_data) - except OSError as e: - return JsonResponse({'message': e.strerror}, status=404) - except NodeException as e: - return JsonResponse({e.action: e.reason}, status=400) diff --git a/vp/workflow/urls.py b/vp/workflow/urls.py index 3442503..2d6245e 100644 --- a/vp/workflow/urls.py +++ b/vp/workflow/urls.py @@ -7,6 +7,7 @@ path('save', views.save_workflow, name='save'), path('execute', views.execute_workflow, name='execute workflow'), path('execute//successors', views.get_successors, name='get node successors'), + path('globals', views.global_vars, name="retrieve global variables"), path('retrieve_csv/', views.retrieve_csv, name='retrieve csv'), path('upload', views.upload_file, name='upload file') ] diff --git a/vp/workflow/views.py b/vp/workflow/views.py index f5fe196..b4118fe 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -1,6 +1,5 @@ import json import csv -import inspect from django.http import JsonResponse, HttpResponse from django.conf import settings @@ -31,7 +30,7 @@ def new_workflow(request): request.pyworkflow = Workflow() request.session.update(request.pyworkflow.to_session_dict()) - return JsonResponse(request.pyworkflow.to_graph_json()) + return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) @swagger_auto_schema(method='post', @@ -120,7 +119,8 @@ def save_workflow(request): try: combined_json = json.dumps({ 'react': json.loads(request.body), - 'networkx': request.pyworkflow.to_graph_json(), + 'networkx': Workflow.to_graph_json(request.pyworkflow.graph), + 'flow_vars': Workflow.to_graph_json(request.pyworkflow.flow_vars), 'filename': request.pyworkflow.file_path }) @@ -155,6 +155,21 @@ def execute_workflow(request): return JsonResponse(order, safe=False) + +@swagger_auto_schema(method='get', + operation_summary='Retrieve list of global flow vars.', + operation_description='Retrieves a list of global flow vars.', + responses={ + 200: 'List of global flow variables.', + 404: 'No graph exists.', + 500: 'Error generating the topological sort for the graph.' + }) +@api_view(['GET']) +def global_vars(request): + graph_data = Workflow.to_graph_json(request.pyworkflow.flow_vars) + return JsonResponse(graph_data["nodes"], safe=False) + + @swagger_auto_schema(method='get', operation_summary='Retrieve sorted list of successors from a node.', operation_description='Retrieves a list of successor nodes, sorted in execution order.',