Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
containerd-shim-kata-v2: add the exec service support
Browse files Browse the repository at this point in the history
Add the Exec api support for exec an process in
a running container.

Signed-off-by: fupan <[email protected]>
  • Loading branch information
lifupan committed Nov 28, 2018
1 parent 4c5b296 commit 269c940
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 8 deletions.
91 changes: 91 additions & 0 deletions containerd-shim-v2/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
package containerdshim

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
googleProtobuf "github.com/gogo/protobuf/types"
vc "github.com/kata-containers/runtime/virtcontainers"
specs "github.com/opencontainers/runtime-spec/specs-go"
)

type exec struct {
Expand Down Expand Up @@ -37,3 +43,88 @@ type tty struct {
width uint32
terminal bool
}

func getEnvs(envs []string) []vc.EnvVar {
var vcEnvs = []vc.EnvVar{}
var env vc.EnvVar

for _, v := range envs {
pair := strings.SplitN(v, "=", 2)

if len(pair) == 2 {
env = vc.EnvVar{Var: pair[0], Value: pair[1]}
} else if len(pair) == 1 {
env = vc.EnvVar{Var: pair[0], Value: ""}
}

vcEnvs = append(vcEnvs, env)
}

return vcEnvs
}

func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *googleProtobuf.Any) (*exec, error) {
var height uint32
var width uint32

if jspec == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "googleProtobuf.Any points to nil")
}

// process exec request
var spec specs.Process
if err := json.Unmarshal(jspec.Value, &spec); err != nil {
return nil, err
}

if spec.ConsoleSize != nil {
height = uint32(spec.ConsoleSize.Height)
width = uint32(spec.ConsoleSize.Width)
}

tty := &tty{
stdin: stdin,
stdout: stdout,
stderr: stderr,
height: height,
width: width,
terminal: terminal,
}

cmds := &vc.Cmd{
Args: spec.Args,
Envs: getEnvs(spec.Env),
User: fmt.Sprintf("%d", spec.User.UID),
PrimaryGroup: fmt.Sprintf("%d", spec.User.GID),
WorkDir: spec.Cwd,
Interactive: terminal,
Detach: !terminal,
NoNewPrivileges: spec.NoNewPrivileges,
}

exec := &exec{
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
}

return exec, nil
}

func (c *container) getExec(id string) (*exec, error) {
if c.execs == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "exec does not exist %s", id)
}

exec := c.execs[id]

if exec == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "exec does not exist %s", id)
}

return exec, nil
}
39 changes: 31 additions & 8 deletions containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const (
// it to containerd as the containerd event format.
bufferSize = 32

chSize = 128
chSize = 128
exitCode255 = 255
)

var (
Expand Down Expand Up @@ -297,14 +298,17 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if err != nil {
return nil, errdefs.ToGRPC(err)
}

return &taskAPI.StartResponse{
Pid: s.pid,
}, nil
} else {
//start an exec
_, err = startExec(ctx, s, r.ID, r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
}

//start an exec
return nil, errdefs.ErrNotImplemented
return &taskAPI.StartResponse{
Pid: s.pid,
}, nil
}

// Delete the initial process and container
Expand All @@ -314,7 +318,26 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP

// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
return nil, errdefs.ErrNotImplemented
s.Lock()
defer s.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}

if execs := c.execs[r.ExecID]; execs != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}

execs, err := newExec(c, r.Stdin, r.Stdout, r.Stderr, r.Terminal, r.Spec)
if err != nil {
return nil, errdefs.ToGRPC(err)
}

c.execs[r.ExecID] = execs

return empty, nil
}

// ResizePty of a process
Expand Down
44 changes: 44 additions & 0 deletions containerd-shim-v2/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,47 @@ func startContainer(ctx context.Context, s *service, c *container) error {

return nil
}

func startExec(ctx context.Context, s *service, containerID, execID string) (*exec, error) {
//start an exec
c, err := s.getContainer(containerID)
if err != nil {
return nil, err
}

execs, err := c.getExec(execID)
if err != nil {
return nil, err
}

_, proc, err := s.sandbox.EnterContainer(containerID, *execs.cmds)
if err != nil {
err := fmt.Errorf("cannot enter container %s, with err %s", containerID, err)
return nil, err
}
execs.id = proc.Token

execs.status = task.StatusRunning
if execs.tty.height != 0 && execs.tty.width != 0 {
err = s.sandbox.WinsizeProcess(c.id, execs.id, execs.tty.height, execs.tty.width)
if err != nil {
return nil, err
}
}

stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, execs.id)
if err != nil {
return nil, err
}
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil {
return nil, err
}
execs.ttyio = tty

go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)

go wait(s, c, execID)

return execs, nil
}
10 changes: 10 additions & 0 deletions containerd-shim-v2/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ func wait(s *service, c *container, execID string) (int32, error) {
if execID == "" {
//wait until the io closed, then wait the container
<-c.exitIOch
} else {
execs, err = c.getExec(execID)
if err != nil {
return exitCode255, err
}
<-execs.exitIOch
//This wait could be triggered before exec start which
//will get the exec's id, thus this assignment must after
//the exec exit, to make sure it get the exec's id.
processID = execs.id
}

ret, err := s.sandbox.WaitProcess(c.id, processID)
Expand Down

0 comments on commit 269c940

Please sign in to comment.