diff --git a/containerd-shim-v2/container.go b/containerd-shim-v2/container.go index 6b5e994824..cc57487611 100644 --- a/containerd-shim-v2/container.go +++ b/containerd-shim-v2/container.go @@ -6,6 +6,7 @@ package containerdshim import ( + "io" "time" "github.com/containerd/containerd/api/types/task" @@ -17,23 +18,25 @@ import ( ) type container struct { - s *service - ttyio *ttyIO - spec *specs.Spec - exitTime time.Time - execs map[string]*exec - exitIOch chan struct{} - exitCh chan uint32 - id string - stdin string - stdout string - stderr string - bundle string - cType vc.ContainerType - exit uint32 - status task.Status - terminal bool - mounted bool + s *service + ttyio *ttyIO + spec *specs.Spec + exitTime time.Time + execs map[string]*exec + exitIOch chan struct{} + stdinPipe io.WriteCloser + stdinCloser chan struct{} + exitCh chan uint32 + id string + stdin string + stdout string + stderr string + bundle string + cType vc.ContainerType + exit uint32 + status task.Status + terminal bool + mounted bool } func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) { @@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con } c := &container{ - s: s, - spec: spec, - id: r.ID, - bundle: r.Bundle, - stdin: r.Stdin, - stdout: r.Stdout, - stderr: r.Stderr, - terminal: r.Terminal, - cType: containerType, - execs: make(map[string]*exec), - status: task.StatusCreated, - exitIOch: make(chan struct{}), - exitCh: make(chan uint32, 1), - mounted: mounted, + s: s, + spec: spec, + id: r.ID, + bundle: r.Bundle, + stdin: r.Stdin, + stdout: r.Stdout, + stderr: r.Stderr, + terminal: r.Terminal, + cType: containerType, + execs: make(map[string]*exec), + status: task.StatusCreated, + exitIOch: make(chan struct{}), + exitCh: make(chan uint32, 1), + stdinCloser: make(chan struct{}), + mounted: mounted, } return c, nil } diff --git a/containerd-shim-v2/exec.go b/containerd-shim-v2/exec.go index fcbb95e885..25718208a7 100644 --- a/containerd-shim-v2/exec.go +++ b/containerd-shim-v2/exec.go @@ -7,6 +7,7 @@ package containerdshim import ( "fmt" + "io" "strings" "time" @@ -32,6 +33,9 @@ type exec struct { exitIOch chan struct{} exitCh chan uint32 + stdinCloser chan struct{} + stdinPipe io.WriteCloser + exitTime time.Time } @@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g } exec := &exec{ - container: c, - cmds: cmds, - tty: tty, - exitCode: exitCode255, - exitIOch: make(chan struct{}), - exitCh: make(chan uint32, 1), - status: task.StatusCreated, + container: c, + cmds: cmds, + tty: tty, + exitCode: exitCode255, + exitIOch: make(chan struct{}), + stdinCloser: make(chan struct{}), + exitCh: make(chan uint32, 1), + status: task.StatusCreated, } return exec, nil diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 2ca40558ff..14291a8add 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -725,19 +725,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt return nil, err } - tty := c.ttyio + stdin := c.stdinPipe + stdinCloser := c.stdinCloser + if r.ExecID != "" { execs, err := c.getExec(r.ExecID) if err != nil { return nil, err } - tty = execs.ttyio + stdin = execs.stdinPipe + stdinCloser = execs.stdinCloser } - if tty != nil && tty.Stdin != nil { - if err := tty.Stdin.Close(); err != nil { - return nil, errors.Wrap(err, "close stdin") - } + // wait until the stdin io copy terminated, otherwise + // some contents would not be forwarded to the process. + <-stdinCloser + if err := stdin.Close(); err != nil { + return nil, errors.Wrap(err, "close stdin") } return empty, nil diff --git a/containerd-shim-v2/start.go b/containerd-shim-v2/start.go index eddb15943d..1595ef357f 100644 --- a/containerd-shim-v2/start.go +++ b/containerd-shim-v2/start.go @@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error { return err } + c.stdinPipe = stdin + if c.stdin != "" || c.stdout != "" || c.stderr != "" { tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) if err != nil { return err } c.ttyio = tty - go ioCopy(c.exitIOch, tty, stdin, stdout, stderr) + go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr) } else { //close the io exit channel, since there is no io for this container, //otherwise the following wait goroutine will hang on this channel. close(c.exitIOch) + //close the stdin closer channel to notify that it's safe to close process's + // io. + close(c.stdinCloser) } go wait(s, c, "") @@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex if err != nil { return nil, err } + + execs.stdinPipe = stdin + tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) if err != nil { return nil, err } execs.ttyio = tty - go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr) + go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr) go wait(s, c, execID) diff --git a/containerd-shim-v2/stream.go b/containerd-shim-v2/stream.go index a16cbba0c4..5f3f3c2a91 100644 --- a/containerd-shim-v2/stream.go +++ b/containerd-shim-v2/stream.go @@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) ( return ttyIO, nil } -func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { +func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { var wg sync.WaitGroup var closeOnce sync.Once @@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(stdinPipe, tty.Stdin, *p) + // notify that we can close process's io safely. + close(stdinCloser) wg.Done() }() }