From 23f1c40d7ad2ce7aa8f5ce53dd2ae84b4a18f052 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 19 Apr 2020 09:36:40 -0400 Subject: [PATCH 01/20] feat: Upload custom node file --- pyworkflow/pyworkflow/workflow.py | 14 ++++++++++++++ vp/workflow/urls.py | 3 ++- vp/workflow/views.py | 27 +++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 7930beb..964b897 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -19,9 +19,14 @@ class Workflow: def __init__(self, name="Untitled", root_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): self._name = name self._root_dir = WorkflowUtils.set_root_dir(root_dir) + self._custom_node_dir = WorkflowUtils.set_custom_nodes_dir() self._graph = graph self._flow_vars = flow_vars + @property + def custom_node_dir(self): + return self._custom_node_dir + @property def graph(self): return self._graph @@ -404,6 +409,15 @@ def execute_workflow(workflow_location): class WorkflowUtils: + @staticmethod + def set_custom_nodes_dir(): + custom_node_dir = os.path.join(os.getcwd(), '../pyworkflow/custom_nodes') + + if not os.path.exists(custom_node_dir): + os.makedirs(custom_node_dir) + + return custom_node_dir + @staticmethod def set_root_dir(root_dir): if root_dir is None: diff --git a/vp/workflow/urls.py b/vp/workflow/urls.py index 26f1ad7..dceaca1 100644 --- a/vp/workflow/urls.py +++ b/vp/workflow/urls.py @@ -10,5 +10,6 @@ path('execute//successors', views.get_successors, name='get node successors'), path('globals', views.global_vars, name="retrieve global variables"), path('upload', views.upload_file, name='upload file'), - path('download', views.download_file, name='download file') + path('download', views.download_file, name='download file'), + path('custom_node', views.add_custom_node, name='add custom node') ] diff --git a/vp/workflow/views.py b/vp/workflow/views.py index a76c28c..9ebd497 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -216,6 +216,33 @@ def get_successors(request, node_id): return JsonResponse(order, safe=False) +@swagger_auto_schema(method='post', + operation_summary='Uploads a custom node to server.', + operation_description='Uploads a custom node to server location.', + responses={ + 200: 'File uploaded', + 404: 'No specified file' + }) +@api_view(['POST']) +def add_custom_node(request): + node_file = request.FILES.get('file') + + if node_file is None: + return JsonResponse("Empty content", status=404) + + to_open = os.path.join(request.pyworkflow.custom_node_dir, node_file.name) + + try: + with open(to_open, 'wb') as f: + f.write(node_file.read()) + except OSError as e: + return JsonResponse({'message': str(e)}, status=500) + + node_file.close() + + return JsonResponse({"filename": to_open}, status=201, safe=False) + + @swagger_auto_schema(method='post', operation_summary='Uploads a file to server.', operation_description='Uploads a new file to server location.', From 3e77c5ac0607d07d86fba86c742f207389b91304 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 19 Apr 2020 09:37:04 -0400 Subject: [PATCH 02/20] feat: Retrieve available custom nodes --- vp/vp/urls.py | 1 + vp/vp/views.py | 56 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/vp/vp/urls.py b/vp/vp/urls.py index 88caaaf..85bbe74 100644 --- a/vp/vp/urls.py +++ b/vp/vp/urls.py @@ -39,6 +39,7 @@ path('admin/', admin.site.urls), path('info/', views.info), path('nodes/', views.retrieve_nodes_for_user), + path('custom_nodes/', views.retrieve_custom_nodes_for_user), path('node/', include('node.urls')), path('workflow/', include('workflow.urls')) ] diff --git a/vp/vp/views.py b/vp/vp/views.py index 898c095..2c55761 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -3,6 +3,9 @@ from drf_yasg.utils import swagger_auto_schema from pyworkflow import Node +import os +import inspect + @swagger_auto_schema(method='get', responses={200:'JSON response with data'}) @api_view(['GET']) @@ -62,4 +65,55 @@ def retrieve_nodes_for_user(request): data[key].append(child_node) - return JsonResponse(data) \ No newline at end of file + return JsonResponse(data) + + +@swagger_auto_schema(method='get', + operation_summary='Retrieve a list of custom Nodes', + operation_description='Retrieves a list of custom Nodes, in JSON.', + responses={ + 200: 'List of installed Nodes, in JSON', + }) +@api_view(['GET']) +def retrieve_custom_nodes_for_user(request): + data = dict() + + # TODO: Workflow loading excluded in middleware for this route + # Should probably have a way to access the 'custom_node` dir dynamically + custom_node_path = os.path.join(os.getcwd(), '../pyworkflow/custom_nodes') + + try: + nodes = os.listdir(custom_node_path) + except OSError as e: + return JsonResponse({"message": str(e)}, status=500) + + for node in nodes: + # Parse file type + node_name, ext = os.path.splitext(node) + + try: + package = __import__('custom_nodes.' + node_name) + module = getattr(package, node_name) + except ModuleNotFoundError as e: + # TODO: This will only catch the first missing package. Can we get more? + data[node_name] = f"Please install missing packages and restart the server. Missing '{e.name}'" + continue + + for name, klass in inspect.getmembers(module): + if inspect.isclass(klass) and klass.__module__.startswith('custom_nodes.'): + custom_node = { + 'name': klass.name, + 'node_key': name, + 'node_type': node_name, + 'num_in': klass.num_in, + 'num_out': klass.num_out, + 'color': klass.color or 'black', + 'doc': klass.__doc__, + 'options': {k: v.get_value() for k, v in klass.options.items()}, + 'option_types': klass.option_types, + 'download_result': getattr(klass, "download_result", False) + } + + data[node_name] = custom_node + + return JsonResponse(data, safe=False) From 8e272686be190d7d46345fcc754892fc3f064cda Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sun, 19 Apr 2020 09:37:22 -0400 Subject: [PATCH 03/20] feat: Add custom node creation to `node_factory` --- pyworkflow/pyworkflow/node_factory.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index e295790..3fde202 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -14,7 +14,7 @@ def node_factory(node_info): elif node_type == 'FlowNode': new_node = flow_node(node_key, node_info) else: - new_node = None + new_node = custom_node(node_type, node_key, node_info) return new_node @@ -46,3 +46,15 @@ def manipulation_node(node_key, node_info): return FilterNode(node_info) else: return None + +def custom_node(filename, node_key, node_info): + try: + package = __import__('custom_nodes.' + filename) + module = getattr(package, filename) + my_class = getattr(module, node_key) + instance = my_class(node_info) + + return instance + except Exception as e: + print(str(e)) + return None From 896d3e6285c18ae0a91dec912ae458bb3f9cbd2a Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Tue, 21 Apr 2020 19:45:27 -0400 Subject: [PATCH 04/20] refactor: Merge node listings to one endpoint Factors out some duplicate code. PR #57 adds a `to_json()` method that might be able to replace the `extract_node_info()` method here. TBD. --- pyworkflow/pyworkflow/node_factory.py | 1 + vp/vp/urls.py | 1 - vp/vp/views.py | 116 ++++++++++++++------------ 3 files changed, 63 insertions(+), 55 deletions(-) diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index 3fde202..578a5aa 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -47,6 +47,7 @@ def manipulation_node(node_key, node_info): else: return None + def custom_node(filename, node_key, node_info): try: package = __import__('custom_nodes.' + filename) diff --git a/vp/vp/urls.py b/vp/vp/urls.py index 85bbe74..88caaaf 100644 --- a/vp/vp/urls.py +++ b/vp/vp/urls.py @@ -39,7 +39,6 @@ path('admin/', admin.site.urls), path('info/', views.info), path('nodes/', views.retrieve_nodes_for_user), - path('custom_nodes/', views.retrieve_custom_nodes_for_user), path('node/', include('node.urls')), path('workflow/', include('workflow.urls')) ] diff --git a/vp/vp/views.py b/vp/vp/views.py index 2c55761..e8e9f58 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -2,9 +2,11 @@ from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema from pyworkflow import Node +from modulefinder import ModuleFinder import os import inspect +import sys @swagger_auto_schema(method='get', responses={200:'JSON response with data'}) @@ -42,78 +44,84 @@ def retrieve_nodes_for_user(request): """ data = dict() - # Iterate through node 'types' + # Iterate through installed Nodes for parent in Node.__subclasses__(): key = getattr(parent, "display_name", parent.__name__) data[key] = list() # Iterate through node 'keys' for child in parent.__subclasses__(): - # TODO: check attribute-scope is handled correctly - child_node = { - 'name': child.name, - 'node_key': child.__name__, - 'node_type': parent.__name__, - 'num_in': child.num_in, - 'num_out': child.num_out, - 'color': child.color or parent.color, - 'doc': child.__doc__, - 'options': {k: v.get_value() for k, v in child.options.items()}, - 'option_types': child.option_types, - 'download_result': getattr(child, "download_result", False) - } - - data[key].append(child_node) + node = extract_node_info(parent, child) + data[key].append(node) + + # Check for any installed Custom Nodes + # TODO: Workflow loading excluded in middleware for this route + # Should probably have a way to access the 'custom_node` dir dynamically + custom_node_path = os.path.join(os.getcwd(), '../pyworkflow/custom_nodes') + data['CustomNode'] = import_custom_node(custom_node_path) return JsonResponse(data) -@swagger_auto_schema(method='get', - operation_summary='Retrieve a list of custom Nodes', - operation_description='Retrieves a list of custom Nodes, in JSON.', - responses={ - 200: 'List of installed Nodes, in JSON', - }) -@api_view(['GET']) -def retrieve_custom_nodes_for_user(request): - data = dict() +def check_missing_packages(node_path): + finder = ModuleFinder(node_path) + finder.run_script(node_path) + + uninstalled = list() + for missing_package in finder.badmodules.keys(): + if missing_package not in sys.modules: + uninstalled.append(missing_package) + + return uninstalled + + +def extract_node_info(parent, child): + # TODO: check attribute(s) accessing is handled correctly + return { + 'name': child.name, + 'node_key': child.__name__, + 'node_type': str(parent), + 'num_in': child.num_in, + 'num_out': child.num_out, + 'color': child.color or parent.color or 'black', + 'doc': child.__doc__, + 'options': {k: v.get_value() for k, v in child.options.items()}, + 'option_types': child.option_types, + 'download_result': getattr(child, "download_result", False) + } - # TODO: Workflow loading excluded in middleware for this route - # Should probably have a way to access the 'custom_node` dir dynamically - custom_node_path = os.path.join(os.getcwd(), '../pyworkflow/custom_nodes') +def import_custom_node(root_path): + # Get list of files in path try: - nodes = os.listdir(custom_node_path) + files = os.listdir(root_path) except OSError as e: - return JsonResponse({"message": str(e)}, status=500) + return None - for node in nodes: - # Parse file type - node_name, ext = os.path.splitext(node) + data = list() + for file in files: + # Check file is not a dir + node_path = os.path.join(root_path, file) + if not os.path.isfile(node_path): + continue + + node, ext = os.path.splitext(file) try: - package = __import__('custom_nodes.' + node_name) - module = getattr(package, node_name) - except ModuleNotFoundError as e: - # TODO: This will only catch the first missing package. Can we get more? - data[node_name] = f"Please install missing packages and restart the server. Missing '{e.name}'" + package = __import__('custom_nodes.' + node) + module = getattr(package, node) + except ModuleNotFoundError: + data.append({ + "name": node, + "missing_packages": check_missing_packages(node_path) + }) continue for name, klass in inspect.getmembers(module): if inspect.isclass(klass) and klass.__module__.startswith('custom_nodes.'): - custom_node = { - 'name': klass.name, - 'node_key': name, - 'node_type': node_name, - 'num_in': klass.num_in, - 'num_out': klass.num_out, - 'color': klass.color or 'black', - 'doc': klass.__doc__, - 'options': {k: v.get_value() for k, v in klass.options.items()}, - 'option_types': klass.option_types, - 'download_result': getattr(klass, "download_result", False) - } - - data[node_name] = custom_node - - return JsonResponse(data, safe=False) + custom_node = extract_node_info(node, klass) + data.append(custom_node) + + return data + + From 6746f61b21e4d99efd8cf7199fffefb56b1cdec4 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Tue, 21 Apr 2020 19:46:43 -0400 Subject: [PATCH 05/20] feat: Add missing package check on upload `check_missing_packages()` is duplicated from `vp/views.py`. It should find a home. --- vp/workflow/views.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/vp/workflow/views.py b/vp/workflow/views.py index 9ebd497..9b3c998 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -1,5 +1,6 @@ import os import json +import sys from django.http import JsonResponse, HttpResponse from django.conf import settings @@ -7,6 +8,7 @@ from pyworkflow import Workflow, WorkflowException from drf_yasg.utils import swagger_auto_schema +from modulefinder import ModuleFinder @swagger_auto_schema(method='post', operation_summary='Create a new workflow.', @@ -240,7 +242,10 @@ def add_custom_node(request): node_file.close() - return JsonResponse({"filename": to_open}, status=201, safe=False) + return JsonResponse({ + "filename": to_open, + "missing_packages": check_missing_packages(to_open) + }, status=201, safe=False) @swagger_auto_schema(method='post', @@ -302,3 +307,14 @@ def download_file(request): except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) + +def check_missing_packages(node_path): + finder = ModuleFinder(node_path) + finder.run_script(node_path) + + uninstalled = list() + for missing_package in finder.badmodules.keys(): + if missing_package not in sys.modules: + uninstalled.append(missing_package) + + return uninstalled From 5f53d5a9a0e626eb08eb80216d318b8d156b1821 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 09:41:35 -0400 Subject: [PATCH 06/20] refactor: Prep sub-dirs for Node classes --- pyworkflow/pyworkflow/nodes/__init__.py | 3 +++ pyworkflow/pyworkflow/nodes/flow_control/__init__.py | 1 + pyworkflow/pyworkflow/nodes/io/__init__.py | 2 ++ pyworkflow/pyworkflow/nodes/manipulation/__init__.py | 3 +++ 4 files changed, 9 insertions(+) create mode 100644 pyworkflow/pyworkflow/nodes/__init__.py create mode 100644 pyworkflow/pyworkflow/nodes/flow_control/__init__.py create mode 100644 pyworkflow/pyworkflow/nodes/io/__init__.py create mode 100644 pyworkflow/pyworkflow/nodes/manipulation/__init__.py diff --git a/pyworkflow/pyworkflow/nodes/__init__.py b/pyworkflow/pyworkflow/nodes/__init__.py new file mode 100644 index 0000000..31d80d9 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/__init__.py @@ -0,0 +1,3 @@ +from .io import * +from .manipulation import * +from .flow_control import * diff --git a/pyworkflow/pyworkflow/nodes/flow_control/__init__.py b/pyworkflow/pyworkflow/nodes/flow_control/__init__.py new file mode 100644 index 0000000..1fec3b8 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/flow_control/__init__.py @@ -0,0 +1 @@ +from .string_node import StringNode diff --git a/pyworkflow/pyworkflow/nodes/io/__init__.py b/pyworkflow/pyworkflow/nodes/io/__init__.py new file mode 100644 index 0000000..88de457 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/io/__init__.py @@ -0,0 +1,2 @@ +from .read_csv_node import ReadCsvNode +from .write_csv_node import WriteCsvNode diff --git a/pyworkflow/pyworkflow/nodes/manipulation/__init__.py b/pyworkflow/pyworkflow/nodes/manipulation/__init__.py new file mode 100644 index 0000000..976f358 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/__init__.py @@ -0,0 +1,3 @@ +from .filter_node import FilterNode +from .join_node import JoinNode +from .pivot_node import PivotNode From 671c726a40f779c51797d7a5786f0211c460be1c Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 09:47:33 -0400 Subject: [PATCH 07/20] refactor: Move /nodes endpoint to /workflow views --- front-end/src/API.js | 2 +- vp/vp/urls.py | 1 - vp/vp/views.py | 35 ----------------------------------- vp/workflow/urls.py | 3 ++- vp/workflow/views.py | 18 ++++++++++++++++++ 5 files changed, 21 insertions(+), 38 deletions(-) diff --git a/front-end/src/API.js b/front-end/src/API.js index 392b52a..e05b8a9 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -95,7 +95,7 @@ export async function save(diagramData) { * @returns {Promise} - server response (node menu items) */ export async function getNodes() { - return fetchWrapper("/nodes"); + return fetchWrapper("/workflow/nodes"); } diff --git a/vp/vp/urls.py b/vp/vp/urls.py index 88caaaf..41e7d65 100644 --- a/vp/vp/urls.py +++ b/vp/vp/urls.py @@ -38,7 +38,6 @@ path('redoc/', schema_view.with_ui('redoc', cache_timeout=0), name='schema-redoc'), path('admin/', admin.site.urls), path('info/', views.info), - path('nodes/', views.retrieve_nodes_for_user), path('node/', include('node.urls')), path('workflow/', include('workflow.urls')) ] diff --git a/vp/vp/views.py b/vp/vp/views.py index e8e9f58..c7e3697 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -29,41 +29,6 @@ def info(request): @swagger_auto_schema(method='get', - operation_summary='Retrieve a list of installed Nodes', - operation_description='Retrieves a list of installed Nodes, in JSON.', - responses={ - 200: 'List of installed Nodes, in JSON', - }) -@api_view(['GET']) -def retrieve_nodes_for_user(request): - """Assembles list of Nodes accessible to workflows. - - Retrieve a list of classes from the Node module in `pyworkflow`. - List is split into 'types' (e.g., 'IO' and 'Manipulation') and - 'keys', or individual command Nodes (e.g., 'ReadCsv', 'Pivot'). - """ - data = dict() - - # Iterate through installed Nodes - for parent in Node.__subclasses__(): - key = getattr(parent, "display_name", parent.__name__) - data[key] = list() - - # Iterate through node 'keys' - for child in parent.__subclasses__(): - node = extract_node_info(parent, child) - data[key].append(node) - - # Check for any installed Custom Nodes - # TODO: Workflow loading excluded in middleware for this route - # Should probably have a way to access the 'custom_node` dir dynamically - custom_node_path = os.path.join(os.getcwd(), '../pyworkflow/custom_nodes') - data['CustomNode'] = import_custom_node(custom_node_path) - - return JsonResponse(data) - - -def check_missing_packages(node_path): finder = ModuleFinder(node_path) finder.run_script(node_path) diff --git a/vp/workflow/urls.py b/vp/workflow/urls.py index dceaca1..afc0643 100644 --- a/vp/workflow/urls.py +++ b/vp/workflow/urls.py @@ -11,5 +11,6 @@ path('globals', views.global_vars, name="retrieve global variables"), path('upload', views.upload_file, name='upload file'), path('download', views.download_file, name='download file'), - path('custom_node', views.add_custom_node, name='add custom node') + path('custom_node', views.add_custom_node, name='add custom node'), + path('nodes', views.retrieve_nodes_for_user, name='retrieve node list'), ] diff --git a/vp/workflow/views.py b/vp/workflow/views.py index 9b3c998..05cd579 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -270,6 +270,24 @@ def upload_file(request): except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) +@swagger_auto_schema(method='get', + operation_summary='Retrieve a list of installed Nodes', + operation_description='Retrieves a list of installed Nodes, in JSON.', + responses={ + 200: 'List of installed Nodes, in JSON', + }) +@api_view(['GET']) +def retrieve_nodes_for_user(request): + """Assembles list of Nodes accessible to workflows. + + Retrieve a list of classes from the Node module in `pyworkflow`. + List is split into 'types' (e.g., 'IO' and 'Manipulation') and + 'keys', or individual command Nodes (e.g., 'ReadCsv', 'Pivot'). + """ + packaged_nodes = os.path.join(os.getcwd(), '../pyworkflow/pyworkflow/nodes') + data = request.pyworkflow.get_packaged_nodes() + data.move_to_end('Custom Nodes') + return JsonResponse(data, safe=False) @swagger_auto_schema(method='post', operation_summary='Downloads a file from the server', From 930158ca3fb26b996427d2f245423be3b21ef9b3 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 09:51:56 -0400 Subject: [PATCH 08/20] refactor: Move Node import methods to Workflow --- pyworkflow/pyworkflow/node_factory.py | 18 ++--- pyworkflow/pyworkflow/workflow.py | 112 ++++++++++++++++++++++++-- vp/vp/views.py | 67 --------------- 3 files changed, 113 insertions(+), 84 deletions(-) diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index 578a5aa..5810275 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -1,4 +1,6 @@ from .node import * +from .nodes import * +import importlib def node_factory(node_info): @@ -7,14 +9,14 @@ def node_factory(node_info): node_type = node_info.get('node_type') node_key = node_info.get('node_key') - if node_type == 'IONode': + if node_type == 'io': new_node = io_node(node_key, node_info) - elif node_type == 'ManipulationNode': + elif node_type == 'manipulation': new_node = manipulation_node(node_key, node_info) - elif node_type == 'FlowNode': + elif node_type == 'flow_control': new_node = flow_node(node_key, node_info) else: - new_node = custom_node(node_type, node_key, node_info) + new_node = custom_node(node_key, node_info) return new_node @@ -40,18 +42,16 @@ def manipulation_node(node_key, node_info): return JoinNode(node_info) elif node_key == 'PivotNode': return PivotNode(node_info) - elif node_key == 'multi-in': - return ManipulationNode(node_info) elif node_key == 'FilterNode': return FilterNode(node_info) else: return None -def custom_node(filename, node_key, node_info): +def custom_node(node_key, node_info): try: - package = __import__('custom_nodes.' + filename) - module = getattr(package, filename) + filename = node_info.get('filename') + module = importlib.import_module('pyworkflow.nodes.custom_nodes.' + filename) my_class = getattr(module, node_key) instance = my_class(node_info) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 964b897..bcc8746 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -1,6 +1,12 @@ +import inspect +import importlib +import json import os import networkx as nx -import json +import sys + +from collections import OrderedDict +from modulefinder import ModuleFinder from .node import Node from .node_factory import node_factory @@ -12,14 +18,15 @@ class Workflow: Attributes: name: Name of the workflow root_dir: Used for reading/writing files to/from disk + custom_node_dir: Location of custom nodes graph: A NetworkX Directed Graph flow_vars: Global flow variables associated with workflow """ - def __init__(self, name="Untitled", root_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): + def __init__(self, name="Untitled", root_dir=None, custom_node_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): self._name = name self._root_dir = WorkflowUtils.set_root_dir(root_dir) - self._custom_node_dir = WorkflowUtils.set_custom_nodes_dir() + self._custom_node_dir = WorkflowUtils.set_custom_nodes_dir(custom_node_dir) self._graph = graph self._flow_vars = flow_vars @@ -42,6 +49,58 @@ def root_dir(self): def flow_vars(self): return self._flow_vars + def get_packaged_nodes(self, root_path=None, node_type=None): + if root_path is None: + root_path = self.custom_node_dir + print(f"\n\n{root_path}\n-----------------------") + # Get list of files in path + try: + files = os.listdir(root_path) + except OSError as e: + return None + + print(files) + nodes = list() + data = OrderedDict() + + for file in files: + # Check file is not a dir + node_path = os.path.join(root_path, file) + if os.path.isdir(node_path): + # Recurse per `node_type` + alt_node_type = file.replace('_', ' ').title() + + data[alt_node_type] = self.get_packaged_nodes(node_path, file) + continue + + node, ext = os.path.splitext(file) + + if node == '__init__' or ext != '.py': + continue + + try: + module = importlib.import_module('pyworkflow.nodes.' + node_type + '.' + node) + except ModuleNotFoundError as e: + print("MODULE NOT FOUND") + nodes.append({ + "filename": node, + "missing_packages": WorkflowUtils.check_missing_packages(node_path) + }) + continue + + for name, klass in inspect.getmembers(module): + if inspect.isclass(klass) and klass.__module__.startswith('pyworkflow.nodes.' + node_type): + parsed_node = WorkflowUtils.extract_node_info(node_type, klass) + if node_type == 'custom_nodes': + parsed_node['filename'] = node + + nodes.append(parsed_node) + + if root_path == self.custom_node_dir: + return data + else: + return nodes + def get_node(self, node_id): """Retrieves Node from workflow, if exists @@ -410,13 +469,14 @@ def execute_workflow(workflow_location): class WorkflowUtils: @staticmethod - def set_custom_nodes_dir(): - custom_node_dir = os.path.join(os.getcwd(), '../pyworkflow/custom_nodes') + def set_custom_nodes_dir(custom_node_path): + if custom_node_path is None: + custom_node_path = os.path.join(os.getcwd(), '../pyworkflow/pyworkflow/nodes') - if not os.path.exists(custom_node_dir): - os.makedirs(custom_node_dir) + if not os.path.exists(custom_node_path): + os.makedirs(custom_node_path) - return custom_node_dir + return custom_node_path @staticmethod def set_root_dir(root_dir): @@ -428,6 +488,42 @@ def set_root_dir(root_dir): return root_dir + @staticmethod + def check_missing_packages(node_path): + finder = ModuleFinder(node_path) + finder.run_script(node_path) + + print("CHECKING PACKAGES") + print(finder.badmodules) + uninstalled = list() + for missing_package in finder.badmodules.keys(): + if missing_package not in sys.modules: + uninstalled.append(missing_package) + + print(uninstalled) + return uninstalled + + @staticmethod + def extract_node_info(parent, node): + # TODO: check attribute(s) accessing is handled correctly + try: + color = node.color + except AttributeError: + color = 'black' + + return { + 'name': node.name, + 'node_key': node.__name__, + 'node_type': parent, + 'num_in': node.num_in, + 'num_out': node.num_out, + 'color': color, + 'doc': node.__doc__, + 'options': {k: v.get_value() for k, v in node.options.items()}, + 'option_types': node.option_types, + 'download_result': getattr(node, "download_result", False) + } + class WorkflowException(Exception): def __init__(self, action: str, reason: str): diff --git a/vp/vp/views.py b/vp/vp/views.py index c7e3697..ebe950d 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -4,9 +4,6 @@ from pyworkflow import Node from modulefinder import ModuleFinder -import os -import inspect -import sys @swagger_auto_schema(method='get', responses={200:'JSON response with data'}) @@ -26,67 +23,3 @@ def info(request): "about": "super-duper workflows!" } return JsonResponse(data) - - -@swagger_auto_schema(method='get', - finder = ModuleFinder(node_path) - finder.run_script(node_path) - - uninstalled = list() - for missing_package in finder.badmodules.keys(): - if missing_package not in sys.modules: - uninstalled.append(missing_package) - - return uninstalled - - -def extract_node_info(parent, child): - # TODO: check attribute(s) accessing is handled correctly - return { - 'name': child.name, - 'node_key': child.__name__, - 'node_type': str(parent), - 'num_in': child.num_in, - 'num_out': child.num_out, - 'color': child.color or parent.color or 'black', - 'doc': child.__doc__, - 'options': {k: v.get_value() for k, v in child.options.items()}, - 'option_types': child.option_types, - 'download_result': getattr(child, "download_result", False) - } - - -def import_custom_node(root_path): - # Get list of files in path - try: - files = os.listdir(root_path) - except OSError as e: - return None - - data = list() - for file in files: - # Check file is not a dir - node_path = os.path.join(root_path, file) - if not os.path.isfile(node_path): - continue - - node, ext = os.path.splitext(file) - - try: - package = __import__('custom_nodes.' + node) - module = getattr(package, node) - except ModuleNotFoundError: - data.append({ - "name": node, - "missing_packages": check_missing_packages(node_path) - }) - continue - - for name, klass in inspect.getmembers(module): - if inspect.isclass(klass) and klass.__module__.startswith('custom_nodes.'): - custom_node = extract_node_info(node, klass) - data.append(custom_node) - - return data - - From a198a05649c652f8999865d717a86ea24eb154d4 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 10:15:40 -0400 Subject: [PATCH 09/20] refactor: Move Node subclasses to individual files --- pyworkflow/pyworkflow/node.py | 259 ------------------ pyworkflow/pyworkflow/node_factory.py | 3 +- .../pyworkflow/nodes/flow_control/__init__.py | 2 +- .../nodes/flow_control/string_input.py | 25 ++ pyworkflow/pyworkflow/nodes/io/__init__.py | 4 +- pyworkflow/pyworkflow/nodes/io/read_csv.py | 48 ++++ pyworkflow/pyworkflow/nodes/io/write_csv.py | 51 ++++ .../pyworkflow/nodes/manipulation/__init__.py | 6 +- .../pyworkflow/nodes/manipulation/filter.py | 37 +++ .../pyworkflow/nodes/manipulation/join.py | 27 ++ .../pyworkflow/nodes/manipulation/pivot.py | 62 +++++ 11 files changed, 257 insertions(+), 267 deletions(-) create mode 100644 pyworkflow/pyworkflow/nodes/flow_control/string_input.py create mode 100644 pyworkflow/pyworkflow/nodes/io/read_csv.py create mode 100644 pyworkflow/pyworkflow/nodes/io/write_csv.py create mode 100644 pyworkflow/pyworkflow/nodes/manipulation/filter.py create mode 100644 pyworkflow/pyworkflow/nodes/manipulation/join.py create mode 100644 pyworkflow/pyworkflow/nodes/manipulation/pivot.py diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e447899..29cd835 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,5 +1,3 @@ -import pandas as pd - from .parameters import * @@ -118,263 +116,6 @@ def get_replacement_value(self): return self.options['default_value'].get_value() -class StringNode(FlowNode): - """StringNode object - - Allows for Strings to replace 'string' fields in Nodes - """ - name = "String Input" - num_in = 1 - num_out = 1 - color = 'purple' - - OPTIONS = { - "default_value": StringParameter( - "Default Value", - docstring="Value this node will pass as a flow variable" - ), - "var_name": StringParameter( - "Variable Name", - default="my_var", - docstring="Name of the variable to use in another Node" - ) - } - - -class IONode(Node): - """IONodes deal with file-handling in/out of the Workflow. - - Possible types: - Read CSV - Write CSV - """ - color = 'black' - display_name = "I/O" - - def execute(self, predecessor_data, flow_vars): - raise NotImplementedError() - - -class ReadCsvNode(IONode): - """ReadCsvNode - - Reads a CSV file into a pandas DataFrame. - - Raises: - NodeException: any error reading CSV file, converting - to DataFrame. - """ - name = "Read CSV" - num_in = 0 - num_out = 1 - - OPTIONS = { - "file": FileParameter( - "File", - docstring="CSV File" - ), - "sep": StringParameter( - "Delimiter", - default=",", - docstring="Column delimiter" - ), - # user-specified headers are probably integers, but haven't figured out - # arguments with multiple possible types - "header": StringParameter( - "Header Row", - default="infer", - docstring="Row number containing column names (0-indexed)" - ), - } - - def execute(self, predecessor_data, flow_vars): - try: - df = pd.read_csv( - flow_vars["file"].get_value(), - sep=flow_vars["sep"].get_value(), - header=flow_vars["header"].get_value() - ) - return df.to_json() - except Exception as e: - raise NodeException('read csv', str(e)) - - -class WriteCsvNode(IONode): - """WriteCsvNode - - Writes the current DataFrame to a CSV file. - - Raises: - NodeException: any error writing CSV file, converting - from DataFrame. - """ - name = "Write CSV" - num_in = 1 - num_out = 0 - download_result = True - - OPTIONS = { - "file": StringParameter( - "Filename", - docstring="CSV file to write" - ), - "sep": StringParameter( - "Delimiter", - default=",", - docstring="Column delimiter" - ), - "index": BooleanParameter( - "Write Index", - default=True, - docstring="Write index as column?" - ), - } - - def execute(self, predecessor_data, flow_vars): - try: - # Convert JSON data to DataFrame - df = pd.DataFrame.from_dict(predecessor_data[0]) - - # Write to CSV and save - df.to_csv( - flow_vars["file"].get_value(), - sep=flow_vars["sep"].get_value(), - index=flow_vars["index"].get_value() - ) - return df.to_json() - except Exception as e: - raise NodeException('write csv', str(e)) - - -class ManipulationNode(Node): - """ManipulationNodes deal with data manipulation. - - Possible types: - Pivot - Filter - Multi-in - """ - display_name = "Manipulation" - color = 'goldenrod' - - def execute(self, predecessor_data, flow_vars): - raise NotImplementedError() - - -class PivotNode(ManipulationNode): - name = "Pivoting" - num_in = 1 - num_out = 3 - - OPTIONS = { - 'index': StringParameter( - 'Index', - docstring='Column to aggregate (column, grouper, array or list)' - ), - 'values': StringParameter( - 'Values', - docstring='Column name to use to populate new frame\'s values (column, grouper, array or list)' - ), - 'columns': StringParameter( - 'Column Name Row', - docstring='Column(s) to use for populating new frame values. (column, grouper, array or list)' - ), - 'aggfunc': StringParameter( - 'Aggregation function', - default='mean', - docstring='Function used for aggregation (function, list of functions, dict, default numpy.mean)' - ), - 'fill_value': StringParameter( - 'Fill value', - docstring='Value to replace missing values with (scalar)' - ), - 'margins': BooleanParameter( - 'Margins name', - default=False, - docstring='Add all rows/columns' - ), - 'dropna': BooleanParameter( - 'Drop NaN columns', - default=True, - docstring='Ignore columns with all NaN entries' - ), - 'margins_name': StringParameter( - 'Margins name', - default='All', - docstring='Name of the row/column that will contain the totals when margins is True' - ), - 'observed': BooleanParameter( - 'Column Name Row', - default=False, - docstring='Row number with column names (0-indexed) or "infer"' - ) - } - - def execute(self, predecessor_data, flow_vars): - try: - input_df = pd.DataFrame.from_dict(predecessor_data[0]) - output_df = pd.DataFrame.pivot_table(input_df, **self.options) - return output_df.to_json() - except Exception as e: - raise NodeException('pivot', str(e)) - - -class JoinNode(ManipulationNode): - name = "Joiner" - num_in = 2 - num_out = 1 - - OPTIONS = { - "on": StringParameter("Join Column", docstring="Name of column to join on") - } - - def execute(self, predecessor_data, flow_vars): - try: - first_df = pd.DataFrame.from_dict(predecessor_data[0]) - second_df = pd.DataFrame.from_dict(predecessor_data[1]) - combined_df = pd.merge( - first_df, - second_df, - on=flow_vars["on"].get_value() - ) - return combined_df.to_json() - except Exception as e: - raise NodeException('join', str(e)) - - -class FilterNode(ManipulationNode): - name = "Filter" - num_in = 1 - num_out = 1 - - OPTIONS = { - 'items': StringParameter( - 'Items', - docstring='Keep labels from axis which are in items' - ), - 'like': StringParameter( - 'Like', - docstring='Keep labels from axis for which like in label == True.' - ), - 'regex': StringParameter( - 'Regex', - docstring='Keep labels from axis for which re.search(regex, label) == True.' - ), - 'axis': StringParameter( - 'Axis', - docstring='The axis to filter on.' - ) - } - - def execute(self, predecessor_data, flow_vars): - try: - input_df = pd.DataFrame.from_dict(predecessor_data[0]) - output_df = pd.DataFrame.filter(input_df, **self.options) - return output_df.to_json() - except Exception as e: - raise NodeException('filter', str(e)) - - class NodeException(Exception): def __init__(self, action: str, reason: str): self.action = action diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index 5810275..89830c9 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -1,4 +1,3 @@ -from .node import * from .nodes import * import importlib @@ -51,7 +50,7 @@ def manipulation_node(node_key, node_info): def custom_node(node_key, node_info): try: filename = node_info.get('filename') - module = importlib.import_module('pyworkflow.nodes.custom_nodes.' + filename) + module = importlib.import_module(f'pyworkflow.nodes.custom_nodes.{filename}') my_class = getattr(module, node_key) instance = my_class(node_info) diff --git a/pyworkflow/pyworkflow/nodes/flow_control/__init__.py b/pyworkflow/pyworkflow/nodes/flow_control/__init__.py index 1fec3b8..95d569a 100644 --- a/pyworkflow/pyworkflow/nodes/flow_control/__init__.py +++ b/pyworkflow/pyworkflow/nodes/flow_control/__init__.py @@ -1 +1 @@ -from .string_node import StringNode +from .string_input import StringNode diff --git a/pyworkflow/pyworkflow/nodes/flow_control/string_input.py b/pyworkflow/pyworkflow/nodes/flow_control/string_input.py new file mode 100644 index 0000000..eaade18 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/flow_control/string_input.py @@ -0,0 +1,25 @@ +from pyworkflow.node import FlowNode, NodeException +from pyworkflow.parameters import * + + +class StringNode(FlowNode): + """StringNode object + + Allows for Strings to replace 'string' fields in Nodes + """ + name = "String Input" + num_in = 1 + num_out = 1 + color = 'purple' + + OPTIONS = { + "default_value": StringParameter( + "Default Value", + docstring="Value this node will pass as a flow variable" + ), + "var_name": StringParameter( + "Variable Name", + default="my_var", + docstring="Name of the variable to use in another Node" + ) + } diff --git a/pyworkflow/pyworkflow/nodes/io/__init__.py b/pyworkflow/pyworkflow/nodes/io/__init__.py index 88de457..bc516ac 100644 --- a/pyworkflow/pyworkflow/nodes/io/__init__.py +++ b/pyworkflow/pyworkflow/nodes/io/__init__.py @@ -1,2 +1,2 @@ -from .read_csv_node import ReadCsvNode -from .write_csv_node import WriteCsvNode +from .read_csv import ReadCsvNode +from .write_csv import WriteCsvNode diff --git a/pyworkflow/pyworkflow/nodes/io/read_csv.py b/pyworkflow/pyworkflow/nodes/io/read_csv.py new file mode 100644 index 0000000..e1ca9b9 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/io/read_csv.py @@ -0,0 +1,48 @@ +from pyworkflow.node import Node, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class ReadCsvNode(Node): + """ReadCsvNode + + Reads a CSV file into a pandas DataFrame. + + Raises: + NodeException: any error reading CSV file, converting + to DataFrame. + """ + name = "Read CSV" + num_in = 0 + num_out = 1 + + OPTIONS = { + "file": FileParameter( + "File", + docstring="CSV File" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), + # user-specified headers are probably integers, but haven't figured out + # arguments with multiple possible types + "header": StringParameter( + "Header Row", + default="infer", + docstring="Row number containing column names (0-indexed)" + ), + } + + def execute(self, predecessor_data, flow_vars): + try: + df = pd.read_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + header=flow_vars["header"].get_value() + ) + return df.to_json() + except Exception as e: + raise NodeException('read csv', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/io/write_csv.py b/pyworkflow/pyworkflow/nodes/io/write_csv.py new file mode 100644 index 0000000..6b9f415 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/io/write_csv.py @@ -0,0 +1,51 @@ +from pyworkflow.node import Node, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class WriteCsvNode(Node): + """WriteCsvNode + + Writes the current DataFrame to a CSV file. + + Raises: + NodeException: any error writing CSV file, converting + from DataFrame. + """ + name = "Write CSV" + num_in = 1 + num_out = 0 + download_result = True + + OPTIONS = { + "file": StringParameter( + "Filename", + docstring="CSV file to write" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), + "index": BooleanParameter( + "Write Index", + default=True, + docstring="Write index as column?" + ), + } + + def execute(self, predecessor_data, flow_vars): + try: + # Convert JSON data to DataFrame + df = pd.DataFrame.from_dict(predecessor_data[0]) + + # Write to CSV and save + df.to_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + index=flow_vars["index"].get_value() + ) + return df.to_json() + except Exception as e: + raise NodeException('write csv', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/manipulation/__init__.py b/pyworkflow/pyworkflow/nodes/manipulation/__init__.py index 976f358..2da5947 100644 --- a/pyworkflow/pyworkflow/nodes/manipulation/__init__.py +++ b/pyworkflow/pyworkflow/nodes/manipulation/__init__.py @@ -1,3 +1,3 @@ -from .filter_node import FilterNode -from .join_node import JoinNode -from .pivot_node import PivotNode +from .filter import FilterNode +from .join import JoinNode +from .pivot import PivotNode diff --git a/pyworkflow/pyworkflow/nodes/manipulation/filter.py b/pyworkflow/pyworkflow/nodes/manipulation/filter.py new file mode 100644 index 0000000..e144d94 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/filter.py @@ -0,0 +1,37 @@ +from pyworkflow.node import Node, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class FilterNode(Node): + name = "Filter" + num_in = 1 + num_out = 1 + + OPTIONS = { + 'items': StringParameter( + 'Items', + docstring='Keep labels from axis which are in items' + ), + 'like': StringParameter( + 'Like', + docstring='Keep labels from axis for which like in label == True.' + ), + 'regex': StringParameter( + 'Regex', + docstring='Keep labels from axis for which re.search(regex, label) == True.' + ), + 'axis': StringParameter( + 'Axis', + docstring='The axis to filter on.' + ) + } + + def execute(self, predecessor_data, flow_vars): + try: + input_df = pd.DataFrame.from_dict(predecessor_data[0]) + output_df = pd.DataFrame.filter(input_df, **self.options) + return output_df.to_json() + except Exception as e: + raise NodeException('filter', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/manipulation/join.py b/pyworkflow/pyworkflow/nodes/manipulation/join.py new file mode 100644 index 0000000..14c7483 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/join.py @@ -0,0 +1,27 @@ +from pyworkflow.node import Node, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class JoinNode(Node): + name = "Joiner" + num_in = 2 + num_out = 1 + + OPTIONS = { + "on": StringParameter("Join Column", docstring="Name of column to join on") + } + + def execute(self, predecessor_data, flow_vars): + try: + first_df = pd.DataFrame.from_dict(predecessor_data[0]) + second_df = pd.DataFrame.from_dict(predecessor_data[1]) + combined_df = pd.merge( + first_df, + second_df, + on=flow_vars["on"].get_value() + ) + return combined_df.to_json() + except Exception as e: + raise NodeException('join', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/manipulation/pivot.py b/pyworkflow/pyworkflow/nodes/manipulation/pivot.py new file mode 100644 index 0000000..27a4ac8 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/pivot.py @@ -0,0 +1,62 @@ +from pyworkflow.node import Node, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class PivotNode(Node): + name = "Pivoting" + num_in = 1 + num_out = 3 + + OPTIONS = { + 'index': StringParameter( + 'Index', + docstring='Column to aggregate (column, grouper, array or list)' + ), + 'values': StringParameter( + 'Values', + docstring='Column name to use to populate new frame\'s values (column, grouper, array or list)' + ), + 'columns': StringParameter( + 'Column Name Row', + docstring='Column(s) to use for populating new frame values. (column, grouper, array or list)' + ), + 'aggfunc': StringParameter( + 'Aggregation function', + default='mean', + docstring='Function used for aggregation (function, list of functions, dict, default numpy.mean)' + ), + 'fill_value': StringParameter( + 'Fill value', + docstring='Value to replace missing values with (scalar)' + ), + 'margins': BooleanParameter( + 'Margins name', + default=False, + docstring='Add all rows/columns' + ), + 'dropna': BooleanParameter( + 'Drop NaN columns', + default=True, + docstring='Ignore columns with all NaN entries' + ), + 'margins_name': StringParameter( + 'Margins name', + default='All', + docstring='Name of the row/column that will contain the totals when margins is True' + ), + 'observed': BooleanParameter( + 'Column Name Row', + default=False, + docstring='Row number with column names (0-indexed) or "infer"' + ) + } + + def execute(self, predecessor_data, flow_vars): + try: + input_df = pd.DataFrame.from_dict(predecessor_data[0]) + output_df = pd.DataFrame.pivot_table(input_df, **self.options) + return output_df.to_json() + except Exception as e: + raise NodeException('pivot', str(e)) From 9c81d52e571828e30dde1d7d443ba5163dbfad2e Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:25:46 -0400 Subject: [PATCH 10/20] fix: Re-add intermediate Node classes to inherit color --- pyworkflow/pyworkflow/node.py | 16 ++++++++++++++++ pyworkflow/pyworkflow/nodes/io/read_csv.py | 4 ++-- pyworkflow/pyworkflow/nodes/io/write_csv.py | 4 ++-- .../pyworkflow/nodes/manipulation/filter.py | 4 ++-- pyworkflow/pyworkflow/nodes/manipulation/join.py | 4 ++-- .../pyworkflow/nodes/manipulation/pivot.py | 4 ++-- 6 files changed, 26 insertions(+), 10 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 29cd835..c9556e6 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -116,6 +116,22 @@ def get_replacement_value(self): return self.options['default_value'].get_value() +class IONode(Node): + """IONodes deal with file-handling in/out of the Workflow.""" + color = "green" + + def execute(self, predecessor_data, flow_vars): + raise NotImplementedError() + + +class ManipulationNode(Node): + """ManipulationNodes deal with data manipulation.""" + color = "goldenrod" + + def execute(self, predecessor_data, flow_vars): + raise NotImplementedError() + + class NodeException(Exception): def __init__(self, action: str, reason: str): self.action = action diff --git a/pyworkflow/pyworkflow/nodes/io/read_csv.py b/pyworkflow/pyworkflow/nodes/io/read_csv.py index e1ca9b9..f829d61 100644 --- a/pyworkflow/pyworkflow/nodes/io/read_csv.py +++ b/pyworkflow/pyworkflow/nodes/io/read_csv.py @@ -1,10 +1,10 @@ -from pyworkflow.node import Node, NodeException +from pyworkflow.node import IONode, NodeException from pyworkflow.parameters import * import pandas as pd -class ReadCsvNode(Node): +class ReadCsvNode(IONode): """ReadCsvNode Reads a CSV file into a pandas DataFrame. diff --git a/pyworkflow/pyworkflow/nodes/io/write_csv.py b/pyworkflow/pyworkflow/nodes/io/write_csv.py index 6b9f415..0ca54cf 100644 --- a/pyworkflow/pyworkflow/nodes/io/write_csv.py +++ b/pyworkflow/pyworkflow/nodes/io/write_csv.py @@ -1,10 +1,10 @@ -from pyworkflow.node import Node, NodeException +from pyworkflow.node import IONode, NodeException from pyworkflow.parameters import * import pandas as pd -class WriteCsvNode(Node): +class WriteCsvNode(IONode): """WriteCsvNode Writes the current DataFrame to a CSV file. diff --git a/pyworkflow/pyworkflow/nodes/manipulation/filter.py b/pyworkflow/pyworkflow/nodes/manipulation/filter.py index e144d94..0ee1daa 100644 --- a/pyworkflow/pyworkflow/nodes/manipulation/filter.py +++ b/pyworkflow/pyworkflow/nodes/manipulation/filter.py @@ -1,10 +1,10 @@ -from pyworkflow.node import Node, NodeException +from pyworkflow.node import ManipulationNode, NodeException from pyworkflow.parameters import * import pandas as pd -class FilterNode(Node): +class FilterNode(ManipulationNode): name = "Filter" num_in = 1 num_out = 1 diff --git a/pyworkflow/pyworkflow/nodes/manipulation/join.py b/pyworkflow/pyworkflow/nodes/manipulation/join.py index 14c7483..f0ae745 100644 --- a/pyworkflow/pyworkflow/nodes/manipulation/join.py +++ b/pyworkflow/pyworkflow/nodes/manipulation/join.py @@ -1,10 +1,10 @@ -from pyworkflow.node import Node, NodeException +from pyworkflow.node import ManipulationNode, NodeException from pyworkflow.parameters import * import pandas as pd -class JoinNode(Node): +class JoinNode(ManipulationNode): name = "Joiner" num_in = 2 num_out = 1 diff --git a/pyworkflow/pyworkflow/nodes/manipulation/pivot.py b/pyworkflow/pyworkflow/nodes/manipulation/pivot.py index 27a4ac8..b7fe43f 100644 --- a/pyworkflow/pyworkflow/nodes/manipulation/pivot.py +++ b/pyworkflow/pyworkflow/nodes/manipulation/pivot.py @@ -1,10 +1,10 @@ -from pyworkflow.node import Node, NodeException +from pyworkflow.node import ManipulationNode, NodeException from pyworkflow.parameters import * import pandas as pd -class PivotNode(Node): +class PivotNode(ManipulationNode): name = "Pivoting" num_in = 1 num_out = 3 From 316af5980acb67d3d3b8f63cea8257e42c749c5f Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:26:07 -0400 Subject: [PATCH 11/20] test: Update test to use new 'node_type' value --- pyworkflow/pyworkflow/tests/test_workflow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyworkflow/pyworkflow/tests/test_workflow.py b/pyworkflow/pyworkflow/tests/test_workflow.py index c036481..24f542f 100644 --- a/pyworkflow/pyworkflow/tests/test_workflow.py +++ b/pyworkflow/pyworkflow/tests/test_workflow.py @@ -10,7 +10,7 @@ def setUp(self): self.read_csv_node = { "name": "Read CSV", "node_id": "1", - "node_type": "IONode", + "node_type": "io", "node_key": "ReadCsvNode", "is_global": False, "options": { @@ -21,7 +21,7 @@ def setUp(self): self.write_csv_node = { "name": "Write CSV", "node_id": "2", - "node_type": "IONode", + "node_type": "io", "node_key": "WriteCsvNode", "is_global": False, "options": { @@ -32,7 +32,7 @@ def setUp(self): self.join_node = { "name": "Joiner", "node_id": "3", - "node_type": "ManipulationNode", + "node_type": "manipulation", "node_key": "JoinNode", "is_global": False, "options": { From 071d4a4c90aa195402d3919f91a12f34f451aa8a Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:28:24 -0400 Subject: [PATCH 12/20] refactor: Generalize `upload` endpoint to accept Node and data files --- pyworkflow/pyworkflow/workflow.py | 19 +++++----- vp/workflow/urls.py | 1 - vp/workflow/views.py | 61 ++++++++----------------------- 3 files changed, 25 insertions(+), 56 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 06f3ee6..1a00458 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -18,21 +18,21 @@ class Workflow: Attributes: name: Name of the workflow root_dir: Used for reading/writing files to/from disk - custom_node_dir: Location of custom nodes + node_dir: Location of custom nodes graph: A NetworkX Directed Graph flow_vars: Global flow variables associated with workflow """ - def __init__(self, name="Untitled", root_dir=None, custom_node_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): + def __init__(self, name="Untitled", root_dir=None, node_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): self._name = name self._root_dir = WorkflowUtils.set_root_dir(root_dir) - self._custom_node_dir = WorkflowUtils.set_custom_nodes_dir(custom_node_dir) + self._node_dir = WorkflowUtils.set_custom_nodes_dir(node_dir) self._graph = graph self._flow_vars = flow_vars @property - def custom_node_dir(self): - return self._custom_node_dir + def node_dir(self): + return self._node_dir @property def graph(self): @@ -41,6 +41,9 @@ def graph(self): def path(self, file_name): return os.path.join(self.root_dir, file_name) + def node_path(self, node_type, file_name): + return os.path.join(self.node_dir, node_type, file_name) + @property def root_dir(self): return self._root_dir @@ -374,11 +377,9 @@ def execution_order(self): except RuntimeError as e: raise WorkflowException('execution order', 'The graph was changed while generating the execution order') - def upload_file(self, uploaded_file, node_id): + @staticmethod + def upload_file(uploaded_file, to_open): try: - file_name = f"{node_id}-{uploaded_file.name}" - to_open = self.path(file_name) - # TODO: Change to a stream/other method for large files? with open(to_open, 'wb') as f: f.write(uploaded_file.read()) diff --git a/vp/workflow/urls.py b/vp/workflow/urls.py index afc0643..fccaba6 100644 --- a/vp/workflow/urls.py +++ b/vp/workflow/urls.py @@ -11,6 +11,5 @@ path('globals', views.global_vars, name="retrieve global variables"), path('upload', views.upload_file, name='upload file'), path('download', views.download_file, name='download file'), - path('custom_node', views.add_custom_node, name='add custom node'), path('nodes', views.retrieve_nodes_for_user, name='retrieve node list'), ] diff --git a/vp/workflow/views.py b/vp/workflow/views.py index be5013c..8145e5b 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -10,6 +10,7 @@ from modulefinder import ModuleFinder + @swagger_auto_schema(method='post', operation_summary='Create a new workflow.', operation_description='Creates a new workflow with empty DiGraph.', @@ -218,36 +219,6 @@ def get_successors(request, node_id): return JsonResponse(order, safe=False) -@swagger_auto_schema(method='post', - operation_summary='Uploads a custom node to server.', - operation_description='Uploads a custom node to server location.', - responses={ - 200: 'File uploaded', - 404: 'No specified file' - }) -@api_view(['POST']) -def add_custom_node(request): - node_file = request.FILES.get('file') - - if node_file is None: - return JsonResponse("Empty content", status=404) - - to_open = os.path.join(request.pyworkflow.custom_node_dir, node_file.name) - - try: - with open(to_open, 'wb') as f: - f.write(node_file.read()) - except OSError as e: - return JsonResponse({'message': str(e)}, status=500) - - node_file.close() - - return JsonResponse({ - "filename": to_open, - "missing_packages": check_missing_packages(to_open) - }, status=201, safe=False) - - @swagger_auto_schema(method='post', operation_summary='Uploads a file to server.', operation_description='Uploads a new file to server location.', @@ -262,14 +233,23 @@ def upload_file(request): if f is None: return JsonResponse("Empty content", status=404) - node_id = request.POST.get('nodeId', '') - try: - save_name = request.pyworkflow.upload_file(f, node_id) - return JsonResponse({"filename": save_name}, status=201, safe=False) + node_id = request.POST.get('nodeId') + + if node_id is None: + # custom node file + file_path = request.pyworkflow.node_path('custom_node', f.name) + else: + # node data file + file_path = request.pyworkflow.path(f"{node_id}-{f.name}") + + save_name = Workflow.upload_file(f, file_path) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) + return JsonResponse({"filename": save_name}, status=201, safe=False) + + @swagger_auto_schema(method='get', operation_summary='Retrieve a list of installed Nodes', operation_description='Retrieves a list of installed Nodes, in JSON.', @@ -289,6 +269,7 @@ def retrieve_nodes_for_user(request): data.move_to_end('Custom Nodes') return JsonResponse(data, safe=False) + @swagger_auto_schema(method='post', operation_summary='Downloads a file from the server', operation_description='Downloads a file associated with Node from server.', @@ -324,15 +305,3 @@ def download_file(request): status=404) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) - - -def check_missing_packages(node_path): - finder = ModuleFinder(node_path) - finder.run_script(node_path) - - uninstalled = list() - for missing_package in finder.badmodules.keys(): - if missing_package not in sys.modules: - uninstalled.append(missing_package) - - return uninstalled From 551f8c2672239b27cff457d6756d1c044b4d3f96 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:28:59 -0400 Subject: [PATCH 13/20] refactor: Clean up Node parsing; remove print statements --- pyworkflow/pyworkflow/workflow.py | 108 ++++++++++++++++-------------- vp/workflow/views.py | 2 - 2 files changed, 58 insertions(+), 52 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 1a00458..fbfe6b1 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -54,54 +54,40 @@ def flow_vars(self): def get_packaged_nodes(self, root_path=None, node_type=None): if root_path is None: - root_path = self.custom_node_dir - print(f"\n\n{root_path}\n-----------------------") - # Get list of files in path + root_path = self.node_dir + try: files = os.listdir(root_path) - except OSError as e: + except OSError: return None - print(files) nodes = list() data = OrderedDict() for file in files: - # Check file is not a dir - node_path = os.path.join(root_path, file) - if os.path.isdir(node_path): - # Recurse per `node_type` - alt_node_type = file.replace('_', ' ').title() + file_path = os.path.join(root_path, file) - data[alt_node_type] = self.get_packaged_nodes(node_path, file) + if os.path.isdir(file_path): + # Recurse if file is a directory + display_name = WorkflowUtils.get_display_name(file) + data[display_name] = self.get_packaged_nodes(file_path, file) continue + # Otherwise, try parsing file for a Node class node, ext = os.path.splitext(file) + # Skip init files or non-Python files if node == '__init__' or ext != '.py': continue - try: - module = importlib.import_module('pyworkflow.nodes.' + node_type + '.' + node) - except ModuleNotFoundError as e: - print("MODULE NOT FOUND") - nodes.append({ - "filename": node, - "missing_packages": WorkflowUtils.check_missing_packages(node_path) - }) - continue - - for name, klass in inspect.getmembers(module): - if inspect.isclass(klass) and klass.__module__.startswith('pyworkflow.nodes.' + node_type): - parsed_node = WorkflowUtils.extract_node_info(node_type, klass) - if node_type == 'custom_nodes': - parsed_node['filename'] = node - - nodes.append(parsed_node) + nodes.append(WorkflowUtils.extract_node_info(file_path, node, node_type)) - if root_path == self.custom_node_dir: + if root_path == self.node_dir: + # When traversal returns to `node_dir` return the entire OrderedDict() + data.move_to_end('Custom Nodes') return data else: + # Otherwise, return list containing all Nodes of a `node_type` return nodes def get_node(self, node_id): @@ -547,6 +533,13 @@ def execute_workflow(workflow_location): class WorkflowUtils: + @staticmethod + def get_display_name(file): + if file == 'io': + return 'I/O' + else: + return file.replace('_', ' ').title() + @staticmethod def set_custom_nodes_dir(custom_node_path): if custom_node_path is None: @@ -572,36 +565,51 @@ def check_missing_packages(node_path): finder = ModuleFinder(node_path) finder.run_script(node_path) - print("CHECKING PACKAGES") - print(finder.badmodules) uninstalled = list() for missing_package in finder.badmodules.keys(): if missing_package not in sys.modules: uninstalled.append(missing_package) - print(uninstalled) return uninstalled @staticmethod - def extract_node_info(parent, node): - # TODO: check attribute(s) accessing is handled correctly + def extract_node_info(file_path, node, node_type): + # Check Node file for missing packages try: - color = node.color - except AttributeError: - color = 'black' - - return { - 'name': node.name, - 'node_key': node.__name__, - 'node_type': parent, - 'num_in': node.num_in, - 'num_out': node.num_out, - 'color': color, - 'doc': node.__doc__, - 'options': {k: v.get_value() for k, v in node.options.items()}, - 'option_types': node.option_types, - 'download_result': getattr(node, "download_result", False) - } + module = importlib.import_module('pyworkflow.nodes.' + node_type + '.' + node) + except ModuleNotFoundError: + return { + "filename": node, + "missing_packages": WorkflowUtils.check_missing_packages(file_path) + } + + # Parse module for Node Class information + for name, klass in inspect.getmembers(module): + if inspect.isclass(klass) and klass.__module__.startswith('pyworkflow.nodes.' + node_type): + try: + color = klass.color + except AttributeError: + color = 'black' + + parsed_node = { + 'name': klass.name, + 'node_key': klass.__name__, + 'node_type': node_type, + 'num_in': klass.num_in, + 'num_out': klass.num_out, + 'color': color, + 'doc': klass.__doc__, + 'options': {k: v.get_value() for k, v in klass.options.items()}, + 'option_types': klass.option_types, + 'download_result': getattr(klass, "download_result", False) + } + + if node_type == 'custom_nodes': + parsed_node['filename'] = node + + return parsed_node + + return None class WorkflowException(Exception): diff --git a/vp/workflow/views.py b/vp/workflow/views.py index 8145e5b..6ad5283 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -264,9 +264,7 @@ def retrieve_nodes_for_user(request): List is split into 'types' (e.g., 'IO' and 'Manipulation') and 'keys', or individual command Nodes (e.g., 'ReadCsv', 'Pivot'). """ - packaged_nodes = os.path.join(os.getcwd(), '../pyworkflow/pyworkflow/nodes') data = request.pyworkflow.get_packaged_nodes() - data.move_to_end('Custom Nodes') return JsonResponse(data, safe=False) From 8202042f54a6215d353b9380d4ab3074a62a26b9 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:51:03 -0400 Subject: [PATCH 14/20] chore: Update Pipfile.lock after pipenv install --- Pipfile.lock | 123 ++++++++++++++++++++++++--------------------------- 1 file changed, 59 insertions(+), 64 deletions(-) diff --git a/Pipfile.lock b/Pipfile.lock index e766295..68bb8be 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "a1add4e8cf6f80713942e74aabd4616cf10e546d88aff23e3d4ce8ba47a1d5df" + "sha256": "dce2012d65e3fe72d898cc6effffb73c3a37eeb894bfe86f1dd216d65352cc13" }, "pipfile-spec": 6, "requires": { @@ -25,17 +25,17 @@ }, "autopep8": { "hashes": [ - "sha256:0f592a0447acea0c2b0a9602be1e4e3d86db52badd2e3c84f0193bfd89fd3a43" + "sha256:152fd8fe47d02082be86e05001ec23d6f420086db56b17fc883f3f965fb34954" ], "index": "pypi", - "version": "==1.5" + "version": "==1.5.2" }, "certifi": { "hashes": [ - "sha256:017c25db2a153ce562900032d5bc68e9f191e44e9a0f762f373977de9df1fbb3", - "sha256:25b64c7da4cd7479594d035c08c2d809eb4aab3a26e5a990ea98cc450c320f1f" + "sha256:1d987a998c75633c40847cc966fcf5904906c920a7f17ef374f5aa4282abd304", + "sha256:51fcb31174be6e6664c5f69e3e1691a2d72a1a12e90f872cbdb1567eb47b6519" ], - "version": "==2019.11.28" + "version": "==2020.4.5.1" }, "chardet": { "hashes": [ @@ -44,6 +44,18 @@ ], "version": "==3.0.4" }, + "cli": { + "editable": true, + "path": "./CLI" + }, + "click": { + "hashes": [ + "sha256:8a18b4ea89d8820c5d0c7da8a64b2c324b4dabb695804dbfea19b9be9d88c0cc", + "sha256:e345d143d80bf5ee7534056164e5e112ea5e22716bbb1ce727941f4c8b471b9a" + ], + "index": "pypi", + "version": "==7.1.1" + }, "coreapi": { "hashes": [ "sha256:46145fcc1f7017c076a2ef684969b641d18a2991051fddec9458ad3f78ffc1cb", @@ -67,11 +79,11 @@ }, "django": { "hashes": [ - "sha256:50b781f6cbeb98f673aa76ed8e572a019a45e52bdd4ad09001072dfd91ab07c8", - "sha256:89e451bfbb815280b137e33e454ddd56481fdaa6334054e6e031041ee1eda360" + "sha256:642d8eceab321ca743ae71e0f985ff8fdca59f07aab3a9fb362c617d23e33a76", + "sha256:d4666c2edefa38c5ede0ec1655424c56dc47ceb04b6d8d62a7eac09db89545c1" ], "index": "pypi", - "version": "==3.0.4" + "version": "==3.0.5" }, "djangorestframework": { "hashes": [ @@ -98,22 +110,24 @@ }, "inflection": { "hashes": [ - "sha256:18ea7fb7a7d152853386523def08736aa8c32636b047ade55f7578c4edeb16ca" + "sha256:32a5c3341d9583ec319548b9015b7fbdf8c429cbcb575d326c33ae3a0e90d52c", + "sha256:9a15d3598f01220e93f2207c432cfede50daff53137ce660fb8be838ef1ca6cc" ], - "version": "==0.3.1" + "version": "==0.4.0" }, "itypes": { "hashes": [ - "sha256:c6e77bb9fd68a4bfeb9d958fea421802282451a25bac4913ec94db82a899c073" + "sha256:03da6872ca89d29aef62773672b2d408f490f80db48b23079a4b194c86dd04c6", + "sha256:af886f129dea4a2a1e3d36595a2d139589e4dd287f5cab0b40e799ee81570ff1" ], - "version": "==1.1.0" + "version": "==1.2.0" }, "jinja2": { "hashes": [ - "sha256:93187ffbc7808079673ef52771baa950426fd664d3aad1d0fa3e95644360e250", - "sha256:b0eaf100007721b5c16c1fc1eecb87409464edc10469ddc9a22a27a99123be49" + "sha256:89aab215427ef59c34ad58735269eb58b1a5808103067f7bb9d5836c651b3bb0", + "sha256:f0a4641d3cf955324a89c04f3d94663aa4d638abe8f733ecd3582848e1c37035" ], - "version": "==2.11.1" + "version": "==2.11.2" }, "markupsafe": { "hashes": [ @@ -163,29 +177,29 @@ }, "numpy": { "hashes": [ - "sha256:1598a6de323508cfeed6b7cd6c4efb43324f4692e20d1f76e1feec7f59013448", - "sha256:1b0ece94018ae21163d1f651b527156e1f03943b986188dd81bc7e066eae9d1c", - "sha256:2e40be731ad618cb4974d5ba60d373cdf4f1b8dcbf1dcf4d9dff5e212baf69c5", - "sha256:4ba59db1fcc27ea31368af524dcf874d9277f21fd2e1f7f1e2e0c75ee61419ed", - "sha256:59ca9c6592da581a03d42cc4e270732552243dc45e87248aa8d636d53812f6a5", - "sha256:5e0feb76849ca3e83dd396254e47c7dba65b3fa9ed3df67c2556293ae3e16de3", - "sha256:6d205249a0293e62bbb3898c4c2e1ff8a22f98375a34775a259a0523111a8f6c", - "sha256:6fcc5a3990e269f86d388f165a089259893851437b904f422d301cdce4ff25c8", - "sha256:82847f2765835c8e5308f136bc34018d09b49037ec23ecc42b246424c767056b", - "sha256:87902e5c03355335fc5992a74ba0247a70d937f326d852fc613b7f53516c0963", - "sha256:9ab21d1cb156a620d3999dd92f7d1c86824c622873841d6b080ca5495fa10fef", - "sha256:a1baa1dc8ecd88fb2d2a651671a84b9938461e8a8eed13e2f0a812a94084d1fa", - "sha256:a244f7af80dacf21054386539699ce29bcc64796ed9850c99a34b41305630286", - "sha256:a35af656a7ba1d3decdd4fae5322b87277de8ac98b7d9da657d9e212ece76a61", - "sha256:b1fe1a6f3a6f355f6c29789b5927f8bd4f134a4bd9a781099a7c4f66af8850f5", - "sha256:b5ad0adb51b2dee7d0ee75a69e9871e2ddfb061c73ea8bc439376298141f77f5", - "sha256:ba3c7a2814ec8a176bb71f91478293d633c08582119e713a0c5351c0f77698da", - "sha256:cd77d58fb2acf57c1d1ee2835567cd70e6f1835e32090538f17f8a3a99e5e34b", - "sha256:cdb3a70285e8220875e4d2bc394e49b4988bdb1298ffa4e0bd81b2f613be397c", - "sha256:deb529c40c3f1e38d53d5ae6cd077c21f1d49e13afc7936f7f868455e16b64a0", - "sha256:e7894793e6e8540dbeac77c87b489e331947813511108ae097f1715c018b8f3d" - ], - "version": "==1.18.2" + "sha256:0aa2b318cf81eb1693fcfcbb8007e95e231d7e1aa24288137f3b19905736c3ee", + "sha256:163c78c04f47f26ca1b21068cea25ed7c5ecafe5f5ab2ea4895656a750582b56", + "sha256:1e37626bcb8895c4b3873fcfd54e9bfc5ffec8d0f525651d6985fcc5c6b6003c", + "sha256:264fd15590b3f02a1fbc095e7e1f37cdac698ff3829e12ffdcffdce3772f9d44", + "sha256:3d9e1554cd9b5999070c467b18e5ae3ebd7369f02706a8850816f576a954295f", + "sha256:40c24960cd5cec55222963f255858a1c47c6fa50a65a5b03fd7de75e3700eaaa", + "sha256:46f404314dbec78cb342904f9596f25f9b16e7cf304030f1339e553c8e77f51c", + "sha256:4847f0c993298b82fad809ea2916d857d0073dc17b0510fbbced663b3265929d", + "sha256:48e15612a8357393d176638c8f68a19273676877caea983f8baf188bad430379", + "sha256:6725d2797c65598778409aba8cd67077bb089d5b7d3d87c2719b206dc84ec05e", + "sha256:99f0ba97e369f02a21bb95faa3a0de55991fd5f0ece2e30a9e2eaebeac238921", + "sha256:a41f303b3f9157a31ce7203e3ca757a0c40c96669e72d9b6ee1bce8507638970", + "sha256:a4305564e93f5c4584f6758149fd446df39fd1e0a8c89ca0deb3cce56106a027", + "sha256:a551d8cc267c634774830086da42e4ba157fa41dd3b93982bc9501b284b0c689", + "sha256:a6bc9432c2640b008d5f29bad737714eb3e14bb8854878eacf3d7955c4e91c36", + "sha256:c60175d011a2e551a2f74c84e21e7c982489b96b6a5e4b030ecdeacf2914da68", + "sha256:e46e2384209c91996d5ec16744234d1c906ab79a701ce1a26155c9ec890b8dc8", + "sha256:e607b8cdc2ae5d5a63cd1bec30a15b5ed583ac6a39f04b7ba0f03fcfbf29c05b", + "sha256:e94a39d5c40fffe7696009dbd11bc14a349b377e03a384ed011e03d698787dd3", + "sha256:eb2286249ebfe8fcb5b425e5ec77e4736d53ee56d3ad296f8947f67150f495e3", + "sha256:fdee7540d12519865b423af411bd60ddb513d2eb2cd921149b732854995bbf8b" + ], + "version": "==1.18.3" }, "packaging": { "hashes": [ @@ -216,25 +230,6 @@ "index": "pypi", "version": "==1.0.3" }, - "psycopg2": { - "hashes": [ - "sha256:4212ca404c4445dc5746c0d68db27d2cbfb87b523fe233dc84ecd24062e35677", - "sha256:47fc642bf6f427805daf52d6e52619fe0637648fe27017062d898f3bf891419d", - "sha256:72772181d9bad1fa349792a1e7384dde56742c14af2b9986013eb94a240f005b", - "sha256:8396be6e5ff844282d4d49b81631772f80dabae5658d432202faf101f5283b7c", - "sha256:893c11064b347b24ecdd277a094413e1954f8a4e8cdaf7ffbe7ca3db87c103f0", - "sha256:92a07dfd4d7c325dd177548c4134052d4842222833576c8391aab6f74038fc3f", - "sha256:965c4c93e33e6984d8031f74e51227bd755376a9df6993774fd5b6fb3288b1f4", - "sha256:9ab75e0b2820880ae24b7136c4d230383e07db014456a476d096591172569c38", - "sha256:b0845e3bdd4aa18dc2f9b6fb78fbd3d9d371ad167fd6d1b7ad01c0a6cdad4fc6", - "sha256:dca2d7203f0dfce8ea4b3efd668f8ea65cd2b35112638e488a4c12594015f67b", - "sha256:ed686e5926929887e2c7ae0a700e32c6129abb798b4ad2b846e933de21508151", - "sha256:ef6df7e14698e79c59c7ee7cf94cd62e5b869db369ed4b1b8f7b729ea825712a", - "sha256:f898e5cc0a662a9e12bde6f931263a1bbd350cfb18e1d5336a12927851825bb6" - ], - "index": "pypi", - "version": "==2.8.4" - }, "pycodestyle": { "hashes": [ "sha256:95a2219d12372f05704562a14ec30bc76b05a5b297b21a5dfe3f6fac3491ae56", @@ -244,10 +239,10 @@ }, "pyparsing": { "hashes": [ - "sha256:4c830582a84fb022400b85429791bc551f1f4871c33f23e44f353119e92f969f", - "sha256:c342dccb5250c08d45fd6f8b4a559613ca603b57498511740e65cd11a2e7dcec" + "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1", + "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b" ], - "version": "==2.4.6" + "version": "==2.4.7" }, "python-dateutil": { "hashes": [ @@ -337,10 +332,10 @@ }, "urllib3": { "hashes": [ - "sha256:2f3db8b19923a873b3e5256dc9c2dedfa883e33d87c690d9c7913e1f40673cdc", - "sha256:87716c2d2a7121198ebcb7ce7cccf6ce5e9ba539041cfbaeecfb641dc0bf6acc" + "sha256:3018294ebefce6572a474f0604c2021e33b3fd8006ecd11d62107a5d2a963527", + "sha256:88206b0eb87e6d677d424843ac5209e3fb9d0190d0ee169599165ec25e9d9115" ], - "version": "==1.25.8" + "version": "==1.25.9" } }, "develop": {} From e54b897fe1dba9496eefe65f3a9b00db5869adc9 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:51:26 -0400 Subject: [PATCH 15/20] doc: Add information about Node-parsing methods --- pyworkflow/pyworkflow/workflow.py | 59 +++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index fbfe6b1..162c950 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -53,6 +53,34 @@ def flow_vars(self): return self._flow_vars def get_packaged_nodes(self, root_path=None, node_type=None): + """Retrieve list of Nodes available to the Workflow. + + Recursively searches the `root_path` where Nodes are located. If none + specified, the default is `self.node_path`. + + Each directory represents a given `node_type` (e.g. 'manipulation', + 'io', etc.). Individual Node classes are defined in files within these + directories. Any custom nodes that the user has installed are included + in this search, given they are located in the 'custom_nodes' directory. + + Args: + root_path: Root location where Nodes are defined. + node_type: The type of Node, defined by sub-directory name. + + Returns: + OrderedDict() of Nodes, structured like the following + + { + 'I/O': [ + {node1}, + {node2}, + ], + 'Manipulation': [ + ... + ] + ... + } + """ if root_path is None: root_path = self.node_dir @@ -80,7 +108,7 @@ def get_packaged_nodes(self, root_path=None, node_type=None): if node == '__init__' or ext != '.py': continue - nodes.append(WorkflowUtils.extract_node_info(file_path, node, node_type)) + nodes.append(WorkflowUtils.extract_node_info(node_type, node, file_path)) if root_path == self.node_dir: # When traversal returns to `node_dir` return the entire OrderedDict() @@ -562,6 +590,18 @@ def set_root_dir(root_dir): @staticmethod def check_missing_packages(node_path): + """Check Python file for uninstalled packages. + + When compiling the list of installed Nodes for a Workflow, if any + module throws a `ModuleNotFoundError`, this method is called to + compile a list of missing packages. + + Args: + node_path: Location of the Node file. + + Returns: + list of package names that are not installed + """ finder = ModuleFinder(node_path) finder.run_script(node_path) @@ -573,7 +613,22 @@ def check_missing_packages(node_path): return uninstalled @staticmethod - def extract_node_info(file_path, node, node_type): + def extract_node_info(node_type, node, file_path): + """Extract information about a Node Class from a Python file. + + Takes an individual Python file, representing a Node subclass and + extracts the attributes needed. If a module has packages that are not + installed, the filename and missing package names are returned. + + Args: + node_type: The type of Node. + node: Name of the specific Node. + file_path: Where the Node's file is located. + + Returns: + dict-like with extracted Node information. On `ModuleNotFoundError` + the filename and missing packages are returned. + """ # Check Node file for missing packages try: module = importlib.import_module('pyworkflow.nodes.' + node_type + '.' + node) From d6eb93420678c2c9c66a6bd9c6ad672fdff6125f Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 11:54:34 -0400 Subject: [PATCH 16/20] fix: Change Workflow accessors from private->public attributes --- pyworkflow/pyworkflow/workflow.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 162c950..36c7033 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -217,10 +217,10 @@ def add_edge(self, node_from: Node, node_to: Node): from_id = node_from.node_id to_id = node_to.node_id - if self._graph.has_edge(from_id, to_id): + if self.graph.has_edge(from_id, to_id): raise WorkflowException('add_node', 'Edge between nodes already exists.') - self._graph.add_edge(from_id, to_id) + self.graph.add_edge(from_id, to_id) return (from_id, to_id) @@ -237,7 +237,7 @@ def remove_edge(self, node_from: Node, node_to: Node): to_id = node_to.node_id try: - self._graph.remove_edge(from_id, to_id) + self.graph.remove_edge(from_id, to_id) except nx.NetworkXError: raise WorkflowException('remove_edge', 'Edge from %s to %s does not exist in graph.' % (from_id, to_id)) @@ -260,13 +260,13 @@ def remove_node(self, node): def get_node_successors(self, node_id): try: - return list(self._graph.successors(node_id)) + return list(self.graph.successors(node_id)) except nx.NetworkXError as e: raise WorkflowException('get node successors', str(e)) def get_node_predecessors(self, node_id): try: - return list(self._graph.predecessors(node_id)) + return list(self.graph.predecessors(node_id)) except nx.NetworkXError as e: raise WorkflowException('get node predecessors', str(e)) @@ -385,7 +385,7 @@ def load_input_data(self, node_id): def execution_order(self): try: - return list(nx.topological_sort(self._graph)) + return list(nx.topological_sort(self.graph)) except (nx.NetworkXError, nx.NetworkXUnfeasible) as e: raise WorkflowException('execution order', str(e)) except RuntimeError as e: From d64d9a3767f2bcfc8dc03220c53573d60fca6196 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 12:15:05 -0400 Subject: [PATCH 17/20] fix: Use filename attribute for all Nodes --- pyworkflow/pyworkflow/node.py | 2 +- pyworkflow/pyworkflow/workflow.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index c9556e6..c59c521 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -14,7 +14,7 @@ def __init__(self, node_info): self.node_type = node_info.get('node_type') self.node_key = node_info.get('node_key') self.data = node_info.get('data') - + self.filename = node_info.get('filename') self.is_global = node_info.get('is_global') is True self.option_values = dict() diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 36c7033..b548cec 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -653,14 +653,15 @@ def extract_node_info(node_type, node, file_path): 'num_in': klass.num_in, 'num_out': klass.num_out, 'color': color, + 'filename': node, 'doc': klass.__doc__, 'options': {k: v.get_value() for k, v in klass.options.items()}, 'option_types': klass.option_types, 'download_result': getattr(klass, "download_result", False) } - if node_type == 'custom_nodes': - parsed_node['filename'] = node + # if node_type == 'custom_nodes': + # parsed_node['filename'] = node return parsed_node From 4961eb0d49fc04dabb44c1609c179fb48398f31f Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 24 Apr 2020 13:08:28 -0400 Subject: [PATCH 18/20] test: Update Postman runner with new `node_type` IDs --- Postman/PyWorkflow-runner.postman_collection.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Postman/PyWorkflow-runner.postman_collection.json b/Postman/PyWorkflow-runner.postman_collection.json index 2af6bcb..a575eab 100644 --- a/Postman/PyWorkflow-runner.postman_collection.json +++ b/Postman/PyWorkflow-runner.postman_collection.json @@ -88,7 +88,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"1\",\n \"node_type\": \"IONode\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"1\",\n \"node_type\": \"io\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample.csv\",\n \t\"sep\": \",\"\n }\n}", "options": { "raw": { "language": "json" @@ -132,7 +132,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"2\",\n \"node_type\": \"IONode\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample2.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"2\",\n \"node_type\": \"io\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample2.csv\"\n }\n}", "options": { "raw": { "language": "json" @@ -180,7 +180,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"3\",\n \"node_type\": \"ManipulationNode\",\n \"node_key\": \"JoinNode\",\n \"options\": {\n \t\"on\": \"key\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"3\",\n \"node_type\": \"manipulation\",\n \"node_key\": \"JoinNode\",\n \"options\": {\n \t\"on\": \"key\"\n }\n}", "options": { "raw": { "language": "json" @@ -228,7 +228,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"IONode\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample_out.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"io\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample_out.csv\"\n }\n}", "options": { "raw": { "language": "json" @@ -276,7 +276,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"IONode\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"path_or_buf\": \"/tmp/sample_out.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"io\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"path_or_buf\": \"/tmp/sample_out.csv\"\n }\n}", "options": { "raw": { "language": "json" From 3b970af3a513462743f91b6a52eb8375e5db0932 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sat, 25 Apr 2020 14:20:56 -0400 Subject: [PATCH 19/20] refactor: unified `set_dir()` to make dirs; catch OSError on init --- pyworkflow/pyworkflow/workflow.py | 51 +++++++++++++++++++------------ 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index b548cec..43db599 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -23,12 +23,20 @@ class Workflow: flow_vars: Global flow variables associated with workflow """ - def __init__(self, name="Untitled", root_dir=None, node_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): - self._name = name - self._root_dir = WorkflowUtils.set_root_dir(root_dir) - self._node_dir = WorkflowUtils.set_custom_nodes_dir(node_dir) - self._graph = graph - self._flow_vars = flow_vars + DEFAULT_ROOT_PATH = os.getcwd() + DEFAULT_NODE_PATH = os.path.join(os.getcwd(), '../pyworkflow/pyworkflow/nodes') + + def __init__(self, name="Untitled", root_dir=DEFAULT_ROOT_PATH, + node_dir=DEFAULT_NODE_PATH, graph=nx.DiGraph(), + flow_vars=nx.Graph()): + try: + self._name = name + self._root_dir = WorkflowUtils.set_dir(root_dir) + self._node_dir = WorkflowUtils.set_dir(node_dir, custom_nodes=True) + self._graph = graph + self._flow_vars = flow_vars + except OSError as e: + raise WorkflowException('init workflow', str(e)) @property def node_dir(self): @@ -569,24 +577,29 @@ def get_display_name(file): return file.replace('_', ' ').title() @staticmethod - def set_custom_nodes_dir(custom_node_path): - if custom_node_path is None: - custom_node_path = os.path.join(os.getcwd(), '../pyworkflow/pyworkflow/nodes') + def set_dir(dir_path, custom_nodes=False): + """Makes directories to ensure path is valid. - if not os.path.exists(custom_node_path): - os.makedirs(custom_node_path) + To prevent OSErrors for missing directories, especially in the case of + non-default Workflow locations, this method ensures the `dir_path` + specified exists. - return custom_node_path + Args: + dir_path: Fully-qualified directory to check/make + custom_nodes: Creates a 'custom_nodes' sub directory when True - @staticmethod - def set_root_dir(root_dir): - if root_dir is None: - root_dir = os.getcwd() + Returns: + The `dir_path` requested, with guarantee the path exists. + """ + if custom_nodes: + make_dir = os.path.join(dir_path, 'custom_nodes') + else: + make_dir = dir_path - if not os.path.exists(root_dir): - os.makedirs(root_dir) + if not os.path.exists(make_dir): + os.makedirs(make_dir) - return root_dir + return dir_path @staticmethod def check_missing_packages(node_path): From 983a3ae5bc0992b9e9d019aa4d67897aaa2b6b31 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Sat, 25 Apr 2020 16:59:20 -0400 Subject: [PATCH 20/20] fix: Typo in 'custom_nodes' --- vp/workflow/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vp/workflow/views.py b/vp/workflow/views.py index 6ad5283..15769a1 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -238,7 +238,7 @@ def upload_file(request): if node_id is None: # custom node file - file_path = request.pyworkflow.node_path('custom_node', f.name) + file_path = request.pyworkflow.node_path('custom_nodes', f.name) else: # node data file file_path = request.pyworkflow.path(f"{node_id}-{f.name}")