-
Notifications
You must be signed in to change notification settings - Fork 1.3k
PollComponent and PollActivityExecution #8563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cb5f326 to
21cdb37
Compare
ded2e1b to
efc4722
Compare
3281080 to
9c65e47
Compare
ce95cbe to
29a0bd9
Compare
29a0bd9 to
33e981e
Compare
40e0b6b to
28665bb
Compare
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't review the tests very closely.
There are still come open comments, please address before merging but I do not feel like I need another pass here.
chasm/lib/activity/handler.go
Outdated
| // TODO(dan): include execution key in error message; we may do this at the CHASM | ||
| // framework level. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I don't think this is needed. Not blocking the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the comment
| if len(token) == 0 { | ||
| return chasm.ReadComponent(ctx, ref, (*Activity).buildPollActivityExecutionResponse, req, nil) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems more like an invalid argument to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, we will address when we split the API
service/history/chasm_notifier.go
Outdated
| type ( | ||
| // ChasmNotifier allows subscribers to receive notifications relating to a CHASM execution. | ||
| ChasmNotifier struct { | ||
| executions map[chasm.EntityKey]*subscriptionTracker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocking, but maybe put a TODO here to use the sharded map which will guarantee less lock contention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, and internally tracking need for an audit of TODOs in code
service/history/history_engine.go
Outdated
| e.eventNotifier.NotifyNewHistoryEvent(notification) | ||
| } | ||
|
|
||
| func (e *historyEngineImpl) ChasmEngine() chasm.Engine { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this called anywhere? Maybe I missed it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, removed
tests/standalone_activity_test.go
Outdated
| ) | ||
|
|
||
| var ( | ||
| defaultInput = &commonpb.Payloads{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this before I believe, use payloads.EncodeString() from common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (not specifically related to this PR so it may conflict, but good to do now in case we forget)
tests/standalone_activity_test.go
Outdated
|
|
||
| func (s *standaloneActivityTestSuite) SetupSuite() { | ||
| s.FunctionalTestBase.SetupSuite() | ||
| s.tv = testvars.New(s.T()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be done in SetupTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
| input := createDefaultInput() | ||
| taskQueue := uuid.New().String() | ||
| taskQueue := testcore.RandomizeStr(t.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're already using testvars, might as well fully use it. IMHO that utility doesn't give us much but leaving it up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've switched them over
Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: Roey Berman <roey@temporal.io>
yycptt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need to change all EntityKey -> ExecutionKey
chasm/ref.go
Outdated
| } | ||
| var pRef persistencespb.ChasmComponentRef | ||
| if err := pRef.Unmarshal(data); err != nil { | ||
| return ComponentRef{}, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrMalformedComponentRef?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done
chasm/ref.go
Outdated
| // ErrMalformedComponentRef is returned when component ref bytes cannot be deserialized. | ||
| var ErrMalformedComponentRef = errors.New("malformed component ref") | ||
|
|
||
| // ErrInvalidComponentRef is returned when component ref bytes deserialize to an invalid component ref. | ||
| var ErrInvalidComponentRef = errors.New("invalid component ref") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd return invalidRequest here unless we are sure all api handlers have proper error conversion logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, done.
service/history/interfaces/engine.go
Outdated
|
|
||
| NotifyNewHistoryEvent(event *events.Notification) | ||
| NotifyNewTasks(tasks map[tasks.Category][]tasks.Task) | ||
| ChasmEngine() chasm.Engine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this one required/used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, thanks!
| // Notify for current workflow if it has CHASM updates | ||
| if len(currentWorkflowMutation.UpsertChasmNodes) > 0 || | ||
| len(currentWorkflowMutation.DeleteChasmNodes) > 0 { | ||
| engine.NotifyChasmExecution(chasm.EntityKey{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do it in ConflictResolveExecution as well. Create execution is probably fine, I guess there won't be any poller before execution is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I haven't think through if it needs to be in OperationPossiblySucceeded. Can you elaborate your thoughts a bit here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, let's address this in one more PR to target standalone-activity. It will all arrive in main at the same time.
| if ref != nil { | ||
| return ref, nil | ||
| } | ||
| case <-ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have some tail room here to return an empty response and avoid a timeout error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the design that @bergundy and I have settled on, the caller sets the tail room. See PollActivityExecution in chasm/lib/activity/handler.go.
| // behind the requested reference. However, getExecutionLease does not currently guarantee that | ||
| // execution VT >= ref VT, therefore we call IsStale() again here and return any error (which at | ||
| // this point must be ErrStaleState; ErrStaleReference has already been eliminated). | ||
| err := chasmTree.IsStale(ref) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already checked in getExecutionLease? or it's for fixing the bug we discussed before that getExecutionLease needs to do another stale check after reload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's right. We can remove this when that bug is fixed.
Right, we will do that shortly when we merge/rebase. It will be nice to have that done. |
083e3b0 to
ba38880
Compare
chasm/lib/activity/handler.go
Outdated
| req *activitypb.PollActivityExecutionRequest, | ||
| ) (*activitypb.PollActivityExecutionResponse, bool, error) { | ||
| // TODO(dan): check for terminal activity states | ||
| panic("pollActivityExecutionWaitCompletion is not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Panic in WaitCompletion handler crashes server
The PollActivityExecution handler contains a panic("pollActivityExecutionWaitCompletion is not implemented") statement in the WaitCompletion case branch. If a user sends a PollActivityExecutionRequest with a WaitCompletion wait policy, this will crash the server. This panic should be replaced with returning a proper error like serviceerror.NewUnimplemented("WaitCompletion is not yet implemented") to avoid server crashes.
| waitPolicy := req.GetFrontendRequest().GetWaitPolicy() | ||
|
|
||
| if waitPolicy == nil { | ||
| return chasm.ReadComponent(ctx, ref, (*Activity).buildPollActivityExecutionResponse, req, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Deferred error transformation bypassed on early returns
The deferred function at lines 80-85 transforms NotFound errors into a user-friendly message by modifying the named return variable err. However, the direct return chasm.ReadComponent(...) statements on lines 90 and 109 bypass the named return variable entirely. In Go, when using return expr1, expr2 with named returns, the expressions go directly to the caller without updating the named variables. This means NotFound errors from the waitPolicy == nil and len(token) == 0 code paths won't be transformed to "activity execution not found".
Additional Locations (1)
| ref, err := DeserializeComponentRef(refBytes) | ||
| if err != nil { | ||
| return false, ErrMalformedComponentRef | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: ExecutionStateChanged always returns ErrMalformedComponentRef discarding original error type
The function ExecutionStateChanged unconditionally returns ErrMalformedComponentRef when DeserializeComponentRef fails, but DeserializeComponentRef can return either ErrMalformedComponentRef or ErrInvalidComponentRef (for empty data or missing fields). The function's doc comment claims it "may return ErrInvalidComponentRef or ErrMalformedComponentRef" but the implementation always substitutes ErrMalformedComponentRef on deserialization error. The original error should be returned directly (return false, err) instead of always returning ErrMalformedComponentRef.
- Add `history.ChasmNotifier` for subscribing to CHASM execution state transitions - Implement `chasm.PollComponent` - Add `PollActivityExecution` API handler - Needed for standalone activity - Needed for long-poll of other CHASM archetypes - [x] built - [x] added new unit test(s) - [x] added new functional test(s)
What changed?
history.ChasmNotifierfor subscribing to CHASM execution state transitionschasm.PollComponentPollActivityExecutionAPI handlerWhy?
How did you test it?
Note
Implements long‑polling for CHASM components and standalone activities via new PollComponent API, PollActivityExecution endpoint, and an execution notifier, with validation, ref handling, and tests.
Engine.PollComponent(withNotifyExecution) and helperExecutionStateChanged; extendContextwithstructuredRef.ChasmNotifierfor execution-level subscriptions; wire into history engine; emit notifications on CHASM mutations.ErrMalformedComponentRef/ErrInvalidComponentRefand validation; exposestructuredRefin tree.PollActivityExecutionfrontend and history handler usingPollComponent; buildActivityExecutionInfoand response assembly.LongPollTimeout,LongPollBuffer); provide via FX modules.PollActivityExecution{Request,Response}messages and service RPC; generate client/grpc helpers.PollComponent(no-wait, wait, stale).PollComponentsignature andNotifyExecution; history engine interface addsNotifyChasmExecution.Written by Cursor Bugbot for commit 1d59e66. This will update automatically on new commits. Configure here.