Skip to content

ProtoApiScrubber: Add support to handle streaming requests#39681

Merged
adisuissa merged 43 commits intoenvoyproxy:mainfrom
sumitkmr2:scrubbing
Jun 12, 2025
Merged

ProtoApiScrubber: Add support to handle streaming requests#39681
adisuissa merged 43 commits intoenvoyproxy:mainfrom
sumitkmr2:scrubbing

Conversation

@sumitkmr2
Copy link
Copy Markdown
Contributor

Commit Message: ProtoApiScrubber: Add support to handle streaming requests
Additional Description: Use gRPC field extraction filter's message converter library to buffer streaming messages, convert them to StreamMessage and convert them back to EnvoyBuffer. The scrubbing logic to scrub the StreamMessage would be added in subsequent PRs.
Risk Level: NONE.
Testing: UTs added.
Docs Changes: NONE.
Release Notes: NONE.
Platform Specific Features:
[Optional Runtime guard:]
[Optional Fixes #Issue]
[Optional Fixes commit #PR or SHA]
[Optional Deprecated:]
[Optional API Considerations:]

sumitkmr2 added 27 commits May 8, 2025 11:02
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
# Conflicts:
#	source/extensions/filters/http/proto_api_scrubber/BUILD
#	source/extensions/filters/http/proto_api_scrubber/filter_config.cc
#	source/extensions/filters/http/proto_api_scrubber/filter_config.h

Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
# Conflicts:
#	source/extensions/filters/http/proto_api_scrubber/filter_config.cc
#	source/extensions/filters/http/proto_api_scrubber/filter_config.h
#	test/extensions/filters/http/proto_api_scrubber/filter_config_test.cc

Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
@repokitteh-read-only
Copy link
Copy Markdown

As a reminder, PRs marked as draft will not be automatically assigned reviewers,
or be handled by maintainer-oncall triage.

Please mark your PR as ready when you want it to be reviewed!

🐱

Caused by: #39681 was opened by sumitkmr2.

see: more, trace.

sumitkmr2 added 2 commits June 2, 2025 05:52
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
@sumitkmr2 sumitkmr2 marked this pull request as ready for review June 2, 2025 09:46
@sumitkmr2 sumitkmr2 requested a review from adisuissa as a code owner June 2, 2025 09:46
field_restrictions[std::make_pair(std::string(method_name), std::string(field_mask))] =
factory_cb.value()();
} else {
return absl::InvalidArgumentError(fmt::format(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the code in else part has been removed as it would never get executed. matcher_factory.create doesn't return an optional, this was accidentally copied over from another filter which I was referring to for matcher code.
I tried writing UT to cover the else code and then realized this.

sumitkmr2 added 3 commits June 3, 2025 06:21
… config.cc

Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Copy link
Copy Markdown
Contributor

@adisuissa adisuissa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
Left a few comments as a first pass

"//source/common/http:codes_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//source/extensions/filters/http/grpc_field_extraction/message_converter:message_converter_lib",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be coupling the 2 extensions (one is required for the other)? Is that intentional?
Would it make more sense to move the shared functionality to a common code base that is used by both filters? (e.g., under https://github.com/envoyproxy/envoy/tree/main/source/extensions/common or https://github.com/envoyproxy/envoy/tree/main/source/extensions/filters/common)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the message_converter library dependency is intentional but the grpc_field_extraction filter is not required for proto_api_scrubber filter in the filter chain. I thought about moving it to a shared code but when I started making the changes, it became too big since the proto_message_extraction filter is dependent on message_converter as well so I kept in like that in the interest of time.

I can take this as a follow-up item after the functionality of proto_api_scrubber is completed, to refactor and move this library to common place and update its dependencies. SG?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, follow should be ok

ENVOY_STREAM_LOG(trace, "Accumulated {} messages. Starting scrubbing on each of them one by one.",
*decoder_callbacks_, messages->size());
for (size_t msg_idx = 0; msg_idx < messages->size(); ++msg_idx) {
std::unique_ptr<StreamMessage> stream_message = std::move(messages->at(msg_idx));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure, is it safe to change the contents of messages (moving entries out of it) while iterating over it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's safe as the move doesn't alter the array size or shuffles the entries within the array.

// MessageConverter uses an empty StreamMessage to denote the end.
if (stream_message->message() == nullptr) {
// Expect end_stream=true when the MessageConverter signals an stream end.
DCHECK(end_stream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/DCHECK/ASSERT/ or ENVOY_BUG (depending on what the intention here is)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with ASSERT. Although, these should never be true, but in case, there is a bug introduced in message converter, this can happen so I think ASSERT is better suited here.

bool end_stream) override;
// Request message converter which converts Envoy Buffer data to StreamMessage (for scrubbing) and
// vice-versa.
GrpcFieldExtraction::MessageConverterPtr request_msg_converter_ = nullptr;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
GrpcFieldExtraction::MessageConverterPtr request_msg_converter_ = nullptr;
GrpcFieldExtraction::MessageConverterPtr request_msg_converter_{nullptr};

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it altogether.

Matcher::MatchTreeFactory<HttpMatchingData, ProtoApiScrubberRemoveFieldAction> matcher_factory(
remove_field_action, server_factory_context, validation_visitor);
Matcher::MatchTreeFactoryCb<HttpMatchingData> match_tree_factory_cb =
matcher_factory.create(matcher);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What eforces that the matcher will be of type ProtoApiScrubberRemoveFieldAction?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't understand this. Do you mean what in the function signature enforces this?

ENVOY_STREAM_LOG(debug, "Called ProtoApiScrubber::decodeData: data size={} end_stream={}",
*decoder_callbacks_, data.length(), end_stream);

// Buffer the data to complete the request message.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Buffer the data to complete the request message.
// Move the data to internal gRPC buffer messages representation.


auto buf_convert_status =
request_msg_converter_->convertBackToBuffer(std::move(stream_message));
RELEASE_ASSERT(buf_convert_status.ok(), "failed to convert message back to envoy buffer");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a release-assert (crash) or just reject the message?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are reverse transcoding the message which was generated (or, transcoded) by message converter itself, it's expected to not fail. Had we been handcrafting this message, then reject would be appropriate.

*decoder_callbacks_, data.length(), end_stream);

// Buffer the data to complete the request message.
auto messages = request_msg_converter_->accumulateMessages(data, end_stream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this function uses the msg-converter in a specific way:

  1. Moves the entire data into the converter
  2. Moves all the messages-data out of the converter
    There's seems that there's no real need to keep the request_msg_converter_ as a field here, as there's no context being kept there (i.e., the state of the converter is essentially reset on every decodeData() invocation).
    If that's the case, what were the reasons to keep the converter as part of state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right that's what it does. No particular reason to keep this in state. I followed the pattern used in proto_message_extraction filter. I've removed it from the state now and kept it as a local variable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the internals of message converter and now I understand it better. Basically, it doesn't reinitialize everytime on decodeData(), it rather accumulates the messages within itself. For a streaming request, only when the last message is received (ie, the last call to decodeData()), it returns all the accumulated messages so far. Hence, it needs to be stored in state. I have added it back now. Reference: https://github.com/envoyproxy/envoy/blob/main/source/extensions/filters/http/grpc_field_extraction/message_converter/message_converter.h#L48-L72

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so just so I understand it better:
the message-converter keeps some state, and the purpose of the code below is to send to the upstream some of the data, even if end_stream isn't set.
So, will it make sense to change the code below where a if (stream_message->message() == nullptr) is true, but end_stream will be false, and this won't be the last message (just a partial message)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight correction: The purpose of code below is to form a complete message to be ready for scrubbing (not sending to upstream), a partial message can't be scrubbed.

Regarding the condition (stream_message->message() == nullptr), it is basically a sanity check to confirm that the message converter is working correctly. (stream_message->message() == nullptr) is only true for the last message in the list of messages received from message converter and for the last message, end_stream will have to be true.

Let me know if that makes sense?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be an issue if decodeData() is invoked with end_stream=true, but the message convertor isn't happy with the contents and returns an empty messages object. The main confusion is probably due to whether accumulateMessages() return message even if end_stream is false.

Are you certain that a partial message can't be scrubbed (or that the converter will only return non empty messages when it finished receiving all the data)?
FWIW, my reading of the link you sent above (https://github.com/envoyproxy/envoy/blob/main/source/extensions/filters/http/grpc_field_extraction/message_converter/message_converter.h#L48-L72) is that accumulateMessages may return some messages that can be processed with the data it got so far, and will return the rest when the next batch of data is received and it has sufficient data to return some messages. I think this is true for all streaming algorithms that I'm aware of, and if it isn't so, then the documentation (and maybe the API - better to have 2 functions - one accumulating data, and the other fetching the full response when it is done) for that function should be updated.

If you are certain that accumulateMessages() will always return an empty messages when end_stream is false, I suggest having an explicit check in the code below that will fail the request if messages isn't empty and end_stream is false.

Copy link
Copy Markdown
Contributor Author

@sumitkmr2 sumitkmr2 Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some confusion in respect of message and frames and also the working on MessageConverter. Let me try to explain in a little more detail:

So there are two methods from MessageConverter():

  1. accumulateMessage() - For a single message which is divided in multiple frames.
  2. accumulateMessages() - For multiple messages divided in multiple frames.

We are using (2) here.

Now, let's consider the following example:
Two messages in the overall request - M1, M2
And each message is divided in two frames

  • M1 -> F1, F2
  • M2 -> F3, F4

The frames would be transmitted over the stream in the following order:

  1. F1 (end_stream=false),
  2. F2 (end_stream=false),
  3. F3 (end_stream=false),
  4. F4(end_stream=true)

The accumulateMessages() would be called in the following order with parameter and return values:

  1. accumulateMessages(F1, false) -> {OK, nullptr message}
  2. accumulateMessages(F2, false) -> {OK, [M1]}
  3. accumulateMessages(F3, false) -> {OK, nullptr message}
  4. accumulateMessages(F4, true). -> {OK, [M2, ]}

Note that in the last call, it returns an extra empty message (that's its defined behavior). Hence, we have the ASSERTs in place for the empty message.

Responding inline:

Are you certain that a partial message can't be scrubbed?

Yes, we need to form a complete protobuf payload in order to scrub it.

Let me know if this makes it clear or if you have any further comments/clarifications?

Orthogonal:
I was waiting for this PR to be merged so that I can raise the next PR but since it's taking time, I've raised it anyways (#39815). Note that it contains the changes from this PR also so it's not the best reviewing experience but the files of interest are only these three:

If you can review that PR in parallel, it would be great! Otherwise, I'll wait for this to be merged and update #39815 after.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, as long as it is understood that multiple messages (or zero) may be pushed back to the data-buffer (in the data.move(*buf_convert_status.value()) line below) on a single invocation of decodeData(), and that in some cases there will be decodeData() invocations that will not add bytes back to data, then that should be ok.

FWIW, I think what confuses me is that the code says "Scrubbing completed successfully." while there may still be pending messages that need to be scrubbed.

If you think this all works then I guess LGTM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for sharing the concern! I'll see if I can make it more readable in one of the subsequent PRs.

sumitkmr2 added 5 commits June 9, 2025 06:37
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
# Conflicts:
#	source/extensions/filters/http/proto_api_scrubber/filter.cc
#	source/extensions/filters/http/proto_api_scrubber/filter.h

Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
@KBaichoo
Copy link
Copy Markdown
Contributor

KBaichoo commented Jun 9, 2025

/wait-any

Needs another round of review by @adisuissa and @sumitkmr2 has some merge conflict to address

"//source/common/http:codes_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//source/extensions/filters/http/grpc_field_extraction/message_converter:message_converter_lib",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, follow should be ok

ProtoApiScrubberFilter::decodeHeaders(Envoy::Http::RequestHeaderMap& headers, bool) {
ENVOY_STREAM_LOG(trace, "Called ProtoApiScrubber Filter : {}", *decoder_callbacks_, __func__);

if (!Envoy::Grpc::Common::isGrpcRequestHeaders(headers)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something:
IIUC the headers handling "fails silently" if the message is not a gRPC one (doesn't contain the required gRPC headers).
What should happen in this case when decodeData() is invoked on the data?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're right. Actually, the code that initializes a few variables in decodeHeaders() is planned to be added in upcoming PRs. Those variables are checked to be not-null as a pre-req of decodeData(). For now, I'll add a dummy variable so that filter remains in a correct state.

Signed-off-by: Sumit Kumar <sumitkmr@google.com>
# Conflicts:
#	source/extensions/filters/http/proto_api_scrubber/filter.cc
#	source/extensions/filters/http/proto_api_scrubber/filter.h

Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Signed-off-by: Sumit Kumar <sumitkmr@google.com>
Copy link
Copy Markdown
Contributor

@adisuissa adisuissa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

*decoder_callbacks_, data.length(), end_stream);

// Buffer the data to complete the request message.
auto messages = request_msg_converter_->accumulateMessages(data, end_stream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, as long as it is understood that multiple messages (or zero) may be pushed back to the data-buffer (in the data.move(*buf_convert_status.value()) line below) on a single invocation of decodeData(), and that in some cases there will be decodeData() invocations that will not add bytes back to data, then that should be ok.

FWIW, I think what confuses me is that the code says "Scrubbing completed successfully." while there may still be pending messages that need to be scrubbed.

If you think this all works then I guess LGTM.

@adisuissa adisuissa merged commit 741e47c into envoyproxy:main Jun 12, 2025
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants