Skip to content

Commit

Permalink
api: add sandbox Monitor API
Browse files Browse the repository at this point in the history
It monitors the sandbox status and returns an error channel to let
caller watch it.

Fixes: kata-containers#251

Signed-off-by: Peng Tao <[email protected]>
  • Loading branch information
bergwolf committed Apr 25, 2018
1 parent 45e3f85 commit aa80fbe
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 0 deletions.
3 changes: 3 additions & 0 deletions virtcontainers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type agent interface {
// supported by the agent.
capabilities() capabilities

// check will check the agent liveness
check() error

// disconnect will disconnect the connection to the agent
disconnect() error

Expand Down
5 changes: 5 additions & 0 deletions virtcontainers/hyperstart_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,3 +801,8 @@ func (h *hyper) onlineCPUMem(cpus uint32) error {
// cc-agent uses udev to online CPUs automatically
return nil
}

func (h *hyper) check() error {
// cc-agent does not support check
return nil
}
1 change: 1 addition & 0 deletions virtcontainers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type VCSandbox interface {
Pause() error
Resume() error
Release() error
Monitor() (chan error, error)
Delete() error
Status() SandboxStatus
CreateContainer(contConfig ContainerConfig) (VCContainer, error)
Expand Down
11 changes: 11 additions & 0 deletions virtcontainers/kata_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"syscall"
"time"

kataclient "github.com/kata-containers/agent/protocols/client"
"github.com/kata-containers/agent/protocols/grpc"
Expand Down Expand Up @@ -934,6 +935,11 @@ func (k *kataAgent) disconnect() error {
return nil
}

func (k *kataAgent) check() error {
_, err := k.sendReq(&grpc.CheckRequest{})
return err
}

func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
if err := k.connect(); err != nil {
return nil, err
Expand All @@ -943,6 +949,11 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
}

switch req := request.(type) {
case *grpc.CheckRequest:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := k.client.Check(ctx, req)
return nil, err
case *grpc.ExecProcessRequest:
_, err := k.client.ExecProcess(context.Background(), req)
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions virtcontainers/kata_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,20 @@ func (p *gRPCProxy) OnlineCPUMem(ctx context.Context, req *pb.OnlineCPUMemReques
return emptyResp, nil
}

func (p *gRPCProxy) Check(ctx context.Context, req *pb.CheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{}, nil
}

func (p *gRPCProxy) Version(ctx context.Context, req *pb.CheckRequest) (*pb.VersionCheckResponse, error) {
return &pb.VersionCheckResponse{}, nil

}

func gRPCRegister(s *grpc.Server, srv interface{}) {
switch g := srv.(type) {
case *gRPCProxy:
pb.RegisterAgentServiceServer(s, g)
pb.RegisterHealthServer(s, g)
}
}

Expand All @@ -206,6 +216,7 @@ var reqList = []interface{}{
&pb.StartContainerRequest{},
&pb.RemoveContainerRequest{},
&pb.SignalProcessRequest{},
&pb.CheckRequest{},
}

func TestKataAgentSendReq(t *testing.T) {
Expand Down
95 changes: 95 additions & 0 deletions virtcontainers/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"sync"
"time"
)

const defaultCheckInterval = 1 * time.Second

type monitor struct {
sync.Mutex
sandbox *Sandbox

checkInterval time.Duration
watchers []chan error
running bool
}

func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval}
}

func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()

watcher := make(chan error, 1)
m.watchers = append(m.watchers, watcher)

if !m.running {
// create and start agent watcher
go func() {
for {
if !m.running {
break
}
m.watchAgent()
time.Sleep(m.checkInterval)
}
}()
m.running = true
}

return watcher, nil
}

func (m *monitor) notify(err error) {
m.Lock()
defer m.Unlock()

// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()

for _, c := range m.watchers {
c <- err
}
}

func (m *monitor) stop() {
m.Lock()
defer m.Unlock()

// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()

for _, c := range m.watchers {
close(c)
}
m.watchers = nil
m.running = false
}

func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
m.notify(err)
}
}
62 changes: 62 additions & 0 deletions virtcontainers/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMonitorSuccess(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)

// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

m := newMonitor(s)

ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)

fakeErr := errors.New("foobar error")
m.notify(fakeErr)
resultErr := <-ch
assert.True(t, resultErr == fakeErr, "monitor notification mismatch %v vs. %v", resultErr, fakeErr)

m.stop()
}

func TestMonitorClosedChannel(t *testing.T) {
contID := "505"
contConfig := newTestContainerConfigNoop(contID)
hConfig := newHypervisorConfig(nil, nil)

// create a sandbox
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{contConfig}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

m := newMonitor(s)

ch, err := m.newWatcher()
assert.Nil(t, err, "newWatcher failed: %v", err)

close(ch)
fakeErr := errors.New("foobar error")
m.notify(fakeErr)

m.stop()
}
5 changes: 5 additions & 0 deletions virtcontainers/noop_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (n *noopAgent) processListContainer(sandbox Sandbox, c Container, options P
func (n *noopAgent) onlineCPUMem(cpus uint32) error {
return nil
}

// check is the Noop agent health checker. It does nothing.
func (n *noopAgent) check() error {
return nil
}
5 changes: 5 additions & 0 deletions virtcontainers/pkg/vcmock/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,8 @@ func (p *Sandbox) Status() vc.SandboxStatus {
func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer, *vc.Process, error) {
return &Container{}, &vc.Process{}, nil
}

// Monitor implements the VCSandbox function of the same name.
func (p *Sandbox) Monitor() (chan error, error) {
return nil, nil
}
26 changes: 26 additions & 0 deletions virtcontainers/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,12 @@ func unlockSandbox(lockFile *os.File) error {
type Sandbox struct {
id string

sync.Mutex
hypervisor hypervisor
agent agent
storage resourceStorage
network network
monitor *monitor

config *SandboxConfig

Expand Down Expand Up @@ -532,6 +534,9 @@ func (s *Sandbox) GetContainer(containerID string) VCContainer {
// Release closes the agent connection and removes sandbox from internal list.
func (s *Sandbox) Release() error {
globalSandboxList.removeSandbox(s.id)
if s.monitor != nil {
s.monitor.stop()
}
return s.agent.disconnect()
}

Expand Down Expand Up @@ -561,6 +566,23 @@ func (s *Sandbox) Status() SandboxStatus {
}
}

// Monitor returns a error channel for watcher to watch at
func (s *Sandbox) Monitor() (chan error, error) {
if s.state.State != StateRunning {
return nil, fmt.Errorf("Sandbox not running")
}

if s.monitor == nil {
s.Lock()
if s.monitor == nil {
s.monitor = newMonitor(s)
}
s.Unlock()
}

return s.monitor.newWatcher()
}

func createAssets(sandboxConfig *SandboxConfig) error {
kernel, err := newAsset(sandboxConfig, kernelAsset)
if err != nil {
Expand Down Expand Up @@ -808,6 +830,10 @@ func (s *Sandbox) Delete() error {

globalSandboxList.removeSandbox(s.id)

if s.monitor != nil {
s.monitor.stop()
}

return s.storage.deleteSandboxResources(s.id, nil)
}

Expand Down
20 changes: 20 additions & 0 deletions virtcontainers/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,3 +1388,23 @@ func TestEnterContainer(t *testing.T) {
_, _, err = s.EnterContainer(contID, cmd)
assert.Nil(t, err, "Enter container failed: %v", err)
}

func TestMonitor(t *testing.T) {
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
defer cleanUp()

_, err = s.Monitor()
assert.NotNil(t, err, "Monitoring non-running container should fail")

err = s.start()
assert.Nil(t, err, "Failed to start sandbox: %v", err)

_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox failed: %v", err)

_, err = s.Monitor()
assert.Nil(t, err, "Monitor sandbox again failed: %v", err)

s.monitor.stop()
}

0 comments on commit aa80fbe

Please sign in to comment.