Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2885 from lifupan/master
Browse files Browse the repository at this point in the history
shimv2: fix the issue of close IO stream (backport from 2.0)
  • Loading branch information
bergwolf authored Aug 19, 2020
2 parents 7827155 + 3a0cd87 commit b9501dc
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 47 deletions.
66 changes: 35 additions & 31 deletions containerd-shim-v2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package containerdshim

import (
"io"
"time"

"github.com/containerd/containerd/api/types/task"
Expand All @@ -17,23 +18,25 @@ import (
)

type container struct {
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
stdinPipe io.WriteCloser
stdinCloser chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
}

func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) {
Expand All @@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
}

c := &container{
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
mounted: mounted,
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
stdinCloser: make(chan struct{}),
mounted: mounted,
}
return c, nil
}
19 changes: 12 additions & 7 deletions containerd-shim-v2/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package containerdshim

import (
"fmt"
"io"
"strings"
"time"

Expand All @@ -32,6 +33,9 @@ type exec struct {
exitIOch chan struct{}
exitCh chan uint32

stdinCloser chan struct{}
stdinPipe io.WriteCloser

exitTime time.Time
}

Expand Down Expand Up @@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
}

exec := &exec{
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
stdinCloser: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
}

return exec, nil
Expand Down
16 changes: 10 additions & 6 deletions containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,19 +725,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
return nil, err
}

tty := c.ttyio
stdin := c.stdinPipe
stdinCloser := c.stdinCloser

if r.ExecID != "" {
execs, err := c.getExec(r.ExecID)
if err != nil {
return nil, err
}
tty = execs.ttyio
stdin = execs.stdinPipe
stdinCloser = execs.stdinCloser
}

if tty != nil && tty.Stdin != nil {
if err := tty.Stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}
// wait until the stdin io copy terminated, otherwise
// some contents would not be forwarded to the process.
<-stdinCloser
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}

return empty, nil
Expand Down
12 changes: 10 additions & 2 deletions containerd-shim-v2/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
return err
}

c.stdinPipe = stdin

if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else {
//close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch)
//close the stdin closer channel to notify that it's safe to close process's
// io.
close(c.stdinCloser)
}

go wait(s, c, "")
Expand Down Expand Up @@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
if err != nil {
return nil, err
}

execs.stdinPipe = stdin

tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil {
return nil, err
}
execs.ttyio = tty

go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)

go wait(s, c, execID)

Expand Down
4 changes: 3 additions & 1 deletion containerd-shim-v2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil
}

func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
var closeOnce sync.Once

Expand All @@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
}()
}
Expand Down

0 comments on commit b9501dc

Please sign in to comment.