From 3a0cd87dae9669447be189846c1f821874113712 Mon Sep 17 00:00:00 2001 From: "fupan.lfp" Date: Tue, 21 Jul 2020 20:48:47 +0800 Subject: [PATCH] shimv2: fix the issue of close IO stream It should wait until the stdin io copy termianted to close the process's io stream, otherwise, it would miss forwarding some contents to process stdin. Fixes: #2884 Signed-off-by: fupan.lfp --- containerd-shim-v2/container.go | 66 +++++++++++++++++---------------- containerd-shim-v2/exec.go | 19 ++++++---- containerd-shim-v2/service.go | 16 +++++--- containerd-shim-v2/start.go | 12 +++++- containerd-shim-v2/stream.go | 4 +- 5 files changed, 70 insertions(+), 47 deletions(-) diff --git a/containerd-shim-v2/container.go b/containerd-shim-v2/container.go index 6b5e994824..cc57487611 100644 --- a/containerd-shim-v2/container.go +++ b/containerd-shim-v2/container.go @@ -6,6 +6,7 @@ package containerdshim import ( + "io" "time" "github.com/containerd/containerd/api/types/task" @@ -17,23 +18,25 @@ import ( ) type container struct { - s *service - ttyio *ttyIO - spec *specs.Spec - exitTime time.Time - execs map[string]*exec - exitIOch chan struct{} - exitCh chan uint32 - id string - stdin string - stdout string - stderr string - bundle string - cType vc.ContainerType - exit uint32 - status task.Status - terminal bool - mounted bool + s *service + ttyio *ttyIO + spec *specs.Spec + exitTime time.Time + execs map[string]*exec + exitIOch chan struct{} + stdinPipe io.WriteCloser + stdinCloser chan struct{} + exitCh chan uint32 + id string + stdin string + stdout string + stderr string + bundle string + cType vc.ContainerType + exit uint32 + status task.Status + terminal bool + mounted bool } func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) { @@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con } c := &container{ - s: s, - spec: spec, - id: r.ID, - bundle: r.Bundle, - stdin: r.Stdin, - stdout: r.Stdout, - stderr: r.Stderr, - terminal: r.Terminal, - cType: containerType, - execs: make(map[string]*exec), - status: task.StatusCreated, - exitIOch: make(chan struct{}), - exitCh: make(chan uint32, 1), - mounted: mounted, + s: s, + spec: spec, + id: r.ID, + bundle: r.Bundle, + stdin: r.Stdin, + stdout: r.Stdout, + stderr: r.Stderr, + terminal: r.Terminal, + cType: containerType, + execs: make(map[string]*exec), + status: task.StatusCreated, + exitIOch: make(chan struct{}), + exitCh: make(chan uint32, 1), + stdinCloser: make(chan struct{}), + mounted: mounted, } return c, nil } diff --git a/containerd-shim-v2/exec.go b/containerd-shim-v2/exec.go index fcbb95e885..25718208a7 100644 --- a/containerd-shim-v2/exec.go +++ b/containerd-shim-v2/exec.go @@ -7,6 +7,7 @@ package containerdshim import ( "fmt" + "io" "strings" "time" @@ -32,6 +33,9 @@ type exec struct { exitIOch chan struct{} exitCh chan uint32 + stdinCloser chan struct{} + stdinPipe io.WriteCloser + exitTime time.Time } @@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g } exec := &exec{ - container: c, - cmds: cmds, - tty: tty, - exitCode: exitCode255, - exitIOch: make(chan struct{}), - exitCh: make(chan uint32, 1), - status: task.StatusCreated, + container: c, + cmds: cmds, + tty: tty, + exitCode: exitCode255, + exitIOch: make(chan struct{}), + stdinCloser: make(chan struct{}), + exitCh: make(chan uint32, 1), + status: task.StatusCreated, } return exec, nil diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 2ca40558ff..14291a8add 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -725,19 +725,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt return nil, err } - tty := c.ttyio + stdin := c.stdinPipe + stdinCloser := c.stdinCloser + if r.ExecID != "" { execs, err := c.getExec(r.ExecID) if err != nil { return nil, err } - tty = execs.ttyio + stdin = execs.stdinPipe + stdinCloser = execs.stdinCloser } - if tty != nil && tty.Stdin != nil { - if err := tty.Stdin.Close(); err != nil { - return nil, errors.Wrap(err, "close stdin") - } + // wait until the stdin io copy terminated, otherwise + // some contents would not be forwarded to the process. + <-stdinCloser + if err := stdin.Close(); err != nil { + return nil, errors.Wrap(err, "close stdin") } return empty, nil diff --git a/containerd-shim-v2/start.go b/containerd-shim-v2/start.go index eddb15943d..1595ef357f 100644 --- a/containerd-shim-v2/start.go +++ b/containerd-shim-v2/start.go @@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error { return err } + c.stdinPipe = stdin + if c.stdin != "" || c.stdout != "" || c.stderr != "" { tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) if err != nil { return err } c.ttyio = tty - go ioCopy(c.exitIOch, tty, stdin, stdout, stderr) + go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr) } else { //close the io exit channel, since there is no io for this container, //otherwise the following wait goroutine will hang on this channel. close(c.exitIOch) + //close the stdin closer channel to notify that it's safe to close process's + // io. + close(c.stdinCloser) } go wait(s, c, "") @@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex if err != nil { return nil, err } + + execs.stdinPipe = stdin + tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) if err != nil { return nil, err } execs.ttyio = tty - go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr) + go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr) go wait(s, c, execID) diff --git a/containerd-shim-v2/stream.go b/containerd-shim-v2/stream.go index a16cbba0c4..5f3f3c2a91 100644 --- a/containerd-shim-v2/stream.go +++ b/containerd-shim-v2/stream.go @@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) ( return ttyIO, nil } -func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { +func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { var wg sync.WaitGroup var closeOnce sync.Once @@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(stdinPipe, tty.Stdin, *p) + // notify that we can close process's io safely. + close(stdinCloser) wg.Done() }() }