Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2980 from cmaf/tracing-shimv2-create-nodebug
Browse files Browse the repository at this point in the history
shimv2: Add tracing
  • Loading branch information
amshinde authored Dec 2, 2020
2 parents 2a98f43 + 0279c81 commit c1fd6c0
Showing 1 changed file with 92 additions and 1 deletion.
93 changes: 92 additions & 1 deletion containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/opencontainers/runtime-spec/specs-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -72,13 +73,29 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
vci.SetLogger(ctx, logger)
katautils.SetLogger(ctx, logger, logger.Logger.Level)

// load runtime config so that tracing can start if enabled
_, runtimeConfig, err := katautils.LoadConfiguration("", false, true)
if err != nil {
return nil, err
}

// create tracer
_, err = katautils.CreateTracer("kata")
if err != nil {
return nil, err
}
// create span
span, ctx := trace(ctx, "New")
defer span.Finish()

ctx, cancel := context.WithCancel(ctx)

s := &service{
id: id,
pid: uint32(os.Getpid()),
ctx: ctx,
containers: make(map[string]*container),
config: &runtimeConfig,
events: make(chan interface{}, chSize),
ec: make(chan exit, bufferSize),
cancel: cancel,
Expand Down Expand Up @@ -161,6 +178,13 @@ func newCommand(ctx context.Context, containerdBinary, id, containerdAddress str
// StartShim willl start a kata shimv2 daemon which will implemented the
// ShimV2 APIs such as create/start/update etc containers.
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
// Stop tracing here since a new tracer will be created the next time New()
// is called again after StartShim()
defer katautils.StopTracing(s.ctx)

span, _ := trace(s.ctx, "StartShim")
defer span.Finish()

bundlePath, err := os.Getwd()
if err != nil {
return "", err
Expand Down Expand Up @@ -274,7 +298,22 @@ func getTopic(e interface{}) string {
return cdruntime.TaskUnknownTopic
}

func trace(ctx context.Context, name string) (opentracing.Span, context.Context) {
if ctx == nil {
logrus.WithField("type", "bug").Error("trace called before context set")
ctx = context.Background()
}

span, ctx := opentracing.StartSpanFromContext(ctx, name)
span.SetTag("source", "runtime")

return span, ctx
}

func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) {
span, _ := trace(s.ctx, "Cleanup")
defer span.Finish()

//Since the binary cleanup will return the DeleteResponse from stdout to
//containerd, thus we must make sure there is no any outputs in stdout except
//the returned response, thus here redirect the log to stderr in case there's
Expand Down Expand Up @@ -330,6 +369,9 @@ func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err e

// Create a new sandbox or container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
span, _ := trace(s.ctx, "Create")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -381,6 +423,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *

// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) {
span, _ := trace(s.ctx, "Start")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -427,6 +472,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAP

// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) {
span, _ := trace(s.ctx, "Delete")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -474,6 +522,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *task

// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Exec")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -507,6 +558,9 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *p

// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "ResizePty")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -541,6 +595,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_

// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) {
span, _ := trace(s.ctx, "State")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -584,11 +641,13 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAP
Terminal: execs.tty.terminal,
ExitStatus: uint32(execs.exitCode),
}, nil

}

// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Pause")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -623,6 +682,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes

// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Resume")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -655,6 +717,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptyp

// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Kill")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -707,6 +772,9 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E
// Since for kata, it cannot get the process's pid from VM,
// thus only return the Shim's pid directly.
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) {
span, _ := trace(s.ctx, "Pids")
defer span.Finish()

var processes []*task.ProcessInfo

defer func() {
Expand All @@ -725,6 +793,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.

// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "CloseIO")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -761,6 +832,9 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt

// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Checkpoint")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand All @@ -770,6 +844,9 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque

// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) {
span, _ := trace(s.ctx, "Connect")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand All @@ -785,6 +862,8 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *ta
}

func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Shutdown")

defer func() {
err = toGRPC(err)
}()
Expand All @@ -796,6 +875,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
}
s.mu.Unlock()

span.Finish()
katautils.StopTracing(s.ctx)

s.cancel()

os.Exit(0)
Expand All @@ -806,6 +888,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
}

func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) {
span, _ := trace(s.ctx, "Stats")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand All @@ -830,6 +915,9 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAP

// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Update")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -857,6 +945,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *

// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) {
span, _ := trace(s.ctx, "Wait")
defer span.Finish()

var ret uint32

defer func() {
Expand Down

0 comments on commit c1fd6c0

Please sign in to comment.