Conversation
untitaker
commented
Jan 13, 2026
| crate::testutils::initialize_python(); | ||
|
|
||
| let mut router = create_simple_router(c_str!("lambda x: {}[0]"), Noop {}); | ||
| import_py_dep("sentry_streams.pipeline.exception", "InvalidMessageError"); |
Member
Author
There was a problem hiding this comment.
i think this test was wrong previuosly, it tested the same thing as the test before it.
fpacifici
reviewed
Jan 14, 2026
Collaborator
fpacifici
left a comment
There was a problem hiding this comment.
Some high level questions.
| if self._dlq_config is not None: | ||
| topic_override = self._dlq_config.get("topic") | ||
|
|
||
| self._dlq_producer.produce(metadata, topic_override) |
Collaborator
There was a problem hiding this comment.
What about just producing the payload we got at the specific step directly on the DLQ as well ? Metadata could be either a message wrapper or headers.
| _metrics_wrapped_function, step.name, application_function | ||
| ) | ||
| # Wrap with DLQ handling first, then metrics | ||
| dlq_wrapped = self.__wrap_with_dlq(step.name, application_function, stream) |
Collaborator
There was a problem hiding this comment.
High level design questions.
- Would we have a DLQ for aggregate steps and batching steps ?
- In case we did not support aggregate steps (that are lossy), this may be ok, though, what about invalid messages for the following steps ?
- For batch scenarios, we should at least identify specific invalid messages and DLQ them as they still retain the original identity. If we did not support this, the DLQ would not be useful for SBC either as we batch first and process everything in multiprocess in batches.
I think the semantics of aggergate steps could be this:
- An invalid message is DLQed when we try to add it to an aggregate and fail. This does not compromise the existing aggregate
- DLQ in batched message should still work as if the messages were not batched. Batching is generally an optimization, it should not change the semantics of the application, other aggregations do.
- We should supoprt DLQ even after a batch/aggregate step. At this point the original identity may be lost, so I think we may want to DLQ the aggregate payload entirely. This would be viable in the new architecture as we could reprocess data from a specific step of the pipeline as the steps have identity and they are statically connected.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Configuration Schema
DLQ Message Format
{ "original_topic": "source-topic", "original_partition": 5, "original_offset": 12345, "original_key": "user-123", "step_name": "my_parser", "consumer_group": "pipeline-my_source", "error": "Schema validation failed", "timestamp": 1704067200.123 }Implementation
The implementation will change drastically, still. Right now the exceptions from userland are mostly caught in python and directly forwarded to a dlq producer, then reraised to skip the message. The producer should move to Rust to avoid it becoming a bottleneck.