From c5ae44cf3b164fbc4d81cd7e1300213df0b0e39a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 21 Nov 2018 17:36:25 +0300 Subject: [PATCH] Add amqp.Delivery to the request func in order to have the ability to change incoming message before decode function in case of zipkin context --- .../{encode-decode.go => encode_decode.go} | 0 transport/amqp/publisher.go | 3 ++- transport/amqp/request_response_func.go | 20 +++++++++---------- transport/amqp/subscriber.go | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) rename transport/amqp/{encode-decode.go => encode_decode.go} (100%) diff --git a/transport/amqp/encode-decode.go b/transport/amqp/encode_decode.go similarity index 100% rename from transport/amqp/encode-decode.go rename to transport/amqp/encode_decode.go diff --git a/transport/amqp/publisher.go b/transport/amqp/publisher.go index a28ee941d..f2d62e18f 100644 --- a/transport/amqp/publisher.go +++ b/transport/amqp/publisher.go @@ -82,7 +82,8 @@ func (p Publisher) Endpoint() endpoint.Endpoint { } for _, f := range p.before { - ctx = f(ctx, &pub) + // Affect only amqp.Publishing + ctx = f(ctx, &pub, nil) } deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub) diff --git a/transport/amqp/request_response_func.go b/transport/amqp/request_response_func.go index a6f730f25..1409240a6 100644 --- a/transport/amqp/request_response_func.go +++ b/transport/amqp/request_response_func.go @@ -10,7 +10,7 @@ import ( // RequestFunc may take information from a publisher request and put it into a // request context. In Subscribers, RequestFuncs are executed prior to invoking // the endpoint. -type RequestFunc func(context.Context, *amqp.Publishing) context.Context +type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context // SubscriberResponseFunc may take information from a request context and use it to // manipulate a Publisher. SubscriberResponseFuncs are only executed in @@ -29,7 +29,7 @@ type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context // SetPublishExchange returns a RequestFunc that sets the Exchange field // of an AMQP Publish call. func SetPublishExchange(publishExchange string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyExchange, publishExchange) } } @@ -37,7 +37,7 @@ func SetPublishExchange(publishExchange string) RequestFunc { // SetPublishKey returns a RequestFunc that sets the Key field // of an AMQP Publish call. func SetPublishKey(publishKey string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyPublishKey, publishKey) } } @@ -45,7 +45,7 @@ func SetPublishKey(publishKey string) RequestFunc { // SetPublishDeliveryMode sets the delivery mode of a Publishing. // Please refer to AMQP delivery mode constants in the AMQP package. func SetPublishDeliveryMode(dmode uint8) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.DeliveryMode = dmode return ctx } @@ -57,7 +57,7 @@ func SetPublishDeliveryMode(dmode uint8) RequestFunc { // One example is the SingleNackRequeueErrorEncoder. // It is designed to be used by Subscribers. func SetNackSleepDuration(duration time.Duration) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyNackSleepDuration, duration) } } @@ -68,7 +68,7 @@ func SetNackSleepDuration(duration time.Duration) RequestFunc { // a matching correlationId. // It is designed to be used by Publishers. func SetConsumeAutoAck(autoAck bool) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyAutoAck, autoAck) } } @@ -77,7 +77,7 @@ func SetConsumeAutoAck(autoAck bool) RequestFunc { // function. // It is designed to be used by Publishers. func SetConsumeArgs(args amqp.Table) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyConsumeArgs, args) } } @@ -85,7 +85,7 @@ func SetConsumeArgs(args amqp.Table) RequestFunc { // SetContentType returns a RequestFunc that sets the ContentType field of // an AMQP Publishing. func SetContentType(contentType string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.ContentType = contentType return ctx } @@ -94,7 +94,7 @@ func SetContentType(contentType string) RequestFunc { // SetContentEncoding returns a RequestFunc that sets the ContentEncoding field // of an AMQP Publishing. func SetContentEncoding(contentEncoding string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.ContentEncoding = contentEncoding return ctx } @@ -103,7 +103,7 @@ func SetContentEncoding(contentEncoding string) RequestFunc { // SetCorrelationID returns a RequestFunc that sets the CorrelationId field // of an AMQP Publishing. func SetCorrelationID(cid string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.CorrelationId = cid return ctx } diff --git a/transport/amqp/subscriber.go b/transport/amqp/subscriber.go index 17e1b0f59..d728aa4ed 100644 --- a/transport/amqp/subscriber.go +++ b/transport/amqp/subscriber.go @@ -84,7 +84,7 @@ func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) { pub := amqp.Publishing{} for _, f := range s.before { - ctx = f(ctx, &pub) + ctx = f(ctx, &pub, deliv) } request, err := s.dec(ctx, deliv)