Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
3 changes: 2 additions & 1 deletion transport/amqp/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (p Publisher) Endpoint() endpoint.Endpoint {
}

for _, f := range p.before {
ctx = f(ctx, &pub)
// Affect only amqp.Publishing
ctx = f(ctx, &pub, nil)
}

deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub)
Expand Down
20 changes: 10 additions & 10 deletions transport/amqp/request_response_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// RequestFunc may take information from a publisher request and put it into a
// request context. In Subscribers, RequestFuncs are executed prior to invoking
// the endpoint.
type RequestFunc func(context.Context, *amqp.Publishing) context.Context
type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context

// SubscriberResponseFunc may take information from a request context and use it to
// manipulate a Publisher. SubscriberResponseFuncs are only executed in
Expand All @@ -29,23 +29,23 @@ type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
// SetPublishExchange returns a RequestFunc that sets the Exchange field
// of an AMQP Publish call.
func SetPublishExchange(publishExchange string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyExchange, publishExchange)
}
}

// SetPublishKey returns a RequestFunc that sets the Key field
// of an AMQP Publish call.
func SetPublishKey(publishKey string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
}
}

// SetPublishDeliveryMode sets the delivery mode of a Publishing.
// Please refer to AMQP delivery mode constants in the AMQP package.
func SetPublishDeliveryMode(dmode uint8) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.DeliveryMode = dmode
return ctx
}
Expand All @@ -57,7 +57,7 @@ func SetPublishDeliveryMode(dmode uint8) RequestFunc {
// One example is the SingleNackRequeueErrorEncoder.
// It is designed to be used by Subscribers.
func SetNackSleepDuration(duration time.Duration) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
}
}
Expand All @@ -68,7 +68,7 @@ func SetNackSleepDuration(duration time.Duration) RequestFunc {
// a matching correlationId.
// It is designed to be used by Publishers.
func SetConsumeAutoAck(autoAck bool) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
}
}
Expand All @@ -77,15 +77,15 @@ func SetConsumeAutoAck(autoAck bool) RequestFunc {
// function.
// It is designed to be used by Publishers.
func SetConsumeArgs(args amqp.Table) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyConsumeArgs, args)
}
}

// SetContentType returns a RequestFunc that sets the ContentType field of
// an AMQP Publishing.
func SetContentType(contentType string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.ContentType = contentType
return ctx
}
Expand All @@ -94,7 +94,7 @@ func SetContentType(contentType string) RequestFunc {
// SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
// of an AMQP Publishing.
func SetContentEncoding(contentEncoding string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.ContentEncoding = contentEncoding
return ctx
}
Expand All @@ -103,7 +103,7 @@ func SetContentEncoding(contentEncoding string) RequestFunc {
// SetCorrelationID returns a RequestFunc that sets the CorrelationId field
// of an AMQP Publishing.
func SetCorrelationID(cid string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.CorrelationId = cid
return ctx
}
Expand Down
2 changes: 1 addition & 1 deletion transport/amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {
pub := amqp.Publishing{}

for _, f := range s.before {
ctx = f(ctx, &pub)
ctx = f(ctx, &pub, deliv)
}

request, err := s.dec(ctx, deliv)
Expand Down