Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
containerd-shim-kata-v2: add the start service support
Browse files Browse the repository at this point in the history
Add the Start api support of start a pod or
container created before.

Signed-off-by: fupan <[email protected]>
  • Loading branch information
lifupan committed Nov 28, 2018
1 parent 72fd6e0 commit 4c5b296
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
31 changes: 31 additions & 0 deletions containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,27 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *

// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
s.Lock()
defer s.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}

//start a container
if r.ExecID == "" {
err = startContainer(ctx, s, c)
if err != nil {
return nil, errdefs.ToGRPC(err)
}

return &taskAPI.StartResponse{
Pid: s.pid,
}, nil
}

//start an exec
return nil, errdefs.ErrNotImplemented
}

Expand Down Expand Up @@ -382,3 +403,13 @@ func (s *service) checkProcesses(e exit) {
}
return
}

func (s *service) getContainer(id string) (*container, error) {
c := s.containers[id]

if c == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container does not exist %s", id)
}

return c, nil
}
71 changes: 71 additions & 0 deletions containerd-shim-v2/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package containerdshim

import (
"context"
"fmt"

"github.com/containerd/containerd/api/types/task"
"github.com/kata-containers/runtime/pkg/katautils"
)

func startContainer(ctx context.Context, s *service, c *container) error {
//start a container
if c.cType == "" {
err := fmt.Errorf("Bug, the container %s type is empty", c.id)
return err
}

if s.sandbox == nil {
err := fmt.Errorf("Bug, the sandbox hasn't been created for this container %s", c.id)
return err
}

if c.cType.IsSandbox() {
err := s.sandbox.Start()
if err != nil {
return err
}
} else {
_, err := s.sandbox.StartContainer(c.id)
if err != nil {
return err
}
}

// Run post-start OCI hooks.
err := katautils.EnterNetNS(s.sandbox.GetNetNs(), func() error {
return katautils.PostStartHooks(ctx, *c.spec, s.sandbox.ID(), c.bundle)
})
if err != nil {
return err
}

c.status = task.StatusRunning

stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, c.id)
if err != nil {
return err
}

if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
} else {
//close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch)
}

go wait(s, c, "")

return nil
}
111 changes: 111 additions & 0 deletions containerd-shim-v2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,122 @@
package containerdshim

import (
"context"
"io"
"sync"
"syscall"

"github.com/containerd/fifo"
)

// The buffer size used to specify the buffer for IO streams copy
const bufSize = 32 << 10

var (
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, bufSize)
return &buffer
},
}
)

type ttyIO struct {
Stdin io.ReadCloser
Stdout io.Writer
Stderr io.Writer
}

func (tty *ttyIO) close() {

if tty.Stdin != nil {
tty.Stdin.Close()
}
cf := func(w io.Writer) {
if w == nil {
return
}
if c, ok := w.(io.WriteCloser); ok {
c.Close()
}
}
cf(tty.Stdout)
cf(tty.Stderr)
}

func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var in io.ReadCloser
var outw io.Writer
var errw io.Writer
var err error

if stdin != "" {
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
}

if stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
}

if !console && stderr != "" {
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
}

ttyIO := &ttyIO{
Stdin: in,
Stdout: outw,
Stderr: errw,
}

return ttyIO, nil
}

func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
var closeOnce sync.Once

if tty.Stdin != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
wg.Done()
}()
}

if tty.Stdout != nil {
wg.Add(1)

go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
wg.Done()
closeOnce.Do(tty.close)
}()
}

if tty.Stderr != nil && stderrPipe != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
wg.Done()
}()
}

wg.Wait()
closeOnce.Do(tty.close)
close(exitch)
}
11 changes: 11 additions & 0 deletions containerd-shim-v2/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"fmt"
"os"
"time"

cdshim "github.com/containerd/containerd/runtime/v2/shim"
"github.com/kata-containers/runtime/pkg/katautils"
Expand All @@ -18,6 +19,16 @@ import (
"github.com/opencontainers/runtime-spec/specs-go"
)

func cReap(s *service, status int, id, execid string, exitat time.Time) {
s.ec <- exit{
timestamp: exitat,
pid: s.pid,
status: status,
id: id,
execid: execid,
}
}

func validBundle(containerID, bundlePath string) (string, error) {
// container ID MUST be provided.
if containerID == "" {
Expand Down
56 changes: 56 additions & 0 deletions containerd-shim-v2/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package containerdshim

import (
"time"

"github.com/containerd/containerd/api/types/task"
"github.com/sirupsen/logrus"
)

func wait(s *service, c *container, execID string) (int32, error) {
var execs *exec
var err error

processID := c.id

if execID == "" {
//wait until the io closed, then wait the container
<-c.exitIOch
}

ret, err := s.sandbox.WaitProcess(c.id, processID)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"container": c.id,
"pid": processID,
}).Error("Wait for process failed")
}

if execID == "" {
c.exitCh <- uint32(ret)
} else {
execs.exitCh <- uint32(ret)
}

timeStamp := time.Now()
c.mu.Lock()
if execID == "" {
c.status = task.StatusStopped
c.exit = uint32(ret)
c.time = timeStamp
} else {
execs.status = task.StatusStopped
execs.exitCode = ret
execs.exitTime = timeStamp
}
c.mu.Unlock()

go cReap(s, int(ret), c.id, execID, timeStamp)

return ret, nil
}

0 comments on commit 4c5b296

Please sign in to comment.