Skip to content
Merged
14 changes: 10 additions & 4 deletions front-end/src/API.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,26 @@ export async function uploadDataFile(formData) {

/**
* Download file by name from server
* @param {string} fileName - name of file to download
* @param {CustomNodeModel} node - node containing file to download
* @returns {Promise<void>}
*/
export async function downloadDataFile(fileName) {
export async function downloadDataFile(node) {
// TODO: make this not a giant security problem
let contentType;

const payload = {...node.options, options: node.config};

// can't use fetchWrapper because it assumes JSON response
fetch(`/workflow/download?filename=${fileName}`)
fetch(`/workflow/download`, {
method: "POST",
body: JSON.stringify(payload)
})
.then(async resp => {
if (!resp.ok) return Promise.reject(await resp.json());
contentType = resp.headers.get("content-type");
if (contentType.startsWith("text")) {
resp.text().then(data => {
downloadFile(data, contentType, fileName);
downloadFile(data, contentType, node.config["path_or_buf"]);
})
}
}).catch(err => console.log(err));
Expand Down
2 changes: 1 addition & 1 deletion front-end/src/components/Workspace.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Workspace extends React.Component {
node.setSelected(false);
if (node.options.download_result) {
// TODO: make this work for non-WriteCsvNode nodes
API.downloadDataFile(node.config["path_or_buf"])
API.downloadDataFile(node)
.catch(err => console.log(err));
}
} catch {
Expand Down
2 changes: 1 addition & 1 deletion pyworkflow/pyworkflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .workflow import Workflow, WorkflowException
from .node import *
from .node_factory import node_factory, create_node
from .node_factory import node_factory
10 changes: 2 additions & 8 deletions pyworkflow/pyworkflow/node.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import pandas as pd
from django.core.files.storage import FileSystemStorage
from django.conf import settings

fs = FileSystemStorage(location=settings.MEDIA_ROOT)


class Node:
Expand All @@ -16,7 +12,7 @@ def __init__(self, node_info, options=None):
self.node_key = node_info.get('node_key')
self.data = node_info.get('data')

self.is_global = True if node_info.get('is_global') else False
self.is_global = node_info.get('is_global') is True

# Execution options are passed up from children
self.options = options or dict()
Expand Down Expand Up @@ -217,9 +213,7 @@ def execute(self, predecessor_data, flow_vars):

# Write to CSV and save
opts = self.options
# TODO: Remove use of Django file storage from pyworkflow nodes
fname = fs.path(opts["path_or_buf"])
df.to_csv(fname, sep=opts["sep"], index=opts["index"])
df.to_csv(opts["path_or_buf"], sep=opts["sep"], index=opts["index"])
return df.to_json()
except Exception as e:
raise NodeException('write csv', str(e))
Expand Down
9 changes: 0 additions & 9 deletions pyworkflow/pyworkflow/node_factory.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
from .node import *
import json


def create_node(payload):
json_data = json.loads(payload)

try:
return node_factory(json_data)
except OSError as e:
raise NodeException('create_node', 'Problem parsing JSON')

def node_factory(node_info):
# Create a new Node with info
# TODO: should perform error-checking or add default values if missing
Expand Down
77 changes: 60 additions & 17 deletions pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import os
import networkx as nx
import json

from .node import Node
from .node_factory import node_factory

from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import FileSystemStorage

fs = FileSystemStorage(location=settings.MEDIA_ROOT)


class Workflow:
Expand All @@ -19,13 +16,17 @@ class Workflow:
file_path: Location of a workflow file
"""

def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph()):
def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph(), root_dir=settings.MEDIA_ROOT):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried writing a pyworkflow unit test that imported django.conf.settings, and it complains about Django not being configured. So I would either set /tmp as the default, or make it None and then set it to os.getcwd() when checking its existence in __init__. That would allow someone to use pyworkflow as a regular-old-python-module and have all their output where they're working.

#TODO: need to discuss a way to generating the workflow name. For now passing a default name.
self._graph = graph
self._file_path = file_path
self._name = name
self._flow_vars = flow_vars

if not os.path.exists(root_dir):
os.makedirs(root_dir)
self._root_dir = root_dir

@property
def graph(self):
return self._graph
Expand All @@ -34,6 +35,13 @@ def graph(self):
def graph(self, graph):
self._graph = graph

def path(self, file_name):
return os.path.join(self.root_dir, file_name)

@property
def root_dir(self):
return self._root_dir

@property
def file_path(self):
return self._file_path
Expand Down Expand Up @@ -236,6 +244,35 @@ def execution_order(self):
except RuntimeError as e:
raise WorkflowException('execution order', 'The graph was changed while generating the execution order')

def upload_file(self, uploaded_file, node_id):
try:
file_name = f"{node_id}-{uploaded_file.name}"
to_open = os.path.join(self.root_dir, file_name)

# TODO: Change to a stream/other method for large files?
with open(to_open, 'wb') as f:
f.write(uploaded_file.read())

uploaded_file.close()
return to_open
except OSError as e:
raise WorkflowException('upload_file', str(e))

def download_file(self, node_id):
node = self.get_node(node_id)
if node is None:
return None

try:
# TODO: Change to generic "file" option to allow for more than WriteCsv
to_open = os.path.join(self.root_dir, node.options["path_or_buf"])
return open(to_open)
except KeyError:
raise WorkflowException('download_file', '%s does not have an associated file' % node_id)
except OSError as e:
raise WorkflowException('download_file', str(e))


@staticmethod
def store_node_data(workflow, node_id, data):
"""Store Node data
Expand All @@ -250,10 +287,12 @@ def store_node_data(workflow, node_id, data):
Returns:

"""
file_name = Workflow.generate_file_name(workflow.name, node_id)
file_name = Workflow.generate_file_name(workflow, node_id)

try:
return fs.save(file_name, ContentFile(data))
with open(file_name, 'w') as f:
f.write(data)
return file_name
except Exception as e:
return None

Expand All @@ -274,12 +313,15 @@ def retrieve_node_data(node_to_retrieve):
problem parsing the file.
"""
try:
with fs.open(node_to_retrieve.data) as f:
with open(node_to_retrieve.data) as f:
return json.load(f)
except OSError as e:
raise WorkflowException('retrieve node data', str(e))
except TypeError as e:
raise WorkflowException('retrieve node data', str(e))
except TypeError:
raise WorkflowException(
'retrieve node data',
'Node %s has not yet been executed. No data to retrieve.' % node_to_retrieve.node_id
)
except json.JSONDecodeError as e:
raise WorkflowException('retrieve node data', str(e))

Expand All @@ -301,20 +343,18 @@ def read_graph_json(file_like):
return nx.readwrite.json_graph.node_link_graph(json_data)

@staticmethod
def generate_file_name(workflow_name, node_id):
def generate_file_name(workflow, node_id):
"""Generates a file name for saving intermediate execution data.

Current format is workflow_name - node_id

Args:
workflow_name: the name of the workflow
workflow: the workflow
node_id: the id of the workflow
"""
#TODO: need to add validation
if workflow_name is None:
workflow_name = "a-name"

return workflow_name + '-' + str(node_id)
file_name = workflow.name if workflow.name else 'a-name'
return os.path.join(workflow.root_dir, file_name + '-' + str(node_id))

@classmethod
def from_session(cls, data):
Expand All @@ -330,6 +370,8 @@ def from_session(cls, data):
graph_data = data.get('graph')
name = data.get('name')
flow_vars_data = data.get('flow_vars')
root_dir = data.get('root_dir')

if graph_data is None:
graph = None
else:
Expand All @@ -340,7 +382,7 @@ def from_session(cls, data):
else:
flow_vars = nx.readwrite.json_graph.node_link_graph(flow_vars_data)

return cls(graph, file_path, name, flow_vars)
return cls(graph, file_path, name, flow_vars, root_dir)

@classmethod
def from_file(cls, file_like):
Expand Down Expand Up @@ -370,6 +412,7 @@ def to_session_dict(self):
out['file_path'] = self.file_path
out['name'] = self.name
out['flow_vars'] = Workflow.to_graph_json(self.flow_vars)
out['root_dir'] = self.root_dir
return out


Expand Down
19 changes: 7 additions & 12 deletions vp/node/views.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import json

from django.core.files.base import ContentFile
from django.core.files.storage import FileSystemStorage
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory, create_node
from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory
from rest_framework.decorators import api_view
from drf_yasg.utils import swagger_auto_schema
from django.conf import settings

fs = FileSystemStorage(location=settings.MEDIA_ROOT)


@swagger_auto_schema(method='post',
Expand All @@ -35,7 +30,7 @@ def node(request):
"""
# Extract request info for node creation
try:
new_node = create_node(request.body)
new_node = create_node(request)
except NodeException as e:
return JsonResponse({e.action: e.reason}, status=400)

Expand Down Expand Up @@ -177,7 +172,7 @@ def handle_node(request, node_id):
if request.method == 'GET':
response = JsonResponse(retrieved_node.__dict__, safe=False)
elif request.method == 'POST':
updated_node = create_node(request.body)
updated_node = create_node(request)

# Nodes need to be the same type to update
if type(retrieved_node) != type(updated_node):
Expand Down Expand Up @@ -258,17 +253,17 @@ def retrieve_data(request, node_id):
return JsonResponse({e.action: e.reason}, status=500)


def create_node(payload):
def create_node(request):
"""Pass all request info to Node Factory.

"""
json_data = json.loads(payload)
json_data = json.loads(request.body)
# for options with type 'file', replace value with FileStorage path
for field, info in json_data.get("option_types", dict()).items():
if info["type"] == "file":
if info["type"] == "file" or info["name"] == "Filename":
opt_value = json_data["options"][field]
if opt_value is not None:
json_data["options"][field] = fs.path(opt_value)
json_data["options"][field] = request.pyworkflow.path(opt_value)

try:
return node_factory(json_data)
Expand Down
Loading