From 0e55c2c442b2ff6969a6e14bf4fc3a202140178f Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 5 Apr 2020 12:07:03 -0400 Subject: [PATCH 1/4] refactor: Move `create_node` method to node factory --- pyworkflow/pyworkflow/__init__.py | 2 +- pyworkflow/pyworkflow/node_factory.py | 9 +++++++++ vp/node/views.py | 21 +++++---------------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/pyworkflow/pyworkflow/__init__.py b/pyworkflow/pyworkflow/__init__.py index e4eb1d2..3507b4b 100644 --- a/pyworkflow/pyworkflow/__init__.py +++ b/pyworkflow/pyworkflow/__init__.py @@ -1,3 +1,3 @@ from .workflow import Workflow, WorkflowException from .node import Node, IONode, ManipulationNode, ReadCsvNode, WriteCsvNode, JoinNode, NodeException -from .node_factory import node_factory +from .node_factory import node_factory, create_node diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index 3b43887..c48c704 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -1,6 +1,15 @@ from .node import * +import json +def create_node(payload): + json_data = json.loads(payload) + + try: + return node_factory(json_data) + except OSError as e: + raise NodeException('create_node', 'Problem parsing JSON') + def node_factory(node_info): # Create a new Node with info # TODO: should perform error-checking or add default values if missing diff --git a/vp/node/views.py b/vp/node/views.py index a033964..130c4bf 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -4,7 +4,7 @@ from django.core.files.storage import FileSystemStorage from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt -from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory +from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory, create_node from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema from django.conf import settings @@ -34,7 +34,10 @@ def node(request): 405 - Method not allowed """ # Extract request info for node creation - new_node = create_node(request.body) + try: + new_node = create_node(request.body) + except NodeException as e: + return JsonResponse({e.action: e.reason}, status=400) # If None, create_node failed if new_node is None: @@ -227,17 +230,3 @@ def retrieve_data(request, node_id): return JsonResponse(data, safe=False, status=200) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) - - -def create_node(payload): - """Pass all request info to Node Factory. - - """ - json_data = json.loads(payload) - - try: - return node_factory(json_data) - except OSError as e: - return JsonResponse({'message': e.strerror}, status=404) - except NodeException as e: - return JsonResponse({e.action: e.reason}, status=400) From c0376796d33efb8e044124b08776df64309eb819 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 5 Apr 2020 12:21:13 -0400 Subject: [PATCH 2/4] refactor: Change 'workflow_name' to @property getter --- pyworkflow/pyworkflow/workflow.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 3c7284c..c7656e1 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -19,11 +19,11 @@ class Workflow: file_path: Location of a workflow file """ - def __init__(self, graph=nx.DiGraph(), file_path=None, workflow_name='a-name'): + def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name'): #TODO: need to discuss a way to generating the workflow name. For now passing a default name. self._graph = graph self._file_path = file_path - self._workflow_name = workflow_name + self._name = name @property def graph(self): @@ -77,8 +77,13 @@ def update_or_add_node(self, node: Node): return - def get_workflow_name(self): - return self._workflow_name + @property + def name(self): + return self._name + + @name.setter + def name(self, name: str): + self._name = name def add_edge(self, node_from: Node, node_to: Node): """ Add a Node object to the graph. @@ -207,7 +212,7 @@ def store_node_data(workflow, node_id, data): Returns: """ - file_name = Workflow.generate_file_name(workflow.get_workflow_name(), node_id) + file_name = Workflow.generate_file_name(workflow.name, node_id) try: return fs.save(file_name, ContentFile(data)) @@ -275,6 +280,9 @@ def generate_file_name(workflow_name, node_id): node_id: the id of the workflow """ #TODO: need to add validation + if workflow_name is None: + workflow_name = "a-name" + return workflow_name + '-' + str(node_id) @classmethod @@ -289,7 +297,7 @@ def from_session(cls, data): """ file_path = data.get('file_path') graph_data = data.get('graph') - workflow_name = data.get('workflow_name') + name = data.get('name') if graph_data is None: graph = None else: @@ -321,7 +329,7 @@ def to_session_dict(self): out = dict() out['graph'] = self.to_graph_json() out['file_path'] = self.file_path - out['workflow_name'] = self._workflow_name + out['name'] = self.name return out From 46ee33c3b9df3405b1d7bcdf2ae3b24dd7ba9a0a Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 5 Apr 2020 12:27:46 -0400 Subject: [PATCH 3/4] feat: Add FlowNode classes; create new graph for global flow_vars Initial implementation for local/global flow variables. In KNIME, the two key pieces of information are the 'var_name' and the 'default_value'. Global vs. local will behave much the same way, so they are implemented as Nodes that can be added to the main workflow graph (local) or the new global graph (flow_vars). --- pyworkflow/pyworkflow/__init__.py | 2 +- pyworkflow/pyworkflow/node.py | 44 +++++++++++++++++++ pyworkflow/pyworkflow/node_factory.py | 9 ++++ pyworkflow/pyworkflow/workflow.py | 63 +++++++++++++++++++++------ vp/node/urls.py | 1 + vp/node/views.py | 41 +++++++++++++---- vp/workflow/urls.py | 1 + vp/workflow/views.py | 21 +++++++-- 8 files changed, 156 insertions(+), 26 deletions(-) diff --git a/pyworkflow/pyworkflow/__init__.py b/pyworkflow/pyworkflow/__init__.py index 3507b4b..6674170 100644 --- a/pyworkflow/pyworkflow/__init__.py +++ b/pyworkflow/pyworkflow/__init__.py @@ -1,3 +1,3 @@ from .workflow import Workflow, WorkflowException -from .node import Node, IONode, ManipulationNode, ReadCsvNode, WriteCsvNode, JoinNode, NodeException +from .node import * from .node_factory import node_factory, create_node diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 87ff122..e2c0135 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -14,6 +14,8 @@ def __init__(self, node_info, options=None): self.node_key = node_info.get('node_key') self.data = node_info.get('data') + self.is_global = True if node_info.get('is_global') else False + # Execution options are passed up from children self.options = options or dict() @@ -31,6 +33,48 @@ def __str__(self): return "Test" +class FlowNode(Node): + """FlowNode object + """ + DEFAULT_OPTIONS = { + + } + + def __init__(self, node_info, options=dict()): + super().__init__(node_info, {**FlowNode.DEFAULT_OPTIONS, **options}) + + +class StringNode(FlowNode): + """StringNode object + + Allows for Strings to replace 'string' fields in Nodes + """ + name = "String Input" + num_in = 1 + num_out = 1 + color = 'purple' + + DEFAULT_OPTIONS = { + 'default_value': None, + 'var_name': 'my_var', + } + + OPTION_TYPES = { + 'default_value': { + "type": "string", + "name": "Default Value", + "desc": "Value this Node will pass as a flow variable" + }, + 'var_name': { + "type": "string", + "name": "Variable Name", + "desc": "Name of the variable to use in another Node" + } + } + + def __init__(self, node_info): + super().__init__(node_info) + class IONode(Node): """IONodes deal with file-handling in/out of the Workflow. diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index c48c704..f05be97 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -20,12 +20,21 @@ def node_factory(node_info): new_node = io_node(node_key, node_info) elif node_type == 'ManipulationNode': new_node = manipulation_node(node_key, node_info) + elif node_type == 'FlowNode': + new_node = flow_node(node_key, node_info) else: new_node = None return new_node +def flow_node(node_key, node_info): + if node_key == 'StringNode': + return StringNode(node_info) + else: + return None + + def io_node(node_key, node_info): if node_key == 'ReadCsvNode': return ReadCsvNode(node_info) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index c7656e1..95b5718 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -19,11 +19,12 @@ class Workflow: file_path: Location of a workflow file """ - def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name'): + def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph()): #TODO: need to discuss a way to generating the workflow name. For now passing a default name. self._graph = graph self._file_path = file_path self._name = name + self._flow_vars = flow_vars @property def graph(self): @@ -44,6 +45,10 @@ def file_path(self, file_path: str): else: raise WorkflowException('set_file_path', 'File ' + file_path + ' is not JSON.') + @property + def flow_vars(self): + return self._flow_vars + def get_node(self, node_id): """Retrieves Node from workflow, if exists @@ -56,6 +61,18 @@ def get_node(self, node_id): node_info = self.graph.nodes[node_id] return node_factory(node_info) + def get_flow_var(self, node_id): + """Retrieves a global flow variable from workflow, if exists + + Return: + FlowNode object, if one exists. Otherwise, None. + """ + if self._flow_vars.has_node(node_id) is not True: + return None + + node_info = self.flow_vars.nodes[node_id] + return node_factory(node_info) + def update_or_add_node(self, node: Node): """ Update or add a Node object to the graph. @@ -65,15 +82,21 @@ def update_or_add_node(self, node: Node): TODO: * validate() always returns True; this should perform actual validation """ - if node.validate(): - # If Node not in graph yet, add it - if self._graph.has_node(node.node_id) is False: - self._graph.add_node(node.node_id) + # Do not add/update if invalid + if node.validate() is False: + raise WorkflowException('update_or_add_node', 'Node is invalid') + + # Select the correct graph to modify + graph = self.flow_vars if node.is_global else self.graph - # Iterate through all Node attributes to add to graph - node_dict = node.__dict__ - for key in node_dict.keys(): - self._graph.nodes[node.node_id][key] = node_dict[key] + if graph.has_node(node.node_id) is False: + graph.add_node(node.node_id) + + # NetworkX cannot store mutable data, so iterate through all Node + # attributes to add to graph + node_dict = node.__dict__ + for key in node_dict.keys(): + graph.nodes[node.node_id][key] = node_dict[key] return @@ -137,7 +160,10 @@ def remove_node(self, node): WorkflowException: on issue with removing node from graph """ try: - self._graph.remove_node(node.node_id) + # Select the correct graph to modify + graph = self.flow_vars if node.is_global else self.graph + + graph.remove_node(node.node_id) except nx.NetworkXError: raise WorkflowException('remove_node', 'Node does not exist in graph.') @@ -298,11 +324,18 @@ def from_session(cls, data): file_path = data.get('file_path') graph_data = data.get('graph') name = data.get('name') + flow_vars_data = data.get('flow_vars') if graph_data is None: graph = None else: graph = nx.readwrite.json_graph.node_link_graph(graph_data) - return cls(graph, file_path) + + if flow_vars_data is None: + flow_vars = None + else: + flow_vars = nx.readwrite.json_graph.node_link_graph(flow_vars_data) + + return cls(graph, file_path, name, flow_vars) @classmethod def from_file(cls, file_like): @@ -320,16 +353,18 @@ def from_request(cls, json_data): graph = nx.readwrite.json_graph.node_link_graph(json_data) return cls(graph) - def to_graph_json(self): - return nx.readwrite.json_graph.node_link_data(self.graph) + @staticmethod + def to_graph_json(graph): + return nx.readwrite.json_graph.node_link_data(graph) def to_session_dict(self): """Store Workflow information in the Django session. """ out = dict() - out['graph'] = self.to_graph_json() + out['graph'] = Workflow.to_graph_json(self.graph) out['file_path'] = self.file_path out['name'] = self.name + out['flow_vars'] = Workflow.to_graph_json(self.flow_vars) return out diff --git a/vp/node/urls.py b/vp/node/urls.py index 9882088..11529a6 100644 --- a/vp/node/urls.py +++ b/vp/node/urls.py @@ -4,6 +4,7 @@ urlpatterns = [ path('', views.node, name='node'), path('', views.handle_node, name='handle node'), + path('global/', views.handle_node, name='handle node'), path('/execute', views.execute_node, name='execute node'), path('/retrieve_data', views.retrieve_data, name='retrieve data'), path('edge//', views.handle_edge, name='handle edge') diff --git a/vp/node/views.py b/vp/node/views.py index 130c4bf..17d0860 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -45,17 +45,31 @@ def node(request): 'message': 'Missing required Node information' }, status=400) - # Check node_id is unique in graph - if request.pyworkflow.get_node(new_node.node_id) is not None: + # Check Node is unique in the graph + if new_node.is_global: + node_exists = request.pyworkflow.get_flow_var(new_node.node_id) + else: + node_exists = request.pyworkflow.get_node(new_node.node_id) + + if node_exists: return JsonResponse({ - 'message': 'A node with id %s already exists in the graph.' % new_node.node_id + 'message': 'A %s with id %s already exists in the graph.' % ( + 'flow variable' if new_node.is_global else 'node', + new_node.node_id + ) }, status=400) - # Add Node to graph - request.pyworkflow.update_or_add_node(new_node) + # Add to the graph + try: + request.pyworkflow.update_or_add_node(new_node) + except WorkflowException as e: + return JsonResponse({e.action: e.reason}, status=400) return JsonResponse({ - 'message': 'Added new node to graph with id: %s' % new_node.node_id + 'message': 'Added new %s to graph with id: %s' % ( + 'flow variable' if new_node.is_global else 'node', + new_node.node_id + ) }) @@ -146,8 +160,11 @@ def handle_node(request, node_id): 405 - Method not allowed 500 - Error processing Node change """ - # Check if the graph contains the requested Node - retrieved_node = request.pyworkflow.get_node(node_id) + # Retrieve the global or local Node + if request.path_info.startswith('/node/global/'): + retrieved_node = request.pyworkflow.get_flow_var(node_id) + else: + retrieved_node = request.pyworkflow.get_node(node_id) if retrieved_node is None: return JsonResponse({ @@ -170,6 +187,14 @@ def handle_node(request, node_id): 'updated_node': str(type(updated_node)), }, status=500) + # Nodes need to be the same type to update + if retrieved_node.is_global != updated_node.is_global: + return JsonResponse({ + 'message': 'Node scopes do not match. Need correct info.', + 'retrieved_node': str(retrieved_node.is_global), + 'updated_node': str(updated_node.is_global), + }, status=500) + request.pyworkflow.update_or_add_node(updated_node) response = JsonResponse(updated_node.__dict__, safe=False) elif request.method == 'DELETE': diff --git a/vp/workflow/urls.py b/vp/workflow/urls.py index a01c63f..2b943a1 100644 --- a/vp/workflow/urls.py +++ b/vp/workflow/urls.py @@ -7,5 +7,6 @@ path('save', views.save_workflow, name='save'), path('execute', views.execute_workflow, name='execute workflow'), path('execute//successors', views.get_successors, name='get node successors'), + path('globals', views.global_vars, name="retrieve global variables"), path('retrieve_csv/', views.retrieve_csv, name='retrieve csv') ] diff --git a/vp/workflow/views.py b/vp/workflow/views.py index ccc187a..af486ad 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -1,6 +1,5 @@ import json import csv -import inspect from django.http import JsonResponse, HttpResponse from django.conf import settings @@ -31,7 +30,7 @@ def new_workflow(request): request.pyworkflow = Workflow() request.session.update(request.pyworkflow.to_session_dict()) - return JsonResponse(request.pyworkflow.to_graph_json()) + return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) @swagger_auto_schema(method='post', @@ -120,7 +119,8 @@ def save_workflow(request): try: combined_json = json.dumps({ 'react': json.loads(request.body), - 'networkx': request.pyworkflow.to_graph_json(), + 'networkx': Workflow.to_graph_json(request.pyworkflow.graph), + 'flow_vars': Workflow.to_graph_json(request.pyworkflow.flow_vars), 'filename': request.pyworkflow.file_path }) @@ -155,6 +155,21 @@ def execute_workflow(request): return JsonResponse(order, safe=False) + +@swagger_auto_schema(method='get', + operation_summary='Retrieve list of global flow vars.', + operation_description='Retrieves a list of global flow vars.', + responses={ + 200: 'List of global flow variables.', + 404: 'No graph exists.', + 500: 'Error generating the topological sort for the graph.' + }) +@api_view(['GET']) +def global_vars(request): + graph_data = Workflow.to_graph_json(request.pyworkflow.flow_vars) + return JsonResponse(graph_data["nodes"], safe=False) + + @swagger_auto_schema(method='get', operation_summary='Retrieve sorted list of successors from a node.', operation_description='Retrieves a list of successor nodes, sorted in execution order.', From ba88a024cfee735e27dca9aaecc66bc96ae40406 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 5 Apr 2020 14:12:17 -0400 Subject: [PATCH 4/4] feat: Use local FlowNodes to replace Node options Added to separate input for Node execution. Now accepts 'preceding_data' and 'flow_vars' as input to use during execution. --- pyworkflow/pyworkflow/node.py | 23 +++++++++++++-------- pyworkflow/pyworkflow/workflow.py | 33 ++++++++++++++++++------------- vp/node/views.py | 3 ++- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index d50975c..02d36f6 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,5 +1,6 @@ import pandas as pd + class Node: """Node object @@ -20,7 +21,7 @@ def __init__(self, node_info, options=None): if node_info.get("options"): self.options.update(node_info["options"]) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): pass def validate(self): @@ -88,7 +89,7 @@ class IONode(Node): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**IONode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): pass def validate(self): @@ -136,11 +137,11 @@ class ReadCsvNode(IONode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): try: # TODO: FileStorage implemented in Django to store in /tmp # Better filename/path handling should be implemented. - + NodeUtils.replace_flow_vars(self.options, flow_vars) df = pd.read_csv(**self.options) return df.to_json() except Exception as e: @@ -171,7 +172,7 @@ class WriteCsvNode(IONode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): try: # Write CSV needs exactly 1 input DataFrame NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) @@ -201,7 +202,7 @@ class ManipulationNode(Node): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**ManipulationNode.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): pass def validate(self): @@ -229,7 +230,7 @@ class PivotNode(ManipulationNode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): try: NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) input_df = pd.DataFrame.from_dict(predecessor_data[0]) @@ -249,7 +250,7 @@ class JoinNode(ManipulationNode): def __init__(self, node_info, options=dict()): super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data): + def execute(self, predecessor_data, flow_vars): # Join cannot accept more than 2 input DataFrames # TODO: Add more error-checking if 1, or no, DataFrames passed through try: @@ -289,3 +290,9 @@ def validate_predecessor_data(predecessor_data_len, num_in, node_key): exception_txt % (node_key, num_in, predecessor_data_len) ) + @staticmethod + def replace_flow_vars(node_options, flow_vars): + for var in flow_vars: + node_options[var['var_name']] = var['default_value'] + + return diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 95b5718..2d8e31d 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -196,20 +196,32 @@ def execute(self, node_id): if node_to_execute is None: raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) - # Read in any data from predecessor nodes + # Read in any data from predecessor/flow nodes + # TODO: This should work for local FlowNodes, but global flow_vars still + # need a way to be assigned/replace Node options preceding_data = list() + flow_vars = list() for predecessor in self.get_node_predecessors(node_id): try: - preceding_data.append(self.retrieve_node_data(self, predecessor)) + node_to_retrieve = self.get_node(predecessor) + + if node_to_retrieve is None: + raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % node_id) + + if node_to_retrieve.node_type == 'FlowNode': + flow_vars.append(node_to_retrieve.options) + else: + preceding_data.append(Workflow.retrieve_node_data(node_to_retrieve)) + except WorkflowException: # TODO: Should this append None, skip reading, or raise exception to view? preceding_data.append(None) # Pass in data to current Node to use in execution - output = node_to_execute.execute(preceding_data) + output = node_to_execute.execute(preceding_data, flow_vars) # Save new execution data to disk - node_to_execute.data = self.store_node_data(self, node_id, output) + node_to_execute.data = Workflow.store_node_data(self, node_id, output) if node_to_execute.data is None: raise WorkflowException('execute', 'There was a problem saving node output.') @@ -246,15 +258,13 @@ def store_node_data(workflow, node_id, data): return None @staticmethod - def retrieve_node_data(workflow, node_id): + def retrieve_node_data(node_to_retrieve): """Retrieve Node data - Gets the Node specified by 'node_id' and attempts to read a saved - DataFrame if one exists. + Reads a saved DataFrame, referenced by the Node's 'data' attribute. Args: - workflow: The workflow containing the Node. - node_id: The Node containing a DataFrame saved to disk. + node_to_retrieve: The Node containing a DataFrame saved to disk. Returns: Contents of the file (a DataFrame) in a JSON object. @@ -263,11 +273,6 @@ def retrieve_node_data(workflow, node_id): WorkflowException: Node does not exist, file does not exist, or problem parsing the file. """ - node_to_retrieve = workflow.get_node(node_id) - - if node_to_retrieve is None: - raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % node_id) - try: with fs.open(node_to_retrieve.data) as f: return json.load(f) diff --git a/vp/node/views.py b/vp/node/views.py index 17d0860..2f7dc48 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -251,7 +251,8 @@ def execute_node(request, node_id): @api_view(['GET']) def retrieve_data(request, node_id): try: - data = Workflow.retrieve_node_data(request.pyworkflow, node_id) + node_to_retrieve = request.pyworkflow.get_node(node_id) + data = Workflow.retrieve_node_data(node_to_retrieve) return JsonResponse(data, safe=False, status=200) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500)