Skip to content

Retry After doesn't work for MT Channel Based Broker #5935

@pierDipi

Description

@pierDipi

Describe the bug

The new experimental feature introduced in #5813 doesn't work for MT Channel-based Brokers since retries are handled by the backing channel and the filter deployment doesn't propagate response headers back to the channel.

Knative release version

devel

Additional context

  • [Experimental] DeliverySpec RetryAfter #5811
  • Filter response handling:
    func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target string, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) {
    // send the event to trigger's subscriber
    response, err := h.sendEvent(ctx, headers, target, event, reportArgs)
    if err != nil {
    h.logger.Error("failed to send event", zap.Error(err))
    writer.WriteHeader(http.StatusInternalServerError)
    _ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError)
    return
    }
    h.logger.Debug("Successfully dispatched message", zap.Any("target", target))
    // If there is an event in the response write it to the response
    statusCode, err := h.writeResponse(ctx, writer, response, ttl, target)
    if err != nil {
    h.logger.Error("failed to write response", zap.Error(err))
    }
    _ = h.reporter.ReportEventCount(reportArgs, statusCode)
    }
    func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target string, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, error) {
    // Send the event to the subscriber
    req, err := h.sender.NewCloudEventRequestWithTarget(ctx, target)
    if err != nil {
    return nil, fmt.Errorf("failed to create the request: %w", err)
    }
    message := binding.ToMessage(event)
    defer message.Finish(nil)
    additionalHeaders := utils.PassThroughHeaders(headers)
    // Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events
    additionalHeaders.Set("prefer", "reply")
    err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
    if err != nil {
    return nil, fmt.Errorf("failed to write request: %w", err)
    }
    start := time.Now()
    resp, err := h.sender.Send(req)
    dispatchTime := time.Since(start)
    if err != nil {
    err = fmt.Errorf("failed to dispatch message: %w", err)
    }
    sc := 0
    if resp != nil {
    sc = resp.StatusCode
    }
    _ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime)
    return resp, err
    }
    // The return values are the status
    func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) {
    response := cehttp.NewMessageFromHttpResponse(resp)
    defer response.Finish(nil)
    if response.ReadEncoding() == binding.EncodingUnknown {
    // Response doesn't have a ce-specversion header nor a content-type matching a cloudevent event format
    // Just read a byte out of the reader to see if it's non-empty, we don't care what it is,
    // just that it is not empty. This means there was a response and it's not valid, so treat
    // as delivery failure.
    body := make([]byte, 1)
    n, _ := response.BodyReader.Read(body)
    response.BodyReader.Close()
    if n != 0 {
    // Note that we could just use StatusInternalServerError, but to distinguish
    // between the failure cases, we use a different code here.
    writer.WriteHeader(http.StatusBadGateway)
    return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be either empty or a valid CloudEvent")
    }
    h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
    writer.WriteHeader(resp.StatusCode)
    return resp.StatusCode, nil
    }
    event, err := binding.ToEvent(ctx, response)
    if err != nil {
    // Like in the above case, we could just use StatusInternalServerError, but to distinguish
    // between the failure cases, we use a different code here.
    writer.WriteHeader(http.StatusBadGateway)
    // Malformed event, reply with err
    return http.StatusBadGateway, err
    }
    // Reattach the TTL (with the same value) to the response event before sending it to the Broker.
    if err := broker.SetTTL(event.Context, ttl); err != nil {
    writer.WriteHeader(http.StatusInternalServerError)
    return http.StatusInternalServerError, fmt.Errorf("failed to reset TTL: %w", err)
    }
    eventResponse := binding.ToMessage(event)
    defer eventResponse.Finish(nil)
    if err := cehttp.WriteResponseWriter(ctx, eventResponse, resp.StatusCode, writer); err != nil {
    return http.StatusInternalServerError, fmt.Errorf("failed to write response event: %w", err)
    }
    h.logger.Debug("Replied with a CloudEvent response", zap.Any("target", target))
    return resp.StatusCode, nil
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/brokerskind/bugCategorizes issue or PR as related to a bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions