Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
agent: fix the issue of missing close process terminal
Browse files Browse the repository at this point in the history
Kata assumes once a process exits, it's IO will also
close automatically, thus the client will wait on the
process's IO closed and then do some cleanup for this
process. But the issue is that if the process forked
some background children processes running as daemon
process, thus those children process will intherit its
parent terminal, and even its parent exited, its terminal
will not close as the children running.

So this commit will try to fix this issue by introducing
a pipe, and let the IO read epolling on one end of this pipe
and the process's pty master. Once the process exits, it
will close the other end of the pipe to notify the IO read
that the process has exited and and there's no need to wait
on its IO.

Fixes: #370

Signed-off-by: fupan <[email protected]>
  • Loading branch information
lifupan committed Sep 19, 2018
1 parent aa81883 commit fc907f6
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 1 deletion.
17 changes: 16 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type process struct {
stderr *os.File
consoleSock *os.File
termMaster *os.File
epoller *epoller
exitCodeCh chan int
sync.Once
stdinClosed bool
Expand Down Expand Up @@ -175,6 +176,10 @@ func (p *process) closePostExitFDs() {
if p.stderr != nil {
p.stderr.Close()
}

if p.epoller != nil {
p.epoller.sockR.Close()
}
}

func (c *container) setProcess(process *process) {
Expand Down Expand Up @@ -368,7 +373,17 @@ func (s *sandbox) readStdio(cid, execID string, length int, stdout bool) ([]byte

var file *os.File
if proc.termMaster != nil {
file = proc.termMaster
// The process's epoller's run() will return a file descriptor of the process's
// terminal or one end of its exited pipe. If it returns its terminal, it means
// there is data needed to be read out or it has been closed; if it returns the
// process's exited pipe, it means the process has exited and there is no data
// needed to be read out in its terminal, thus following read on it will read out
// "EOF" to terminate this process's io since the other end of this pipe has been
// closed in reap().
file, err = proc.epoller.run()
if err != nil {
return nil, err
}
} else {
if stdout {
file = proc.stdout
Expand Down
113 changes: 113 additions & 0 deletions epoll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package main

import (
"os"

"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
)

const maxEvents = 2

type epoller struct {
fd int
// sockR and sockW are a pipe's files two ends, this pipe is
// used to sync between the readStdio and the process exits.
// once the process exits, it will close one end to notify
// the readStdio that the process has exited and it should not
// wait on the process's terminal which has been inherited
// by it's children and hasn't exited.
sockR *os.File
sockW *os.File
sockMap map[int32]*os.File
}

func newEpoller() (*epoller, error) {
epollerFd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, err
}

rSock, wSock, err := os.Pipe()
if err != nil {
return nil, err
}

ep := &epoller{
fd: epollerFd,
sockW: wSock,
sockR: rSock,
sockMap: make(map[int32]*os.File),
}

if err = ep.add(rSock); err != nil {
return nil, err
}

return ep, nil
}

func (ep *epoller) add(f *os.File) error {
// add creates an epoll which is used to monitor the process's pty's master and
// one end of its exit notify pipe. Those files will be registered with level-triggered
// notification.

event := unix.EpollEvent{
Fd: int32(f.Fd()),
Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR | unix.EPOLLRDHUP,
}
ep.sockMap[int32(f.Fd())] = f
return unix.EpollCtl(ep.fd, unix.EPOLL_CTL_ADD, int(f.Fd()), &event)
}

// There will be three cases on the epoller once it run:
// a: only pty's master get an event;
// b: only the pipe get an event;
// c: both of pty and pipe have event occur;
// for case a, it means there is output in process's terminal and what needed to do is
// just read the terminal and send them out; for case b, it means the process has exited
// and there is no data in the terminal, thus just return the "EOF" to end the io;
// for case c, it means the process has exited but there is some data in the terminal which
// hasn't been send out, thus it should send those data out first and then send "EOF" last to
// end the io.
func (ep *epoller) run() (*os.File, error) {
fd := int32(ep.sockR.Fd())
events := make([]unix.EpollEvent, maxEvents)
for {
n, err := unix.EpollWait(ep.fd, events, -1)
if err != nil {
// EINTR: The call was interrupted by a signal handler before either
// any of the requested events occurred or the timeout expired
if err == unix.EINTR {
continue
}
return nil, err
}

for i := 0; i < n; i++ {
ev := &events[i]
// fd has been assigned with one end of process's exited pipe by default, and
// here to check is there any event occur on process's terminal, if "yes", it
// should be dealt first, otherwise, it means the process has exited and there
// is nothing left in the process's terminal needed to be read.
if ev.Fd != fd {
fd = ev.Fd
break
}
}
break
}

mf, exist := ep.sockMap[fd]
if !exist {
return nil, grpcStatus.Errorf(codes.NotFound, "File %d not found", fd)
}

return mf, nil
}
79 changes: 79 additions & 0 deletions epoll_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package main

import (
"os"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/sys/unix"
)

func TestNewEpoller(t *testing.T) {
assert := assert.New(t)

epoller, err := newEpoller()
assert.NoError(err)

closeEpoller(epoller)

}

func closeEpoller(ep *epoller) {
ep.sockW.Close()
ep.sockR.Close()
unix.Close(ep.fd)
}

func TestAddEpoller(t *testing.T) {
assert := assert.New(t)

epoller, _ := newEpoller()
assert.NotNil(epoller)
defer closeEpoller(epoller)

rSock, wSock, err := os.Pipe()
assert.NoError(err)
defer rSock.Close()
defer wSock.Close()

err = epoller.add(rSock)

assert.NoError(err)
}

func TestRunEpoller(t *testing.T) {
assert := assert.New(t)
wg := sync.WaitGroup{}

epoller, _ := newEpoller()
assert.NotNil(epoller)
defer closeEpoller(epoller)

content := []byte("temporary file's content")
rSock, wSock, err := os.Pipe()
assert.NoError(err)
defer rSock.Close()
defer wSock.Close()

err = epoller.add(rSock)
assert.NoError(err)

wg.Add(1)
go func() {
wg.Done()
wSock.Write(content)
}()

wg.Wait()
f, err := epoller.run()
assert.NoError(err)

assert.Equal(f.Fd(), rSock.Fd())
closeEpoller(epoller)
}
18 changes: 18 additions & 0 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ func buildProcess(agentProcess *pb.Process, procID string) (*process, error) {
proc.process.ConsoleSocket = childSock
proc.consoleSock = parentSock

epoller, err := newEpoller()
if err != nil {
return nil, err
}

proc.epoller = epoller

return proc, nil
}

Expand Down Expand Up @@ -419,6 +426,17 @@ func (a *agentGRPC) postExecProcess(ctr *container, proc *process) error {
}

proc.termMaster = termMaster

// Get process PID
pid, err := proc.process.Pid()
if err != nil {
return err
}
a.sandbox.subreaper.setEpoller(pid, proc.epoller)

if err = proc.epoller.add(proc.termMaster); err != nil {
return err
}
}

ctr.setProcess(proc)
Expand Down
10 changes: 10 additions & 0 deletions mockreaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ type mockreaper struct {
func (r *mockreaper) init() {
}

func (r *mockreaper) getEpoller(pid int) (*epoller, error) {
return nil, nil
}

func (r *mockreaper) setEpoller(pid int, epoller *epoller) {
}

func (r *mockreaper) deleteEpoller(pid int) {
}

func (r *mockreaper) getExitCodeCh(pid int) (chan<- int, error) {
return nil, nil
}
Expand Down
39 changes: 39 additions & 0 deletions reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type reaper interface {
getExitCodeCh(pid int) (chan<- int, error)
setExitCodeCh(pid int, exitCodeCh chan<- int)
deleteExitCodeCh(pid int)
getEpoller(pid int) (*epoller, error)
setEpoller(pid int, epoller *epoller)
deleteEpoller(pid int)
reap() error
start(c *exec.Cmd) (<-chan int, error)
wait(exitCodeCh <-chan int, proc waitProcess) (int, error)
Expand All @@ -40,6 +43,7 @@ type agentReaper struct {

chansLock sync.RWMutex
exitCodeChans map[int]chan<- int
epoller map[int]*epoller
}

func exitStatus(status unix.WaitStatus) int {
Expand All @@ -52,6 +56,7 @@ func exitStatus(status unix.WaitStatus) int {

func (r *agentReaper) init() {
r.exitCodeChans = make(map[int]chan<- int)
r.epoller = make(map[int]*epoller)
}

func (r *agentReaper) lock() {
Expand All @@ -62,6 +67,32 @@ func (r *agentReaper) unlock() {
r.RUnlock()
}

func (r *agentReaper) getEpoller(pid int) (*epoller, error) {
r.chansLock.RLock()
defer r.chansLock.RUnlock()

epoller, exist := r.epoller[pid]
if !exist {
return nil, fmt.Errorf("epoller doesn't exist for process %d", pid)
}

return epoller, nil
}

func (r *agentReaper) setEpoller(pid int, ep *epoller) {
r.chansLock.Lock()
defer r.chansLock.Unlock()

r.epoller[pid] = ep
}

func (r *agentReaper) deleteEpoller(pid int) {
r.chansLock.Lock()
defer r.chansLock.Unlock()

delete(r.epoller, pid)
}

func (r *agentReaper) getExitCodeCh(pid int) (chan<- int, error) {
r.chansLock.RLock()
defer r.chansLock.RUnlock()
Expand Down Expand Up @@ -140,6 +171,14 @@ func (r *agentReaper) reap() error {
// of the process and return the exit code to the
// caller of WaitProcess().
exitCodeCh <- status

epoller, err := r.getEpoller(pid)
if err == nil {
//close the socket file to notify readStdio to close terminal specifically
//in case this process's terminal has been inherited by its children.
epoller.sockW.Close()
}
r.deleteEpoller(pid)
}
}

Expand Down

0 comments on commit fc907f6

Please sign in to comment.