diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 6912851361..8f81e08d3a 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" "github.com/opencontainers/runtime-spec/specs-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -72,6 +73,21 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi vci.SetLogger(ctx, logger) katautils.SetLogger(ctx, logger, logger.Logger.Level) + // load runtime config so that tracing can start if enabled + _, runtimeConfig, err := katautils.LoadConfiguration("", false, true) + if err != nil { + return nil, err + } + + // create tracer + _, err = katautils.CreateTracer("kata") + if err != nil { + return nil, err + } + // create span + span, ctx := trace(ctx, "New") + defer span.Finish() + ctx, cancel := context.WithCancel(ctx) s := &service{ @@ -79,6 +95,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi pid: uint32(os.Getpid()), ctx: ctx, containers: make(map[string]*container), + config: &runtimeConfig, events: make(chan interface{}, chSize), ec: make(chan exit, bufferSize), cancel: cancel, @@ -161,6 +178,13 @@ func newCommand(ctx context.Context, containerdBinary, id, containerdAddress str // StartShim willl start a kata shimv2 daemon which will implemented the // ShimV2 APIs such as create/start/update etc containers. func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { + // Stop tracing here since a new tracer will be created the next time New() + // is called again after StartShim() + defer katautils.StopTracing(s.ctx) + + span, _ := trace(s.ctx, "StartShim") + defer span.Finish() + bundlePath, err := os.Getwd() if err != nil { return "", err @@ -274,7 +298,22 @@ func getTopic(e interface{}) string { return cdruntime.TaskUnknownTopic } +func trace(ctx context.Context, name string) (opentracing.Span, context.Context) { + if ctx == nil { + logrus.WithField("type", "bug").Error("trace called before context set") + ctx = context.Background() + } + + span, ctx := opentracing.StartSpanFromContext(ctx, name) + span.SetTag("source", "runtime") + + return span, ctx +} + func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) { + span, _ := trace(s.ctx, "Cleanup") + defer span.Finish() + //Since the binary cleanup will return the DeleteResponse from stdout to //containerd, thus we must make sure there is no any outputs in stdout except //the returned response, thus here redirect the log to stderr in case there's @@ -330,6 +369,9 @@ func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err e // Create a new sandbox or container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + span, _ := trace(s.ctx, "Create") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -381,6 +423,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) { + span, _ := trace(s.ctx, "Start") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -427,6 +472,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAP // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) { + span, _ := trace(s.ctx, "Delete") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -474,6 +522,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *task // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Exec") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -507,6 +558,9 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *p // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "ResizePty") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -541,6 +595,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ // State returns runtime state information for a process func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) { + span, _ := trace(s.ctx, "State") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -584,11 +641,13 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAP Terminal: execs.tty.terminal, ExitStatus: uint32(execs.exitCode), }, nil - } // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Pause") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -623,6 +682,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Resume") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -655,6 +717,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptyp // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Kill") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -707,6 +772,9 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E // Since for kata, it cannot get the process's pid from VM, // thus only return the Shim's pid directly. func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) { + span, _ := trace(s.ctx, "Pids") + defer span.Finish() + var processes []*task.ProcessInfo defer func() { @@ -725,6 +793,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI. // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "CloseIO") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -761,6 +832,9 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt // Checkpoint the container func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Checkpoint") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -770,6 +844,9 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque // Connect returns shim information such as the shim's pid func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) { + span, _ := trace(s.ctx, "Connect") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -785,6 +862,8 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *ta } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Shutdown") + defer func() { err = toGRPC(err) }() @@ -796,6 +875,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ * } s.mu.Unlock() + span.Finish() + katautils.StopTracing(s.ctx) + s.cancel() os.Exit(0) @@ -806,6 +888,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ * } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) { + span, _ := trace(s.ctx, "Stats") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -830,6 +915,9 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAP // Update a running container func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) { + span, _ := trace(s.ctx, "Update") + defer span.Finish() + defer func() { err = toGRPC(err) }() @@ -857,6 +945,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ * // Wait for a process to exit func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) { + span, _ := trace(s.ctx, "Wait") + defer span.Finish() + var ret uint32 defer func() {