Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,30 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
virtual void continueDecoding() PURE;

/**
* @return Buffer::InstancePtr& the currently buffered data as buffered by this filter or previous
* ones in the filter chain. May be nullptr if nothing has been buffered yet. Callers
* are free to remove, reallocate, and generally modify the buffered data.
* @return const Buffer::Instance* the currently buffered data as buffered by this filter or
* previous ones in the filter chain. May be nullptr if nothing has been buffered yet.
*/
virtual const Buffer::Instance* decodingBuffer() PURE;

/**
* Add buffered body data. This method is used in advanced cases where returning
* StopIterationAndBuffer from decodeData() is not sufficient.
*
* NOTE: For common buffering cases, there is no need for each filter to manually handle
* buffering. If decodeData() returns StopIterationAndBuffer, the filter manager will
* buffer the data passed to the callback on behalf of the filter.
* 1) If a headers only request needs to be turned into a request with a body, this method can
* be called to add body in the decodeHeaders() callback. Subsequent filters will receive
* decodeHeaders(..., false) followed by decodeData(..., true). This works both in the direct
* iteration as well as the continuation case.
*
* NOTE: In complex cases, the filter may wish to manually modify the buffer. One example
* of this is switching a header only request to a request with body data. If a filter
* receives decodeHeaders(..., true), it has the option of filling decodingBuffer() with
* body data. Subsequent filters will receive decodeHeaders(..., false) followed by
* decodeData(..., true). This works both in the direct iteration as well as the
* continuation case.
* 2) If additional buffered body data needs to be added by a filter before continuation of
* data to further filters (outside of callback context).
*
* 3) If additional data needs to be added in the decodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive decodeData(..., false)
* followed by decodeTrailers().
*
* It is an error to call this method in any other case.
*/
virtual Buffer::InstancePtr& decodingBuffer() PURE;
virtual void addDecodedData(Buffer::Instance& data) PURE;

/**
* Called with headers to be encoded, optionally indicating end of stream.
Expand Down Expand Up @@ -246,22 +254,30 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
virtual void continueEncoding() PURE;

/**
* @return Buffer::InstancePtr& the currently buffered data as buffered by this filter or previous
* ones in the filter chain. May be nullptr if nothing has been buffered yet. Callers
* are free to remove, reallocate, and generally modify the buffered data.
* @return const Buffer::Instance* the currently buffered data as buffered by this filter or
* previous ones in the filter chain. May be nullptr if nothing has been buffered yet.
*/
virtual const Buffer::Instance* encodingBuffer() PURE;

/**
* Add buffered body data. This method is used in advanced cases where returning
* StopIterationAndBuffer from encodeData() is not sufficient.
*
* 1) If a headers only response needs to be turned into a response with a body, this method can
* be called to add body in the encodeHeaders() callback. Subsequent filters will receive
* encodeHeaders(..., false) followed by encodeData(..., true). This works both in the direct
* iteration as well as the continuation case.
*
* NOTE: For common buffering cases, there is no need for each filter to manually handle
* buffering. If encodeData() returns StopIterationAndBuffer, the filter manager will
* buffer the data passed to the callback on behalf of the filter.
* 2) If additional buffered body data needs to be added by a filter before continuation of
* data to further filters (outside of callback context).
*
* NOTE: In complex cases, the filter may wish to manually modify the buffer. One example
* of this is switching a header only request to a request with body data. If a filter
* receives encodeHeaders(..., true), it has the option of filling encodingBuffer() with
* body data. Subsequent filters will receive encodeHeaders(..., false) followed by
* encodeData(..., true). This works both in the direct iteration as well as the
* continuation case.
*/
virtual Buffer::InstancePtr& encodingBuffer() PURE;
* 3) If additional data needs to be added in the encodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive encodeData(..., false)
* followed by encodeTrailers().
*
* It is an error to call this method in any other case.
*/
virtual void addEncodedData(Buffer::Instance& data) PURE;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions source/common/dynamo/dynamo_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Http::FilterTrailersStatus DynamoFilter::decodeTrailers(Http::HeaderMap&) {
}

void DynamoFilter::onDecodeComplete(const Buffer::Instance& data) {
std::string body = buildBody(decoder_callbacks_->decodingBuffer().get(), data);
std::string body = buildBody(decoder_callbacks_->decodingBuffer(), data);
if (!body.empty()) {
try {
Json::ObjectPtr json_body = Json::Factory::loadFromString(body);
Expand All @@ -68,7 +68,7 @@ void DynamoFilter::onEncodeComplete(const Buffer::Instance& data) {
uint64_t status = Http::Utility::getResponseStatus(*response_headers_);
chargeBasicStats(status);

std::string body = buildBody(encoder_callbacks_->encodingBuffer().get(), data);
std::string body = buildBody(encoder_callbacks_->encodingBuffer(), data);
if (!body.empty()) {
try {
Json::ObjectPtr json_body = Json::Factory::loadFromString(body);
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
Tracing::Span& activeSpan() override { return active_span_; }
const std::string& downstreamAddress() override { return EMPTY_STRING; }
void continueDecoding() override { NOT_IMPLEMENTED; }
Buffer::InstancePtr& decodingBuffer() override {
void addDecodedData(Buffer::Instance&) override { NOT_IMPLEMENTED; }
const Buffer::Instance* decodingBuffer() override {
throw EnvoyException("buffering is not supported in streaming");
}
void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
Expand Down Expand Up @@ -229,7 +230,7 @@ class AsyncRequestImpl final : public AsyncClient::Request,
void onReset() override;

// Http::StreamDecoderFilterCallbacks
Buffer::InstancePtr& decodingBuffer() override { return request_->body(); }
const Buffer::Instance* decodingBuffer() override { return request_->body().get(); }

MessagePtr request_;
AsyncClient::Callbacks& callbacks_;
Expand Down
65 changes: 65 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ ConnectionManagerImpl::ActiveStream::~ActiveStream() {
*this);
}
}

ASSERT(state_.filter_call_state_ == 0);
}

void ConnectionManagerImpl::ActiveStream::addStreamDecoderFilter(
Expand Down Expand Up @@ -482,8 +484,11 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte
}

for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
FilterHeadersStatus status = (*entry)->handle_->decodeHeaders(
headers, end_stream && continue_data_entry == decoder_filters_.end());
state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
stream_log_trace("decode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterHeadersCallback(status) &&
Expand Down Expand Up @@ -537,7 +542,10 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter*
}

for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeData));
state_.filter_call_state_ |= FilterCallState::DecodeData;
FilterDataStatus status = (*entry)->handle_->decodeData(data, end_stream);
state_.filter_call_state_ &= ~FilterCallState::DecodeData;
stream_log_trace("decode data called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterDataCallback(status, data)) {
Expand All @@ -546,6 +554,24 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter*
}
}

void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilter& filter,
Buffer::Instance& data) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::DecodeHeaders)) {
// If no call is happening or we are in the decode headers callback, buffer the data. Inline
// processing happens in the decodeHeaders() callback if necessary.
filter.commonHandleBufferData(data);
} else if (state_.filter_call_state_ & FilterCallState::DecodeTrailers) {
// In this case we need to inline dispatch the data to further filters. If those filters
// choose to buffer/stop iteration that's fine.
decodeData(&filter, data, false);
} else {
// TODO(mattklein123): Formalize error handling for filters and add tests. Should probably
// throw an exception here.
NOT_IMPLEMENTED;
}
}

void ConnectionManagerImpl::ActiveStream::decodeTrailers(HeaderMapPtr&& trailers) {
request_trailers_ = std::move(trailers);
ASSERT(!state_.remote_complete_);
Expand All @@ -568,7 +594,10 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(ActiveStreamDecoderFilt
}

for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeTrailers));
state_.filter_call_state_ |= FilterCallState::DecodeTrailers;
FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers);
state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers;
stream_log_trace("decode trailers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterTrailersCallback(status)) {
Expand Down Expand Up @@ -609,8 +638,11 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
std::list<ActiveStreamEncoderFilterPtr>::iterator continue_data_entry = encoder_filters_.end();

for (; entry != encoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders));
state_.filter_call_state_ |= FilterCallState::EncodeHeaders;
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(
headers, end_stream && continue_data_entry == encoder_filters_.end());
state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders;
stream_log_trace("encode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterHeadersCallback(status)) {
Expand Down Expand Up @@ -690,11 +722,32 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
}
}

void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilter& filter,
Buffer::Instance& data) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::EncodeHeaders)) {
// If no call is happening or we are in the decode headers callback, buffer the data. Inline
// processing happens in the decodeHeaders() callback if necessary.
filter.commonHandleBufferData(data);
} else if (state_.filter_call_state_ & FilterCallState::EncodeTrailers) {
// In this case we need to inline dispatch the data to further filters. If those filters
// choose to buffer/stop iteration that's fine.
encodeData(&filter, data, false);
} else {
// TODO(mattklein123): Formalize error handling for filters and add tests. Should probably
// throw an exception here.
NOT_IMPLEMENTED;
}
}

void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* filter,
Buffer::Instance& data, bool end_stream) {
std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, end_stream);
for (; entry != encoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeData));
state_.filter_call_state_ |= FilterCallState::EncodeData;
FilterDataStatus status = (*entry)->handle_->encodeData(data, end_stream);
state_.filter_call_state_ &= ~FilterCallState::EncodeData;
stream_log_trace("encode data called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterDataCallback(status, data)) {
Expand All @@ -714,7 +767,10 @@ void ConnectionManagerImpl::ActiveStream::encodeTrailers(ActiveStreamEncoderFilt
HeaderMap& trailers) {
std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, true);
for (; entry != encoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeTrailers));
state_.filter_call_state_ |= FilterCallState::EncodeTrailers;
FilterTrailersStatus status = (*entry)->handle_->encodeTrailers(trailers);
state_.filter_call_state_ &= ~FilterCallState::EncodeTrailers;
stream_log_trace("encode trailers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterTrailersCallback(status)) {
Expand Down Expand Up @@ -772,6 +828,7 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::addResetStreamCallback(
}

void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() {
// TODO(mattklein123): Raise an error if this is called during a callback.
stream_log_trace("continuing filter chain: filter={}", parent_, static_cast<const void*>(this));
ASSERT(stopped_);
stopped_ = false;
Expand Down Expand Up @@ -895,6 +952,10 @@ Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route
return parent_.cached_route_.value();
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::Instance& data) {
parent_.addDecodedData(*this, data);
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); }

void ConnectionManagerImpl::ActiveStreamDecoderFilter::encodeHeaders(HeaderMapPtr&& headers,
Expand All @@ -913,6 +974,10 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::encodeTrailers(HeaderMapP
parent_.encodeTrailers(nullptr, *parent_.response_trailers_);
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data) {
return parent_.addEncodedData(*this, data);
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::continueEncoding() { commonContinue(); }

void ConnectionManagerImpl::ActiveStreamFilterBase::resetStream() {
Expand Down
30 changes: 26 additions & 4 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
const HeaderMapPtr& trailers() override { return parent_.request_trailers_; }

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance& data) override;
void continueDecoding() override;
Buffer::InstancePtr& decodingBuffer() override { return parent_.buffered_request_data_; }
const Buffer::Instance* decodingBuffer() override {
return parent_.buffered_request_data_.get();
}
void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(HeaderMapPtr&& trailers) override;
Expand Down Expand Up @@ -345,8 +348,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
const HeaderMapPtr& trailers() override { return parent_.response_trailers_; }

// Http::StreamEncoderFilterCallbacks
void addEncodedData(Buffer::Instance& data) override;
void continueEncoding() override;
Buffer::InstancePtr& encodingBuffer() override { return parent_.buffered_response_data_; }
const Buffer::Instance* encodingBuffer() override {
return parent_.buffered_response_data_.get();
}

StreamEncoderFilterSharedPtr handle_;
};
Expand All @@ -371,9 +377,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream);
uint64_t connectionId();
Ssl::Connection* ssl();
void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data);
void decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream);
void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream);
void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers);
void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data);
void encodeHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers, bool end_stream);
void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream);
void encodeTrailers(ActiveStreamEncoderFilter* filter, HeaderMap& trailers);
Expand All @@ -398,11 +406,25 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
virtual Tracing::OperationName operationName() const override;
virtual const std::vector<Http::LowerCaseString>& requestHeadersForTags() const override;

// All state for the stream. Put here for readability. We could move this to a bit field
// eventually if we want.
/**
* Flags that keep track of which filter calls are currently in progress.
*/
// clang-format off
struct FilterCallState {
static constexpr uint32_t DecodeHeaders = 0x01;
static constexpr uint32_t DecodeData = 0x02;
static constexpr uint32_t DecodeTrailers = 0x04;
static constexpr uint32_t EncodeHeaders = 0x08;
static constexpr uint32_t EncodeData = 0x10;
static constexpr uint32_t EncodeTrailers = 0x20;
};
// clang-format on

// All state for the stream. Put here for readability.
struct State {
State() : remote_complete_(false), local_complete_(false), saw_connection_close_(false) {}

uint32_t filter_call_state_{0};
bool remote_complete_ : 1;
bool local_complete_ : 1;
bool saw_connection_close_ : 1;
Expand Down
Loading