From e5310964d7b99e1cd675b2cc2b80b3f15a7918f3 Mon Sep 17 00:00:00 2001 From: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Date: Thu, 15 Feb 2024 11:01:38 +0100 Subject: [PATCH 1/5] fix: concurrency safe `net.UnixConn` Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> --- cli-plugins/socket/socket.go | 38 ++++++++++++++++++-------- cli-plugins/socket/socket_darwin.go | 2 +- cli-plugins/socket/socket_nodarwin.go | 2 +- cli-plugins/socket/socket_test.go | 39 ++++++++++++++------------- cmd/docker/docker.go | 17 +++++++----- 5 files changed, 59 insertions(+), 39 deletions(-) diff --git a/cli-plugins/socket/socket.go b/cli-plugins/socket/socket.go index 67ba11562e38..332d6d99ce50 100644 --- a/cli-plugins/socket/socket.go +++ b/cli-plugins/socket/socket.go @@ -16,15 +16,18 @@ const EnvKey = "DOCKER_CLI_PLUGIN_SOCKET" // SetupConn sets up a Unix socket listener, establishes a goroutine to handle connections // and update the conn pointer, and returns the listener for the socket (which the caller // is responsible for closing when it's no longer needed). -func SetupConn(conn **net.UnixConn) (*net.UnixListener, error) { +func SetupConn() (*net.UnixListener, <-chan *net.UnixConn, error) { listener, err := listen("docker_cli_" + randomID()) if err != nil { - return nil, err + return nil, nil, err } - accept(listener, conn) + // accept starts a background goroutine + // to accept a new connection + // once accepted, the connChan will be updated. + connChan := accept(listener) - return listener, nil + return listener, connChan, nil } func randomID() string { @@ -35,16 +38,29 @@ func randomID() string { return hex.EncodeToString(b) } -func accept(listener *net.UnixListener, conn **net.UnixConn) { - go func() { +// accept creates a new Unix socket connection +// and sends it to the *net.UnixConn channel +// it allows reconnects +func accept(listener *net.UnixListener) <-chan *net.UnixConn { + connChan := make(chan *net.UnixConn, 1) + + go func(connChan chan<- *net.UnixConn) { for { - // ignore error here, if we failed to accept a connection, - // conn is nil and we fallback to previous behavior - *conn, _ = listener.AcceptUnix() + // this is a blocking call and will wait + // until a new connection is accepted + c, err := listener.AcceptUnix() + // perform any platform-specific actions on accept (e.g. unlink non-abstract sockets) - onAccept(*conn, listener) + onAccept(listener) + // retry accepting a connection if there was an error + if err != nil { + continue + } + connChan <- c } - }() + }(connChan) + + return connChan } // ConnectAndWait connects to the socket passed via well-known env var, diff --git a/cli-plugins/socket/socket_darwin.go b/cli-plugins/socket/socket_darwin.go index 17ab6aa69e6e..2531a9c8044a 100644 --- a/cli-plugins/socket/socket_darwin.go +++ b/cli-plugins/socket/socket_darwin.go @@ -14,6 +14,6 @@ func listen(socketname string) (*net.UnixListener, error) { }) } -func onAccept(conn *net.UnixConn, listener *net.UnixListener) { +func onAccept(listener *net.UnixListener) { syscall.Unlink(listener.Addr().String()) } diff --git a/cli-plugins/socket/socket_nodarwin.go b/cli-plugins/socket/socket_nodarwin.go index aa6065ecb446..ed5d4cbcc6f0 100644 --- a/cli-plugins/socket/socket_nodarwin.go +++ b/cli-plugins/socket/socket_nodarwin.go @@ -13,7 +13,7 @@ func listen(socketname string) (*net.UnixListener, error) { }) } -func onAccept(conn *net.UnixConn, listener *net.UnixListener) { +func onAccept(listener *net.UnixListener) { // do nothing // while on darwin and OpenBSD we would unlink here; // on non-darwin the socket is abstract and not present on the filesystem diff --git a/cli-plugins/socket/socket_test.go b/cli-plugins/socket/socket_test.go index 409eb689485c..8019567d1928 100644 --- a/cli-plugins/socket/socket_test.go +++ b/cli-plugins/socket/socket_test.go @@ -15,8 +15,7 @@ import ( func TestSetupConn(t *testing.T) { t.Run("updates conn when connected", func(t *testing.T) { - var conn *net.UnixConn - listener, err := SetupConn(&conn) + listener, conn, err := SetupConn() assert.NilError(t, err) assert.Check(t, listener != nil, "returned nil listener but no error") addr, err := net.ResolveUnixAddr("unix", listener.Addr().String()) @@ -25,12 +24,11 @@ func TestSetupConn(t *testing.T) { _, err = net.DialUnix("unix", nil, addr) assert.NilError(t, err, "failed to dial returned listener") - pollConnNotNil(t, &conn) + pollConnNotNil(t, conn) }) t.Run("allows reconnects", func(t *testing.T) { - var conn *net.UnixConn - listener, err := SetupConn(&conn) + listener, _, err := SetupConn() assert.NilError(t, err) assert.Check(t, listener != nil, "returned nil listener but no error") addr, err := net.ResolveUnixAddr("unix", listener.Addr().String()) @@ -46,8 +44,7 @@ func TestSetupConn(t *testing.T) { }) t.Run("does not leak sockets to local directory", func(t *testing.T) { - var conn *net.UnixConn - listener, err := SetupConn(&conn) + listener, _, err := SetupConn() assert.NilError(t, err) assert.Check(t, listener != nil, "returned nil listener but no error") checkDirNoPluginSocket(t) @@ -78,8 +75,7 @@ func checkDirNoPluginSocket(t *testing.T) { func TestConnectAndWait(t *testing.T) { t.Run("calls cancel func on EOF", func(t *testing.T) { - var conn *net.UnixConn - listener, err := SetupConn(&conn) + listener, connChan, err := SetupConn() assert.NilError(t, err, "failed to setup listener") done := make(chan struct{}) @@ -87,8 +83,9 @@ func TestConnectAndWait(t *testing.T) { cancelFunc := func() { done <- struct{}{} } + ConnectAndWait(cancelFunc) - pollConnNotNil(t, &conn) + conn := pollConnNotNil(t, connChan) conn.Close() select { @@ -101,8 +98,7 @@ func TestConnectAndWait(t *testing.T) { // TODO: this test cannot be executed with `t.Parallel()`, due to // relying on goroutine numbers to ensure correct behaviour t.Run("connect goroutine exits after EOF", func(t *testing.T) { - var conn *net.UnixConn - listener, err := SetupConn(&conn) + listener, connChan, err := SetupConn() assert.NilError(t, err, "failed to setup listener") t.Setenv(EnvKey, listener.Addr().String()) numGoroutines := runtime.NumGoroutine() @@ -110,8 +106,9 @@ func TestConnectAndWait(t *testing.T) { ConnectAndWait(func() {}) assert.Equal(t, runtime.NumGoroutine(), numGoroutines+1) - pollConnNotNil(t, &conn) + conn := pollConnNotNil(t, connChan) conn.Close() + poll.WaitOn(t, func(t poll.LogT) poll.Result { if runtime.NumGoroutine() > numGoroutines+1 { return poll.Continue("waiting for connect goroutine to exit") @@ -121,13 +118,17 @@ func TestConnectAndWait(t *testing.T) { }) } -func pollConnNotNil(t *testing.T, conn **net.UnixConn) { +func pollConnNotNil(t *testing.T, conn <-chan *net.UnixConn) *net.UnixConn { t.Helper() - poll.WaitOn(t, func(t poll.LogT) poll.Result { - if *conn == nil { - return poll.Continue("waiting for conn to not be nil") + select { + case c := <-conn: + if c == nil { + t.Fatal("conn is nil") } - return poll.Success() - }, poll.WithDelay(1*time.Millisecond), poll.WithTimeout(10*time.Millisecond)) + return c + case <-time.After(10 * time.Millisecond): + t.Fatal("timeout waiting for conn to be set") + } + return nil } diff --git a/cmd/docker/docker.go b/cmd/docker/docker.go index cfc53a6fa170..1febe64139f2 100644 --- a/cmd/docker/docker.go +++ b/cmd/docker/docker.go @@ -2,12 +2,12 @@ package main import ( "fmt" - "net" "os" "os/exec" "os/signal" "strings" "syscall" + "time" "github.com/docker/cli/cli" pluginmanager "github.com/docker/cli/cli-plugins/manager" @@ -222,8 +222,7 @@ func tryPluginRun(dockerCli command.Cli, cmd *cobra.Command, subcommand string, } // Establish the plugin socket, adding it to the environment under a well-known key if successful. - var conn *net.UnixConn - listener, err := socket.SetupConn(&conn) + listener, conn, err := socket.SetupConn() if err == nil { envs = append(envs, socket.EnvKey+"="+listener.Addr().String()) defer listener.Close() @@ -247,11 +246,15 @@ func tryPluginRun(dockerCli command.Cli, cmd *cobra.Command, subcommand string, // receive signals due to sharing a pgid with the parent CLI continue } - if conn != nil { - if err := conn.Close(); err != nil { - _, _ = fmt.Fprintf(dockerCli.Err(), "failed to signal plugin to close: %v\n", err) + select { + case c := <-conn: + if c != nil { + if err := c.Close(); err != nil { + _, _ = fmt.Fprintf(dockerCli.Err(), "failed to signal plugin to close: %v\n", err) + } } - conn = nil + case <-time.After(100 * time.Millisecond): + // fallthrough and continue with the loop. } retries++ if retries >= exitLimit { From c920d2d0dcbe3500fb0416eb450ca632ce8d15ff Mon Sep 17 00:00:00 2001 From: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Date: Tue, 20 Feb 2024 12:02:32 +0100 Subject: [PATCH 2/5] fix: infinite socket connection retries Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> --- cli-plugins/socket/socket.go | 28 ++++++++++++++++++++-------- cli-plugins/socket/socket_openbsd.go | 2 +- cmd/docker/docker.go | 9 ++++++--- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/cli-plugins/socket/socket.go b/cli-plugins/socket/socket.go index 332d6d99ce50..932d2b0c940d 100644 --- a/cli-plugins/socket/socket.go +++ b/cli-plugins/socket/socket.go @@ -7,6 +7,7 @@ import ( "io" "net" "os" + "time" ) // EnvKey represents the well-known environment variable used to pass the plugin being @@ -44,21 +45,32 @@ func randomID() string { func accept(listener *net.UnixListener) <-chan *net.UnixConn { connChan := make(chan *net.UnixConn, 1) - go func(connChan chan<- *net.UnixConn) { - for { + go func() { + const maxRetries = 10 + const waitBetweenRetries = 100 * time.Millisecond + + var conn *net.UnixConn + var err error + + // retry accepting a connection if there was an error + for i := 0; i < maxRetries; i++ { // this is a blocking call and will wait // until a new connection is accepted - c, err := listener.AcceptUnix() + // or until the timout is reached + conn, err = listener.AcceptUnix() - // perform any platform-specific actions on accept (e.g. unlink non-abstract sockets) - onAccept(listener) - // retry accepting a connection if there was an error if err != nil { + time.Sleep(waitBetweenRetries) continue } - connChan <- c + break } - }(connChan) + // perform any platform-specific actions on accept (e.g. unlink non-abstract sockets) + onAccept(listener) + connChan <- conn + // close the channel to signal we won't accept any more connections + close(connChan) + }() return connChan } diff --git a/cli-plugins/socket/socket_openbsd.go b/cli-plugins/socket/socket_openbsd.go index 17ab6aa69e6e..2531a9c8044a 100644 --- a/cli-plugins/socket/socket_openbsd.go +++ b/cli-plugins/socket/socket_openbsd.go @@ -14,6 +14,6 @@ func listen(socketname string) (*net.UnixListener, error) { }) } -func onAccept(conn *net.UnixConn, listener *net.UnixListener) { +func onAccept(listener *net.UnixListener) { syscall.Unlink(listener.Addr().String()) } diff --git a/cmd/docker/docker.go b/cmd/docker/docker.go index 1febe64139f2..e2ba6a4917f0 100644 --- a/cmd/docker/docker.go +++ b/cmd/docker/docker.go @@ -222,7 +222,7 @@ func tryPluginRun(dockerCli command.Cli, cmd *cobra.Command, subcommand string, } // Establish the plugin socket, adding it to the environment under a well-known key if successful. - listener, conn, err := socket.SetupConn() + listener, connChan, err := socket.SetupConn() if err == nil { envs = append(envs, socket.EnvKey+"="+listener.Addr().String()) defer listener.Close() @@ -247,13 +247,16 @@ func tryPluginRun(dockerCli command.Cli, cmd *cobra.Command, subcommand string, continue } select { - case c := <-conn: + // connChan will close itself once we receive the connection + // thus further loops will not block on connChan + case c := <-connChan: if c != nil { if err := c.Close(); err != nil { _, _ = fmt.Fprintf(dockerCli.Err(), "failed to signal plugin to close: %v\n", err) } } - case <-time.After(100 * time.Millisecond): + // timeout here if for some reason the channel is still open and not populated. + case <-time.After(1 * time.Millisecond): // fallthrough and continue with the loop. } retries++ From ad778cf743c1876098671035bc183aa39762827d Mon Sep 17 00:00:00 2001 From: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Date: Thu, 22 Feb 2024 10:34:24 +0100 Subject: [PATCH 3/5] refactor: remove reconnect behavior Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> --- cli-plugins/socket/socket.go | 23 ++++------------------- cli-plugins/socket/socket_test.go | 16 ---------------- 2 files changed, 4 insertions(+), 35 deletions(-) diff --git a/cli-plugins/socket/socket.go b/cli-plugins/socket/socket.go index 932d2b0c940d..81d61caf9dd1 100644 --- a/cli-plugins/socket/socket.go +++ b/cli-plugins/socket/socket.go @@ -7,7 +7,6 @@ import ( "io" "net" "os" - "time" ) // EnvKey represents the well-known environment variable used to pass the plugin being @@ -46,25 +45,11 @@ func accept(listener *net.UnixListener) <-chan *net.UnixConn { connChan := make(chan *net.UnixConn, 1) go func() { - const maxRetries = 10 - const waitBetweenRetries = 100 * time.Millisecond + // this is a blocking call and will wait + // until a new connection is accepted + // or until the timout is reached + conn, _ := listener.AcceptUnix() - var conn *net.UnixConn - var err error - - // retry accepting a connection if there was an error - for i := 0; i < maxRetries; i++ { - // this is a blocking call and will wait - // until a new connection is accepted - // or until the timout is reached - conn, err = listener.AcceptUnix() - - if err != nil { - time.Sleep(waitBetweenRetries) - continue - } - break - } // perform any platform-specific actions on accept (e.g. unlink non-abstract sockets) onAccept(listener) connChan <- conn diff --git a/cli-plugins/socket/socket_test.go b/cli-plugins/socket/socket_test.go index 8019567d1928..d9947a7ab881 100644 --- a/cli-plugins/socket/socket_test.go +++ b/cli-plugins/socket/socket_test.go @@ -27,22 +27,6 @@ func TestSetupConn(t *testing.T) { pollConnNotNil(t, conn) }) - t.Run("allows reconnects", func(t *testing.T) { - listener, _, err := SetupConn() - assert.NilError(t, err) - assert.Check(t, listener != nil, "returned nil listener but no error") - addr, err := net.ResolveUnixAddr("unix", listener.Addr().String()) - assert.NilError(t, err, "failed to resolve listener address") - - otherConn, err := net.DialUnix("unix", nil, addr) - assert.NilError(t, err, "failed to dial returned listener") - - otherConn.Close() - - _, err = net.DialUnix("unix", nil, addr) - assert.NilError(t, err, "failed to redial listener") - }) - t.Run("does not leak sockets to local directory", func(t *testing.T) { listener, _, err := SetupConn() assert.NilError(t, err) From 13c6b68e4218b4179f431214ad1530762226984f Mon Sep 17 00:00:00 2001 From: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Date: Thu, 22 Feb 2024 15:58:55 +0100 Subject: [PATCH 4/5] chore: update docstring and address nit Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> --- cli-plugins/socket/socket.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cli-plugins/socket/socket.go b/cli-plugins/socket/socket.go index 81d61caf9dd1..f46cf3ce6d1d 100644 --- a/cli-plugins/socket/socket.go +++ b/cli-plugins/socket/socket.go @@ -13,9 +13,10 @@ import ( // executed the socket name it should listen on to coordinate with the host CLI. const EnvKey = "DOCKER_CLI_PLUGIN_SOCKET" -// SetupConn sets up a Unix socket listener, establishes a goroutine to handle connections -// and update the conn pointer, and returns the listener for the socket (which the caller -// is responsible for closing when it's no longer needed). +// SetupConn sets up a Unix socket listener, establishes a goroutine to handle connections. +// A listener is returned, along with a connection channel that will receive the established +// connection. The channel *may* return a nil connection and should be checked once received. +// The caller is responsible for closing the listener when it's no longer needed. func SetupConn() (*net.UnixListener, <-chan *net.UnixConn, error) { listener, err := listen("docker_cli_" + randomID()) if err != nil { @@ -40,11 +41,12 @@ func randomID() string { // accept creates a new Unix socket connection // and sends it to the *net.UnixConn channel -// it allows reconnects func accept(listener *net.UnixListener) <-chan *net.UnixConn { connChan := make(chan *net.UnixConn, 1) go func() { + // close the channel to signal we won't accept any more connections + defer close(connChan) // this is a blocking call and will wait // until a new connection is accepted // or until the timout is reached @@ -53,8 +55,6 @@ func accept(listener *net.UnixListener) <-chan *net.UnixConn { // perform any platform-specific actions on accept (e.g. unlink non-abstract sockets) onAccept(listener) connChan <- conn - // close the channel to signal we won't accept any more connections - close(connChan) }() return connChan From e1a028ced4c143bce14986eb1429e12074ee06ea Mon Sep 17 00:00:00 2001 From: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Date: Fri, 1 Mar 2024 13:03:48 +0100 Subject: [PATCH 5/5] fix: fallthrough for non-blocking channel Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> --- cmd/docker/docker.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/docker/docker.go b/cmd/docker/docker.go index e2ba6a4917f0..addf674aa2c1 100644 --- a/cmd/docker/docker.go +++ b/cmd/docker/docker.go @@ -7,7 +7,6 @@ import ( "os/signal" "strings" "syscall" - "time" "github.com/docker/cli/cli" pluginmanager "github.com/docker/cli/cli-plugins/manager" @@ -255,8 +254,7 @@ func tryPluginRun(dockerCli command.Cli, cmd *cobra.Command, subcommand string, _, _ = fmt.Fprintf(dockerCli.Err(), "failed to signal plugin to close: %v\n", err) } } - // timeout here if for some reason the channel is still open and not populated. - case <-time.After(1 * time.Millisecond): + default: // fallthrough and continue with the loop. } retries++