Skip to content
10 changes: 8 additions & 2 deletions front-end/src/API.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,16 @@ export async function getNodes() {

/**
* Start a new workflow on the server
* @param {DiagramModel} model - Diagram model
* @returns {Promise<Object>} - server response
*/
export async function initWorkflow() {
return fetchWrapper("/workflow/new");
export async function initWorkflow(model) {
const options = {
method: "POST",
body: JSON.stringify(model.options.id)
};

return fetchWrapper("/workflow/new", options);
}


Expand Down
6 changes: 3 additions & 3 deletions front-end/src/components/Workspace.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Workspace extends React.Component {
API.getNodes()
.then(nodes => this.setState({nodes: nodes}))
.catch(err => console.log(err));
API.initWorkflow().catch(err => console.log(err));
API.initWorkflow(this.model).catch(err => console.log(err));
}

/**
Expand All @@ -51,7 +51,7 @@ class Workspace extends React.Component {
clear() {
if (window.confirm("Clear diagram? You will lose all work.")) {
this.model.getNodes().forEach(n => n.remove());
API.initWorkflow().catch(err => console.log(err));
API.initWorkflow(this.model).catch(err => console.log(err));
this.engine.repaintCanvas();
}
}
Expand Down Expand Up @@ -128,7 +128,7 @@ function FileUpload(props) {
const form = new FormData();
form.append("file", file);
API.uploadWorkflow(form).then(json => {
props.handleData(json.react);
props.handleData(json);
}).catch(err => {
console.log(err);
});
Expand Down
140 changes: 58 additions & 82 deletions pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,34 @@
from .node import Node
from .node_factory import node_factory

from django.conf import settings


class Workflow:
""" Workflow object

Attributes:
name: Name of the workflow
root_dir: Used for reading/writing files to/from disk
graph: A NetworkX Directed Graph
file_path: Location of a workflow file
flow_vars: Global flow variables associated with workflow
"""

def __init__(self, graph=nx.DiGraph(), file_path=None, name='a-name', flow_vars=nx.Graph(), root_dir=settings.MEDIA_ROOT):
#TODO: need to discuss a way to generating the workflow name. For now passing a default name.
self._graph = graph
self._file_path = file_path
def __init__(self, name="Untitled", root_dir=None, graph=nx.DiGraph(), flow_vars=nx.Graph()):
self._name = name
self._root_dir = WorkflowUtils.set_root_dir(root_dir)
self._graph = graph
self._flow_vars = flow_vars

if not os.path.exists(root_dir):
os.makedirs(root_dir)
self._root_dir = root_dir

@property
def graph(self):
return self._graph

@graph.setter
def graph(self, graph):
self._graph = graph

def path(self, file_name):
return os.path.join(self.root_dir, file_name)

@property
def root_dir(self):
return self._root_dir

@property
def file_path(self):
return self._file_path

@file_path.setter
def file_path(self, file_path: str):
if file_path is None or file_path[-5:] == '.json':
self._file_path = file_path
else:
raise WorkflowException('set_file_path', 'File ' + file_path + ' is not JSON.')

@property
def flow_vars(self):
return self._flow_vars
Expand Down Expand Up @@ -116,6 +96,10 @@ def name(self):
def name(self, name: str):
self._name = name

@property
def filename(self):
return self.name + '.json'

def add_edge(self, node_from: Node, node_to: Node):
""" Add a Node object to the graph.

Expand Down Expand Up @@ -247,7 +231,7 @@ def execution_order(self):
def upload_file(self, uploaded_file, node_id):
try:
file_name = f"{node_id}-{uploaded_file.name}"
to_open = os.path.join(self.root_dir, file_name)
to_open = self.path(file_name)

# TODO: Change to a stream/other method for large files?
with open(to_open, 'wb') as f:
Expand All @@ -265,14 +249,13 @@ def download_file(self, node_id):

try:
# TODO: Change to generic "file" option to allow for more than WriteCsv
to_open = os.path.join(self.root_dir, node.options["path_or_buf"])
to_open = self.path(node.options['path_or_buf'])
return open(to_open)
except KeyError:
raise WorkflowException('download_file', '%s does not have an associated file' % node_id)
except OSError as e:
raise WorkflowException('download_file', str(e))


@staticmethod
def store_node_data(workflow, node_id, data):
"""Store Node data
Expand All @@ -288,9 +271,10 @@ def store_node_data(workflow, node_id, data):

"""
file_name = Workflow.generate_file_name(workflow, node_id)
file_path = workflow.path(file_name)

try:
with open(file_name, 'w') as f:
with open(file_path, 'w') as f:
f.write(data)
return file_name
except Exception as e:
Expand Down Expand Up @@ -326,79 +310,57 @@ def retrieve_node_data(node_to_retrieve):
raise WorkflowException('retrieve node data', str(e))

@staticmethod
def read_graph_json(file_like):
def read_graph_json(json_data):
"""Deserialize JSON NetworkX graph

Args:
file_like: file-like object from which to read JSON-serialized graph
json_data: JSON data from which to read JSON-serialized graph

Returns:
NetworkX DiGraph object

Raises:
OSError: on file error
NetworkXError: on issue with loading JSON graph data
"""
json_data = json.load(file_like)
return nx.readwrite.json_graph.node_link_graph(json_data)

@staticmethod
def generate_file_name(workflow, node_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here re. static method. In fact, this could probably be an instance method on the Node class. But maybe not worth it depending how many places call it (and their callers).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is only used once, in store_node_data(), and I thought about removing it completely. Once generated and used for writing data, they can be looked up by accessing the data attribute on a Node, which I think is all we need (basically your idea of an instance method on the Node class?).

If there is a reason to keep this, I think it makes more sense in the Workflow class since the file name includes the Workflow name.

"""Generates a file name for saving intermediate execution data.

Current format is workflow_name - node_id
Current format is 'workflow_name - node_id'

Args:
workflow: the workflow
node_id: the id of the workflow
"""
#TODO: need to add validation
file_name = workflow.name if workflow.name else 'a-name'
return os.path.join(workflow.root_dir, file_name + '-' + str(node_id))
return f"{workflow.name}-{node_id}"

@classmethod
def from_session(cls, data):
"""Create instance from graph (JSON) data and filename

Typically takes Django session as argument, which contains
`graph` and `file_path` keys.
def from_json(cls, json_data):
"""Load Workflow from JSON data.

Args:
data: dict-like with keys `file_path` and `graph`
"""
file_path = data.get('file_path')
graph_data = data.get('graph')
name = data.get('name')
flow_vars_data = data.get('flow_vars')
root_dir = data.get('root_dir')

if graph_data is None:
graph = None
else:
graph = nx.readwrite.json_graph.node_link_graph(graph_data)

if flow_vars_data is None:
flow_vars = None
else:
flow_vars = nx.readwrite.json_graph.node_link_graph(flow_vars_data)

return cls(graph, file_path, name, flow_vars, root_dir)

@classmethod
def from_file(cls, file_like):
"""

"""
graph = cls.read_graph_json(file_like)
return cls(graph)
json_data: JSON-like data from session, or uploaded file

@classmethod
def from_request(cls, json_data):
"""
Returns:
New Workflow object

Raises:
WorkflowException: on missing data (KeyError) or on
malformed NetworkX graph data (NetworkXError)
"""
graph = nx.readwrite.json_graph.node_link_graph(json_data)
return cls(graph)
try:
name = json_data['name']
root_dir = json_data['root_dir']
graph = Workflow.read_graph_json(json_data['graph'])
flow_vars = Workflow.read_graph_json(json_data['flow_vars'])

return cls(name=name, root_dir=root_dir, graph=graph, flow_vars=flow_vars)
except KeyError as e:
raise WorkflowException('from_json', str(e))
except nx.NetworkXError as e:
raise WorkflowException('from_json', str(e))

@staticmethod
def to_graph_json(graph):
Expand All @@ -407,13 +369,27 @@ def to_graph_json(graph):
def to_session_dict(self):
"""Store Workflow information in the Django session.
"""
out = dict()
out['graph'] = Workflow.to_graph_json(self.graph)
out['file_path'] = self.file_path
out['name'] = self.name
out['flow_vars'] = Workflow.to_graph_json(self.flow_vars)
out['root_dir'] = self.root_dir
return out
try:
out = dict()
out['name'] = self.name
out['root_dir'] = self.root_dir
out['graph'] = Workflow.to_graph_json(self.graph)
out['flow_vars'] = Workflow.to_graph_json(self.flow_vars)
return out
except nx.NetworkXError as e:
raise WorkflowException('to_session_dict', str(e))


class WorkflowUtils:
@staticmethod
def set_root_dir(root_dir):
if root_dir is None:
root_dir = os.getcwd()

if not os.path.exists(root_dir):
os.makedirs(root_dir)

return root_dir


class WorkflowException(Exception):
Expand Down
19 changes: 11 additions & 8 deletions vp/workflow/middleware.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pyworkflow import Workflow
from pyworkflow import Workflow, WorkflowException
from django.http import JsonResponse


Expand All @@ -25,13 +25,16 @@ def __call__(self, request):
pass
else:
# All other cases, load workflow from session
request.pyworkflow = Workflow.from_session(request.session)

# Check if a graph is present
if request.pyworkflow.graph is None:
return JsonResponse({
'message': 'A workflow has not been created yet.'
}, status=404)
try:
request.pyworkflow = Workflow.from_json(request.session)

# Check if a graph is present
if request.pyworkflow.graph is None:
return JsonResponse({
'message': 'A workflow has not been created yet.'
}, status=404)
except WorkflowException as e:
return JsonResponse({e.action: e.reason}, status=500)

response = self.get_response(request)

Expand Down
1 change: 1 addition & 0 deletions vp/workflow/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
urlpatterns = [
path('new', views.new_workflow, name='new workflow'),
path('open', views.open_workflow, name='open workflow'),
path('edit', views.edit_workflow, name='edit workflow'),
path('save', views.save_workflow, name='save'),
path('execute', views.execute_workflow, name='execute workflow'),
path('execute/<str:node_id>/successors', views.get_successors, name='get node successors'),
Expand Down
Loading