From db97c9e5a5f819586f634edacbf196e764a85035 Mon Sep 17 00:00:00 2001 From: Anton Kostenko Date: Wed, 16 Nov 2016 21:08:05 +0300 Subject: [PATCH 1/5] Add support of TLS connections (amqps://) --- client.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/client.go b/client.go index 14f5477..d261611 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,7 @@ package cony import ( + "crypto/tls" "errors" "sync" "sync/atomic" @@ -36,6 +37,7 @@ type Client struct { bo Backoffer attempt int32 l sync.Mutex + tlsConf *tls.Config } // Declare used to declare queues/exchanges/bindings. @@ -118,7 +120,11 @@ func (c *Client) Loop() bool { atomic.AddInt32(&c.attempt, 1) } - conn, err = amqp.Dial(c.addr) + if c.tlsConf != nil { + conn, err = amqp.DialTLS(c.addr, c.tlsConf) + } else { + conn, err = amqp.Dial(c.addr) + } if c.reportErr(err) { return true @@ -265,3 +271,10 @@ func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt { c.blocking = blockingChan } } + +// ErrorsChan is a functional option, used to setup TLS configuration +func TLS(tls *tls.Config) ClientOpt { + return func(c *Client) { + c.tlsConf = tls + } +} From 2f246b60ea7dff69364e306baa90a783855e558a Mon Sep 17 00:00:00 2001 From: Anton Kostenko Date: Wed, 16 Nov 2016 21:09:53 +0300 Subject: [PATCH 2/5] Add support of TLS connections (amqps://) --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index d261611..df464c8 100644 --- a/client.go +++ b/client.go @@ -272,7 +272,7 @@ func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt { } } -// ErrorsChan is a functional option, used to setup TLS configuration +// TLS is a functional option, used to setup TLS configuration func TLS(tls *tls.Config) ClientOpt { return func(c *Client) { c.tlsConf = tls From 1c311a99f96921a590b8b72a15d9856b5876b4d2 Mon Sep 17 00:00:00 2001 From: Anton Kostenko Date: Fri, 18 Nov 2016 14:39:18 +0300 Subject: [PATCH 3/5] Use DialConfig instead of dialTLS --- client.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index df464c8..a256fbe 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,6 @@ package cony import ( - "crypto/tls" "errors" "sync" "sync/atomic" @@ -37,7 +36,7 @@ type Client struct { bo Backoffer attempt int32 l sync.Mutex - tlsConf *tls.Config + config amqp.Config } // Declare used to declare queues/exchanges/bindings. @@ -120,11 +119,12 @@ func (c *Client) Loop() bool { atomic.AddInt32(&c.attempt, 1) } - if c.tlsConf != nil { - conn, err = amqp.DialTLS(c.addr, c.tlsConf) - } else { - conn, err = amqp.Dial(c.addr) + if c.config == nil { + c.config = amqp.Config{ + Heartbeat: 10 * time.Second, + } } + conn, err = amqp.DialConfig(c.addr, c.config) if c.reportErr(err) { return true @@ -272,9 +272,9 @@ func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt { } } -// TLS is a functional option, used to setup TLS configuration -func TLS(tls *tls.Config) ClientOpt { +// Config is a functional option, used to setup extended amqp configuration +func Config(config amqp.Config) ClientOpt { return func(c *Client) { - c.tlsConf = tls + c.config = config } } From d557f877d26d4370849e4283e011799f5b4cf3d1 Mon Sep 17 00:00:00 2001 From: Anton Kostenko Date: Fri, 18 Nov 2016 14:42:17 +0300 Subject: [PATCH 4/5] Remove missed testing lines --- client.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/client.go b/client.go index a256fbe..5d00dc0 100644 --- a/client.go +++ b/client.go @@ -118,12 +118,7 @@ func (c *Client) Loop() bool { time.Sleep(c.bo.Backoff(int(c.attempt))) atomic.AddInt32(&c.attempt, 1) } - - if c.config == nil { - c.config = amqp.Config{ - Heartbeat: 10 * time.Second, - } - } + conn, err = amqp.DialConfig(c.addr, c.config) if c.reportErr(err) { From 31a74760bb3a99312c8f0e7b8c4ec01c36f43de4 Mon Sep 17 00:00:00 2001 From: Anton Kostenko Date: Fri, 18 Nov 2016 17:51:06 +0300 Subject: [PATCH 5/5] Set default heartbeat to 10s --- client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/client.go b/client.go index 5d00dc0..dfff7a8 100644 --- a/client.go +++ b/client.go @@ -118,7 +118,12 @@ func (c *Client) Loop() bool { time.Sleep(c.bo.Backoff(int(c.attempt))) atomic.AddInt32(&c.attempt, 1) } - + + // set default Heartbeat to 10 seconds like in original amqp.Dial + if c.config.Heartbeat == 0 { + c.config.Heartbeat = 10 * time.Second + } + conn, err = amqp.DialConfig(c.addr, c.config) if c.reportErr(err) {