Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
shimv2: Send task events to containerd/cri
Browse files Browse the repository at this point in the history
The Runtime v2 supports an async event model. In order for the an upstream
caller (such as Docker) to get these events in the correct order a Runtime
v2 shim MUST implement some events.

For much more info, please see:
https://github.com/containerd/containerd/blob/master/runtime/v2/README.md#events

Fixes:#1204

Signed-off-by: fupan <[email protected]>
  • Loading branch information
lifupan committed Feb 11, 2019
1 parent 29dae85 commit 96e524d
Showing 1 changed file with 101 additions and 36 deletions.
137 changes: 101 additions & 36 deletions containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ type exit struct {

// service is the shim implementation of a remote shim over GRPC
type service struct {
sync.Mutex
mu sync.Mutex
eventSendMu sync.Mutex

// pid Since this shimv2 cannot get the container processes pid from VM,
// thus for the returned values needed pid, just return this shim's
Expand Down Expand Up @@ -211,6 +212,21 @@ func (s *service) forward(publisher events.Publisher) {
}
}

func (s *service) send(evt interface{}) {
// for unit test, it will not initialize s.events
if s.events != nil {
s.events <- evt
}
}

func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
if s.events != nil {
s.events <- evt
}
s.eventSendMu.Unlock()
}

func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
case *eventstypes.TaskCreate:
Expand Down Expand Up @@ -291,8 +307,8 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)

// Create a new sandbox or container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

//the network namespace created by cni plugin
netns, err := namespaces.NamespaceRequired(ctx)
Expand Down Expand Up @@ -328,33 +344,60 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *

s.containers[r.ID] = container

s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: s.pid,
})

return &taskAPI.CreateTaskResponse{
Pid: s.pid,
}, nil
}

// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}

// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
defer s.eventSendMu.Unlock()

//start a container
if r.ExecID == "" {
err = startContainer(ctx, s, c)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskStart{
ContainerID: c.id,
Pid: s.pid,
})
} else {
//start an exec
_, err = startExec(ctx, s, r.ID, r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecStarted{
ContainerID: c.id,
ExecID: r.ExecID,
Pid: s.pid,
})
}

return &taskAPI.StartResponse{
Expand All @@ -364,8 +407,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.

// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand Down Expand Up @@ -394,6 +437,13 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
}

s.send(&eventstypes.TaskDelete{
ContainerID: s.id,
Pid: s.pid,
ExitStatus: c.exit,
ExitedAt: c.time,
})

return &taskAPI.DeleteResponse{
ExitStatus: c.exit,
ExitedAt: c.time,
Expand All @@ -417,8 +467,8 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP

// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand All @@ -436,13 +486,18 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty

c.execs[r.ExecID] = execs

s.send(&eventstypes.TaskExecAdded{
ContainerID: c.id,
ExecID: r.ExecID,
})

return empty, nil
}

// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand Down Expand Up @@ -471,8 +526,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*

// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand Down Expand Up @@ -515,8 +570,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.

// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand All @@ -536,13 +591,17 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
c.status = task.StatusUnknown
}

s.send(&eventstypes.TaskPaused{
c.id,
})

return empty, err
}

// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand All @@ -560,13 +619,17 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
c.status = task.StatusUnknown
}

s.send(&eventstypes.TaskResumed{
c.id,
})

return empty, err
}

// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

signum := syscall.Signal(r.Signal)

Expand Down Expand Up @@ -617,8 +680,8 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi

// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand Down Expand Up @@ -650,8 +713,8 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque

// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

return &taskAPI.ConnectResponse{
ShimPid: s.pid,
Expand All @@ -661,12 +724,12 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}

func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.Lock()
s.mu.Lock()
if len(s.containers) != 0 {
s.Unlock()
s.mu.Unlock()
return empty, nil
}
s.Unlock()
s.mu.Unlock()

os.Exit(0)

Expand All @@ -676,8 +739,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
}

func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

c, err := s.getContainer(r.ID)
if err != nil {
Expand All @@ -696,8 +759,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.

// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

var resources *specs.LinuxResources
v, err := typeurl.UnmarshalAny(r.Resources)
Expand All @@ -721,9 +784,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
var ret uint32

s.Lock()
s.mu.Lock()
c, err := s.getContainer(r.ID)
s.Unlock()
s.mu.Unlock()

if err != nil {
return nil, err
Expand Down Expand Up @@ -760,20 +823,22 @@ func (s *service) processExits() {
}

func (s *service) checkProcesses(e exit) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

id := e.execid
if id == "" {
id = e.id
}
s.events <- &eventstypes.TaskExit{

s.sendL(&eventstypes.TaskExit{
ContainerID: e.id,
ID: id,
Pid: e.pid,
ExitStatus: uint32(e.status),
ExitedAt: e.timestamp,
}
})

return
}

Expand Down

0 comments on commit 96e524d

Please sign in to comment.