Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
d3ac21a
Initial draft for python SDK
RyanLettieri Apr 18, 2023
575bba0
Merge branch 'master' of https://github.com/dapr/python-sdk into work…
RyanLettieri Apr 18, 2023
9acdf2c
Adding workflow code to aio client
RyanLettieri Apr 18, 2023
3e23139
Fixing protos and small fixes to workflow imports
RyanLettieri Apr 18, 2023
c4e9ef7
Workflow Authoring
DeepanshuA Apr 25, 2023
487e97f
Add example
DeepanshuA Apr 25, 2023
0733570
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA Apr 25, 2023
26380ee
lint
DeepanshuA Apr 25, 2023
eb995b8
is it wheel fix
DeepanshuA Apr 25, 2023
cd044d1
fix lint
DeepanshuA Apr 25, 2023
6fde40c
Updating proto and more small fixes to workflow
RyanLettieri Apr 27, 2023
128a1fa
Add tests and client APIs
DeepanshuA Apr 27, 2023
e074e58
Updating proto and merging in other changes
RyanLettieri Apr 27, 2023
2682d72
Updating proto
RyanLettieri Apr 27, 2023
9361b30
lint
DeepanshuA Apr 27, 2023
49c78d7
Removing super init
RyanLettieri Apr 28, 2023
0516400
tox fix
RyanLettieri Apr 28, 2023
f82bb8e
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA Apr 28, 2023
ffde1df
Add dtf python dependency
DeepanshuA Apr 28, 2023
8d62488
correction
DeepanshuA Apr 28, 2023
28690f0
Some implementation of workflow into fake server and other cleanup
RyanLettieri Apr 30, 2023
3015073
Fixing input for workflow
RyanLettieri Apr 30, 2023
de23696
More workflow cleanup
RyanLettieri Apr 30, 2023
15938b4
Fixing up workflow options to be optional
RyanLettieri May 1, 2023
19ea43f
Few more updates to workflow
RyanLettieri May 2, 2023
40fe904
Few more updates to workflow merging
RyanLettieri May 2, 2023
1be2447
Remove get-pip.py
DeepanshuA May 2, 2023
ab2d201
Updating examples test for workflow and cleaning up methods
RyanLettieri May 2, 2023
a537862
Update durabletask dependency version
DeepanshuA May 2, 2023
75ae2bf
Extra line - to be deleted
DeepanshuA May 3, 2023
7a31c66
test compatible with 3.7
DeepanshuA May 3, 2023
3c11d6c
Fixing comment
RyanLettieri May 3, 2023
f5c7796
Merge branch 'master' of https://github.com/dapr/python-sdk into work…
RyanLettieri May 3, 2023
325dedb
Merge branch 'master' into workflow_authoring
yaron2 May 4, 2023
b8bca56
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA May 9, 2023
2f81812
Incorporate review comments
DeepanshuA May 10, 2023
3185586
Merge branch 'workflow_authoring' of https://github.com/DeepanshuA/py…
DeepanshuA May 10, 2023
2ff152e
lint
DeepanshuA May 11, 2023
347c0bf
Ut fix
DeepanshuA May 11, 2023
2ebf20c
validate demo_workflow
DeepanshuA May 11, 2023
a8fa501
App Readme
DeepanshuA May 11, 2023
05194c8
fix step md
DeepanshuA May 11, 2023
b2aff5c
Adding in exceptions to workflow methods
RyanLettieri May 11, 2023
cc25bd7
Encoding workflow start data and fixing test
RyanLettieri May 11, 2023
1a8af1d
Cleaning up some workflow methods
RyanLettieri May 11, 2023
7598dd1
Validate demo workflow example
DeepanshuA May 12, 2023
74dbc9f
Remove demo actor temporarily
DeepanshuA May 12, 2023
bf69dea
Include raise event test and assertions
DeepanshuA May 16, 2023
0968b8e
Rename
DeepanshuA May 16, 2023
94f915b
Adressing some workflow comments
RyanLettieri May 17, 2023
4886c9e
Incorporate Review comments
DeepanshuA May 18, 2023
18bc883
Lint, validate
DeepanshuA May 18, 2023
9d8ec7f
test correction
DeepanshuA May 18, 2023
49f960a
Fake class method correction
DeepanshuA May 18, 2023
c64a091
Check expected std output in validate example
DeepanshuA May 18, 2023
f3207fc
Remove extra port check
DeepanshuA May 19, 2023
2bfc44a
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA May 19, 2023
82b33e5
Temporary - Verify Workflow Example first
DeepanshuA May 19, 2023
de33a9b
Requirements
DeepanshuA May 19, 2023
363a0df
Remove line
DeepanshuA May 19, 2023
ba2087a
Add back removed validate examples
DeepanshuA May 19, 2023
a3fb750
Update examples/demo_workflow/demo_workflow/requirements.txt
berndverst May 22, 2023
135d3d7
Change running order of wf
DeepanshuA May 22, 2023
a9be483
Commit to re-run example
DeepanshuA May 22, 2023
0e0203d
Merge branch 'master' of https://github.com/dapr/python-sdk into work…
RyanLettieri May 22, 2023
003e25b
Merging in authoring for workflow
RyanLettieri May 23, 2023
b9865b3
Merging in master
RyanLettieri May 23, 2023
0a0e94b
Addressing some review comments
RyanLettieri May 23, 2023
23ae6f7
Few updates for workflow PR
RyanLettieri May 24, 2023
9cc7e4a
More workflow PR fixes
RyanLettieri May 24, 2023
30ab277
Merge branch 'master' into workflow-sdk
RyanLettieri May 24, 2023
da401e8
More workflow typing fixes
RyanLettieri May 25, 2023
8b7dec9
Merge branch 'workflow-sdk' of https://github.com/RyanLettieri/python…
RyanLettieri May 25, 2023
e1d050c
Test and aio client fixes for workflow
RyanLettieri May 25, 2023
d7e9ebd
Whitespace fix
RyanLettieri May 25, 2023
9f5f138
Making workflow test more deterministic
RyanLettieri May 25, 2023
d08907e
Fixing broken workflow test
RyanLettieri May 26, 2023
722e5b3
Addressing some workflow comments
RyanLettieri May 26, 2023
57af017
linting
RyanLettieri May 26, 2023
40da262
Addressing more comments
RyanLettieri May 29, 2023
4c985f0
Even more fixes to workflow
RyanLettieri May 29, 2023
a4769d5
Fixing workflow example
RyanLettieri May 29, 2023
cbed6a2
Making workflow example more consitent
RyanLettieri May 29, 2023
739f145
Making workflow example more consitent again
RyanLettieri May 29, 2023
6932f97
Merging in master
RyanLettieri May 30, 2023
9c6ae6f
Fixing serialization on workflow
RyanLettieri May 30, 2023
efd8420
Code formatting for workflow
RyanLettieri May 30, 2023
ff513f2
Updating workflow documentation
RyanLettieri May 30, 2023
04be191
Fixing input to start workflow
RyanLettieri May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 306 additions & 1 deletion dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import asyncio
import time
import socket
import json
import uuid

from datetime import datetime
from urllib.parse import urlencode

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand All @@ -35,7 +38,9 @@
StreamStreamClientInterceptor
)

from dapr.clients.exceptions import DaprInternalError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -71,6 +76,8 @@
ConfigurationWatcher,
TryLockResponse,
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
)


Expand Down Expand Up @@ -1103,6 +1110,304 @@ async def unlock(
return UnlockResponse(status=UnlockResponseStatus(response.status),
headers=await call.initial_metadata())

async def start_workflow(
self,
workflow_component: str,
workflow_name: str,
input: Optional[Union[Any, bytes]] = None,
instance_id: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add parameter send_raw_bytes: bool = False

workflow_options: Optional[Dict[str, str]] = dict(),
send_raw_bytes: bool = False) -> StartWorkflowResponse:
"""Starts a workflow.

Args:
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
workflow_name (str): the name of the workflow that will be executed.
input (Optional[Union[Any, bytes]]): the input that the workflow will receive.
The input value will be serialized to JSON
by default. Use the send_raw_bytes param
to send unencoded binary input.
instance_id (Optional[str]): the name of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_options (Optional[Dict[str, str]]): the key-value options
that the workflow will receive.
send_raw_bytes (bool) if true, no serialization will be performed on the input
bytes

Returns:
:class:`StartWorkflowResponse`: Instance ID associated with the started workflow
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name)

if instance_id is None:
instance_id = str(uuid.uuid4())

if isinstance(input, bytes) and send_raw_bytes:
encoded_data = input
else:
try:
encoded_data = json.dumps(input).encode(
"utf-8") if input is not None else bytes([])
except TypeError:
raise DaprInternalError("start_workflow: input data must be JSON serializable")
except ValueError as e:
raise DaprInternalError(f"start_workflow JSON serialization error: {e}")

# Actual start workflow invocation
req = api_v1.StartWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name,
options=workflow_options,
input=encoded_data)

try:
response = self._stub.StartWorkflowAlpha1(req)
return StartWorkflowResponse(instance_id=response.instance_id)
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def get_workflow(
self,
instance_id: str,
workflow_component: str) -> GetWorkflowResponse:
"""Gets information on a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`GetWorkflowResponse`: Instance ID associated with the started workflow
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual get workflow invocation
req = api_v1.GetWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
resp = self._stub.GetWorkflowAlpha1(req)
if resp.created_at is None:
resp.created_at = datetime.now
if resp.last_updated_at is None:
resp.last_updated_at = datetime.now
return GetWorkflowResponse(instance_id=instance_id,
workflow_name=resp.workflow_name,
created_at=resp.created_at,
last_updated_at=resp.last_updated_at,
runtime_status=getWorkflowRuntimeStatus(resp.runtime_status),
properties=resp.properties)
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def terminate_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Terminates a workflow.

Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee

"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual terminate workflow invocation
req = api_v1.TerminateWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.TerminateWorkflowAlpha1.with_call(req)
return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def raise_workflow_event(
self,
instance_id: str,
workflow_component: str,
event_name: str,
event_data: Optional[Union[Any, bytes]] = None,
send_raw_bytes: bool = False) -> DaprResponse:
"""Raises an event on a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
event_name (str): the name of the event to be raised on
the workflow.
event_data (Optional[Union[Any, bytes]]): the input that the workflow will receive.
The input value will be serialized to JSON
by default. Use the send_raw_bytes param
to send unencoded binary input.
send_raw_bytes (bool) if true, no serialization will be performed on the input
bytes

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component,
event_name=event_name)
if isinstance(event_data, bytes) and send_raw_bytes:
encoded_data = event_data
else:
if event_data is not None:
try:
encoded_data = json.dumps(event_data).encode(
"utf-8") if event_data is not None else bytes([])
except TypeError:
raise DaprInternalError("raise_workflow_event:\
event_data must be JSON serializable")
except ValueError as e:
raise DaprInternalError(f"raise_workflow_event JSON serialization error: {e}")
encoded_data = json.dumps(event_data).encode("utf-8")
else:
encoded_data = bytes([])
# Actual workflow raise event invocation
req = api_v1.raise_workflow_event(
instance_id=instance_id,
workflow_component=workflow_component,
event_name=event_name,
event_data=encoded_data)

try:
_, call = self._stub.RaiseEventWorkflowAlpha1.with_call(req)
return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def pause_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Pause a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee

"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual pause workflow invocation
req = api_v1.PauseWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.PauseWorkflowAlpha1.with_call(req)

return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def resume_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Resumes a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual resume workflow invocation
req = api_v1.ResumeWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.ResumeWorkflowAlpha1.with_call(req)

return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def purge_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Purges a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual purge workflow invocation
req = api_v1.PurgeWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.PurgeWorkflowAlpha1.with_call(req)

return DaprResponse(
headers=call.initial_metadata())

except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def wait(self, timeout_s: float):
"""Waits for sidecar to be available within the timeout.

Expand Down
20 changes: 19 additions & 1 deletion dapr/clients/grpc/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from collections import namedtuple
from typing import Dict, List, Union, Tuple, Optional

from enum import Enum
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.message import Message as GrpcMessage
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore
Expand Down Expand Up @@ -180,3 +180,21 @@ def validateNotBlankString(**kwargs: Optional[str]):
for field_name, value in kwargs.items():
if not value or not value.strip():
raise ValueError(f"{field_name} name cannot be empty or blank")


class WorkflowRuntimeStatus(Enum):
UNKNOWN = "Unknown"
RUNNING = "Running"
COMPLETED = "Completed"
FAILED = "Failed"
TERMINATED = "Terminated"
PENDING = "Pending"
SUSPENDED = "Suspended"


# Will return the enum entry if it is present, otherwise returns "unknown"
def getWorkflowRuntimeStatus(inputString):
try:
return WorkflowRuntimeStatus[inputString].value
except KeyError:
return WorkflowRuntimeStatus.UNKNOWN
Loading