Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/entrypoint/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ The following flags are available:
- `-wait_file_content`: expects the `wait_file` to contain actual
contents. It will continue watching for `wait_file` until it has
content.
- `-stdout_path`: If specified, the stdout of the sub-process will be
copied to the given path on the local filesystem.
- `-stderr_path`: If specified, the stderr of the sub-process will be
copied to the given path on the local filesystem. It can be set to the
same value as `{{stdout_path}}` so both streams are copied to the same
file. However, there is no ordering guarantee on data copied from both
streams.

Any extra positional arguments are passed to the original entrypoint command.

Expand Down
105 changes: 105 additions & 0 deletions cmd/entrypoint/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"errors"
"io"
"math"
"os"
"time"
)

type ioResult struct {
numBytes int
err error
}

// readAsync implements a non-blocking read.
func readAsync(r io.Reader, p []byte) <-chan ioResult {
resultCh := make(chan ioResult, 1)
go func() {
defer close(resultCh)
n, err := r.Read(p)
resultCh <- ioResult{n, err}
}()
return resultCh
}

// copyAsync performs a non-blocking copy from src to dst.
func copyAsync(dst io.Writer, src io.Reader, stopCh <-chan struct{}) <-chan ioResult {
resultCh := make(chan ioResult, 1)
go func() {
defer close(resultCh)

buf := make([]byte, 1024)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest putting 1024 into a named constant with any relevant info about its value in an accompanying comment.

result := ioResult{}
readCh := readAsync(src, buf)
stopped := false
done := false
timer := time.NewTimer(time.Duration(math.MaxInt64))
defer timer.Stop()

for !done {
// If the stop channel is signalled, continue the loop to read the rest of the available
// data with a short timeout instead of a non-blocking read to mitigate the race between
// this loop and Read() running in another goroutine.
if stopped {
if !timer.Stop() {
<-timer.C
}
timer.Reset(100 * time.Millisecond)
}
select {
case r := <-readCh:
if r.numBytes != 0 {
nw, err := dst.Write(buf[:r.numBytes])
result.numBytes += nw
if err != nil {
result.err = err
done = true
} else if nw < r.numBytes {
result.err = io.ErrShortWrite
done = true
}
}
if r.err != nil {
if !errors.Is(r.err, io.EOF) {
result.err = r.err
}
done = true
}
if !done {
readCh = readAsync(src, buf)
}
case <-stopCh:
stopped = true
stopCh = nil
case <-timer.C:
done = true
}
}

resultCh <- result
}()
return resultCh
}

// asyncWriter creates a write that duplicates its writes to the provided writer asynchronously.
func asyncWriter(w io.Writer, stopCh <-chan struct{}) (io.Writer, <-chan error, error) {
pr, pw, err := os.Pipe()
if err != nil {
return nil, nil, err
}

doneCh := make(chan error, 1)
go func() {
defer close(doneCh)

if err := (<-copyAsync(w, pr, stopCh)).err; err != nil {
doneCh <- err
}
pr.Close()
pw.Close()
}()

return pw, doneCh, nil
}
76 changes: 76 additions & 0 deletions cmd/entrypoint/io_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"bytes"
"errors"
"io"
"testing"
)

func TestCopyAsyncEOF(t *testing.T) {
stopCh := make(chan struct{}, 1)
defer close(stopCh)

pr, pw := io.Pipe()
defer pr.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
pw.Write([]byte(expectedString))
pw.Close()

if c := <-copyCh; c.err != nil {
t.Fatalf("Unexpected error: %v", c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}

func TestCopyAsyncStop(t *testing.T) {
stopCh := make(chan struct{}, 1)

pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
pw.Write([]byte(expectedString))

close(stopCh)

if c := <-copyCh; c.err != nil {
t.Fatalf("Unexpected error: %v", c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}

func TestCopyAsyncError(t *testing.T) {
stopCh := make(chan struct{}, 1)
defer close(stopCh)

pr, pw := io.Pipe()
defer pr.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
expectedError := errors.New("test error")
pw.Write([]byte(expectedString))
pw.CloseWithError(expectedError)

if c := <-copyCh; !errors.Is(c.err, expectedError) {
t.Errorf("Expected error %v but got %v", expectedError, c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}
13 changes: 9 additions & 4 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
postFile = flag.String("post_file", "", "If specified, file to write upon completion")
terminationPath = flag.String("termination_path", "/tekton/termination", "If specified, file to write upon termination")
results = flag.String("results", "", "If specified, list of file names that might contain task results")
stdoutPath = flag.String("stdout_path", "", "If specified, file to copy stdout to")
stderrPath = flag.String("stderr_path", "", "If specified, file to copy stdout to")
timeout = flag.Duration("timeout", time.Duration(0), "If specified, sets timeout for step")
)

Expand Down Expand Up @@ -104,10 +106,13 @@ func main() {
TerminationPath: *terminationPath,
Args: flag.Args(),
Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval},
Runner: &realRunner{},
PostWriter: &realPostWriter{},
Results: strings.Split(*results, ","),
Timeout: timeout,
Runner: &realRunner{
stdoutPath: *stdoutPath,
stderrPath: *stderrPath,
},
PostWriter: &realPostWriter{},
Results: strings.Split(*results, ","),
Timeout: timeout,
}

// Copy any creds injected by the controller into the $HOME directory of the current
Expand Down
59 changes: 58 additions & 1 deletion cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"io"
"log"
"os"
"os/exec"
"os/signal"
Expand All @@ -15,7 +17,9 @@ import (

// realRunner actually runs commands.
type realRunner struct {
signals chan os.Signal
signals chan os.Signal
stdoutPath string
stderrPath string
}

var _ entrypoint.Runner = (*realRunner)(nil)
Expand All @@ -35,8 +39,61 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
defer signal.Reset()

cmd := exec.CommandContext(ctx, name, args...)
stopCh := make(chan struct{}, 1)
defer close(stopCh)

cmd.Stdout = os.Stdout
var stdoutFile *os.File
if rr.stdoutPath != "" {
var err error
var doneCh <-chan error
if stdoutFile, err = os.Create(rr.stdoutPath); err != nil {
return err
}
// We use os.Pipe in asyncWriter to copy stdout instead of cmd.StdoutPipe or providing an
// io.Writer directly because otherwise Go would wait for the underlying fd to be closed by the
// child process before returning from cmd.Wait even if the process is no longer running. This
// would cause a deadlock if the child spawns a long running descendant process before exiting.
if cmd.Stdout, doneCh, err = asyncWriter(io.MultiWriter(os.Stdout, stdoutFile), stopCh); err != nil {
return err
}
go func() {
if err := <-doneCh; err != nil {
log.Fatalf("Copying stdout: %v", err)
}
stdoutFile.Close()
}()
}

cmd.Stderr = os.Stderr
var stderrFile *os.File
if rr.stderrPath != "" {
var err error
var doneCh <-chan error
if rr.stderrPath == rr.stdoutPath {
fd, err := syscall.Dup(int(stdoutFile.Fd()))
if err != nil {
return err
}
stderrFile = os.NewFile(uintptr(fd), rr.stderrPath)
} else if stderrFile, err = os.Create(rr.stderrPath); err != nil {
return err
}
// We use os.Pipe in asyncWriter to copy stderr instead of cmd.StderrPipe or providing an
// io.Writer directly because otherwise Go would wait for the underlying fd to be closed by the
// child process before returning from cmd.Wait even if the process is no longer running. This
// would cause a deadlock if the child spawns a long running descendant process before exiting.
if cmd.Stderr, doneCh, err = asyncWriter(io.MultiWriter(os.Stderr, stderrFile), stopCh); err != nil {
return err
}
go func() {
if err := <-doneCh; err != nil {
log.Fatalf("Copying stderr: %v", err)
}
stderrFile.Close()
}()
}

// dedicated PID group used to forward signals to
// main process and all children
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
Expand Down
Loading