diff --git a/log/concurrency_test.go b/log/concurrency_test.go index e68d16a3e..95a749e77 100644 --- a/log/concurrency_test.go +++ b/log/concurrency_test.go @@ -1,8 +1,7 @@ package log_test import ( - "strconv" - "sync" + "math" "testing" "github.com/go-kit/kit/log" @@ -10,19 +9,32 @@ import ( // These test are designed to be run with the race detector. -func testConcurrency(t *testing.T, logger log.Logger) { - for _, n := range []int{10, 100, 500} { - wg := sync.WaitGroup{} - wg.Add(n) - for i := 0; i < n; i++ { - go func() { spam(logger); wg.Done() }() +func testConcurrency(t *testing.T, logger log.Logger, total int) { + n := int(math.Sqrt(float64(total))) + share := total / n + + errC := make(chan error, n) + + for i := 0; i < n; i++ { + go func() { + errC <- spam(logger, share) + }() + } + + for i := 0; i < n; i++ { + err := <-errC + if err != nil { + t.Fatalf("concurrent logging error: %v", err) } - wg.Wait() } } -func spam(logger log.Logger) { - for i := 0; i < 100; i++ { - logger.Log("key", strconv.FormatInt(int64(i), 10)) +func spam(logger log.Logger, count int) error { + for i := 0; i < count; i++ { + err := logger.Log("key", i) + if err != nil { + return err + } } + return nil } diff --git a/log/example_test.go b/log/example_test.go index 174161548..9ecf2769a 100644 --- a/log/example_test.go +++ b/log/example_test.go @@ -1,13 +1,31 @@ package log_test import ( + "net/url" "os" "github.com/go-kit/kit/log" ) +func Example_stdout() { + w := log.NewSyncWriter(os.Stdout) + logger := log.NewLogfmtLogger(w) + + reqUrl := &url.URL{ + Scheme: "https", + Host: "github.com", + Path: "/go-kit/kit", + } + + logger.Log("method", "GET", "url", reqUrl) + + // Output: + // method=GET url=https://github.com/go-kit/kit +} + func ExampleContext() { - logger := log.NewLogfmtLogger(os.Stdout) + w := log.NewSyncWriter(os.Stdout) + logger := log.NewLogfmtLogger(w) logger.Log("foo", 123) ctx := log.NewContext(logger).With("level", "info") ctx.Log() diff --git a/log/json_logger_test.go b/log/json_logger_test.go index 291157721..42df70c1c 100644 --- a/log/json_logger_test.go +++ b/log/json_logger_test.go @@ -100,6 +100,7 @@ func (textstringer) String() string { } func TestJSONLoggerStringValue(t *testing.T) { + t.Parallel() tests := []struct { v interface{} expected string @@ -152,5 +153,6 @@ func BenchmarkJSONLoggerContextual(b *testing.B) { } func TestJSONLoggerConcurrency(t *testing.T) { - testConcurrency(t, log.NewJSONLogger(ioutil.Discard)) + t.Parallel() + testConcurrency(t, log.NewJSONLogger(ioutil.Discard), 10000) } diff --git a/log/log.go b/log/log.go index 25e76cb74..fed019407 100644 --- a/log/log.go +++ b/log/log.go @@ -2,12 +2,15 @@ // // The fundamental interface is Logger. Loggers create log events from // key/value data. +// +// Concurrent Safety +// +// Applications with multiple goroutines want each log event written to the +// same logger to remain separate from other log events. Package log provides +// multiple solutions for concurrent safe logging. package log -import ( - "errors" - "sync/atomic" -) +import "errors" // Logger is the fundamental interface for all log operations. Log creates a // log event from keyvals, a variadic sequence of alternating keys and values. @@ -149,33 +152,3 @@ type LoggerFunc func(...interface{}) error func (f LoggerFunc) Log(keyvals ...interface{}) error { return f(keyvals...) } - -// SwapLogger wraps another logger that may be safely replaced while other -// goroutines use the SwapLogger concurrently. The zero value for a SwapLogger -// will discard all log events without error. -// -// SwapLogger serves well as a package global logger that can be changed by -// importers. -type SwapLogger struct { - logger atomic.Value -} - -type loggerStruct struct { - Logger -} - -// Log implements the Logger interface by forwarding keyvals to the currently -// wrapped logger. It does not log anything if the wrapped logger is nil. -func (l *SwapLogger) Log(keyvals ...interface{}) error { - s, ok := l.logger.Load().(loggerStruct) - if !ok || s.Logger == nil { - return nil - } - return s.Log(keyvals...) -} - -// Swap replaces the currently wrapped logger with logger. Swap may be called -// concurrently with calls to Log from other goroutines. -func (l *SwapLogger) Swap(logger Logger) { - l.logger.Store(loggerStruct{logger}) -} diff --git a/log/log_test.go b/log/log_test.go index 7cd084411..7c44095b3 100644 --- a/log/log_test.go +++ b/log/log_test.go @@ -71,6 +71,7 @@ func TestContextMissingValue(t *testing.T) { // whether Context.Log is called via an interface typed variable or a concrete // typed variable. func TestContextStackDepth(t *testing.T) { + t.Parallel() fn := fmt.Sprintf("%n", stack.Caller(0)) var output []interface{} @@ -207,49 +208,3 @@ func BenchmarkTenWith(b *testing.B) { lc.Log("k", "v") } } - -func TestSwapLogger(t *testing.T) { - var logger log.SwapLogger - - // Zero value does not panic or error. - err := logger.Log("k", "v") - if got, want := err, error(nil); got != want { - t.Errorf("got %v, want %v", got, want) - } - - buf := &bytes.Buffer{} - json := log.NewJSONLogger(buf) - logger.Swap(json) - - if err := logger.Log("k", "v"); err != nil { - t.Error(err) - } - if got, want := buf.String(), `{"k":"v"}`+"\n"; got != want { - t.Errorf("got %v, want %v", got, want) - } - - buf.Reset() - prefix := log.NewLogfmtLogger(buf) - logger.Swap(prefix) - - if err := logger.Log("k", "v"); err != nil { - t.Error(err) - } - if got, want := buf.String(), "k=v\n"; got != want { - t.Errorf("got %v, want %v", got, want) - } - - buf.Reset() - logger.Swap(nil) - - if err := logger.Log("k", "v"); err != nil { - t.Error(err) - } - if got, want := buf.String(), ""; got != want { - t.Errorf("got %v, want %v", got, want) - } -} - -func TestSwapLoggerConcurrency(t *testing.T) { - testConcurrency(t, &log.SwapLogger{}) -} diff --git a/log/logfmt_logger_test.go b/log/logfmt_logger_test.go index 185e94851..91bbca15c 100644 --- a/log/logfmt_logger_test.go +++ b/log/logfmt_logger_test.go @@ -11,6 +11,7 @@ import ( ) func TestLogfmtLogger(t *testing.T) { + t.Parallel() buf := &bytes.Buffer{} logger := log.NewLogfmtLogger(buf) @@ -47,7 +48,8 @@ func BenchmarkLogfmtLoggerContextual(b *testing.B) { } func TestLogfmtLoggerConcurrency(t *testing.T) { - testConcurrency(t, log.NewLogfmtLogger(ioutil.Discard)) + t.Parallel() + testConcurrency(t, log.NewLogfmtLogger(ioutil.Discard), 10000) } type mymap map[int]int diff --git a/log/nop_logger_test.go b/log/nop_logger_test.go index 043553e62..25af1836d 100644 --- a/log/nop_logger_test.go +++ b/log/nop_logger_test.go @@ -7,6 +7,7 @@ import ( ) func TestNopLogger(t *testing.T) { + t.Parallel() logger := log.NewNopLogger() if err := logger.Log("abc", 123); err != nil { t.Error(err) diff --git a/log/sync.go b/log/sync.go new file mode 100644 index 000000000..3ac21f8a2 --- /dev/null +++ b/log/sync.go @@ -0,0 +1,233 @@ +package log + +import ( + "errors" + "io" + "sync" + "sync/atomic" +) + +// SwapLogger wraps another logger that may be safely replaced while other +// goroutines use the SwapLogger concurrently. The zero value for a SwapLogger +// will discard all log events without error. +// +// SwapLogger serves well as a package global logger that can be changed by +// importers. +type SwapLogger struct { + logger atomic.Value +} + +type loggerStruct struct { + Logger +} + +// Log implements the Logger interface by forwarding keyvals to the currently +// wrapped logger. It does not log anything if the wrapped logger is nil. +func (l *SwapLogger) Log(keyvals ...interface{}) error { + s, ok := l.logger.Load().(loggerStruct) + if !ok || s.Logger == nil { + return nil + } + return s.Log(keyvals...) +} + +// Swap replaces the currently wrapped logger with logger. Swap may be called +// concurrently with calls to Log from other goroutines. +func (l *SwapLogger) Swap(logger Logger) { + l.logger.Store(loggerStruct{logger}) +} + +// SyncWriter synchronizes concurrent writes to an io.Writer. +type SyncWriter struct { + mu sync.Mutex + w io.Writer +} + +// NewSyncWriter returns a new SyncWriter. The returned writer is safe for +// concurrent use by multiple goroutines. +func NewSyncWriter(w io.Writer) *SyncWriter { + return &SyncWriter{w: w} +} + +// Write writes p to the underlying io.Writer. If another write is already in +// progress, the calling goroutine blocks until the SyncWriter is available. +func (w *SyncWriter) Write(p []byte) (n int, err error) { + w.mu.Lock() + n, err = w.w.Write(p) + w.mu.Unlock() + return n, err +} + +// syncLogger provides concurrent safe logging for another Logger. +type syncLogger struct { + mu sync.Mutex + logger Logger +} + +// NewSyncLogger returns a logger that synchronizes concurrent use of the +// wrapped logger. When multiple goroutines use the SyncLogger concurrently +// only one goroutine will be allowed to log to the wrapped logger at a time. +// The other goroutines will block until the logger is available. +func NewSyncLogger(logger Logger) Logger { + return &syncLogger{logger: logger} +} + +// Log logs keyvals to the underlying Logger. If another log is already in +// progress, the calling goroutine blocks until the syncLogger is available. +func (l *syncLogger) Log(keyvals ...interface{}) error { + l.mu.Lock() + err := l.logger.Log(keyvals...) + l.mu.Unlock() + return err +} + +// AsyncLogger provides buffered asynchronous and concurrent safe logging for +// another logger. +// +// Errors returned by the wrapped logger are ignored, therefore the wrapped +// logger must handle all errors appropriately. +type AsyncLogger struct { + logger Logger + keyvalsC chan []interface{} +} + +// NewAsyncLogger returns a new AsyncLogger that logs to logger and can buffer +// up to size log events before its Log method blocks. +func NewAsyncLogger(logger Logger, size int) *AsyncLogger { + l := &AsyncLogger{ + logger: logger, + keyvalsC: make(chan []interface{}, size), + } + go l.run() + return l +} + +// run forwards log events from l.keyvalsC to l.logger. +func (l *AsyncLogger) run() { + for keyvals := range l.keyvalsC { + l.logger.Log(keyvals...) + } +} + +// Log queues keyvals for logging by the wrapped Logger. Log may be called +// concurrently by multiple goroutines. If the the buffer is full, Log will +// block until space is available. Log always returns a nil error. +func (l *AsyncLogger) Log(keyvals ...interface{}) error { + l.keyvalsC <- keyvals + return nil +} + +// Len returns a snapshot of the number of buffered log events. The returned +// count should only be used for monitoring purposes as it becomes stale +// quickly. +func (l *AsyncLogger) Len() int { + return len(l.keyvalsC) +} + +// Cap returns the maximum capacity of the buffer. +func (l *AsyncLogger) Cap() int { + return cap(l.keyvalsC) +} + +// NonblockingLogger provides buffered asynchronous and concurrent safe +// logging for another logger. +// +// Errors returned by the wrapped logger are ignored, therefore the wrapped +// logger must handle all errors appropriately. +type NonblockingLogger struct { + logger Logger + keyvalsC chan []interface{} + + mu sync.Mutex + stopping chan struct{} // must be closed before keyvalsC + + stopped chan struct{} // closed when run loop exits +} + +// NewNonblockingLogger returns a new NonblockingLogger that logs to logger +// and can buffer up to size log events before overflowing. +func NewNonblockingLogger(logger Logger, size int) *NonblockingLogger { + l := &NonblockingLogger{ + logger: logger, + keyvalsC: make(chan []interface{}, size), + stopping: make(chan struct{}), + stopped: make(chan struct{}), + } + go l.run() + return l +} + +// run forwards log events from l.keyvalsC to l.logger. +func (l *NonblockingLogger) run() { + defer close(l.stopped) + for keyvals := range l.keyvalsC { + l.logger.Log(keyvals...) + } +} + +// Stop stops the NonblockingLogger. After stop returns the logger will not +// accept new log events. Log events queued prior to calling Stop will be +// logged. +func (l *NonblockingLogger) Stop() { + l.mu.Lock() + select { + case <-l.stopping: + // already stopping, do nothing + default: + close(l.stopping) + close(l.keyvalsC) + } + l.mu.Unlock() +} + +// Log queues keyvals for logging by the wrapped Logger. Log may be called +// concurrently by multiple goroutines. If the the buffer is full, Log will +// return ErrNonblockingLoggerOverflow and the keyvals are not queued. If the +// NonblockingLogger is stopping, Log will return +// ErrNonblockingLoggerStopping. +func (l *NonblockingLogger) Log(keyvals ...interface{}) error { + l.mu.Lock() + defer l.mu.Unlock() + + select { + case <-l.stopping: + return ErrNonblockingLoggerStopping + default: + } + + select { + case l.keyvalsC <- keyvals: + return nil + default: + return ErrNonblockingLoggerOverflow + } +} + +// Errors returned by NonblockingLogger. +var ( + ErrNonblockingLoggerStopping = errors.New("aysnc logger: logger stopped") + ErrNonblockingLoggerOverflow = errors.New("aysnc logger: log buffer overflow") +) + +// Stopping returns a channel that is closed after Stop is called. +func (l *NonblockingLogger) Stopping() <-chan struct{} { + return l.stopping +} + +// Stopped returns a channel that is closed after Stop is called and all log +// events have been sent to the wrapped logger. +func (l *NonblockingLogger) Stopped() <-chan struct{} { + return l.stopped +} + +// Len returns a snapshot of the number of buffered log events. The returned +// count should only be used for monitoring purposes as it becomes stale +// quickly. +func (l *NonblockingLogger) Len() int { + return len(l.keyvalsC) +} + +// Cap returns the maximum capacity of the buffer. +func (l *NonblockingLogger) Cap() int { + return cap(l.keyvalsC) +} diff --git a/log/sync_test.go b/log/sync_test.go new file mode 100644 index 000000000..dadacf8c6 --- /dev/null +++ b/log/sync_test.go @@ -0,0 +1,200 @@ +package log_test + +import ( + "bytes" + "io" + "testing" + + "github.com/go-kit/kit/log" +) + +func TestSwapLogger(t *testing.T) { + t.Parallel() + var logger log.SwapLogger + + // Zero value does not panic or error. + err := logger.Log("k", "v") + if got, want := err, error(nil); got != want { + t.Errorf("got %v, want %v", got, want) + } + + buf := &bytes.Buffer{} + json := log.NewJSONLogger(buf) + logger.Swap(json) + + if err := logger.Log("k", "v"); err != nil { + t.Error(err) + } + if got, want := buf.String(), `{"k":"v"}`+"\n"; got != want { + t.Errorf("got %v, want %v", got, want) + } + + buf.Reset() + prefix := log.NewLogfmtLogger(buf) + logger.Swap(prefix) + + if err := logger.Log("k", "v"); err != nil { + t.Error(err) + } + if got, want := buf.String(), "k=v\n"; got != want { + t.Errorf("got %v, want %v", got, want) + } + + buf.Reset() + logger.Swap(nil) + + if err := logger.Log("k", "v"); err != nil { + t.Error(err) + } + if got, want := buf.String(), ""; got != want { + t.Errorf("got %v, want %v", got, want) + } +} + +func TestSwapLoggerConcurrency(t *testing.T) { + t.Parallel() + testConcurrency(t, &log.SwapLogger{}, 10000) +} + +func TestSyncWriterConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + w = log.NewSyncWriter(w) + testConcurrency(t, log.NewLogfmtLogger(w), 10000) +} + +func TestSyncLoggerConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + logger = log.NewSyncLogger(logger) + testConcurrency(t, logger, 10000) +} + +func TestAsyncLoggerConcurrency(t *testing.T) { + for _, size := range []int{1, 100, 1000, 10000} { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + logger = log.NewAsyncLogger(logger, size) + testConcurrency(t, logger, 10000) + } +} + +func TestAsyncLoggerLogs(t *testing.T) { + t.Parallel() + output := make(chan []interface{}) + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output <- keyvals + return nil + }) + + const size = 4 + const logcnt = size * 20 + al := log.NewAsyncLogger(logger, size) + + go func() { + for i := 0; i < logcnt; i++ { + al.Log("key", i) + } + }() + + for i := 0; i < logcnt; i++ { + e := <-output + if got, want := e[1], i; got != want { + t.Errorf("log event mismatch, got %v, want %v", got, want) + } + } +} + +func TestNonblockingLoggerConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + al := log.NewNonblockingLogger(logger, 10000) + testConcurrency(t, al, 10000) + al.Stop() + <-al.Stopped() +} + +func TestNonblockingLoggerLogs(t *testing.T) { + t.Parallel() + output := [][]interface{}{} + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output = append(output, keyvals) + return nil + }) + + const logcnt = 10 + al := log.NewNonblockingLogger(logger, logcnt) + + for i := 0; i < logcnt; i++ { + al.Log("key", i) + } + + al.Stop() + al.Stop() // stop is idempotent + <-al.Stopping() + + if got, want := al.Log("key", "late"), log.ErrNonblockingLoggerStopping; got != want { + t.Errorf(`logger err: got "%v", want "%v"`, got, want) + } + + <-al.Stopped() + al.Stop() // stop is idempotent + + if got, want := len(output), logcnt; got != want { + t.Errorf("logged events: got %v, want %v", got, want) + } + + for i, e := range output { + if got, want := e[1], i; got != want { + t.Errorf("log event mismatch, got %v, want %v", got, want) + } + } +} + +func TestNonblockingLoggerOverflow(t *testing.T) { + t.Parallel() + var ( + output = make(chan []interface{}, 10) + loggerdone = make(chan struct{}) + ) + + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output <- keyvals + <-loggerdone // block here to stall the NonblockingLogger.run loop + return nil + }) + + al := log.NewNonblockingLogger(logger, 1) + + if got, want := al.Log("k", 1), error(nil); got != want { + t.Errorf(`first log err: got "%v", want "%v"`, got, want) + } + + <-output + // Now we know the NonblockingLogger.run loop has consumed the first log event + // and will be stalled until loggerdone is closed. + + // This log event fills the buffer without error. + if got, want := al.Log("k", 2), error(nil); got != want { + t.Errorf(`second log err: got "%v", want "%v"`, got, want) + } + + // Now we test for buffer overflow. + if got, want := al.Log("k", 3), log.ErrNonblockingLoggerOverflow; got != want { + t.Errorf(`third log err: got "%v", want "%v"`, got, want) + } + + al.Stop() + <-al.Stopping() + + if got, want := al.Log("key", "late"), log.ErrNonblockingLoggerStopping; got != want { + t.Errorf(`log while stopping err: got "%v", want "%v"`, got, want) + } + + // Release the NonblockingLogger.run loop and wait for it to stop. + close(loggerdone) + <-al.Stopped() +} diff --git a/log/value_test.go b/log/value_test.go index 52773611c..44e6478af 100644 --- a/log/value_test.go +++ b/log/value_test.go @@ -9,6 +9,7 @@ import ( ) func TestValueBinding(t *testing.T) { + t.Parallel() var output []interface{} logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error { @@ -33,7 +34,7 @@ func TestValueBinding(t *testing.T) { if want, have := start.Add(time.Second), timestamp; want != have { t.Errorf("output[1]: want %v, have %v", want, have) } - if want, have := "value_test.go:28", fmt.Sprint(output[3]); want != have { + if want, have := "value_test.go:29", fmt.Sprint(output[3]); want != have { t.Errorf("output[3]: want %s, have %s", want, have) } @@ -46,12 +47,13 @@ func TestValueBinding(t *testing.T) { if want, have := start.Add(2*time.Second), timestamp; want != have { t.Errorf("output[1]: want %v, have %v", want, have) } - if want, have := "value_test.go:41", fmt.Sprint(output[3]); want != have { + if want, have := "value_test.go:42", fmt.Sprint(output[3]); want != have { t.Errorf("output[3]: want %s, have %s", want, have) } } func TestValueBinding_loggingZeroKeyvals(t *testing.T) { + t.Parallel() var output []interface{} logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {