diff --git a/front-end/src/API.js b/front-end/src/API.js index 2d457f1..93050e4 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -164,20 +164,26 @@ export async function uploadDataFile(formData) { /** * Download file by name from server - * @param {string} fileName - name of file to download + * @param {CustomNodeModel} node - node containing file to download * @returns {Promise} */ -export async function downloadDataFile(fileName) { +export async function downloadDataFile(node) { // TODO: make this not a giant security problem let contentType; + + const payload = {...node.options, options: node.config}; + // can't use fetchWrapper because it assumes JSON response - fetch(`/workflow/download?filename=${fileName}`) + fetch(`/workflow/download`, { + method: "POST", + body: JSON.stringify(payload) + }) .then(async resp => { if (!resp.ok) return Promise.reject(await resp.json()); contentType = resp.headers.get("content-type"); if (contentType.startsWith("text")) { resp.text().then(data => { - downloadFile(data, contentType, fileName); + downloadFile(data, contentType, node.config["path_or_buf"]); }) } }).catch(err => console.log(err)); diff --git a/front-end/src/components/Workspace.js b/front-end/src/components/Workspace.js index 9231cdf..7f9dca1 100644 --- a/front-end/src/components/Workspace.js +++ b/front-end/src/components/Workspace.js @@ -83,7 +83,7 @@ class Workspace extends React.Component { node.setSelected(false); if (node.options.download_result) { // TODO: make this work for non-WriteCsvNode nodes - API.downloadDataFile(node.config["path_or_buf"]) + API.downloadDataFile(node) .catch(err => console.log(err)); } } catch { diff --git a/pyworkflow/pyworkflow/__init__.py b/pyworkflow/pyworkflow/__init__.py index 6674170..ae0673b 100644 --- a/pyworkflow/pyworkflow/__init__.py +++ b/pyworkflow/pyworkflow/__init__.py @@ -1,3 +1,3 @@ from .workflow import Workflow, WorkflowException from .node import * -from .node_factory import node_factory, create_node +from .node_factory import node_factory diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 711cb29..002b2df 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -1,8 +1,4 @@ import pandas as pd -from django.core.files.storage import FileSystemStorage -from django.conf import settings - -fs = FileSystemStorage(location=settings.MEDIA_ROOT) class Node: @@ -16,7 +12,7 @@ def __init__(self, node_info, options=None): self.node_key = node_info.get('node_key') self.data = node_info.get('data') - self.is_global = True if node_info.get('is_global') else False + self.is_global = node_info.get('is_global') is True # Execution options are passed up from children self.options = options or dict() @@ -217,9 +213,7 @@ def execute(self, predecessor_data, flow_vars): # Write to CSV and save opts = self.options - # TODO: Remove use of Django file storage from pyworkflow nodes - fname = fs.path(opts["path_or_buf"]) - df.to_csv(fname, sep=opts["sep"], index=opts["index"]) + df.to_csv(opts["path_or_buf"], sep=opts["sep"], index=opts["index"]) return df.to_json() except Exception as e: raise NodeException('write csv', str(e)) diff --git a/pyworkflow/pyworkflow/node_factory.py b/pyworkflow/pyworkflow/node_factory.py index f05be97..f0f8a6f 100644 --- a/pyworkflow/pyworkflow/node_factory.py +++ b/pyworkflow/pyworkflow/node_factory.py @@ -1,15 +1,6 @@ from .node import * -import json -def create_node(payload): - json_data = json.loads(payload) - - try: - return node_factory(json_data) - except OSError as e: - raise NodeException('create_node', 'Problem parsing JSON') - def node_factory(node_info): # Create a new Node with info # TODO: should perform error-checking or add default values if missing diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 2d8e31d..effe898 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -1,3 +1,4 @@ +import os import networkx as nx import json @@ -5,10 +6,6 @@ from .node_factory import node_factory from django.conf import settings -from django.core.files.base import ContentFile -from django.core.files.storage import FileSystemStorage - -fs = FileSystemStorage(location=settings.MEDIA_ROOT) class Workflow: @@ -19,13 +16,17 @@ class Workflow: file_path: Location of a workflow file """ - def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph()): + def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph(), root_dir=settings.MEDIA_ROOT): #TODO: need to discuss a way to generating the workflow name. For now passing a default name. self._graph = graph self._file_path = file_path self._name = name self._flow_vars = flow_vars + if not os.path.exists(root_dir): + os.makedirs(root_dir) + self._root_dir = root_dir + @property def graph(self): return self._graph @@ -34,6 +35,13 @@ def graph(self): def graph(self, graph): self._graph = graph + def path(self, file_name): + return os.path.join(self.root_dir, file_name) + + @property + def root_dir(self): + return self._root_dir + @property def file_path(self): return self._file_path @@ -236,6 +244,35 @@ def execution_order(self): except RuntimeError as e: raise WorkflowException('execution order', 'The graph was changed while generating the execution order') + def upload_file(self, uploaded_file, node_id): + try: + file_name = f"{node_id}-{uploaded_file.name}" + to_open = os.path.join(self.root_dir, file_name) + + # TODO: Change to a stream/other method for large files? + with open(to_open, 'wb') as f: + f.write(uploaded_file.read()) + + uploaded_file.close() + return to_open + except OSError as e: + raise WorkflowException('upload_file', str(e)) + + def download_file(self, node_id): + node = self.get_node(node_id) + if node is None: + return None + + try: + # TODO: Change to generic "file" option to allow for more than WriteCsv + to_open = os.path.join(self.root_dir, node.options["path_or_buf"]) + return open(to_open) + except KeyError: + raise WorkflowException('download_file', '%s does not have an associated file' % node_id) + except OSError as e: + raise WorkflowException('download_file', str(e)) + + @staticmethod def store_node_data(workflow, node_id, data): """Store Node data @@ -250,10 +287,12 @@ def store_node_data(workflow, node_id, data): Returns: """ - file_name = Workflow.generate_file_name(workflow.name, node_id) + file_name = Workflow.generate_file_name(workflow, node_id) try: - return fs.save(file_name, ContentFile(data)) + with open(file_name, 'w') as f: + f.write(data) + return file_name except Exception as e: return None @@ -274,12 +313,15 @@ def retrieve_node_data(node_to_retrieve): problem parsing the file. """ try: - with fs.open(node_to_retrieve.data) as f: + with open(node_to_retrieve.data) as f: return json.load(f) except OSError as e: raise WorkflowException('retrieve node data', str(e)) - except TypeError as e: - raise WorkflowException('retrieve node data', str(e)) + except TypeError: + raise WorkflowException( + 'retrieve node data', + 'Node %s has not yet been executed. No data to retrieve.' % node_to_retrieve.node_id + ) except json.JSONDecodeError as e: raise WorkflowException('retrieve node data', str(e)) @@ -301,20 +343,18 @@ def read_graph_json(file_like): return nx.readwrite.json_graph.node_link_graph(json_data) @staticmethod - def generate_file_name(workflow_name, node_id): + def generate_file_name(workflow, node_id): """Generates a file name for saving intermediate execution data. Current format is workflow_name - node_id Args: - workflow_name: the name of the workflow + workflow: the workflow node_id: the id of the workflow """ #TODO: need to add validation - if workflow_name is None: - workflow_name = "a-name" - - return workflow_name + '-' + str(node_id) + file_name = workflow.name if workflow.name else 'a-name' + return os.path.join(workflow.root_dir, file_name + '-' + str(node_id)) @classmethod def from_session(cls, data): @@ -330,6 +370,8 @@ def from_session(cls, data): graph_data = data.get('graph') name = data.get('name') flow_vars_data = data.get('flow_vars') + root_dir = data.get('root_dir') + if graph_data is None: graph = None else: @@ -340,7 +382,7 @@ def from_session(cls, data): else: flow_vars = nx.readwrite.json_graph.node_link_graph(flow_vars_data) - return cls(graph, file_path, name, flow_vars) + return cls(graph, file_path, name, flow_vars, root_dir) @classmethod def from_file(cls, file_like): @@ -370,6 +412,7 @@ def to_session_dict(self): out['file_path'] = self.file_path out['name'] = self.name out['flow_vars'] = Workflow.to_graph_json(self.flow_vars) + out['root_dir'] = self.root_dir return out diff --git a/vp/node/views.py b/vp/node/views.py index 20baa4f..f365ad4 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -1,15 +1,10 @@ import json -from django.core.files.base import ContentFile -from django.core.files.storage import FileSystemStorage from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt -from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory, create_node +from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema -from django.conf import settings - -fs = FileSystemStorage(location=settings.MEDIA_ROOT) @swagger_auto_schema(method='post', @@ -35,7 +30,7 @@ def node(request): """ # Extract request info for node creation try: - new_node = create_node(request.body) + new_node = create_node(request) except NodeException as e: return JsonResponse({e.action: e.reason}, status=400) @@ -177,7 +172,7 @@ def handle_node(request, node_id): if request.method == 'GET': response = JsonResponse(retrieved_node.__dict__, safe=False) elif request.method == 'POST': - updated_node = create_node(request.body) + updated_node = create_node(request) # Nodes need to be the same type to update if type(retrieved_node) != type(updated_node): @@ -258,17 +253,17 @@ def retrieve_data(request, node_id): return JsonResponse({e.action: e.reason}, status=500) -def create_node(payload): +def create_node(request): """Pass all request info to Node Factory. """ - json_data = json.loads(payload) + json_data = json.loads(request.body) # for options with type 'file', replace value with FileStorage path for field, info in json_data.get("option_types", dict()).items(): - if info["type"] == "file": + if info["type"] == "file" or info["name"] == "Filename": opt_value = json_data["options"][field] if opt_value is not None: - json_data["options"][field] = fs.path(opt_value) + json_data["options"][field] = request.pyworkflow.path(opt_value) try: return node_factory(json_data) diff --git a/vp/workflow/views.py b/vp/workflow/views.py index ee2a478..c54f370 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -1,16 +1,12 @@ import os import json -import csv from django.http import JsonResponse, HttpResponse from django.conf import settings -from django.core.files.storage import FileSystemStorage from rest_framework.decorators import api_view from pyworkflow import Workflow, WorkflowException from drf_yasg.utils import swagger_auto_schema -fs = FileSystemStorage(location=settings.MEDIA_ROOT) - @swagger_auto_schema(method='get', operation_summary='Create a new workflow.', @@ -28,7 +24,7 @@ def new_workflow(request): 200 - Created new DiGraph """ # Create new Workflow - request.pyworkflow = Workflow() + request.pyworkflow = Workflow(root_dir=settings.MEDIA_ROOT) request.session.update(request.pyworkflow.to_session_dict()) return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) @@ -68,15 +64,11 @@ def open_workflow(request): 500 - Missing JSON data or """ try: - # If multi-part form-data, use this # TODO: file is parsed into JSON in memory; # may want to save to 'fs' for large files - uploaded_file = request.FILES['file'] + uploaded_file = request.FILES.get('file') combined_json = json.load(uploaded_file) - # If file is passed in as raw JSON, use this - # combined_json = json.loads(request.body) - request.pyworkflow = Workflow.from_request(combined_json['networkx']) request.session.update(request.pyworkflow.to_session_dict()) react = combined_json['react'] @@ -122,7 +114,8 @@ def save_workflow(request): 'react': json.loads(request.body), 'networkx': Workflow.to_graph_json(request.pyworkflow.graph), 'flow_vars': Workflow.to_graph_json(request.pyworkflow.flow_vars), - 'filename': request.pyworkflow.file_path + 'filename': request.pyworkflow.file_path, + 'root_dir': request.pyworkflow.root_dir, }) return HttpResponse(combined_json, content_type='application/json') @@ -205,37 +198,53 @@ def get_successors(request, node_id): }) @api_view(['POST']) def upload_file(request): - if 'file' not in request.data: + f = request.FILES.get('file') + + if f is None: return JsonResponse("Empty content", status=404) - f = request.data['file'] - node_id = request.data.get('nodeId', '') - save_name = f"{node_id}-{f.name}" - fs.save(save_name, f) - return JsonResponse({"filename": save_name}, status=201, safe=False) + node_id = request.POST.get('nodeId', '') -@swagger_auto_schema(method='get', + try: + save_name = request.pyworkflow.upload_file(f, node_id) + return JsonResponse({"filename": save_name}, status=201, safe=False) + except WorkflowException as e: + return JsonResponse({e.action: e.reason}, status=500) + + +@swagger_auto_schema(method='post', operation_summary='Downloads a file from the server', - operation_description='Downloads a file by name from the server.', + operation_description='Downloads a file associated with Node from server.', responses={ 200: 'File downloaded', 404: 'Could not read specified file' }) -@api_view(['GET']) +@api_view(['POST']) def download_file(request): - fname = request.GET["filename"] - _, ext = os.path.splitext(fname) - content = "application/octet-stream" - if ext == ".csv": - content = "text/csv" - elif ext == ".json": - content = "application/json" - response = HttpResponse(content_type=content) try: - with fs.open(fname) as f: - response.write(f.read()) + # Retrieve Node info, and related File object + json_data = json.loads(request.body) + f = request.pyworkflow.download_file(json_data['node_id']) + + # Parse file type + _, ext = os.path.splitext(f.name) + if ext == ".csv": + content = "text/csv" + elif ext == ".json": + content = "application/json" + else: + content = "application/octet-stream" + + # Construct response + response = HttpResponse(content_type=content) + response.write(f.read()) + + # File not opened with `with`; need to close + f.close() return response except OSError: return JsonResponse({"message": "Could not find or read file"}, status=404) + except WorkflowException as e: + return JsonResponse({e.action: e.reason}, status=500)