Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"strings"
"sync"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/moby/buildkit/identity"
Expand Down Expand Up @@ -36,14 +37,16 @@ type Attachable interface {

// Session is a long running connection between client and a daemon
type Session struct {
id string
name string
sharedKey string
ctx context.Context
cancelCtx func()
done chan struct{}
grpcServer *grpc.Server
conn net.Conn
mu sync.Mutex // synchronizes conn run and close
id string
name string
sharedKey string
ctx context.Context
cancelCtx func()
done chan struct{}
grpcServer *grpc.Server
conn net.Conn
closeCalled bool
}

// NewSession returns a new long running session
Expand Down Expand Up @@ -99,6 +102,11 @@ func (s *Session) ID() string {

// Run activates the session
func (s *Session) Run(ctx context.Context, dialer Dialer) error {
s.mu.Lock()
if s.closeCalled {
s.mu.Unlock()
return nil
}
ctx, cancel := context.WithCancel(ctx)
s.cancelCtx = cancel
s.done = make(chan struct{})
Expand All @@ -118,22 +126,27 @@ func (s *Session) Run(ctx context.Context, dialer Dialer) error {
}
conn, err := dialer(ctx, "h2c", meta)
if err != nil {
s.mu.Unlock()
return errors.Wrap(err, "failed to dial gRPC")
}
s.conn = conn
s.mu.Unlock()
serve(ctx, s.grpcServer, conn)
return nil
}

// Close closes the session
func (s *Session) Close() error {
s.mu.Lock()
if s.cancelCtx != nil && s.done != nil {
if s.conn != nil {
s.conn.Close()
}
s.grpcServer.Stop()
<-s.done
}
s.closeCalled = true
s.mu.Unlock()
return nil
}

Expand Down