Skip to content

Commit

Permalink
api: add sandbox Monitor API
Browse files Browse the repository at this point in the history
It monitors the sandbox status and returns an error channel to let
caller watch it.

Fixes: kata-containers#251

Signed-off-by: Peng Tao <[email protected]>
  • Loading branch information
bergwolf committed Apr 26, 2018
1 parent 45e3f85 commit 9f4c4d6
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 0 deletions.
3 changes: 3 additions & 0 deletions virtcontainers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type agent interface {
// supported by the agent.
capabilities() capabilities

// check will check the agent liveness
check() error

// disconnect will disconnect the connection to the agent
disconnect() error

Expand Down
5 changes: 5 additions & 0 deletions virtcontainers/hyperstart_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,3 +801,8 @@ func (h *hyper) onlineCPUMem(cpus uint32) error {
// cc-agent uses udev to online CPUs automatically
return nil
}

func (h *hyper) check() error {
// cc-agent does not support check
return nil
}
1 change: 1 addition & 0 deletions virtcontainers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type VCSandbox interface {
Pause() error
Resume() error
Release() error
Monitor() (chan error, error)
Delete() error
Status() SandboxStatus
CreateContainer(contConfig ContainerConfig) (VCContainer, error)
Expand Down
11 changes: 11 additions & 0 deletions virtcontainers/kata_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"syscall"
"time"

kataclient "github.com/kata-containers/agent/protocols/client"
"github.com/kata-containers/agent/protocols/grpc"
Expand Down Expand Up @@ -934,6 +935,11 @@ func (k *kataAgent) disconnect() error {
return nil
}

func (k *kataAgent) check() error {
_, err := k.sendReq(&grpc.CheckRequest{})
return err
}

func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
if err := k.connect(); err != nil {
return nil, err
Expand All @@ -943,6 +949,11 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
}

switch req := request.(type) {
case *grpc.CheckRequest:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := k.client.Check(ctx, req)
return nil, err
case *grpc.ExecProcessRequest:
_, err := k.client.ExecProcess(context.Background(), req)
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions virtcontainers/kata_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,20 @@ func (p *gRPCProxy) OnlineCPUMem(ctx context.Context, req *pb.OnlineCPUMemReques
return emptyResp, nil
}

func (p *gRPCProxy) Check(ctx context.Context, req *pb.CheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{}, nil
}

func (p *gRPCProxy) Version(ctx context.Context, req *pb.CheckRequest) (*pb.VersionCheckResponse, error) {
return &pb.VersionCheckResponse{}, nil

}

func gRPCRegister(s *grpc.Server, srv interface{}) {
switch g := srv.(type) {
case *gRPCProxy:
pb.RegisterAgentServiceServer(s, g)
pb.RegisterHealthServer(s, g)
}
}

Expand All @@ -206,6 +216,7 @@ var reqList = []interface{}{
&pb.StartContainerRequest{},
&pb.RemoveContainerRequest{},
&pb.SignalProcessRequest{},
&pb.CheckRequest{},
}

func TestKataAgentSendReq(t *testing.T) {
Expand Down
120 changes: 120 additions & 0 deletions virtcontainers/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"sync"
"time"
)

const defaultCheckInterval = 10 * time.Second

type monitor struct {
sync.Mutex

sandbox *Sandbox
checkInterval time.Duration
watchers []chan error
running bool
stopCh chan bool
wg sync.WaitGroup
}

func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}

func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()

watcher := make(chan error, 1)
m.watchers = append(m.watchers, watcher)

if !m.running {
m.running = true
m.wg.Add(1)

// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchAgent()
}
}
}()
}

return watcher, nil
}

func (m *monitor) notify(err error) {
m.Lock()
defer m.Unlock()

if !m.running {
return
}

// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()

for _, c := range m.watchers {
c <- err
}
}

func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()

m.Lock()
defer m.Unlock()

if !m.running {
return
}

defer func() {
m.stopCh <- true
m.watchers = nil
m.running = false
}()

// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()

for _, c := range m.watchers {
close(c)
}
}

func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
m.notify(err)
}
}
62 changes: 62 additions & 0 deletions virtcontainers/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMonitorSuccess(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)

// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

m := newMonitor(s)

ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)

fakeErr := errors.New("foobar error")
m.notify(fakeErr)
resultErr := <-ch
assert.True(t, resultErr == fakeErr, "monitor notification mismatch %v vs. %v", resultErr, fakeErr)

m.stop()
}

func TestMonitorClosedChannel(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)

// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

m := newMonitor(s)

ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)

close(ch)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)

m.stop()
}
5 changes: 5 additions & 0 deletions virtcontainers/noop_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (n *noopAgent) processListContainer(sandbox Sandbox, c Container, options P
func (n *noopAgent) onlineCPUMem(cpus uint32) error {
return nil
}

// check is the Noop agent health checker. It does nothing.
func (n *noopAgent) check() error {
return nil
}
5 changes: 5 additions & 0 deletions virtcontainers/pkg/vcmock/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,8 @@ func (p *Sandbox) Status() vc.SandboxStatus {
func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) {
return &Container{}, &vc.Process{}, nil
}

// Monitor implements the VCSandbox function of the same name.
func (p *Sandbox) Monitor() (chan error, error) {
return nil, nil
}
26 changes: 26 additions & 0 deletions virtcontainers/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,12 @@ func unlockSandbox(lockFile *os.File) error {
type Sandbox struct {
id string

sync.Mutex
hypervisor hypervisor
agent agent
storage resourceStorage
network network
monitor *monitor

config *SandboxConfig

Expand Down Expand Up @@ -532,6 +534,9 @@ func (s *Sandbox) GetContainer(containerID string) VCContainer {
// Release closes the agent connection and removes sandbox from internal list.
func (s *Sandbox) Release() error {
globalSandboxList.removeSandbox(s.id)
if s.monitor != nil {
s.monitor.stop()
}
return s.agent.disconnect()
}

Expand Down Expand Up @@ -561,6 +566,23 @@ func (s *Sandbox) Status() SandboxStatus {
}
}

// Monitor returns a error channel for watcher to watch at
func (s *Sandbox) Monitor() (chan error, error) {
if s.state.State != StateRunning {
return nil, fmt.Errorf("Sandbox is not running")
}

if s.monitor == nil {
s.Lock()
if s.monitor == nil {
s.monitor = newMonitor(s)
}
s.Unlock()
}

return s.monitor.newWatcher()
}

func createAssets(sandboxConfig *SandboxConfig) error {
kernel, err := newAsset(sandboxConfig, kernelAsset)
if err != nil {
Expand Down Expand Up @@ -808,6 +830,10 @@ func (s *Sandbox) Delete() error {

globalSandboxList.removeSandbox(s.id)

if s.monitor != nil {
s.monitor.stop()
}

return s.storage.deleteSandboxResources(s.id, nil)
}

Expand Down
20 changes: 20 additions & 0 deletions virtcontainers/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,3 +1388,23 @@ func TestEnterContainer(t *testing.T) {
_, _, err = s.EnterContainer(contID, cmd)
assert.Nil(t, err, "Enter container failed: %v", err)
}

func TestMonitor(t *testing.T) {
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
defer cleanUp()

_, err = s.Monitor()
assert.NotNil(t, err, "Monitoring non-running container should fail")

err = s.start()
assert.Nil(t, err, "Failed to start sandbox: %v", err)

_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox failed: %v", err)

_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox again failed: %v", err)

s.monitor.stop()
}

0 comments on commit 9f4c4d6

Please sign in to comment.