Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions transport/amqp/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package amqp implements an AMQP transport.
package amqp
22 changes: 22 additions & 0 deletions transport/amqp/encode-decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package amqp

import (
"context"
"github.com/streadway/amqp"
)

// DecodeRequestFunc extracts a user-domain request object from
// an AMQP Delivery object. It is designed to be used in AMQP Subscribers.
type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error)

// EncodeRequestFunc encodes the passed request object into
// an AMQP Publishing object. It is designed to be used in AMQP Publishers.
type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error

// EncodeResponseFunc encodes the passed reponse object to
// an AMQP Publishing object. It is designed to be used in AMQP Subscribers.
type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error

// DecodeResponseFunc extracts a user-domain response object from
// an AMQP Delivery object. It is designed to be used in AMQP Publishers.
type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error)
152 changes: 152 additions & 0 deletions transport/amqp/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package amqp

import (
"context"
"time"

"github.com/go-kit/kit/endpoint"
"github.com/streadway/amqp"
)

// The golang AMQP implementation requires the []byte representation of
// correlation id strings to have a maximum length of 255 bytes.
const maxCorrelationIdLength = 255

// Publisher wraps an AMQP channel and queue, and provides a method that
// implements endpoint.Endpoint.
type Publisher struct {
ch Channel
q *amqp.Queue
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []PublisherResponseFunc
timeout time.Duration
}

// NewPublisher constructs a usable Publisher for a single remote method.
func NewPublisher(
ch Channel,
q *amqp.Queue,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
options ...PublisherOption,
) *Publisher {
p := &Publisher{
ch: ch,
q: q,
enc: enc,
dec: dec,
timeout: 10 * time.Second,
}
for _, option := range options {
option(p)
}
return p
}

// PublisherOption sets an optional parameter for clients.
type PublisherOption func(*Publisher)

// PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP
// request before it's invoked.
func PublisherBefore(before ...RequestFunc) PublisherOption {
return func(p *Publisher) { p.before = append(p.before, before...) }
}

// PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
return func(p *Publisher) { p.after = append(p.after, after...) }
}

// PublisherTimeout sets the available timeout for an AMQP request.
func PublisherTimeout(timeout time.Duration) PublisherOption {
return func(p *Publisher) { p.timeout = timeout }
}

// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (p Publisher) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

pub := amqp.Publishing{
ReplyTo: p.q.Name,
CorrelationId: randomString(randInt(5, maxCorrelationIdLength)),
}

if err := p.enc(ctx, &pub, request); err != nil {
return nil, err
}

for _, f := range p.before {
ctx = f(ctx, &pub)
}

deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub)
if err != nil {
return nil, err
}

for _, f := range p.after {
ctx = f(ctx, deliv)
}
response, err := p.dec(ctx, deliv)
if err != nil {
return nil, err
}

return response, nil
}
}

// publishAndConsumeFirstMatchingResponse publishes the specified Publishing
// and returns the first Delivery object with the matching correlationId.
// If the context times out while waiting for a reply, an error will be returned.
func (p Publisher) publishAndConsumeFirstMatchingResponse(
ctx context.Context,
pub *amqp.Publishing,
) (*amqp.Delivery, error) {
err := p.ch.Publish(
getPublishExchange(ctx),
getPublishKey(ctx),
false, //mandatory
false, //immediate
*pub,
)
if err != nil {
return nil, err
}
autoAck := getConsumeAutoAck(ctx)

msg, err := p.ch.Consume(
p.q.Name,
"", //consumer
autoAck,
false, //exclusive
false, //noLocal
false, //noWait
getConsumeArgs(ctx),
)
if err != nil {
return nil, err
}

for {
select {
case d := <-msg:
if d.CorrelationId == pub.CorrelationId {
if !autoAck {
d.Ack(false) //multiple
}
return &d, nil
}

case <-ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case <-ctx.Done():
case <-ctx.Done():

return nil, ctx.Err()
}
}

}
Loading