diff --git a/Pipfile.lock b/Pipfile.lock index e766295..68bb8be 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "a1add4e8cf6f80713942e74aabd4616cf10e546d88aff23e3d4ce8ba47a1d5df" + "sha256": "dce2012d65e3fe72d898cc6effffb73c3a37eeb894bfe86f1dd216d65352cc13" }, "pipfile-spec": 6, "requires": { @@ -25,17 +25,17 @@ }, "autopep8": { "hashes": [ - "sha256:0f592a0447acea0c2b0a9602be1e4e3d86db52badd2e3c84f0193bfd89fd3a43" + "sha256:152fd8fe47d02082be86e05001ec23d6f420086db56b17fc883f3f965fb34954" ], "index": "pypi", - "version": "==1.5" + "version": "==1.5.2" }, "certifi": { "hashes": [ - "sha256:017c25db2a153ce562900032d5bc68e9f191e44e9a0f762f373977de9df1fbb3", - "sha256:25b64c7da4cd7479594d035c08c2d809eb4aab3a26e5a990ea98cc450c320f1f" + "sha256:1d987a998c75633c40847cc966fcf5904906c920a7f17ef374f5aa4282abd304", + "sha256:51fcb31174be6e6664c5f69e3e1691a2d72a1a12e90f872cbdb1567eb47b6519" ], - "version": "==2019.11.28" + "version": "==2020.4.5.1" }, "chardet": { "hashes": [ @@ -44,6 +44,18 @@ ], "version": "==3.0.4" }, + "cli": { + "editable": true, + "path": "./CLI" + }, + "click": { + "hashes": [ + "sha256:8a18b4ea89d8820c5d0c7da8a64b2c324b4dabb695804dbfea19b9be9d88c0cc", + "sha256:e345d143d80bf5ee7534056164e5e112ea5e22716bbb1ce727941f4c8b471b9a" + ], + "index": "pypi", + "version": "==7.1.1" + }, "coreapi": { "hashes": [ "sha256:46145fcc1f7017c076a2ef684969b641d18a2991051fddec9458ad3f78ffc1cb", @@ -67,11 +79,11 @@ }, "django": { "hashes": [ - "sha256:50b781f6cbeb98f673aa76ed8e572a019a45e52bdd4ad09001072dfd91ab07c8", - "sha256:89e451bfbb815280b137e33e454ddd56481fdaa6334054e6e031041ee1eda360" + "sha256:642d8eceab321ca743ae71e0f985ff8fdca59f07aab3a9fb362c617d23e33a76", + "sha256:d4666c2edefa38c5ede0ec1655424c56dc47ceb04b6d8d62a7eac09db89545c1" ], "index": "pypi", - "version": "==3.0.4" + "version": "==3.0.5" }, "djangorestframework": { "hashes": [ @@ -98,22 +110,24 @@ }, "inflection": { "hashes": [ - "sha256:18ea7fb7a7d152853386523def08736aa8c32636b047ade55f7578c4edeb16ca" + "sha256:32a5c3341d9583ec319548b9015b7fbdf8c429cbcb575d326c33ae3a0e90d52c", + "sha256:9a15d3598f01220e93f2207c432cfede50daff53137ce660fb8be838ef1ca6cc" ], - "version": "==0.3.1" + "version": "==0.4.0" }, "itypes": { "hashes": [ - "sha256:c6e77bb9fd68a4bfeb9d958fea421802282451a25bac4913ec94db82a899c073" + "sha256:03da6872ca89d29aef62773672b2d408f490f80db48b23079a4b194c86dd04c6", + "sha256:af886f129dea4a2a1e3d36595a2d139589e4dd287f5cab0b40e799ee81570ff1" ], - "version": "==1.1.0" + "version": "==1.2.0" }, "jinja2": { "hashes": [ - "sha256:93187ffbc7808079673ef52771baa950426fd664d3aad1d0fa3e95644360e250", - "sha256:b0eaf100007721b5c16c1fc1eecb87409464edc10469ddc9a22a27a99123be49" + "sha256:89aab215427ef59c34ad58735269eb58b1a5808103067f7bb9d5836c651b3bb0", + "sha256:f0a4641d3cf955324a89c04f3d94663aa4d638abe8f733ecd3582848e1c37035" ], - "version": "==2.11.1" + "version": "==2.11.2" }, "markupsafe": { "hashes": [ @@ -163,29 +177,29 @@ }, "numpy": { "hashes": [ - "sha256:1598a6de323508cfeed6b7cd6c4efb43324f4692e20d1f76e1feec7f59013448", - "sha256:1b0ece94018ae21163d1f651b527156e1f03943b986188dd81bc7e066eae9d1c", - "sha256:2e40be731ad618cb4974d5ba60d373cdf4f1b8dcbf1dcf4d9dff5e212baf69c5", - "sha256:4ba59db1fcc27ea31368af524dcf874d9277f21fd2e1f7f1e2e0c75ee61419ed", - "sha256:59ca9c6592da581a03d42cc4e270732552243dc45e87248aa8d636d53812f6a5", - "sha256:5e0feb76849ca3e83dd396254e47c7dba65b3fa9ed3df67c2556293ae3e16de3", - "sha256:6d205249a0293e62bbb3898c4c2e1ff8a22f98375a34775a259a0523111a8f6c", - "sha256:6fcc5a3990e269f86d388f165a089259893851437b904f422d301cdce4ff25c8", - "sha256:82847f2765835c8e5308f136bc34018d09b49037ec23ecc42b246424c767056b", - "sha256:87902e5c03355335fc5992a74ba0247a70d937f326d852fc613b7f53516c0963", - "sha256:9ab21d1cb156a620d3999dd92f7d1c86824c622873841d6b080ca5495fa10fef", - "sha256:a1baa1dc8ecd88fb2d2a651671a84b9938461e8a8eed13e2f0a812a94084d1fa", - "sha256:a244f7af80dacf21054386539699ce29bcc64796ed9850c99a34b41305630286", - "sha256:a35af656a7ba1d3decdd4fae5322b87277de8ac98b7d9da657d9e212ece76a61", - "sha256:b1fe1a6f3a6f355f6c29789b5927f8bd4f134a4bd9a781099a7c4f66af8850f5", - "sha256:b5ad0adb51b2dee7d0ee75a69e9871e2ddfb061c73ea8bc439376298141f77f5", - "sha256:ba3c7a2814ec8a176bb71f91478293d633c08582119e713a0c5351c0f77698da", - "sha256:cd77d58fb2acf57c1d1ee2835567cd70e6f1835e32090538f17f8a3a99e5e34b", - "sha256:cdb3a70285e8220875e4d2bc394e49b4988bdb1298ffa4e0bd81b2f613be397c", - "sha256:deb529c40c3f1e38d53d5ae6cd077c21f1d49e13afc7936f7f868455e16b64a0", - "sha256:e7894793e6e8540dbeac77c87b489e331947813511108ae097f1715c018b8f3d" - ], - "version": "==1.18.2" + "sha256:0aa2b318cf81eb1693fcfcbb8007e95e231d7e1aa24288137f3b19905736c3ee", + "sha256:163c78c04f47f26ca1b21068cea25ed7c5ecafe5f5ab2ea4895656a750582b56", + "sha256:1e37626bcb8895c4b3873fcfd54e9bfc5ffec8d0f525651d6985fcc5c6b6003c", + "sha256:264fd15590b3f02a1fbc095e7e1f37cdac698ff3829e12ffdcffdce3772f9d44", + "sha256:3d9e1554cd9b5999070c467b18e5ae3ebd7369f02706a8850816f576a954295f", + "sha256:40c24960cd5cec55222963f255858a1c47c6fa50a65a5b03fd7de75e3700eaaa", + "sha256:46f404314dbec78cb342904f9596f25f9b16e7cf304030f1339e553c8e77f51c", + "sha256:4847f0c993298b82fad809ea2916d857d0073dc17b0510fbbced663b3265929d", + "sha256:48e15612a8357393d176638c8f68a19273676877caea983f8baf188bad430379", + "sha256:6725d2797c65598778409aba8cd67077bb089d5b7d3d87c2719b206dc84ec05e", + "sha256:99f0ba97e369f02a21bb95faa3a0de55991fd5f0ece2e30a9e2eaebeac238921", + "sha256:a41f303b3f9157a31ce7203e3ca757a0c40c96669e72d9b6ee1bce8507638970", + "sha256:a4305564e93f5c4584f6758149fd446df39fd1e0a8c89ca0deb3cce56106a027", + "sha256:a551d8cc267c634774830086da42e4ba157fa41dd3b93982bc9501b284b0c689", + "sha256:a6bc9432c2640b008d5f29bad737714eb3e14bb8854878eacf3d7955c4e91c36", + "sha256:c60175d011a2e551a2f74c84e21e7c982489b96b6a5e4b030ecdeacf2914da68", + "sha256:e46e2384209c91996d5ec16744234d1c906ab79a701ce1a26155c9ec890b8dc8", + "sha256:e607b8cdc2ae5d5a63cd1bec30a15b5ed583ac6a39f04b7ba0f03fcfbf29c05b", + "sha256:e94a39d5c40fffe7696009dbd11bc14a349b377e03a384ed011e03d698787dd3", + "sha256:eb2286249ebfe8fcb5b425e5ec77e4736d53ee56d3ad296f8947f67150f495e3", + "sha256:fdee7540d12519865b423af411bd60ddb513d2eb2cd921149b732854995bbf8b" + ], + "version": "==1.18.3" }, "packaging": { "hashes": [ @@ -216,25 +230,6 @@ "index": "pypi", "version": "==1.0.3" }, - "psycopg2": { - "hashes": [ - "sha256:4212ca404c4445dc5746c0d68db27d2cbfb87b523fe233dc84ecd24062e35677", - "sha256:47fc642bf6f427805daf52d6e52619fe0637648fe27017062d898f3bf891419d", - "sha256:72772181d9bad1fa349792a1e7384dde56742c14af2b9986013eb94a240f005b", - "sha256:8396be6e5ff844282d4d49b81631772f80dabae5658d432202faf101f5283b7c", - "sha256:893c11064b347b24ecdd277a094413e1954f8a4e8cdaf7ffbe7ca3db87c103f0", - "sha256:92a07dfd4d7c325dd177548c4134052d4842222833576c8391aab6f74038fc3f", - "sha256:965c4c93e33e6984d8031f74e51227bd755376a9df6993774fd5b6fb3288b1f4", - "sha256:9ab75e0b2820880ae24b7136c4d230383e07db014456a476d096591172569c38", - "sha256:b0845e3bdd4aa18dc2f9b6fb78fbd3d9d371ad167fd6d1b7ad01c0a6cdad4fc6", - "sha256:dca2d7203f0dfce8ea4b3efd668f8ea65cd2b35112638e488a4c12594015f67b", - "sha256:ed686e5926929887e2c7ae0a700e32c6129abb798b4ad2b846e933de21508151", - "sha256:ef6df7e14698e79c59c7ee7cf94cd62e5b869db369ed4b1b8f7b729ea825712a", - "sha256:f898e5cc0a662a9e12bde6f931263a1bbd350cfb18e1d5336a12927851825bb6" - ], - "index": "pypi", - "version": "==2.8.4" - }, "pycodestyle": { "hashes": [ "sha256:95a2219d12372f05704562a14ec30bc76b05a5b297b21a5dfe3f6fac3491ae56", @@ -244,10 +239,10 @@ }, "pyparsing": { "hashes": [ - "sha256:4c830582a84fb022400b85429791bc551f1f4871c33f23e44f353119e92f969f", - "sha256:c342dccb5250c08d45fd6f8b4a559613ca603b57498511740e65cd11a2e7dcec" + "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1", + "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b" ], - "version": "==2.4.6" + "version": "==2.4.7" }, "python-dateutil": { "hashes": [ @@ -337,10 +332,10 @@ }, "urllib3": { "hashes": [ - "sha256:2f3db8b19923a873b3e5256dc9c2dedfa883e33d87c690d9c7913e1f40673cdc", - "sha256:87716c2d2a7121198ebcb7ce7cccf6ce5e9ba539041cfbaeecfb641dc0bf6acc" + "sha256:3018294ebefce6572a474f0604c2021e33b3fd8006ecd11d62107a5d2a963527", + "sha256:88206b0eb87e6d677d424843ac5209e3fb9d0190d0ee169599165ec25e9d9115" ], - "version": "==1.25.8" + "version": "==1.25.9" } }, "develop": {} diff --git a/Postman/PyWorkflow-runner.postman_collection.json b/Postman/PyWorkflow-runner.postman_collection.json index 2af6bcb..a575eab 100644 --- a/Postman/PyWorkflow-runner.postman_collection.json +++ b/Postman/PyWorkflow-runner.postman_collection.json @@ -88,7 +88,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"1\",\n \"node_type\": \"IONode\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"1\",\n \"node_type\": \"io\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample.csv\",\n \t\"sep\": \",\"\n }\n}", "options": { "raw": { "language": "json" @@ -132,7 +132,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"2\",\n \"node_type\": \"IONode\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample2.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"2\",\n \"node_type\": \"io\",\n \"node_key\": \"ReadCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample2.csv\"\n }\n}", "options": { "raw": { "language": "json" @@ -180,7 +180,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"3\",\n \"node_type\": \"ManipulationNode\",\n \"node_key\": \"JoinNode\",\n \"options\": {\n \t\"on\": \"key\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"3\",\n \"node_type\": \"manipulation\",\n \"node_key\": \"JoinNode\",\n \"options\": {\n \t\"on\": \"key\"\n }\n}", "options": { "raw": { "language": "json" @@ -228,7 +228,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"IONode\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample_out.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"io\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"file\": \"/tmp/sample_out.csv\"\n }\n}", "options": { "raw": { "language": "json" @@ -276,7 +276,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"IONode\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"path_or_buf\": \"/tmp/sample_out.csv\"\n }\n}", + "raw": "{\n \"name\": null,\n \"node_id\": \"4\",\n \"node_type\": \"io\",\n \"node_key\": \"WriteCsvNode\",\n \"options\": {\n \t\"path_or_buf\": \"/tmp/sample_out.csv\"\n }\n}", "options": { "raw": { "language": "json" diff --git a/front-end/src/API.js b/front-end/src/API.js index 9cfc46f..fca8352 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -95,7 +95,7 @@ export async function save(diagramData) { * @returns {Promise} - server response (node menu items) */ export async function getNodes() { - return fetchWrapper("/nodes"); + return fetchWrapper("/workflow/nodes"); } diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e447899..c59c521 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,5 +1,3 @@ -import pandas as pd - from .parameters import * @@ -16,7 +14,7 @@ def __init__(self, node_info): self.node_type = node_info.get('node_type') self.node_key = node_info.get('node_key') self.data = node_info.get('data') - + self.filename = node_info.get('filename') self.is_global = node_info.get('is_global') is True self.option_values = dict() @@ -118,263 +116,22 @@ def get_replacement_value(self): return self.options['default_value'].get_value() -class StringNode(FlowNode): - """StringNode object - - Allows for Strings to replace 'string' fields in Nodes - """ - name = "String Input" - num_in = 1 - num_out = 1 - color = 'purple' - - OPTIONS = { - "default_value": StringParameter( - "Default Value", - docstring="Value this node will pass as a flow variable" - ), - "var_name": StringParameter( - "Variable Name", - default="my_var", - docstring="Name of the variable to use in another Node" - ) - } - - class IONode(Node): - """IONodes deal with file-handling in/out of the Workflow. - - Possible types: - Read CSV - Write CSV - """ - color = 'black' - display_name = "I/O" + """IONodes deal with file-handling in/out of the Workflow.""" + color = "green" def execute(self, predecessor_data, flow_vars): raise NotImplementedError() -class ReadCsvNode(IONode): - """ReadCsvNode - - Reads a CSV file into a pandas DataFrame. - - Raises: - NodeException: any error reading CSV file, converting - to DataFrame. - """ - name = "Read CSV" - num_in = 0 - num_out = 1 - - OPTIONS = { - "file": FileParameter( - "File", - docstring="CSV File" - ), - "sep": StringParameter( - "Delimiter", - default=",", - docstring="Column delimiter" - ), - # user-specified headers are probably integers, but haven't figured out - # arguments with multiple possible types - "header": StringParameter( - "Header Row", - default="infer", - docstring="Row number containing column names (0-indexed)" - ), - } - - def execute(self, predecessor_data, flow_vars): - try: - df = pd.read_csv( - flow_vars["file"].get_value(), - sep=flow_vars["sep"].get_value(), - header=flow_vars["header"].get_value() - ) - return df.to_json() - except Exception as e: - raise NodeException('read csv', str(e)) - - -class WriteCsvNode(IONode): - """WriteCsvNode - - Writes the current DataFrame to a CSV file. - - Raises: - NodeException: any error writing CSV file, converting - from DataFrame. - """ - name = "Write CSV" - num_in = 1 - num_out = 0 - download_result = True - - OPTIONS = { - "file": StringParameter( - "Filename", - docstring="CSV file to write" - ), - "sep": StringParameter( - "Delimiter", - default=",", - docstring="Column delimiter" - ), - "index": BooleanParameter( - "Write Index", - default=True, - docstring="Write index as column?" - ), - } - - def execute(self, predecessor_data, flow_vars): - try: - # Convert JSON data to DataFrame - df = pd.DataFrame.from_dict(predecessor_data[0]) - - # Write to CSV and save - df.to_csv( - flow_vars["file"].get_value(), - sep=flow_vars["sep"].get_value(), - index=flow_vars["index"].get_value() - ) - return df.to_json() - except Exception as e: - raise NodeException('write csv', str(e)) - - class ManipulationNode(Node): - """ManipulationNodes deal with data manipulation. - - Possible types: - Pivot - Filter - Multi-in - """ - display_name = "Manipulation" - color = 'goldenrod' + """ManipulationNodes deal with data manipulation.""" + color = "goldenrod" def execute(self, predecessor_data, flow_vars): raise NotImplementedError() -class PivotNode(ManipulationNode): - name = "Pivoting" - num_in = 1 - num_out = 3 - - OPTIONS = { - 'index': StringParameter( - 'Index', - docstring='Column to aggregate (column, grouper, array or list)' - ), - 'values': StringParameter( - 'Values', - docstring='Column name to use to populate new frame\'s values (column, grouper, array or list)' - ), - 'columns': StringParameter( - 'Column Name Row', - docstring='Column(s) to use for populating new frame values. (column, grouper, array or list)' - ), - 'aggfunc': StringParameter( - 'Aggregation function', - default='mean', - docstring='Function used for aggregation (function, list of functions, dict, default numpy.mean)' - ), - 'fill_value': StringParameter( - 'Fill value', - docstring='Value to replace missing values with (scalar)' - ), - 'margins': BooleanParameter( - 'Margins name', - default=False, - docstring='Add all rows/columns' - ), - 'dropna': BooleanParameter( - 'Drop NaN columns', - default=True, - docstring='Ignore columns with all NaN entries' - ), - 'margins_name': StringParameter( - 'Margins name', - default='All', - docstring='Name of the row/column that will contain the totals when margins is True' - ), - 'observed': BooleanParameter( - 'Column Name Row', - default=False, - docstring='Row number with column names (0-indexed) or "infer"' - ) - } - - def execute(self, predecessor_data, flow_vars): - try: - input_df = pd.DataFrame.from_dict(predecessor_data[0]) - output_df = pd.DataFrame.pivot_table(input_df, **self.options) - return output_df.to_json() - except Exception as e: - raise NodeException('pivot', str(e)) - - -class JoinNode(ManipulationNode): - name = "Joiner" - num_in = 2 - num_out = 1 - - OPTIONS = { - "on": StringParameter("Join Column", docstring="Name of column to join on") - } - - def execute(self, predecessor_data, flow_vars): - try: - first_df = pd.DataFrame.from_dict(predecessor_data[0]) - second_df = pd.DataFrame.from_dict(predecessor_data[1]) - combined_df = pd.merge( - first_df, - second_df, - on=flow_vars["on"].get_value() - ) - return combined_df.to_json() - except Exception as e: - raise NodeException('join', str(e)) - - -class FilterNode(ManipulationNode): - name = "Filter" - num_in = 1 - num_out = 1 - - OPTIONS = { - 'items': StringParameter( - 'Items', - docstring='Keep labels from axis which are in items' - ), - 'like': StringParameter( - 'Like', - docstring='Keep labels from axis for which like in label == True.' - ), - 'regex': StringParameter( - 'Regex', - docstring='Keep labels from axis for which re.search(regex, label) == True.' - ), - 'axis': StringParameter( - 'Axis', - docstring='The axis to filter on.' - ) - } - - def execute(self, predecessor_data, flow_vars): - try: - input_df = pd.DataFrame.from_dict(predecessor_data[0]) - output_df = pd.DataFrame.filter(input_df, **self.options) - return output_df.to_json() - except Exception as e: - raise NodeException('filter', str(e)) - - class NodeException(Exception): def __init__(self, action: str, reason: str): self.action = action diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index e295790..89830c9 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -1,4 +1,5 @@ -from .node import * +from .nodes import * +import importlib def node_factory(node_info): @@ -7,14 +8,14 @@ def node_factory(node_info): node_type = node_info.get('node_type') node_key = node_info.get('node_key') - if node_type == 'IONode': + if node_type == 'io': new_node = io_node(node_key, node_info) - elif node_type == 'ManipulationNode': + elif node_type == 'manipulation': new_node = manipulation_node(node_key, node_info) - elif node_type == 'FlowNode': + elif node_type == 'flow_control': new_node = flow_node(node_key, node_info) else: - new_node = None + new_node = custom_node(node_key, node_info) return new_node @@ -40,9 +41,20 @@ def manipulation_node(node_key, node_info): return JoinNode(node_info) elif node_key == 'PivotNode': return PivotNode(node_info) - elif node_key == 'multi-in': - return ManipulationNode(node_info) elif node_key == 'FilterNode': return FilterNode(node_info) else: return None + + +def custom_node(node_key, node_info): + try: + filename = node_info.get('filename') + module = importlib.import_module(f'pyworkflow.nodes.custom_nodes.{filename}') + my_class = getattr(module, node_key) + instance = my_class(node_info) + + return instance + except Exception as e: + print(str(e)) + return None diff --git a/pyworkflow/pyworkflow/nodes/__init__.py b/pyworkflow/pyworkflow/nodes/__init__.py new file mode 100644 index 0000000..31d80d9 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/__init__.py @@ -0,0 +1,3 @@ +from .io import * +from .manipulation import * +from .flow_control import * diff --git a/pyworkflow/pyworkflow/nodes/flow_control/__init__.py b/pyworkflow/pyworkflow/nodes/flow_control/__init__.py new file mode 100644 index 0000000..95d569a --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/flow_control/__init__.py @@ -0,0 +1 @@ +from .string_input import StringNode diff --git a/pyworkflow/pyworkflow/nodes/flow_control/string_input.py b/pyworkflow/pyworkflow/nodes/flow_control/string_input.py new file mode 100644 index 0000000..eaade18 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/flow_control/string_input.py @@ -0,0 +1,25 @@ +from pyworkflow.node import FlowNode, NodeException +from pyworkflow.parameters import * + + +class StringNode(FlowNode): + """StringNode object + + Allows for Strings to replace 'string' fields in Nodes + """ + name = "String Input" + num_in = 1 + num_out = 1 + color = 'purple' + + OPTIONS = { + "default_value": StringParameter( + "Default Value", + docstring="Value this node will pass as a flow variable" + ), + "var_name": StringParameter( + "Variable Name", + default="my_var", + docstring="Name of the variable to use in another Node" + ) + } diff --git a/pyworkflow/pyworkflow/nodes/io/__init__.py b/pyworkflow/pyworkflow/nodes/io/__init__.py new file mode 100644 index 0000000..bc516ac --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/io/__init__.py @@ -0,0 +1,2 @@ +from .read_csv import ReadCsvNode +from .write_csv import WriteCsvNode diff --git a/pyworkflow/pyworkflow/nodes/io/read_csv.py b/pyworkflow/pyworkflow/nodes/io/read_csv.py new file mode 100644 index 0000000..f829d61 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/io/read_csv.py @@ -0,0 +1,48 @@ +from pyworkflow.node import IONode, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class ReadCsvNode(IONode): + """ReadCsvNode + + Reads a CSV file into a pandas DataFrame. + + Raises: + NodeException: any error reading CSV file, converting + to DataFrame. + """ + name = "Read CSV" + num_in = 0 + num_out = 1 + + OPTIONS = { + "file": FileParameter( + "File", + docstring="CSV File" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), + # user-specified headers are probably integers, but haven't figured out + # arguments with multiple possible types + "header": StringParameter( + "Header Row", + default="infer", + docstring="Row number containing column names (0-indexed)" + ), + } + + def execute(self, predecessor_data, flow_vars): + try: + df = pd.read_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + header=flow_vars["header"].get_value() + ) + return df.to_json() + except Exception as e: + raise NodeException('read csv', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/io/write_csv.py b/pyworkflow/pyworkflow/nodes/io/write_csv.py new file mode 100644 index 0000000..0ca54cf --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/io/write_csv.py @@ -0,0 +1,51 @@ +from pyworkflow.node import IONode, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class WriteCsvNode(IONode): + """WriteCsvNode + + Writes the current DataFrame to a CSV file. + + Raises: + NodeException: any error writing CSV file, converting + from DataFrame. + """ + name = "Write CSV" + num_in = 1 + num_out = 0 + download_result = True + + OPTIONS = { + "file": StringParameter( + "Filename", + docstring="CSV file to write" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), + "index": BooleanParameter( + "Write Index", + default=True, + docstring="Write index as column?" + ), + } + + def execute(self, predecessor_data, flow_vars): + try: + # Convert JSON data to DataFrame + df = pd.DataFrame.from_dict(predecessor_data[0]) + + # Write to CSV and save + df.to_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + index=flow_vars["index"].get_value() + ) + return df.to_json() + except Exception as e: + raise NodeException('write csv', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/manipulation/__init__.py b/pyworkflow/pyworkflow/nodes/manipulation/__init__.py new file mode 100644 index 0000000..2da5947 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/__init__.py @@ -0,0 +1,3 @@ +from .filter import FilterNode +from .join import JoinNode +from .pivot import PivotNode diff --git a/pyworkflow/pyworkflow/nodes/manipulation/filter.py b/pyworkflow/pyworkflow/nodes/manipulation/filter.py new file mode 100644 index 0000000..0ee1daa --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/filter.py @@ -0,0 +1,37 @@ +from pyworkflow.node import ManipulationNode, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class FilterNode(ManipulationNode): + name = "Filter" + num_in = 1 + num_out = 1 + + OPTIONS = { + 'items': StringParameter( + 'Items', + docstring='Keep labels from axis which are in items' + ), + 'like': StringParameter( + 'Like', + docstring='Keep labels from axis for which like in label == True.' + ), + 'regex': StringParameter( + 'Regex', + docstring='Keep labels from axis for which re.search(regex, label) == True.' + ), + 'axis': StringParameter( + 'Axis', + docstring='The axis to filter on.' + ) + } + + def execute(self, predecessor_data, flow_vars): + try: + input_df = pd.DataFrame.from_dict(predecessor_data[0]) + output_df = pd.DataFrame.filter(input_df, **self.options) + return output_df.to_json() + except Exception as e: + raise NodeException('filter', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/manipulation/join.py b/pyworkflow/pyworkflow/nodes/manipulation/join.py new file mode 100644 index 0000000..f0ae745 --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/join.py @@ -0,0 +1,27 @@ +from pyworkflow.node import ManipulationNode, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class JoinNode(ManipulationNode): + name = "Joiner" + num_in = 2 + num_out = 1 + + OPTIONS = { + "on": StringParameter("Join Column", docstring="Name of column to join on") + } + + def execute(self, predecessor_data, flow_vars): + try: + first_df = pd.DataFrame.from_dict(predecessor_data[0]) + second_df = pd.DataFrame.from_dict(predecessor_data[1]) + combined_df = pd.merge( + first_df, + second_df, + on=flow_vars["on"].get_value() + ) + return combined_df.to_json() + except Exception as e: + raise NodeException('join', str(e)) diff --git a/pyworkflow/pyworkflow/nodes/manipulation/pivot.py b/pyworkflow/pyworkflow/nodes/manipulation/pivot.py new file mode 100644 index 0000000..b7fe43f --- /dev/null +++ b/pyworkflow/pyworkflow/nodes/manipulation/pivot.py @@ -0,0 +1,62 @@ +from pyworkflow.node import ManipulationNode, NodeException +from pyworkflow.parameters import * + +import pandas as pd + + +class PivotNode(ManipulationNode): + name = "Pivoting" + num_in = 1 + num_out = 3 + + OPTIONS = { + 'index': StringParameter( + 'Index', + docstring='Column to aggregate (column, grouper, array or list)' + ), + 'values': StringParameter( + 'Values', + docstring='Column name to use to populate new frame\'s values (column, grouper, array or list)' + ), + 'columns': StringParameter( + 'Column Name Row', + docstring='Column(s) to use for populating new frame values. (column, grouper, array or list)' + ), + 'aggfunc': StringParameter( + 'Aggregation function', + default='mean', + docstring='Function used for aggregation (function, list of functions, dict, default numpy.mean)' + ), + 'fill_value': StringParameter( + 'Fill value', + docstring='Value to replace missing values with (scalar)' + ), + 'margins': BooleanParameter( + 'Margins name', + default=False, + docstring='Add all rows/columns' + ), + 'dropna': BooleanParameter( + 'Drop NaN columns', + default=True, + docstring='Ignore columns with all NaN entries' + ), + 'margins_name': StringParameter( + 'Margins name', + default='All', + docstring='Name of the row/column that will contain the totals when margins is True' + ), + 'observed': BooleanParameter( + 'Column Name Row', + default=False, + docstring='Row number with column names (0-indexed) or "infer"' + ) + } + + def execute(self, predecessor_data, flow_vars): + try: + input_df = pd.DataFrame.from_dict(predecessor_data[0]) + output_df = pd.DataFrame.pivot_table(input_df, **self.options) + return output_df.to_json() + except Exception as e: + raise NodeException('pivot', str(e)) diff --git a/pyworkflow/pyworkflow/tests/test_workflow.py b/pyworkflow/pyworkflow/tests/test_workflow.py index c036481..24f542f 100644 --- a/pyworkflow/pyworkflow/tests/test_workflow.py +++ b/pyworkflow/pyworkflow/tests/test_workflow.py @@ -10,7 +10,7 @@ def setUp(self): self.read_csv_node = { "name": "Read CSV", "node_id": "1", - "node_type": "IONode", + "node_type": "io", "node_key": "ReadCsvNode", "is_global": False, "options": { @@ -21,7 +21,7 @@ def setUp(self): self.write_csv_node = { "name": "Write CSV", "node_id": "2", - "node_type": "IONode", + "node_type": "io", "node_key": "WriteCsvNode", "is_global": False, "options": { @@ -32,7 +32,7 @@ def setUp(self): self.join_node = { "name": "Joiner", "node_id": "3", - "node_type": "ManipulationNode", + "node_type": "manipulation", "node_key": "JoinNode", "is_global": False, "options": { diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 6ffeb44..43db599 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -1,6 +1,12 @@ +import inspect +import importlib +import json import os import networkx as nx -import json +import sys + +from collections import OrderedDict +from modulefinder import ModuleFinder from .node import Node, NodeException from .node_factory import node_factory @@ -12,15 +18,29 @@ class Workflow: Attributes: name: Name of the workflow root_dir: Used for reading/writing files to/from disk + node_dir: Location of custom nodes graph: A NetworkX Directed Graph flow_vars: Global flow variables associated with workflow """ - def __init__(self, name="Untitled", root_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()): - self._name = name - self._root_dir = WorkflowUtils.set_root_dir(root_dir) - self._graph = graph - self._flow_vars = flow_vars + DEFAULT_ROOT_PATH = os.getcwd() + DEFAULT_NODE_PATH = os.path.join(os.getcwd(), '../pyworkflow/pyworkflow/nodes') + + def __init__(self, name="Untitled", root_dir=DEFAULT_ROOT_PATH, + node_dir=DEFAULT_NODE_PATH, graph=nx.DiGraph(), + flow_vars=nx.Graph()): + try: + self._name = name + self._root_dir = WorkflowUtils.set_dir(root_dir) + self._node_dir = WorkflowUtils.set_dir(node_dir, custom_nodes=True) + self._graph = graph + self._flow_vars = flow_vars + except OSError as e: + raise WorkflowException('init workflow', str(e)) + + @property + def node_dir(self): + return self._node_dir @property def graph(self): @@ -29,6 +49,9 @@ def graph(self): def path(self, file_name): return os.path.join(self.root_dir, file_name) + def node_path(self, node_type, file_name): + return os.path.join(self.node_dir, node_type, file_name) + @property def root_dir(self): return self._root_dir @@ -37,6 +60,72 @@ def root_dir(self): def flow_vars(self): return self._flow_vars + def get_packaged_nodes(self, root_path=None, node_type=None): + """Retrieve list of Nodes available to the Workflow. + + Recursively searches the `root_path` where Nodes are located. If none + specified, the default is `self.node_path`. + + Each directory represents a given `node_type` (e.g. 'manipulation', + 'io', etc.). Individual Node classes are defined in files within these + directories. Any custom nodes that the user has installed are included + in this search, given they are located in the 'custom_nodes' directory. + + Args: + root_path: Root location where Nodes are defined. + node_type: The type of Node, defined by sub-directory name. + + Returns: + OrderedDict() of Nodes, structured like the following + + { + 'I/O': [ + {node1}, + {node2}, + ], + 'Manipulation': [ + ... + ] + ... + } + """ + if root_path is None: + root_path = self.node_dir + + try: + files = os.listdir(root_path) + except OSError: + return None + + nodes = list() + data = OrderedDict() + + for file in files: + file_path = os.path.join(root_path, file) + + if os.path.isdir(file_path): + # Recurse if file is a directory + display_name = WorkflowUtils.get_display_name(file) + data[display_name] = self.get_packaged_nodes(file_path, file) + continue + + # Otherwise, try parsing file for a Node class + node, ext = os.path.splitext(file) + + # Skip init files or non-Python files + if node == '__init__' or ext != '.py': + continue + + nodes.append(WorkflowUtils.extract_node_info(node_type, node, file_path)) + + if root_path == self.node_dir: + # When traversal returns to `node_dir` return the entire OrderedDict() + data.move_to_end('Custom Nodes') + return data + else: + # Otherwise, return list containing all Nodes of a `node_type` + return nodes + def get_node(self, node_id): """Retrieves Node from workflow, if exists @@ -136,10 +225,10 @@ def add_edge(self, node_from: Node, node_to: Node): from_id = node_from.node_id to_id = node_to.node_id - if self._graph.has_edge(from_id, to_id): + if self.graph.has_edge(from_id, to_id): raise WorkflowException('add_node', 'Edge between nodes already exists.') - self._graph.add_edge(from_id, to_id) + self.graph.add_edge(from_id, to_id) return (from_id, to_id) @@ -156,7 +245,7 @@ def remove_edge(self, node_from: Node, node_to: Node): to_id = node_to.node_id try: - self._graph.remove_edge(from_id, to_id) + self.graph.remove_edge(from_id, to_id) except nx.NetworkXError: raise WorkflowException('remove_edge', 'Edge from %s to %s does not exist in graph.' % (from_id, to_id)) @@ -179,13 +268,13 @@ def remove_node(self, node): def get_node_successors(self, node_id): try: - return list(self._graph.successors(node_id)) + return list(self.graph.successors(node_id)) except nx.NetworkXError as e: raise WorkflowException('get node successors', str(e)) def get_node_predecessors(self, node_id): try: - return list(self._graph.predecessors(node_id)) + return list(self.graph.predecessors(node_id)) except nx.NetworkXError as e: raise WorkflowException('get node predecessors', str(e)) @@ -304,17 +393,15 @@ def load_input_data(self, node_id): def execution_order(self): try: - return list(nx.topological_sort(self._graph)) + return list(nx.topological_sort(self.graph)) except (nx.NetworkXError, nx.NetworkXUnfeasible) as e: raise WorkflowException('execution order', str(e)) except RuntimeError as e: raise WorkflowException('execution order', 'The graph was changed while generating the execution order') - def upload_file(self, uploaded_file, node_id): + @staticmethod + def upload_file(uploaded_file, to_open): try: - file_name = f"{node_id}-{uploaded_file.name}" - to_open = self.path(file_name) - # TODO: Change to a stream/other method for large files? with open(to_open, 'wb') as f: f.write(uploaded_file.read()) @@ -483,14 +570,115 @@ def execute_workflow(workflow_location): class WorkflowUtils: @staticmethod - def set_root_dir(root_dir): - if root_dir is None: - root_dir = os.getcwd() + def get_display_name(file): + if file == 'io': + return 'I/O' + else: + return file.replace('_', ' ').title() + + @staticmethod + def set_dir(dir_path, custom_nodes=False): + """Makes directories to ensure path is valid. + + To prevent OSErrors for missing directories, especially in the case of + non-default Workflow locations, this method ensures the `dir_path` + specified exists. + + Args: + dir_path: Fully-qualified directory to check/make + custom_nodes: Creates a 'custom_nodes' sub directory when True + + Returns: + The `dir_path` requested, with guarantee the path exists. + """ + if custom_nodes: + make_dir = os.path.join(dir_path, 'custom_nodes') + else: + make_dir = dir_path + + if not os.path.exists(make_dir): + os.makedirs(make_dir) + + return dir_path + + @staticmethod + def check_missing_packages(node_path): + """Check Python file for uninstalled packages. + + When compiling the list of installed Nodes for a Workflow, if any + module throws a `ModuleNotFoundError`, this method is called to + compile a list of missing packages. + + Args: + node_path: Location of the Node file. + + Returns: + list of package names that are not installed + """ + finder = ModuleFinder(node_path) + finder.run_script(node_path) + + uninstalled = list() + for missing_package in finder.badmodules.keys(): + if missing_package not in sys.modules: + uninstalled.append(missing_package) + + return uninstalled + + @staticmethod + def extract_node_info(node_type, node, file_path): + """Extract information about a Node Class from a Python file. + + Takes an individual Python file, representing a Node subclass and + extracts the attributes needed. If a module has packages that are not + installed, the filename and missing package names are returned. + + Args: + node_type: The type of Node. + node: Name of the specific Node. + file_path: Where the Node's file is located. + + Returns: + dict-like with extracted Node information. On `ModuleNotFoundError` + the filename and missing packages are returned. + """ + # Check Node file for missing packages + try: + module = importlib.import_module('pyworkflow.nodes.' + node_type + '.' + node) + except ModuleNotFoundError: + return { + "filename": node, + "missing_packages": WorkflowUtils.check_missing_packages(file_path) + } + + # Parse module for Node Class information + for name, klass in inspect.getmembers(module): + if inspect.isclass(klass) and klass.__module__.startswith('pyworkflow.nodes.' + node_type): + try: + color = klass.color + except AttributeError: + color = 'black' + + parsed_node = { + 'name': klass.name, + 'node_key': klass.__name__, + 'node_type': node_type, + 'num_in': klass.num_in, + 'num_out': klass.num_out, + 'color': color, + 'filename': node, + 'doc': klass.__doc__, + 'options': {k: v.get_value() for k, v in klass.options.items()}, + 'option_types': klass.option_types, + 'download_result': getattr(klass, "download_result", False) + } + + # if node_type == 'custom_nodes': + # parsed_node['filename'] = node - if not os.path.exists(root_dir): - os.makedirs(root_dir) + return parsed_node - return root_dir + return None class WorkflowException(Exception): diff --git a/vp/vp/urls.py b/vp/vp/urls.py index 88caaaf..41e7d65 100644 --- a/vp/vp/urls.py +++ b/vp/vp/urls.py @@ -38,7 +38,6 @@ path('redoc/', schema_view.with_ui('redoc', cache_timeout=0), name='schema-redoc'), path('admin/', admin.site.urls), path('info/', views.info), - path('nodes/', views.retrieve_nodes_for_user), path('node/', include('node.urls')), path('workflow/', include('workflow.urls')) ] diff --git a/vp/vp/views.py b/vp/vp/views.py index 898c095..ebe950d 100644 --- a/vp/vp/views.py +++ b/vp/vp/views.py @@ -2,6 +2,8 @@ from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema from pyworkflow import Node +from modulefinder import ModuleFinder + @swagger_auto_schema(method='get', responses={200:'JSON response with data'}) @@ -21,45 +23,3 @@ def info(request): "about": "super-duper workflows!" } return JsonResponse(data) - - -@swagger_auto_schema(method='get', - operation_summary='Retrieve a list of installed Nodes', - operation_description='Retrieves a list of installed Nodes, in JSON.', - responses={ - 200: 'List of installed Nodes, in JSON', - }) -@api_view(['GET']) -def retrieve_nodes_for_user(request): - """Assembles list of Nodes accessible to workflows. - - Retrieve a list of classes from the Node module in `pyworkflow`. - List is split into 'types' (e.g., 'IO' and 'Manipulation') and - 'keys', or individual command Nodes (e.g., 'ReadCsv', 'Pivot'). - """ - data = dict() - - # Iterate through node 'types' - for parent in Node.__subclasses__(): - key = getattr(parent, "display_name", parent.__name__) - data[key] = list() - - # Iterate through node 'keys' - for child in parent.__subclasses__(): - # TODO: check attribute-scope is handled correctly - child_node = { - 'name': child.name, - 'node_key': child.__name__, - 'node_type': parent.__name__, - 'num_in': child.num_in, - 'num_out': child.num_out, - 'color': child.color or parent.color, - 'doc': child.__doc__, - 'options': {k: v.get_value() for k, v in child.options.items()}, - 'option_types': child.option_types, - 'download_result': getattr(child, "download_result", False) - } - - data[key].append(child_node) - - return JsonResponse(data) \ No newline at end of file diff --git a/vp/workflow/urls.py b/vp/workflow/urls.py index 26f1ad7..fccaba6 100644 --- a/vp/workflow/urls.py +++ b/vp/workflow/urls.py @@ -10,5 +10,6 @@ path('execute//successors', views.get_successors, name='get node successors'), path('globals', views.global_vars, name="retrieve global variables"), path('upload', views.upload_file, name='upload file'), - path('download', views.download_file, name='download file') + path('download', views.download_file, name='download file'), + path('nodes', views.retrieve_nodes_for_user, name='retrieve node list'), ] diff --git a/vp/workflow/views.py b/vp/workflow/views.py index f468697..15769a1 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -1,5 +1,6 @@ import os import json +import sys from django.http import JsonResponse, HttpResponse from django.conf import settings @@ -7,6 +8,8 @@ from pyworkflow import Workflow, WorkflowException from drf_yasg.utils import swagger_auto_schema +from modulefinder import ModuleFinder + @swagger_auto_schema(method='post', operation_summary='Create a new workflow.', @@ -230,14 +233,40 @@ def upload_file(request): if f is None: return JsonResponse("Empty content", status=404) - node_id = request.POST.get('nodeId', '') - try: - save_name = request.pyworkflow.upload_file(f, node_id) - return JsonResponse({"filename": save_name}, status=201, safe=False) + node_id = request.POST.get('nodeId') + + if node_id is None: + # custom node file + file_path = request.pyworkflow.node_path('custom_nodes', f.name) + else: + # node data file + file_path = request.pyworkflow.path(f"{node_id}-{f.name}") + + save_name = Workflow.upload_file(f, file_path) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) + return JsonResponse({"filename": save_name}, status=201, safe=False) + + +@swagger_auto_schema(method='get', + operation_summary='Retrieve a list of installed Nodes', + operation_description='Retrieves a list of installed Nodes, in JSON.', + responses={ + 200: 'List of installed Nodes, in JSON', + }) +@api_view(['GET']) +def retrieve_nodes_for_user(request): + """Assembles list of Nodes accessible to workflows. + + Retrieve a list of classes from the Node module in `pyworkflow`. + List is split into 'types' (e.g., 'IO' and 'Manipulation') and + 'keys', or individual command Nodes (e.g., 'ReadCsv', 'Pivot'). + """ + data = request.pyworkflow.get_packaged_nodes() + return JsonResponse(data, safe=False) + @swagger_auto_schema(method='post', operation_summary='Downloads a file from the server', @@ -274,4 +303,3 @@ def download_file(request): status=404) except WorkflowException as e: return JsonResponse({e.action: e.reason}, status=500) -