diff --git a/Makefile b/Makefile index 9be6773..cebe26a 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,12 @@ -PACKAGES=$(shell go list ./...) - all: lint test -init: tools - GO111MODULE=on go mod vendor +fmt: + golangci-lint run ./... --fix -lint: init +lint: golangci-lint run ./... -test: init +test: go test -race -v ./... -tools: - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(GOPATH)/bin v1.17.0 - -fmt: tools - go fmt $(PACKAGES) - -.PHONY: help lint test fmt tools +.PHONY: fmt lint test diff --git a/backends/rabbitmq/rabbitmq.go b/backends/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..2f8c47e --- /dev/null +++ b/backends/rabbitmq/rabbitmq.go @@ -0,0 +1,274 @@ +package rabbitmq + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/zerofox-oss/go-msg" +) + +type Server struct { + Queue string + Concurrency int + + conn *amqp.Connection + inFlightQueue chan struct{} + + // context used to shutdown processing of in-flight messages + receiverCtx context.Context + receiverCancelFunc context.CancelFunc + + // context used to shutdown the server + serverCtx context.Context + serverCancelFunc context.CancelFunc +} + +func (s *Server) Serve(r msg.Receiver) error { + ch, err := s.conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + notifyConnClose := s.conn.NotifyClose(make(chan *amqp.Error, 1)) + notifyChanClose := ch.NotifyClose(make(chan *amqp.Error, 1)) + + // open queue channel + msgChan, err := ch.Consume(s.Queue, "", false, false, false, false, nil) + if err != nil { + return err + } + + for { + select { + case <-s.serverCtx.Done(): + ch.Close() + close(s.inFlightQueue) + + return msg.ErrServerClosed + + // exit if conn or channel are closed + case err, ok := <-notifyConnClose: + if ok { + return fmt.Errorf("amqp connection was closed: %s, exiting", err) + } + + case err, ok := <-notifyChanClose: + if ok { + return fmt.Errorf("amqp channel was closed: %s", err) + } + + case m := <-msgChan: + // apparently this doesn't block if there's no messages in the queue, it just returns an empty message + if m.Acknowledger == nil { + continue + } + + s.inFlightQueue <- struct{}{} + + go func(d amqp.Delivery) { + defer func() { + <-s.inFlightQueue + }() + + if err := s.process(d, r); err != nil { + log.Printf("failed to process: %s", err) + } + }(m) + } + } +} + +func (s *Server) process(d amqp.Delivery, r msg.Receiver) error { + message := msg.Message{ + Body: bytes.NewBuffer(d.Body), + } + + if err := r.Receive(s.receiverCtx, &message); err != nil { + if err := d.Nack(false, true); err != nil { + return err + } + + throttleErr, ok := err.(msg.ErrServerThrottled) + if ok { + time.Sleep(throttleErr.Duration) + return nil + } + + return fmt.Errorf("receiver error: %w", err) + } + + if err := d.Ack(false); err != nil { + return fmt.Errorf("could not ACK message: %w", err) + } + + return nil +} + +const shutdownPollInterval = 500 * time.Millisecond + +// Shutdown stops the receipt of new messages and waits for routines +// to complete or the passed in ctx to be canceled. msg.ErrServerClosed +// will be returned upon a clean shutdown. Otherwise, the passed ctx's +// Error will be returned. +func (s *Server) Shutdown(ctx context.Context) error { + if ctx == nil { + panic("context not set") + } + + s.serverCancelFunc() + + ticker := time.NewTicker(shutdownPollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + s.receiverCancelFunc() + + return ctx.Err() + case <-ticker.C: + if len(s.inFlightQueue) == 0 { + return msg.ErrServerClosed + } + } + } +} + +// Option is the signature that modifies a `Server` to set some configuration +type Option func(*Server) error + +func NewServer(conn *amqp.Connection, queue string, opts ...Option) (*Server, error) { + defaultConcurrency := 10 + + serverCtx, serverCancelFunc := context.WithCancel(context.Background()) + receiverCtx, receiverCancelFunc := context.WithCancel(context.Background()) + + srv := &Server{ + Queue: queue, + Concurrency: defaultConcurrency, + + conn: conn, + inFlightQueue: make(chan struct{}, defaultConcurrency), + + receiverCtx: receiverCtx, + receiverCancelFunc: receiverCancelFunc, + serverCtx: serverCtx, + serverCancelFunc: serverCancelFunc, + } + + for _, opt := range opts { + if err := opt(srv); err != nil { + return nil, err + } + } + + return srv, nil +} + +func WithConcurrency(c int) func(*Server) error { + return func(srv *Server) error { + srv.Concurrency = c + srv.inFlightQueue = make(chan struct{}, c) + + return nil + } +} + +// Topic publishes Messages to a RabbitMQ Exchange. +type Topic struct { + Exchange string + Conn *amqp.Connection +} + +// NewWriter returns a MessageWriter. +// The MessageWriter may be used to write messages to a RabbitMQ Exchange. +func (t *Topic) NewWriter(ctx context.Context) msg.MessageWriter { + return &MessageWriter{ + ctx: ctx, + attributes: make(map[string][]string), + buf: &bytes.Buffer{}, + + exchange: t.Exchange, + conn: t.Conn, + } +} + +// MessageWriter is used to publish a single Message to a RabbitMQ Exchange. +// Once all of the data has been written and closed, it may not be used again. +type MessageWriter struct { + msg.MessageWriter + + exchange string + conn *amqp.Connection + + ctx context.Context + attributes msg.Attributes + buf *bytes.Buffer // internal buffer + closed bool + mux sync.Mutex +} + +// Attributes returns the attributes of the MessageWriter. +func (w *MessageWriter) Attributes() *msg.Attributes { + return &w.attributes +} + +// Write writes bytes to an internal buffer. +func (w *MessageWriter) Write(p []byte) (int, error) { + w.mux.Lock() + defer w.mux.Unlock() + + if w.closed { + return 0, msg.ErrClosedMessageWriter + } + return w.buf.Write(p) +} + +// Close publishes a Message. +// If the MessageWriter is already closed it will return an error. +func (w *MessageWriter) Close() error { + w.mux.Lock() + defer w.mux.Unlock() + + if w.closed { + return msg.ErrClosedMessageWriter + } + w.closed = true + + if w.buf.Len() > 0 { + raw, err := json.Marshal(w.buf) + if err != nil { + return err + } + + ch, err := w.conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + return ch.PublishWithContext(w.ctx, w.exchange, "", false, false, amqp.Publishing{ + Headers: attrsToHeaders(w), + Body: raw, + }) + } + + return nil +} + +func attrsToHeaders(w msg.MessageWriter) map[string]interface{} { + headers := map[string]interface{}{} + + for k, v := range *w.Attributes() { + headers[k] = v + } + + return headers +} diff --git a/backends/rabbitmq/rabbitmq_test.go b/backends/rabbitmq/rabbitmq_test.go new file mode 100644 index 0000000..5a60de4 --- /dev/null +++ b/backends/rabbitmq/rabbitmq_test.go @@ -0,0 +1,195 @@ +package rabbitmq + +import ( + "context" + "encoding/json" + "math/rand" + "sync/atomic" + "testing" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/zerofox-oss/go-msg" +) + +const rabbitmqURL = "amqp://guest:guest@localhost:5672" + +func Test_RabbitMQ(t *testing.T) { + conn, err := amqp.Dial(rabbitmqURL) + if err != nil { + t.Fatalf("could not connect to rabbitmq: %s", err) + } + defer conn.Close() + + t.Log("opened connection") + + // creates a pub/sub exchange + conf := exchangeConf{ + Name: "my_new_exchange", + Type: amqp.ExchangeTopic, + Queues: []string{"queue1"}, + NumMessages: 1000000, + } + + cleanup := setupExchange(t, conn, conf) + defer cleanup() + + // setup consumers for each group + var count atomic.Uint32 + + go func() { + srvConn, err := amqp.Dial(rabbitmqURL) + if err != nil { + t.Errorf("could not connect to rabbitmq: %s", err) + } + defer srvConn.Close() + + srv1, err := NewServer(srvConn, conf.Queues[0]) + if err != nil { + t.Errorf("could not start server: %s", err) + } + + receiveFunc := msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { + t.Logf("Returning without error to ACK.") + count.Add(1) + + return nil + }) + + // sleep to allow the loop below to start + time.Sleep(1 * time.Second) + srv1.Serve(receiveFunc) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + if actual := count.Load(); actual != uint32(conf.NumMessages) { + t.Errorf("Expected %d messages to be processed, got %d", conf.NumMessages, actual) + } + + t.Fail() + return + default: + // inspect state of exchange + time.Sleep(1 * time.Second) + } + } +} + +type exchangeConf struct { + Name string + Type string + Queues []string + NumMessages int +} + +func setupExchange(t *testing.T, conn *amqp.Connection, conf exchangeConf) func() error { + ch, err := conn.Channel() + if err != nil { + t.Fatalf("could not open channel: %s", err) + } + defer ch.Close() + + if err := ch.ExchangeDeclare(conf.Name, conf.Type, true, false, false, false, nil); err != nil { + t.Errorf("could not declare exchange: %s", err) + } + + // setup queues + for _, name := range conf.Queues { + _, err := ch.QueueDeclare(name, true, false, false, false, nil) + if err != nil { + t.Fatalf("could not create queue: %s", err) + } + + if err := ch.QueueBind(name, "#", conf.Name, false, nil); err != nil { + t.Fatalf("could not bind queue: %s", err) + } + } + + // publish a bunch of messages to the exchange + topic := Topic{Conn: conn, Exchange: conf.Name} + batches := make(chan struct{}, 10) + batchSize := 1000 + totalWrites := 0 + + for i := 0; i < conf.NumMessages/batchSize; i++ { + if totalWrites%(batchSize*10) == 0 { + t.Logf("Wrote %d messages to exchange", totalWrites) + } + + batches <- struct{}{} + + go func() { + // generate random string once, because this isn't safe to generate in goroutines + var r *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano())) + + defer func() { + <-batches + }() + + for j := 0; j < batchSize; j++ { + // generate random string of 100KB + raw, err := json.Marshal(String(r, 100000)) + if err != nil { + t.Errorf("could not marshal message: %s", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + w := topic.NewWriter(ctx) + if _, err := w.Write(raw); err != nil { + t.Errorf("could not write message: %s", err) + } + + if err := w.Close(); err != nil { + t.Errorf("could not close message: %s", err) + } + } + + totalWrites += batchSize + }() + } + + t.Logf("Exchange=%s configured with queues=%d, messages=%d", conf.Name, len(conf.Queues), conf.NumMessages) + + return func() error { + ch, err := conn.Channel() + if err != nil { + t.Fatalf("could not open channel: %s", err) + } + defer ch.Close() + + for _, name := range conf.Queues { + if _, err := ch.QueueDelete(name, false, false, false); err != nil { + t.Errorf("could not delete queue %s: %s", name, err) + } + } + + if err := ch.ExchangeDelete(conf.Name, false, false); err != nil { + t.Errorf("could not delete exchange: %s", err) + } + + return err + } +} + +// for testing stream sizes +// https://www.calhoun.io/creating-random-strings-in-go/ +const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +func StringWithCharset(r *rand.Rand, length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[r.Intn(len(charset))] + } + return string(b) +} + +func String(r *rand.Rand, length int) string { + return StringWithCharset(r, length, charset) +} diff --git a/backends/redis/README.md b/backends/redis/README.md new file mode 100644 index 0000000..a89dd63 --- /dev/null +++ b/backends/redis/README.md @@ -0,0 +1,132 @@ +# Redis Stream Implemenation + +## Architecture Overview + +* A **Stream** is equivalent to an SNS topic. + +* **Consumer groups** are equivalent to an SNS topic subscription. + A **stream** may have multiple consumer groups (subscribers). + + > Note, when creating a consumer group, you can choose whether to forward all messages in the stream, or only new ones + > by specifying a message ID. "0" means send all, "$" means only send new. + +* **Consumers** are individual processes that can receive messages from a stream. + A service which scales horizontally will have multiple consumers reading from the same stream. + + > Note, consumers will use XREADGROUP to read messages from a stream. Those messages will be assigned to that consumer, + > so if the message is not ACKd, then it will be received again by the same consumer (no others will have access to it). + > This ensures that messages are not processed by more than one consumer. + > + > There is one slight caveat to this, which is that you can use XCLAIM/XAUTOCLAIM in order to re-assign any messages + > on the PEL (Pending Entries List) to another consumer. + +## Example: Multiple Consumers + +This example: + +* Creates a stream with multiple consumer groups (3) +* Creates 2 consumers for one of the groups (`group1`) +* Writes 2 messages to the stream +* Once the messages are processed, they will not be processed again (by the same or the other consumer). +* The other groups will still have 2 messages to be assigned (`group2`, `group3`). + +```bash +# start local redis-server for tests +$ redis-server -v +Redis server v=7.2.5 sha=00000000:0 malloc=libc bits=64 build=bd81cd1340e80580 +$ redis-server & + +# run test +$ +2024/06/24 14:03:05 === Test #1 - Multiple Consumers of a Stream +2024/06/24 14:03:05 Stream=stream1 has messages=2, consumer_groups=2 +2024/06/24 14:03:05 XInfoGroups: [{Name:group1 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 14:03:06 XInfoGroups: [{Name:group1 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 14:03:07 Returning without error to ACK. +2024/06/24 14:03:07 Returning without error to ACK. +2024/06/24 14:03:07 XInfoGroups: [{Name:group1 Consumers:1 Pending:2 LastDeliveredID:1719252185290-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 14:03:08 XInfoGroups: [{Name:group1 Consumers:1 Pending:0 LastDeliveredID:1719252185290-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 14:03:09 XInfoGroups: [{Name:group1 Consumers:1 Pending:0 LastDeliveredID:1719252185290-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 14:03:10 XInfoGroups: [{Name:group1 Consumers:1 Pending:0 LastDeliveredID:1719252185290-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 14:03:11 Processed all messages! +``` + +## Example: Reassign messages + +This example: + +* Creates a stream with multiple consumer groups (3) +* Creates 2 consumers for one of the groups (`group1`) +* Writes 2 messages to the stream +* Srv1 will fail to process the messages +* A few seconds later, Srv2 starts up. It will poll for a few seconds, then will claim the messages from srv1. +* Once the messages are processed, they will not be processed again (by the same or the other consumer). +* The other groups will still have 2 messages to be assigned (`group2`, `group3`). + +```bash +# start local redis-server for tests +$ redis-server -v +Redis server v=7.2.5 sha=00000000:0 malloc=libc bits=64 build=bd81cd1340e80580 +$ redis-server & + +# run test +$ cd backends/redis +$ go run . +2024/06/24 16:07:09 === Test #2 - Reassigning failed messages with XAUTOCLAIM +2024/06/24 16:07:09 XREADGROUP +2024/06/24 16:07:09 Stream=stream2 has messages=2, consumer_groups=2 +2024/06/24 16:07:09 Starting srv1... +2024/06/24 16:07:09 XREADGROUP +2024/06/24 16:07:09 XInfoGroups: [{Name:group1 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:09 XREADGROUP +2024/06/24 16:07:09 Simulating a message failure in server1, returning error to re-assign +2024/06/24 16:07:09 Simulating a message failure in server1, returning error to re-assign +2024/06/24 16:07:10 Starting srv2... +2024/06/24 16:07:10 XREADGROUP +2024/06/24 16:07:10 XInfoGroups: [{Name:group1 Consumers:1 Pending:2 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:10 XREADGROUP +2024/06/24 16:07:10 XREADGROUP +2024/06/24 16:07:11 XInfoGroups: [{Name:group1 Consumers:2 Pending:2 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:11 XREADGROUP +2024/06/24 16:07:11 XREADGROUP +2024/06/24 16:07:11 XREADGROUP +2024/06/24 16:07:12 XInfoGroups: [{Name:group1 Consumers:2 Pending:2 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:12 XREADGROUP +2024/06/24 16:07:12 Returning without error to ACK. +2024/06/24 16:07:12 Returning without error to ACK. +2024/06/24 16:07:12 XREADGROUP +2024/06/24 16:07:12 XREADGROUP +2024/06/24 16:07:13 XInfoGroups: [{Name:group1 Consumers:2 Pending:0 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:13 XREADGROUP +2024/06/24 16:07:13 XREADGROUP +2024/06/24 16:07:13 XREADGROUP +2024/06/24 16:07:14 XInfoGroups: [{Name:group1 Consumers:2 Pending:0 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:14 XREADGROUP +2024/06/24 16:07:14 XREADGROUP +2024/06/24 16:07:14 XREADGROUP +2024/06/24 16:07:15 XInfoGroups: [{Name:group1 Consumers:2 Pending:0 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:15 XREADGROUP +2024/06/24 16:07:15 XREADGROUP +2024/06/24 16:07:15 XREADGROUP +2024/06/24 16:07:16 XInfoGroups: [{Name:group1 Consumers:2 Pending:0 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:16 XREADGROUP +2024/06/24 16:07:16 XREADGROUP +2024/06/24 16:07:16 XREADGROUP +2024/06/24 16:07:17 XInfoGroups: [{Name:group1 Consumers:2 Pending:0 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:17 XREADGROUP +2024/06/24 16:07:17 XREADGROUP +2024/06/24 16:07:17 XREADGROUP +2024/06/24 16:07:18 XInfoGroups: [{Name:group1 Consumers:2 Pending:0 LastDeliveredID:1719259629201-0 EntriesRead:2 Lag:0} {Name:group2 Consumers:0 Pending:0 LastDeliveredID:0-0 EntriesRead:0 Lag:2}] +2024/06/24 16:07:18 XREADGROUP +2024/06/24 16:07:18 XREADGROUP +2024/06/24 16:07:18 XREADGROUP +2024/06/24 16:07:19 Processed all messages! +``` + +## TODO + +* Despite ACKing messages, they will not be removed from the stream. Which means Redis will grow unbounded. + To resolve, we could run XTRIM on a cron (Say 3d) which will remove any messages older than that period. + This is effectively creating a message retention period. + +* diff --git a/backends/redis/streams.go b/backends/redis/streams.go new file mode 100644 index 0000000..8387a81 --- /dev/null +++ b/backends/redis/streams.go @@ -0,0 +1,317 @@ +package redis + +import ( + "bytes" + "context" + "fmt" + "sync" + "time" + + "github.com/redis/go-redis/v9" + "github.com/zerofox-oss/go-msg" +) + +type Server struct { + Stream string + Group string + Consumer string // Consumer should be unique, so if we horizontally scale a component each alloc gets its own ID + Concurrency int + + conn *redis.Client + + inFlightQueue chan struct{} + + // context used to shutdown processing of in-flight messages + receiverCtx context.Context + receiverCancelFunc context.CancelFunc + + // context used to shutdown the server + serverCtx context.Context + serverCancelFunc context.CancelFunc +} + +func (s *Server) Serve(r msg.Receiver) error { + for { + select { + case <-s.serverCtx.Done(): + close(s.inFlightQueue) + + return msg.ErrServerClosed + default: + messages, err := s.xReadGroup() + if err != nil { + return err + } + + if len(messages) == 0 { + messages, err = s.xAutoclaim() + if err != nil { + return err + } + + // TODO - there isn't a concept of message retries w/ DLQ. A message could fail to be processed indefinitely. + // We could recreate this by using XCLAIM+XPENDING separately (the latter will return idle time + retries) + } + + // otherwise, we have messages to process + for _, m := range messages { + s.inFlightQueue <- struct{}{} + + go func(m redis.XMessage) { + defer func() { + <-s.inFlightQueue + }() + + s.process(m, r) + }(m) + } + } + } +} + +func (s *Server) xAutoclaim() ([]redis.XMessage, error) { + ctx, cancel := context.WithTimeout(s.receiverCtx, 1*time.Second) + defer cancel() + + messages, _, err := s.conn.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: s.Stream, + Group: s.Group, + Consumer: s.Consumer, + + // MinIdle == Message Visibility Timeout (SQS) + MinIdle: 3 * time.Second, + Start: "0", + Count: int64(10), + }).Result() + if err != nil { + if err != redis.Nil { + return nil, fmt.Errorf("XAUTOCLAIM failed with: %w", err) + } + return []redis.XMessage{}, nil + } + + return messages, nil +} + +func (s *Server) xReadGroup() ([]redis.XMessage, error) { + ctx, cancel := context.WithTimeout(s.receiverCtx, 100*time.Millisecond) + defer cancel() + + resp, err := s.conn.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: s.Group, + Consumer: s.Consumer, + Streams: []string{s.Stream, ">"}, + Count: int64(10), + + // This effectively limits the server to poll at most once a second. + // https://github.com/redis/go-redis/issues/1941 + // https://stackoverflow.com/questions/64801757/redis-streams-how-to-manage-perpetual-subscription-and-block-behaviour + Block: 1 * time.Second, + }).Result() + if err != nil { + if err != redis.Nil { + return nil, fmt.Errorf("failed to read messages with: %w", err) + } + + return []redis.XMessage{}, nil + } + + return resp[0].Messages, nil +} + +func (s *Server) process(m redis.XMessage, r msg.Receiver) error { + message, err := toMessage(m.Values) + if err != nil { + return fmt.Errorf("could not marshal redis.XMessage: %w", err) + } + + if err := r.Receive(s.receiverCtx, &message); err != nil { + throttleErr, ok := err.(msg.ErrServerThrottled) + if ok { + time.Sleep(throttleErr.Duration) + return nil + } + + return fmt.Errorf("receiver error: %w", err) + } + + // ACK the message to remove it + ctx, cancel := context.WithTimeout(s.receiverCtx, 2*time.Second) + defer cancel() + + resp := s.conn.XAck(ctx, s.Stream, s.Group, m.ID) + if err := resp.Err(); err != nil { + return fmt.Errorf("could not XACK message: %w", err) + } + + return nil +} + +const shutdownPollInterval = 500 * time.Millisecond + +// Shutdown stops the receipt of new messages and waits for routines +// to complete or the passed in ctx to be canceled. msg.ErrServerClosed +// will be returned upon a clean shutdown. Otherwise, the passed ctx's +// Error will be returned. +func (s *Server) Shutdown(ctx context.Context) error { + if ctx == nil { + panic("context not set") + } + + s.serverCancelFunc() + + ticker := time.NewTicker(shutdownPollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + s.receiverCancelFunc() + + return ctx.Err() + case <-ticker.C: + if len(s.inFlightQueue) == 0 { + return msg.ErrServerClosed + } + } + } +} + +// Option is the signature that modifies a `Server` to set some configuration +type Option func(*Server) error + +func NewServer(client *redis.Client, stream, group, consumer string, opts ...Option) (*Server, error) { + defaultConcurrency := 10 + + serverCtx, serverCancelFunc := context.WithCancel(context.Background()) + receiverCtx, receiverCancelFunc := context.WithCancel(context.Background()) + + srv := &Server{ + Stream: stream, + Group: group, + Consumer: consumer, + Concurrency: defaultConcurrency, + + conn: client, + inFlightQueue: make(chan struct{}, defaultConcurrency), + + receiverCtx: receiverCtx, + receiverCancelFunc: receiverCancelFunc, + serverCtx: serverCtx, + serverCancelFunc: serverCancelFunc, + } + + for _, opt := range opts { + if err := opt(srv); err != nil { + return nil, err + } + } + + return srv, nil +} + +func WithConcurrency(c int) func(*Server) error { + return func(srv *Server) error { + srv.Concurrency = c + srv.inFlightQueue = make(chan struct{}, c) + + return nil + } +} + +// Topic publishes Messages to a Redis Stream. +type Topic struct { + Stream string + Conn *redis.Client +} + +// NewWriter returns a MessageWriter. +// The MessageWriter may be used to write messages to a Reis Stream. +func (t *Topic) NewWriter(ctx context.Context) msg.MessageWriter { + return &MessageWriter{ + ctx: ctx, + attributes: make(map[string][]string), + buf: &bytes.Buffer{}, + + stream: t.Stream, + conn: t.Conn, + } +} + +// MessageWriter is used to publish a single Message to a Redis Stream. +// Once all of the data has been written and closed, it may not be used again. +type MessageWriter struct { + msg.MessageWriter + + stream string + conn *redis.Client + + ctx context.Context + attributes msg.Attributes + buf *bytes.Buffer // internal buffer + closed bool + mux sync.Mutex +} + +// Attributes returns the attributes of the MessageWriter. +func (w *MessageWriter) Attributes() *msg.Attributes { + return &w.attributes +} + +// Close publishes a Message. +// If the MessageWriter is already closed it will return an error. +func (w *MessageWriter) Close() error { + w.mux.Lock() + defer w.mux.Unlock() + + if w.closed { + return msg.ErrClosedMessageWriter + } + w.closed = true + + if w.buf.Len() > 0 { + message, err := toMap(msg.Message{ + Attributes: *w.Attributes(), + Body: w.buf, + }) + if err != nil { + return err + } + + res := w.conn.XAdd(w.ctx, &redis.XAddArgs{ + Stream: w.stream, + ID: "*", + Values: message, + }) + + return res.Err() + } + + return nil +} + +// Write writes bytes to an internal buffer. +func (w *MessageWriter) Write(p []byte) (int, error) { + w.mux.Lock() + defer w.mux.Unlock() + + if w.closed { + return 0, msg.ErrClosedMessageWriter + } + return w.buf.Write(p) +} + +func toMap(m msg.Message) (map[string]interface{}, error) { + return map[string]interface{}{ + "Attributes": nil, + "Body": fmt.Sprint(m.Body), + }, nil +} + +// converts map[string]interface{} to a Message +func toMessage(m map[string]interface{}) (msg.Message, error) { + return msg.Message{ + Attributes: nil, + Body: bytes.NewBufferString(fmt.Sprint(m["Body"])), + }, nil +} diff --git a/backends/redis/streams_test.go b/backends/redis/streams_test.go new file mode 100644 index 0000000..438fafe --- /dev/null +++ b/backends/redis/streams_test.go @@ -0,0 +1,251 @@ +package redis + +import ( + "context" + "encoding/json" + "errors" + "math/rand" + "sync/atomic" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/zerofox-oss/go-msg" +) + +var redisURL = "localhost:6379" + +func Test_Redis(t *testing.T) { + client := redis.NewClient(&redis.Options{Addr: redisURL}) + + stream := "stream1" + groups := []string{"group1", "group2"} + + // delete stream once test is complete + cleanup := setupStream(t, client, streamConf{ + Stream: stream, + Groups: groups, + NumMessages: 1000000, + }) + defer cleanup() + + // setup consumers for each group + var count atomic.Uint32 + + go func() { + srv1, err := NewServer(client, stream, "group1", "consumer1") + if err != nil { + t.Errorf("could not start server: %s", err) + } + + receiveFunc := msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { + t.Logf("Returning without error to ACK.") + count.Add(1) + + return nil + }) + + // sleep to allow the loop below to start + time.Sleep(1 * time.Second) + srv1.Serve(receiveFunc) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + if actual := count.Load(); actual != uint32(2) { + t.Errorf("Expected 2 messages to be processed, got %d", actual) + } + + return + default: + resp := client.XInfoGroups(ctx, stream) + if err := resp.Err(); err != nil { + t.Errorf("XInfoGroups failed with: %s", err) + } + + t.Logf("XInfoGroups: %+v", resp.Val()) + time.Sleep(1 * time.Second) + } + } +} + +func Test_ServerClaimPendingMessages(t *testing.T) { + client := redis.NewClient(&redis.Options{Addr: redisURL}) + + stream := "stream2" + groups := []string{"group1", "group2"} + + // delete stream once test is complete + cleanup := setupStream(t, client, streamConf{ + Stream: stream, + Groups: groups, + NumMessages: 100, + }) + defer cleanup() + + // setup consumers for each group + var count atomic.Uint32 + + // srv1 - fails to process messages + go func() { + srv1, err := NewServer(client, stream, "group1", "consumer1") + if err != nil { + t.Errorf("could not start server: %s", err) + return + } + + receiveFunc := msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { + t.Logf("Simulating a message failure in server1, returning error to re-assign") + + return errors.New("message timed out") + }) + + if err := srv1.Serve(receiveFunc); err != nil { + t.Logf("srv1 failed with: %s", err) + } + }() + + // srv2 - starts a few seconds after srv1 to let srv1 claim both messages. + go func() { + srv2, err := NewServer(client, stream, "group1", "consumer2") + if err != nil { + t.Errorf("could not start server: %s", err) + return + } + + time.Sleep(1 * time.Second) + + receiveFunc := msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { + t.Logf("Returning without error to ACK.") + + count.Add(1) + + return nil + }) + + if err := srv2.Serve(receiveFunc); err != nil { + t.Logf("srv2 failed with: %s", err) + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + if actual := count.Load(); actual != uint32(2) { + t.Errorf("Expected 2 messages to be processed, got %d", actual) + } + + return + default: + resp, err := client.XInfoGroups(ctx, stream).Result() + if err != nil { + t.Errorf("XInfoGroups failed with: %s", err) + } + + t.Logf("XInfoGroups: %+v", resp) + time.Sleep(1 * time.Second) + } + } +} + +type streamConf struct { + Stream string + Groups []string + NumMessages int +} + +func setupStream(t *testing.T, client *redis.Client, conf streamConf) func() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Consumer groups should only receive new messages - therefore specify $ as the ID + // See https://redis.io/docs/latest/commands/xgroup-create/ + for _, group := range conf.Groups { + resp := client.XGroupCreateMkStream(ctx, conf.Stream, group, "$") + if err := resp.Err(); err != nil { + t.Fatalf("could not create consumer group (%s): %s", group, err) + } + } + + // publish a bunch of messages to the stream + topic := Topic{Stream: conf.Stream, Conn: client} + batches := make(chan struct{}, 10) + batchSize := 1000 + totalWrites := 0 + + for i := 0; i < conf.NumMessages/batchSize; i++ { + if totalWrites%(batchSize*10) == 0 { + t.Logf("Wrote %d messages to stream", totalWrites) + } + + batches <- struct{}{} + + go func() { + // generate random string once, because this isn't safe to generate in goroutines + var r *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano())) + + defer func() { + <-batches + }() + + for j := 0; j < batchSize; j++ { + // generate random string of 100KB + raw, err := json.Marshal(String(r, 100000)) + if err != nil { + t.Errorf("could not marshal message: %s", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + w := topic.NewWriter(ctx) + if _, err := w.Write(raw); err != nil { + t.Errorf("could not write message: %s", err) + } + + if err := w.Close(); err != nil { + t.Errorf("could not close message: %s", err) + } + } + + totalWrites += batchSize + }() + } + + t.Logf("Stream=%s configured with consumer_groups=%d, messages=%d", conf.Stream, len(conf.Groups), conf.NumMessages) + + return func() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := client.Del(ctx, conf.Stream).Err(); err != nil { + t.Errorf("DEL %s failed with: %s", conf.Stream, err) + return err + } + + return nil + } +} + +// for testing stream sizes +// https://www.calhoun.io/creating-random-strings-in-go/ +const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +func StringWithCharset(r *rand.Rand, length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[r.Intn(len(charset))] + } + return string(b) +} + +func String(r *rand.Rand, length int) string { + return StringWithCharset(r, length, charset) +} diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000..2f55bf3 --- /dev/null +++ b/docker/README.md @@ -0,0 +1,75 @@ +# A comparison of Message Brokers + +## Setup + +### RabbitMQ/Prometheus/Grafana Stack + +Setup: +```bash +docker-compose -f docker/docker-compose.rabbitmq.yml up -d --remove-orphans +``` + +Teardown: + +```bash +docker-compose -f docker/docker-compose.rabbitmq.yml down -v +``` + +> This will tear down attached volumes to reset Prometheus/Grafana metrics. + +While docker is running, you can run tests: + +```bash +go test -v --run "Test_RabbitMQ" ./backends/rabbitmq --timeout 20m +``` + +Go to http://localhost:3000/d/5qyVNpQIk/memory-disk-usage?orgId=1&from=now-15m&to=now&refresh=10s +to monitor RabbitMQ while the tests are running. + +### Redis/Prometheus/Grafana Stack + +Setup: + +```bash +docker-compose -f docker/docker-compose.redis.yml up -d --remove-orphans +``` + +Teardown: + +```bash +docker-compose -f docker/docker-compose.redis.yml down -v +``` + +> This will tear down attached volumes to reset Prometheus/Grafana metrics. + +While docker is running, you can run tests: + +```bash +go test -v --run "Test_Redis" ./backends/redis --timeout 20m +``` + + +## Results + +The test is as follows: + +* Write 1M (million) messages to the broker. +* Each message is a random alphanumeric string of size 100KB. +* Process them using a go-msg server (no errors). + +Each test runs a broker (Rabbitmq or Redis), Prometheus, Grafana, and optional exporter (to scrape metrics for Prometheus). +Each broker is configured for the default Docker resource allocations: + +![](./results/docker-resource-allocation.png) + +We're going to compare resource usage between the message brokers, to see if one is more viable. + +**Redis** wrote ~100K messages before it got an OOM error (at 8GB memory), crashed, and then the tests failed. + + +![](./results/redis-test-results.png) + +**RabbitMQ** wrote all 1M messages while only using 500MB memory and ~200MB disk. +All messages were processed. + +![](./results/rabbitmq-test-results.png) diff --git a/docker/docker-compose.rabbitmq.yml b/docker/docker-compose.rabbitmq.yml new file mode 100644 index 0000000..eb6efa8 --- /dev/null +++ b/docker/docker-compose.rabbitmq.yml @@ -0,0 +1,69 @@ +# https://docs.docker.com/compose/compose-file/#networks +networks: + rabbitmq_prometheus: + +# https://docs.docker.com/compose/compose-file/#volumes +volumes: + prometheus_prometheus: + prometheus_grafana: + +services: + rabbitmq: + image: rabbitmq:3.13.3 + networks: + - "rabbitmq_prometheus" + ports: + - "5672:5672" + # https://unix.stackexchange.com/questions/71940/killing-tcp-connection-in-linux + # https://en.wikipedia.org/wiki/Tcpkill + # https://www.digitalocean.com/community/tutorials/iptables-essentials-common-firewall-rules-and-commands#block-an-ip-address + cap_add: + - ALL + environment: + RABBITMQ_ERLANG_COOKIE: prometheus + # Uncomment the following line if you want debug logs & colour + # RABBITMQ_LOG: debug,+color + volumes: + - ./rabbitmq/overview.conf:/etc/rabbitmq/rabbitmq.conf:ro + - ./rabbitmq/overview-definitions.json:/etc/rabbitmq/rabbitmq-definitions.json:ro + + grafana: + # https://hub.docker.com/r/grafana/grafana/tags + image: grafana/grafana:8.3.4 + ports: + - "3000:3000" + networks: + - "rabbitmq_prometheus" + volumes: + - prometheus_grafana:/var/lib/grafana + - ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yaml + - ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yaml + - ./grafana/dashboards:/dashboards + environment: + GF_INSTALL_PLUGINS: "flant-statusmap-panel,grafana-piechart-panel" + + prometheus: + # https://hub.docker.com/r/prom/prometheus/tags + image: prom/prometheus:v2.28.1 + networks: + - "rabbitmq_prometheus" + ports: + - "9090:9090" + volumes: + - prometheus_prometheus:/prometheus + - ./prometheus.yml:/etc/prometheus/prometheus.yml + + cadvisor: + image: gcr.io/cadvisor/cadvisor:v0.49.1 + networks: + - "rabbitmq_prometheus" + ports: + - "8080:8080" + volumes: + - /:/rootfs:ro + - /sys:/sys:ro + - /var/lib/docker/:/var/lib/docker:ro + - /var/run:/var/run:ro + - /var/run/docker.sock:/var/run/docker.sock:ro + command: + - '--store_container_labels=true' \ No newline at end of file diff --git a/docker/docker-compose.redis.yml b/docker/docker-compose.redis.yml new file mode 100644 index 0000000..07643a6 --- /dev/null +++ b/docker/docker-compose.redis.yml @@ -0,0 +1,73 @@ +# https://docs.docker.com/compose/compose-file/#networks +networks: + redis_prometheus: + +# https://docs.docker.com/compose/compose-file/#volumes +volumes: + prometheus_prometheus: + prometheus_grafana: + +services: + redis: + image: redis:7.2.5 + container_name: redis + networks: + - "redis_prometheus" + ports: + - "6379:6379" + + grafana: + # https://hub.docker.com/r/grafana/grafana/tags + image: grafana/grafana:8.3.4 + container_name: grafana + ports: + - "3000:3000" + networks: + - "redis_prometheus" + volumes: + - prometheus_grafana:/var/lib/grafana + - ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yaml + - ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yaml + - ./grafana/dashboards:/dashboards + environment: + GF_INSTALL_PLUGINS: "flant-statusmap-panel,grafana-piechart-panel" + + prometheus: + # https://hub.docker.com/r/prom/prometheus/tags + image: prom/prometheus:v2.28.1 + container_name: prometheus + networks: + - "redis_prometheus" + ports: + - "9090:9090" + volumes: + - prometheus_prometheus:/prometheus + - ./prometheus.yml:/etc/prometheus/prometheus.yml + + cadvisor: + image: gcr.io/cadvisor/cadvisor:v0.49.1 + networks: + - "rabbitmq_prometheus" + ports: + - "8080:8080" + volumes: + - /:/rootfs:ro + - /sys:/sys:ro + - /var/lib/docker/:/var/lib/docker:ro + - /var/run:/var/run:ro + - /var/run/docker.sock:/var/run/docker.sock:ro + command: + - '--store_container_labels=true' + + # https://github.com/oliver006/redis_exporter + redis-exporter: + image: oliver006/redis_exporter:v1.61.0 + container_name: redis-exporter + networks: + - redis_prometheus + ports: + - "9121:9121" + environment: + REDIS_ADDR: redis://redis:6379 + REDIS_EXPORTER_CHECK_SINGLE_STREAMS: stream1 + REDIS_EXPORTER_INCL_SYSTEM_METRICS: true \ No newline at end of file diff --git a/docker/grafana/README.md b/docker/grafana/README.md new file mode 100644 index 0000000..c0270fe --- /dev/null +++ b/docker/grafana/README.md @@ -0,0 +1,39 @@ +# RabbitMQ Grafana Dashboards + +## Creating or updating dashboards + +### Making changes + +First, from the `rabbitmq_prometheus` directory, run `make overview metrics` to spin up a local environment using +`docker-compose`. This will provide you with a full Prometheus + Grafana stack, and sample workloads to make the +dashboards 'come alive' with non-zero metrics. To tear down this infrastructure, run `make down`. + +Login to `localhost:3000` with the credentials `admin`:`admin` and begin creating / updating your dashboard in the +Grafana UI. + +Once you have finished editing your dashboard, navigate to the 'Share' menu next to the name of the dashboard, go to +'Export', and be sure to tick 'Export for sharing externally'. Either save the dashboard JSON to a file or click 'View +JSON' and copy-paste over into this repo. + +At this point, you can test your changes work on different versions of Grafana, by editing `services.grafana.image` in +[docker-compose-metrics.yml](../docker-compose-metrics.yml) and running `make metrics` to rebuild the Grafana server. +Be sure to test on the latest version publicly available. + +### If creating a new dashboard + +You will need some additional files for a new dashboard: + +- A description of the dashboard; see [the Erlang-Distribution one](publish/erlang-distribution-11352.md) for example +- Screenshots of the dashboard in action, saved in `./publish/` + +### Create a PR + +Create a pull request in `rabbitmq-server` with the above changes. + +## Making a dashboard available on grafana.com + +If you are on the RabbitMQ team, navigate to our GrafanaLabs dashboard at https://grafana.com/orgs/rabbitmq/dashboards, +and log in with the team credentials. + +Once a PR is merged, either create a new revision of a dashboard from the JSON checked in under `./dashboards/`, or +create a new dashboard from the JSON. \ No newline at end of file diff --git a/docker/grafana/dashboards.yml b/docker/grafana/dashboards.yml new file mode 100644 index 0000000..4b99d54 --- /dev/null +++ b/docker/grafana/dashboards.yml @@ -0,0 +1,10 @@ +apiVersion: 1 + +providers: + - name: 'rabbitmq' + orgId: 1 + folder: '' + type: file + disableDeletion: true + options: + path: /dashboards diff --git a/docker/grafana/dashboards/RabbitMQ.json b/docker/grafana/dashboards/RabbitMQ.json new file mode 100644 index 0000000..f960b37 --- /dev/null +++ b/docker/grafana/dashboards/RabbitMQ.json @@ -0,0 +1,929 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "description": "", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 7, + "title": "Row title", + "type": "row" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 0, + "y": 1 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_connections", + "interval": "", + "legendFormat": "connections", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_channels", + "hide": false, + "interval": "", + "legendFormat": "channels", + "refId": "B" + } + ], + "title": "Connections + Channels", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 1 + }, + "id": 14, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_queue_messages_ready", + "interval": "", + "legendFormat": "ready", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_queue_messages_persistent", + "hide": false, + "interval": "", + "legendFormat": "persistent", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_queue_messages_unacked", + "hide": false, + "interval": "", + "legendFormat": "unacked", + "refId": "C" + } + ], + "title": "Queue Size", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 16, + "y": 1 + }, + "id": 13, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "sum(rate(rabbitmq_queue_disk_reads_total[$__rate_interval])) ", + "interval": "", + "legendFormat": "reads", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "sum(rate(rabbitmq_queue_disk_writes_total[$__rate_interval])) ", + "hide": false, + "interval": "", + "legendFormat": "writes", + "refId": "B" + } + ], + "title": "Disk Reds + Writes (per/sec)", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 7 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_process_resident_memory_bytes", + "interval": "", + "legendFormat": "{{ instance }}", + "refId": "A" + } + ], + "title": "Memory Usage (bytes)", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 7 + }, + "id": 4, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_disk_space_available_bytes", + "hide": false, + "interval": "", + "legendFormat": "{{ instance }}", + "refId": "A" + } + ], + "title": "Disk Usage (bytes)", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_queue_messages_ram_bytes", + "interval": "", + "legendFormat": "messages_ram", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_queue_messages_paged_out_bytes", + "hide": false, + "interval": "", + "legendFormat": "messages_paged_out", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_queue_process_memory_bytes", + "hide": false, + "interval": "", + "legendFormat": "process_memory", + "refId": "C" + } + ], + "title": "Queue Size (bytes)", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "sum(rate(rabbitmq_connection_incoming_bytes_total[$__rate_interval])) ", + "interval": "", + "legendFormat": "incoming", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "sum(rate(rabbitmq_connection_outgoing_bytes_total[$__rate_interval])) ", + "hide": false, + "interval": "", + "legendFormat": "outgoing", + "refId": "B" + } + ], + "title": "Incoming + Outgoing (bytes/sec)", + "type": "timeseries" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 25 + }, + "id": 9, + "panels": [ + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 11 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes", + "interval": "", + "legendFormat": "{{ instance }}", + "refId": "A" + } + ], + "title": "Memory Usage %", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 11 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "1 - (\n rabbitmq_disk_space_available_bytes / (rabbitmq_disk_space_available_limit_bytes + rabbitmq_disk_space_available_bytes)\n)", + "hide": false, + "interval": "", + "legendFormat": "{{ instance }}", + "refId": "A" + } + ], + "title": "Disk Usage %", + "type": "timeseries" + } + ], + "title": "Row title", + "type": "row" + } + ], + "refresh": "10s", + "schemaVersion": 34, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "RabbitMQ", + "uid": "5qyVNpQIk", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/docker/grafana/dashboards/Redis.json b/docker/grafana/dashboards/Redis.json new file mode 100644 index 0000000..122d2ae --- /dev/null +++ b/docker/grafana/dashboards/Redis.json @@ -0,0 +1,316 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 2, + "links": [], + "liveNow": false, + "panels": [ + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 0, + "y": 0 + }, + "id": 4, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "redis_connected_clients", + "interval": "", + "legendFormat": "connections", + "refId": "A" + } + ], + "title": "Connections", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 8, + "y": 0 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "redis_stream_length", + "interval": "", + "legendFormat": "size", + "refId": "A" + } + ], + "title": "Stream Size", + "type": "timeseries" + }, + { + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 0 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "multi" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "redis_memory_used_bytes", + "interval": "", + "legendFormat": "used", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": true, + "expr": "redis_total_system_memory_bytes", + "hide": false, + "interval": "", + "legendFormat": "total", + "refId": "B" + } + ], + "title": "Memory Usage (bytes)", + "type": "timeseries" + } + ], + "refresh": false, + "schemaVersion": 34, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Redis", + "uid": "NyBFUoQIz", + "version": 2, + "weekStart": "" +} \ No newline at end of file diff --git a/docker/grafana/datasources.yml b/docker/grafana/datasources.yml new file mode 100644 index 0000000..916e507 --- /dev/null +++ b/docker/grafana/datasources.yml @@ -0,0 +1,44 @@ +apiVersion: 1 + +datasources: + # name of the datasource. Required + - name: prometheus + # datasource type. Required + type: prometheus + # access mode. direct or proxy. Required + access: proxy + # org id. will default to orgId 1 if not specified + orgId: 1 + # url + url: http://prometheus:9090 + # database password, if used + # password: + # database user, if used + # user: + # database name, if used + # database: + # enable/disable basic auth + # basicAuth: + # basic auth username + # basicAuthUser: + # basic auth password + # basicAuthPassword: + # enable/disable with credentials headers + # withCredentials: + # mark as default datasource. Max one per org + isDefault: true + # fields that will be converted to json and stored in json_data + # jsonData: + # graphiteVersion: "1.1" + # tlsAuth: true + # tlsAuthWithCACert: true + # httpHeaderName1: "Authorization" + # json object of data that will be encrypted. + # secureJsonData: + # tlsCACert: "..." + # tlsClientCert: "..." + # tlsClientKey: "..." + # httpHeaderValue1: "Bearer xf5yhfkpsnmgo" + version: 1 + # allow users to edit datasources from the UI. + editable: false diff --git a/docker/prometheus.yml b/docker/prometheus.yml new file mode 100644 index 0000000..d4c3fa7 --- /dev/null +++ b/docker/prometheus.yml @@ -0,0 +1,52 @@ +# https://prometheus.io/docs/prometheus/latest/configuration/configuration/ +global: + # This is higher than RabbitMQ's collect_statistics_interval, + # but still close enough to capture metrics that were refreshed within this interval + # This value determines the range that we use with rate(): + # https://www.robustperception.io/what-range-should-i-use-with-rate + scrape_interval: 2s # Default is every 1 minute. + # scrape_timeout: 10s # Default is 10 seconds. + # evaluation_interval: 60s # Default is every 1 minute. + +# Alertmanager configuration +alerting: + alertmanagers: + - static_configs: + - targets: + # - 'alertmanager:9093' + +# Load rules once and periodically evaluate them according to the global 'evaluation_interval'. +rule_files: + # - "first_rules.yml" + # - "second_rules.yml" + +scrape_configs: + # The job name is added as a label `job=` to any timeseries scraped from this config. + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'docker' + static_configs: + - targets: ['docker.for.mac.localhost:9323'] + + - job_name: 'rabbitmq-server' + static_configs: + - targets: + - 'rabbitmq:15692' + + - job_name: 'rabbitmq-server-detailed' + metrics_path: "/metrics/detailed" + params: + family: ["queue_coarse_metrics"] + static_configs: + - targets: + - 'rabbitmq:15692' + + - job_name: redis_exporter + static_configs: + - targets: ['redis-exporter:9121'] + + - job_name: 'cadvisor' + static_configs: + - targets: ['cadvisor:8080'] \ No newline at end of file diff --git a/docker/rabbitmq/overview-definitions.json b/docker/rabbitmq/overview-definitions.json new file mode 100644 index 0000000..57267e1 --- /dev/null +++ b/docker/rabbitmq/overview-definitions.json @@ -0,0 +1,49 @@ +{ + "global_parameters": [ + {"name": "cluster_name", "value": "rabbitmq-overview"} + ], + "permissions": [ + { + "configure": ".*", + "read": ".*", + "user": "guest", + "vhost": "/", + "write": ".*" + } + ], + "policies": [ + { + "apply-to": "queues", + "definition": {"ha-mode": "exactly", "ha-params": 1}, + "name": "ha1", + "pattern": "ha1.*", + "priority": 0, + "vhost": "/" + }, + { + "apply-to": "queues", + "definition": {"ha-mode": "exactly", "ha-params": 2}, + "name": "ha2", + "pattern": "ha2.*", + "priority": 0, + "vhost": "/" + }, + { + "apply-to": "queues", + "definition": {"ha-mode": "exactly", "ha-params": 3}, + "name": "ha3", + "pattern": "ha3.*", + "priority": 0, + "vhost": "/" + } + ], + "users": [ + { + "hashing_algorithm": "rabbit_password_hashing_sha256", + "name": "guest", + "password_hash": "hENva+fxJ7gnmaBK/WhwNHOYbvB53/QjNcqhtF4KqF7p21+x", + "tags": "administrator" + } + ], + "vhosts": [{"name": "/"}] + } \ No newline at end of file diff --git a/docker/rabbitmq/overview.conf b/docker/rabbitmq/overview.conf new file mode 100644 index 0000000..8bc0af4 --- /dev/null +++ b/docker/rabbitmq/overview.conf @@ -0,0 +1,32 @@ +# https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example +loopback_users.guest = false +listeners.tcp.default = 5672 +management.listener.port = 15672 +management.listener.ssl = false + +vm_memory_high_watermark.absolute = 768MiB +vm_memory_high_watermark_paging_ratio = 0.2 + +# cluster_name = rabbitmq-overview + +# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +# cluster_formation.classic_config.nodes.1 = rabbit@rmq0 +# cluster_formation.classic_config.nodes.2 = rabbit@rmq1 +# cluster_formation.classic_config.nodes.3 = rabbit@rmq2 + +load_definitions = /etc/rabbitmq/rabbitmq-definitions.json + +# background_gc_enabled = true + +# Increase the 5s default so that we are below Prometheus' scrape interval, +# but still refresh in time for Prometheus scrape +# This is linked to Prometheus scrape interval & range used with rate() +collect_statistics_interval = 10000 + +# Run RabbitMQ Management in Management-only mode, no stats +# https://github.com/rabbitmq/rabbitmq-management/pull/707 +# management.disable_stats = true + +# Return per-object metrics (unaggregated) +# https://github.com/rabbitmq/rabbitmq-prometheus/pull/28 +# prometheus.return_per_object_metrics = true \ No newline at end of file diff --git a/docker/results/docker-resource-allocation.png b/docker/results/docker-resource-allocation.png new file mode 100644 index 0000000..e47a0f0 Binary files /dev/null and b/docker/results/docker-resource-allocation.png differ diff --git a/docker/results/rabbitmq-test-results.png b/docker/results/rabbitmq-test-results.png new file mode 100644 index 0000000..dd7d0ee Binary files /dev/null and b/docker/results/rabbitmq-test-results.png differ diff --git a/docker/results/redis-test-results.png b/docker/results/redis-test-results.png new file mode 100644 index 0000000..e6e8a31 Binary files /dev/null and b/docker/results/redis-test-results.png differ diff --git a/go.mod b/go.mod index 8cde497..a23ab7f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/google/go-cmp v0.6.0 github.com/pierrec/lz4/v4 v4.1.8 + github.com/rabbitmq/amqp091-go v1.10.0 + github.com/redis/go-redis/v9 v9.5.3 go.opencensus.io v0.24.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/bridge/opencensus v1.24.0 @@ -13,6 +15,8 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/go.sum b/go.sum index 35764e3..e6a746a 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,24 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -45,6 +51,10 @@ github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuR github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -52,46 +62,23 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= -go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= -go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= -go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= go.opentelemetry.io/otel/bridge/opencensus v1.24.0 h1:Vlhy5ee5k5R0zASpH+9AgHiJH7xnKACI3XopO1tUZfY= go.opentelemetry.io/otel/bridge/opencensus v1.24.0/go.mod h1:jRjVXV/X38jyrnHtvMGN8+9cejZB21JvXAAvooF2s+Q= -go.opentelemetry.io/otel/bridge/opencensus v1.25.0 h1:0o/9KwAgxjK+3pMV0pwIF5toYHqDsPmQhfrBvKaG6mU= -go.opentelemetry.io/otel/bridge/opencensus v1.25.0/go.mod h1:rZyTdpmRqoV+PpUn6QlruxJp/kE4765rPy0pP6mRDk8= -go.opentelemetry.io/otel/bridge/opencensus v1.26.0 h1:DZzxj9QjznMVoehskOJnFP2gsTCWtDTFBDvFhPAY7nc= -go.opentelemetry.io/otel/bridge/opencensus v1.26.0/go.mod h1:rJiX0KrF5m8Tm1XE8jLczpAv5zUaDcvhKecFG0ZoFG4= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= -go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= -go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= -go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= -go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= -go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8= -go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs= go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= -go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= -go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= -go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y= -go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= -go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= -go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= -go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/msg.go b/msg.go index daa5ee7..6349032 100644 --- a/msg.go +++ b/msg.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net/textproto" + "time" ) // Attributes represent the key-value metadata for a Message. @@ -111,9 +113,19 @@ func (f ReceiverFunc) Receive(ctx context.Context, m *Message) error { return f(ctx, m) } -// ErrServerClosed represents a completed Shutdown +// ErrServerClosed represents a completed shutdown var ErrServerClosed = errors.New("msg: server closed") +// ErrServerThrottled signals that the server should sleep before resuming work. +type ErrServerThrottled struct { + Message string + Duration time.Duration +} + +func (e ErrServerThrottled) Error() string { + return fmt.Sprintf("error: %s - server throttled for %s", e.Message, e.Duration) +} + // A Server serves messages to a receiver. type Server interface { // Serve is a blocking function that gets data from an input stream,