Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #252 from bergwolf/sandbox_api_1
Browse files Browse the repository at this point in the history
API: support sandbox monitor operation
  • Loading branch information
Sebastien Boeuf authored May 1, 2018
2 parents 70b3c77 + 9d1311d commit 87aa1d7
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 34 deletions.
3 changes: 3 additions & 0 deletions virtcontainers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type agent interface {
// supported by the agent.
capabilities() capabilities

// check will check the agent liveness
check() error

// disconnect will disconnect the connection to the agent
disconnect() error

Expand Down
5 changes: 5 additions & 0 deletions virtcontainers/hyperstart_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,3 +801,8 @@ func (h *hyper) onlineCPUMem(cpus uint32) error {
// cc-agent uses udev to online CPUs automatically
return nil
}

func (h *hyper) check() error {
// cc-agent does not support check
return nil
}
1 change: 1 addition & 0 deletions virtcontainers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type VCSandbox interface {
Pause() error
Resume() error
Release() error
Monitor() (chan error, error)
Delete() error
Status() SandboxStatus
CreateContainer(contConfig ContainerConfig) (VCContainer, error)
Expand Down
95 changes: 61 additions & 34 deletions virtcontainers/kata_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"strconv"
"strings"
"syscall"
"time"

"github.com/gogo/protobuf/proto"
kataclient "github.com/kata-containers/agent/protocols/client"
"github.com/kata-containers/agent/protocols/grpc"
vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
Expand Down Expand Up @@ -73,6 +75,7 @@ type kataAgent struct {
shim shim
proxy proxy
client *kataclient.AgentClient
reqHandlers map[string]reqFunc
state KataAgentState
keepConn bool
proxyBuiltIn bool
Expand Down Expand Up @@ -916,6 +919,7 @@ func (k *kataAgent) connect() error {
return err
}

k.installReqFunc(client)
k.client = client

return nil
Expand All @@ -929,11 +933,62 @@ func (k *kataAgent) disconnect() error {
if err := k.client.Close(); err != nil && err != golangGrpc.ErrClientConnClosing {
return err
}

k.client = nil
k.reqHandlers = nil

return nil
}

func (k *kataAgent) check() error {
_, err := k.sendReq(&grpc.CheckRequest{})
return err
}

type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error)

func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers = make(map[string]reqFunc)
k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...)
}
k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...)
}
k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...)
}
k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...)
}
k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...)
}
k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...)
}
k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...)
}
k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...)
}
k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...)
}
k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...)
}
k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...)
}
k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
}
}

func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
if err := k.connect(); err != nil {
return nil, err
Expand All @@ -942,39 +997,11 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
defer k.disconnect()
}

switch req := request.(type) {
case *grpc.ExecProcessRequest:
_, err := k.client.ExecProcess(context.Background(), req)
return nil, err
case *grpc.CreateSandboxRequest:
_, err := k.client.CreateSandbox(context.Background(), req)
return nil, err
case *grpc.DestroySandboxRequest:
_, err := k.client.DestroySandbox(context.Background(), req)
return nil, err
case *grpc.CreateContainerRequest:
_, err := k.client.CreateContainer(context.Background(), req)
return nil, err
case *grpc.StartContainerRequest:
_, err := k.client.StartContainer(context.Background(), req)
return nil, err
case *grpc.RemoveContainerRequest:
_, err := k.client.RemoveContainer(context.Background(), req)
return nil, err
case *grpc.SignalProcessRequest:
_, err := k.client.SignalProcess(context.Background(), req)
return nil, err
case *grpc.UpdateRoutesRequest:
_, err := k.client.UpdateRoutes(context.Background(), req)
return nil, err
case *grpc.UpdateInterfaceRequest:
ifc, err := k.client.UpdateInterface(context.Background(), req)
return ifc, err
case *grpc.OnlineCPUMemRequest:
return k.client.OnlineCPUMem(context.Background(), req)
case *grpc.ListProcessesRequest:
return k.client.ListProcesses(context.Background(), req)
default:
return nil, fmt.Errorf("Unknown gRPC type %T", req)
msgName := proto.MessageName(request.(proto.Message))
handler := k.reqHandlers[msgName]
if msgName == "" || handler == nil {
return nil, fmt.Errorf("Invalid request type")
}

return handler(context.Background(), request)
}
11 changes: 11 additions & 0 deletions virtcontainers/kata_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,20 @@ func (p *gRPCProxy) OnlineCPUMem(ctx context.Context, req *pb.OnlineCPUMemReques
return emptyResp, nil
}

func (p *gRPCProxy) Check(ctx context.Context, req *pb.CheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{}, nil
}

func (p *gRPCProxy) Version(ctx context.Context, req *pb.CheckRequest) (*pb.VersionCheckResponse, error) {
return &pb.VersionCheckResponse{}, nil

}

func gRPCRegister(s *grpc.Server, srv interface{}) {
switch g := srv.(type) {
case *gRPCProxy:
pb.RegisterAgentServiceServer(s, g)
pb.RegisterHealthServer(s, g)
}
}

Expand All @@ -206,6 +216,7 @@ var reqList = []interface{}{
&pb.StartContainerRequest{},
&pb.RemoveContainerRequest{},
&pb.SignalProcessRequest{},
&pb.CheckRequest{},
}

func TestKataAgentSendReq(t *testing.T) {
Expand Down
120 changes: 120 additions & 0 deletions virtcontainers/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"sync"
"time"
)

const defaultCheckInterval = 10 * time.Second

type monitor struct {
sync.Mutex

sandbox *Sandbox
checkInterval time.Duration
watchers []chan error
running bool
stopCh chan bool
wg sync.WaitGroup
}

func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}

func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()

watcher := make(chan error, 1)
m.watchers = append(m.watchers, watcher)

if !m.running {
m.running = true
m.wg.Add(1)

// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchAgent()
}
}
}()
}

return watcher, nil
}

func (m *monitor) notify(err error) {
m.Lock()
defer m.Unlock()

if !m.running {
return
}

// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()

for _, c := range m.watchers {
c <- err
}
}

func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()

m.Lock()
defer m.Unlock()

if !m.running {
return
}

defer func() {
m.stopCh <- true
m.watchers = nil
m.running = false
}()

// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()

for _, c := range m.watchers {
close(c)
}
}

func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
m.notify(err)
}
}
62 changes: 62 additions & 0 deletions virtcontainers/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMonitorSuccess(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)

// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

m := newMonitor(s)

ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)

fakeErr := errors.New("foobar error")
m.notify(fakeErr)
resultErr := <-ch
assert.True(t, resultErr == fakeErr, "monitor notification mismatch %v vs. %v", resultErr, fakeErr)

m.stop()
}

func TestMonitorClosedChannel(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)

// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

m := newMonitor(s)

ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)

close(ch)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)

m.stop()
}
5 changes: 5 additions & 0 deletions virtcontainers/noop_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (n *noopAgent) processListContainer(sandbox *Sandbox, c Container, options
func (n *noopAgent) onlineCPUMem(cpus uint32) error {
return nil
}

// check is the Noop agent health checker. It does nothing.
func (n *noopAgent) check() error {
return nil
}
5 changes: 5 additions & 0 deletions virtcontainers/pkg/vcmock/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,8 @@ func (p *Sandbox) Status() vc.SandboxStatus {
func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) {
return &Container{}, &vc.Process{}, nil
}

// Monitor implements the VCSandbox function of the same name.
func (p *Sandbox) Monitor() (chan error, error) {
return nil, nil
}
Loading

0 comments on commit 87aa1d7

Please sign in to comment.