diff --git a/Gopkg.lock b/Gopkg.lock index 57b06ecedf..39a88f5c0a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -106,6 +106,14 @@ revision = "925541529c1fa6821df4e44ce2723319eb2be768" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:7b760aa6dbe426378f54934270dde1b176fda379111da2154748f030fffe4d3f" + name = "github.com/grpc-ecosystem/grpc-opentracing" + packages = ["go/otgrpc"] + pruneopts = "NUT" + revision = "8e809c8a86450a29b90dcc9efbf062d0fe6d9746" + [[projects]] branch = "master" digest = "1:63e0b20cfa3fe456480edf93a7995f776afb610e49da8e3da04d8904472a44cc" @@ -122,14 +130,14 @@ revision = "eda239928bfa12b214e9c93192d548cccf4e7f1e" [[projects]] - digest = "1:55460fbdfca464360cec902b0805126451908aa1a058fe4072b01650ebe768b3" + digest = "1:01c37fcb6e2a1fe1321a97faaef74c66ac531ea292ca3f929b7189cc400b1d47" name = "github.com/kata-containers/agent" packages = [ "protocols/client", "protocols/grpc", ] pruneopts = "NUT" - revision = "cd8f37b29332cbc081f00e4186c59944a79574fd" + revision = "46396d205bf096db4e69fcfa319525858ce8050c" [[projects]] digest = "1:04054595e5c5a35d1553a7f3464d18577caf597445d643992998643df56d4afd" @@ -405,6 +413,7 @@ "github.com/opencontainers/runc/libcontainer/utils", "github.com/opencontainers/runtime-spec/specs-go", "github.com/opentracing/opentracing-go", + "github.com/opentracing/opentracing-go/log", "github.com/safchain/ethtool", "github.com/sirupsen/logrus", "github.com/sirupsen/logrus/hooks/syslog", diff --git a/Gopkg.toml b/Gopkg.toml index 36d68e26f9..aa01dfb1f1 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,7 +56,7 @@ [[constraint]] name = "github.com/kata-containers/agent" - revision = "cd8f37b29332cbc081f00e4186c59944a79574fd" + revision = "46396d205bf096db4e69fcfa319525858ce8050c" [[constraint]] name = "github.com/containerd/cri-containerd" diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/LICENSE b/vendor/github.com/grpc-ecosystem/grpc-opentracing/LICENSE new file mode 100644 index 0000000000..abe5fe170b --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2016, gRPC Ecosystem +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of grpc-opentracing nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/PATENTS b/vendor/github.com/grpc-ecosystem/grpc-opentracing/PATENTS new file mode 100644 index 0000000000..5cfe0175ee --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/PATENTS @@ -0,0 +1,23 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the GRPC project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of GRPC, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of GRPC. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of GRPC or any code incorporated within this +implementation of GRPC constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of GRPC +shall terminate as of the date such litigation is filed. +Status API Training Shop Blog About diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/client.go b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/client.go new file mode 100644 index 0000000000..3414e55cb1 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/client.go @@ -0,0 +1,239 @@ +package otgrpc + +import ( + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "io" + "runtime" + "sync/atomic" +) + +// OpenTracingClientInterceptor returns a grpc.UnaryClientInterceptor suitable +// for use in a grpc.Dial call. +// +// For example: +// +// conn, err := grpc.Dial( +// address, +// ..., // (existing DialOptions) +// grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer))) +// +// All gRPC client spans will inject the OpenTracing SpanContext into the gRPC +// metadata; they will also look in the context.Context for an active +// in-process parent Span and establish a ChildOf reference if such a parent +// Span could be found. +func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor { + otgrpcOpts := newOptions() + otgrpcOpts.apply(optFuncs...) + return func( + ctx context.Context, + method string, + req, resp interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + var err error + var parentCtx opentracing.SpanContext + if parent := opentracing.SpanFromContext(ctx); parent != nil { + parentCtx = parent.Context() + } + if otgrpcOpts.inclusionFunc != nil && + !otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) { + return invoker(ctx, method, req, resp, cc, opts...) + } + clientSpan := tracer.StartSpan( + method, + opentracing.ChildOf(parentCtx), + ext.SpanKindRPCClient, + gRPCComponentTag, + ) + defer clientSpan.Finish() + ctx = injectSpanContext(ctx, tracer, clientSpan) + if otgrpcOpts.logPayloads { + clientSpan.LogFields(log.Object("gRPC request", req)) + } + err = invoker(ctx, method, req, resp, cc, opts...) + if err == nil { + if otgrpcOpts.logPayloads { + clientSpan.LogFields(log.Object("gRPC response", resp)) + } + } else { + SetSpanTags(clientSpan, err, true) + clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) + } + if otgrpcOpts.decorator != nil { + otgrpcOpts.decorator(clientSpan, method, req, resp, err) + } + return err + } +} + +// OpenTracingStreamClientInterceptor returns a grpc.StreamClientInterceptor suitable +// for use in a grpc.Dial call. The interceptor instruments streaming RPCs by creating +// a single span to correspond to the lifetime of the RPC's stream. +// +// For example: +// +// conn, err := grpc.Dial( +// address, +// ..., // (existing DialOptions) +// grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer))) +// +// All gRPC client spans will inject the OpenTracing SpanContext into the gRPC +// metadata; they will also look in the context.Context for an active +// in-process parent Span and establish a ChildOf reference if such a parent +// Span could be found. +func OpenTracingStreamClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamClientInterceptor { + otgrpcOpts := newOptions() + otgrpcOpts.apply(optFuncs...) + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + var err error + var parentCtx opentracing.SpanContext + if parent := opentracing.SpanFromContext(ctx); parent != nil { + parentCtx = parent.Context() + } + if otgrpcOpts.inclusionFunc != nil && + !otgrpcOpts.inclusionFunc(parentCtx, method, nil, nil) { + return streamer(ctx, desc, cc, method, opts...) + } + + clientSpan := tracer.StartSpan( + method, + opentracing.ChildOf(parentCtx), + ext.SpanKindRPCClient, + gRPCComponentTag, + ) + ctx = injectSpanContext(ctx, tracer, clientSpan) + cs, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) + SetSpanTags(clientSpan, err, true) + clientSpan.Finish() + return cs, err + } + return newOpenTracingClientStream(cs, method, desc, clientSpan, otgrpcOpts), nil + } +} + +func newOpenTracingClientStream(cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan opentracing.Span, otgrpcOpts *options) grpc.ClientStream { + finishChan := make(chan struct{}) + + isFinished := new(int32) + *isFinished = 0 + finishFunc := func(err error) { + // The current OpenTracing specification forbids finishing a span more than + // once. Since we have multiple code paths that could concurrently call + // `finishFunc`, we need to add some sort of synchronization to guard against + // multiple finishing. + if !atomic.CompareAndSwapInt32(isFinished, 0, 1) { + return + } + close(finishChan) + defer clientSpan.Finish() + if err != nil { + clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) + SetSpanTags(clientSpan, err, true) + } + if otgrpcOpts.decorator != nil { + otgrpcOpts.decorator(clientSpan, method, nil, nil, err) + } + } + go func() { + select { + case <-finishChan: + // The client span is being finished by another code path; hence, no + // action is necessary. + case <-cs.Context().Done(): + finishFunc(cs.Context().Err()) + } + }() + otcs := &openTracingClientStream{ + ClientStream: cs, + desc: desc, + finishFunc: finishFunc, + } + + // The `ClientStream` interface allows one to omit calling `Recv` if it's + // known that the result will be `io.EOF`. See + // http://stackoverflow.com/q/42915337 + // In such cases, there's nothing that triggers the span to finish. We, + // therefore, set a finalizer so that the span and the context goroutine will + // at least be cleaned up when the garbage collector is run. + runtime.SetFinalizer(otcs, func(otcs *openTracingClientStream) { + otcs.finishFunc(nil) + }) + return otcs +} + +type openTracingClientStream struct { + grpc.ClientStream + desc *grpc.StreamDesc + finishFunc func(error) +} + +func (cs *openTracingClientStream) Header() (metadata.MD, error) { + md, err := cs.ClientStream.Header() + if err != nil { + cs.finishFunc(err) + } + return md, err +} + +func (cs *openTracingClientStream) SendMsg(m interface{}) error { + err := cs.ClientStream.SendMsg(m) + if err != nil { + cs.finishFunc(err) + } + return err +} + +func (cs *openTracingClientStream) RecvMsg(m interface{}) error { + err := cs.ClientStream.RecvMsg(m) + if err == io.EOF { + cs.finishFunc(nil) + return err + } else if err != nil { + cs.finishFunc(err) + return err + } + if !cs.desc.ServerStreams { + cs.finishFunc(nil) + } + return err +} + +func (cs *openTracingClientStream) CloseSend() error { + err := cs.ClientStream.CloseSend() + if err != nil { + cs.finishFunc(err) + } + return err +} + +func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(nil) + } else { + md = md.Copy() + } + mdWriter := metadataReaderWriter{md} + err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter) + // We have no better place to record an error than the Span itself :-/ + if err != nil { + clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err)) + } + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/errors.go b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/errors.go new file mode 100644 index 0000000000..41a6346f25 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/errors.go @@ -0,0 +1,69 @@ +package otgrpc + +import ( + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// A Class is a set of types of outcomes (including errors) that will often +// be handled in the same way. +type Class string + +const ( + Unknown Class = "0xx" + // Success represents outcomes that achieved the desired results. + Success Class = "2xx" + // ClientError represents errors that were the client's fault. + ClientError Class = "4xx" + // ServerError represents errors that were the server's fault. + ServerError Class = "5xx" +) + +// ErrorClass returns the class of the given error +func ErrorClass(err error) Class { + if s, ok := status.FromError(err); ok { + switch s.Code() { + // Success or "success" + case codes.OK, codes.Canceled: + return Success + + // Client errors + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, + codes.PermissionDenied, codes.Unauthenticated, codes.FailedPrecondition, + codes.OutOfRange: + return ClientError + + // Server errors + case codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted, + codes.Unimplemented, codes.Internal, codes.Unavailable, codes.DataLoss: + return ServerError + + // Not sure + case codes.Unknown: + fallthrough + default: + return Unknown + } + } + return Unknown +} + +// SetSpanTags sets one or more tags on the given span according to the +// error. +func SetSpanTags(span opentracing.Span, err error, client bool) { + c := ErrorClass(err) + code := codes.Unknown + if s, ok := status.FromError(err); ok { + code = s.Code() + } + span.SetTag("response_code", code) + span.SetTag("response_class", c) + if err == nil { + return + } + if client || c == ServerError { + ext.Error.Set(span, true) + } +} diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/options.go b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/options.go new file mode 100644 index 0000000000..903e8382e3 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/options.go @@ -0,0 +1,76 @@ +package otgrpc + +import "github.com/opentracing/opentracing-go" + +// Option instances may be used in OpenTracing(Server|Client)Interceptor +// initialization. +// +// See this post about the "functional options" pattern: +// http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis +type Option func(o *options) + +// LogPayloads returns an Option that tells the OpenTracing instrumentation to +// try to log application payloads in both directions. +func LogPayloads() Option { + return func(o *options) { + o.logPayloads = true + } +} + +// SpanInclusionFunc provides an optional mechanism to decide whether or not +// to trace a given gRPC call. Return true to create a Span and initiate +// tracing, false to not create a Span and not trace. +// +// parentSpanCtx may be nil if no parent could be extraction from either the Go +// context.Context (on the client) or the RPC (on the server). +type SpanInclusionFunc func( + parentSpanCtx opentracing.SpanContext, + method string, + req, resp interface{}) bool + +// IncludingSpans binds a IncludeSpanFunc to the options +func IncludingSpans(inclusionFunc SpanInclusionFunc) Option { + return func(o *options) { + o.inclusionFunc = inclusionFunc + } +} + +// SpanDecoratorFunc provides an (optional) mechanism for otgrpc users to add +// arbitrary tags/logs/etc to the opentracing.Span associated with client +// and/or server RPCs. +type SpanDecoratorFunc func( + span opentracing.Span, + method string, + req, resp interface{}, + grpcError error) + +// SpanDecorator binds a function that decorates gRPC Spans. +func SpanDecorator(decorator SpanDecoratorFunc) Option { + return func(o *options) { + o.decorator = decorator + } +} + +// The internal-only options struct. Obviously overkill at the moment; but will +// scale well as production use dictates other configuration and tuning +// parameters. +type options struct { + logPayloads bool + decorator SpanDecoratorFunc + // May be nil. + inclusionFunc SpanInclusionFunc +} + +// newOptions returns the default options. +func newOptions() *options { + return &options{ + logPayloads: false, + inclusionFunc: nil, + } +} + +func (o *options) apply(opts ...Option) { + for _, opt := range opts { + opt(o) + } +} diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/package.go b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/package.go new file mode 100644 index 0000000000..4ff3d19978 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/package.go @@ -0,0 +1,5 @@ +// Package otgrpc provides OpenTracing support for any gRPC client or server. +// +// See the README for simple usage examples: +// https://github.com/grpc-ecosystem/grpc-opentracing/blob/master/go/otgrpc/README.md +package otgrpc diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/server.go b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/server.go new file mode 100644 index 0000000000..62cf54d221 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/server.go @@ -0,0 +1,141 @@ +package otgrpc + +import ( + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// OpenTracingServerInterceptor returns a grpc.UnaryServerInterceptor suitable +// for use in a grpc.NewServer call. +// +// For example: +// +// s := grpc.NewServer( +// ..., // (existing ServerOptions) +// grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer))) +// +// All gRPC server spans will look for an OpenTracing SpanContext in the gRPC +// metadata; if found, the server span will act as the ChildOf that RPC +// SpanContext. +// +// Root or not, the server Span will be embedded in the context.Context for the +// application-specific gRPC handler(s) to access. +func OpenTracingServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryServerInterceptor { + otgrpcOpts := newOptions() + otgrpcOpts.apply(optFuncs...) + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (resp interface{}, err error) { + spanContext, err := extractSpanContext(ctx, tracer) + if err != nil && err != opentracing.ErrSpanContextNotFound { + // TODO: establish some sort of error reporting mechanism here. We + // don't know where to put such an error and must rely on Tracer + // implementations to do something appropriate for the time being. + } + if otgrpcOpts.inclusionFunc != nil && + !otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, req, nil) { + return handler(ctx, req) + } + serverSpan := tracer.StartSpan( + info.FullMethod, + ext.RPCServerOption(spanContext), + gRPCComponentTag, + ) + defer serverSpan.Finish() + + ctx = opentracing.ContextWithSpan(ctx, serverSpan) + if otgrpcOpts.logPayloads { + serverSpan.LogFields(log.Object("gRPC request", req)) + } + resp, err = handler(ctx, req) + if err == nil { + if otgrpcOpts.logPayloads { + serverSpan.LogFields(log.Object("gRPC response", resp)) + } + } else { + SetSpanTags(serverSpan, err, false) + serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) + } + if otgrpcOpts.decorator != nil { + otgrpcOpts.decorator(serverSpan, info.FullMethod, req, resp, err) + } + return resp, err + } +} + +// OpenTracingStreamServerInterceptor returns a grpc.StreamServerInterceptor suitable +// for use in a grpc.NewServer call. The interceptor instruments streaming RPCs by +// creating a single span to correspond to the lifetime of the RPC's stream. +// +// For example: +// +// s := grpc.NewServer( +// ..., // (existing ServerOptions) +// grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer))) +// +// All gRPC server spans will look for an OpenTracing SpanContext in the gRPC +// metadata; if found, the server span will act as the ChildOf that RPC +// SpanContext. +// +// Root or not, the server Span will be embedded in the context.Context for the +// application-specific gRPC handler(s) to access. +func OpenTracingStreamServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamServerInterceptor { + otgrpcOpts := newOptions() + otgrpcOpts.apply(optFuncs...) + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + spanContext, err := extractSpanContext(ss.Context(), tracer) + if err != nil && err != opentracing.ErrSpanContextNotFound { + // TODO: establish some sort of error reporting mechanism here. We + // don't know where to put such an error and must rely on Tracer + // implementations to do something appropriate for the time being. + } + if otgrpcOpts.inclusionFunc != nil && + !otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, nil, nil) { + return handler(srv, ss) + } + + serverSpan := tracer.StartSpan( + info.FullMethod, + ext.RPCServerOption(spanContext), + gRPCComponentTag, + ) + defer serverSpan.Finish() + ss = &openTracingServerStream{ + ServerStream: ss, + ctx: opentracing.ContextWithSpan(ss.Context(), serverSpan), + } + err = handler(srv, ss) + if err != nil { + SetSpanTags(serverSpan, err, false) + serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) + } + if otgrpcOpts.decorator != nil { + otgrpcOpts.decorator(serverSpan, info.FullMethod, nil, nil, err) + } + return err + } +} + +type openTracingServerStream struct { + grpc.ServerStream + ctx context.Context +} + +func (ss *openTracingServerStream) Context() context.Context { + return ss.ctx +} + +func extractSpanContext(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + md = metadata.New(nil) + } + return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md}) +} diff --git a/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/shared.go b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/shared.go new file mode 100644 index 0000000000..9abd5eaa62 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc/shared.go @@ -0,0 +1,42 @@ +package otgrpc + +import ( + "strings" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "google.golang.org/grpc/metadata" +) + +var ( + // Morally a const: + gRPCComponentTag = opentracing.Tag{string(ext.Component), "gRPC"} +) + +// metadataReaderWriter satisfies both the opentracing.TextMapReader and +// opentracing.TextMapWriter interfaces. +type metadataReaderWriter struct { + metadata.MD +} + +func (w metadataReaderWriter) Set(key, val string) { + // The GRPC HPACK implementation rejects any uppercase keys here. + // + // As such, since the HTTP_HEADERS format is case-insensitive anyway, we + // blindly lowercase the key (which is guaranteed to work in the + // Inject/Extract sense per the OpenTracing spec). + key = strings.ToLower(key) + w.MD[key] = append(w.MD[key], val) +} + +func (w metadataReaderWriter) ForeachKey(handler func(key, val string) error) error { + for k, vals := range w.MD { + for _, v := range vals { + if err := handler(k, v); err != nil { + return err + } + } + } + + return nil +} diff --git a/vendor/github.com/kata-containers/agent/protocols/client/client.go b/vendor/github.com/kata-containers/agent/protocols/client/client.go index c385a1bf04..6e386fe129 100644 --- a/vendor/github.com/kata-containers/agent/protocols/client/client.go +++ b/vendor/github.com/kata-containers/agent/protocols/client/client.go @@ -14,8 +14,10 @@ import ( "strings" "time" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/hashicorp/yamux" "github.com/mdlayher/vsock" + opentracing "github.com/opentracing/opentracing-go" "google.golang.org/grpc" "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" @@ -54,14 +56,28 @@ type dialer func(string, time.Duration) (net.Conn, error) // - unix:// // - vsock://: // - -func NewAgentClient(sock string, enableYamux bool) (*AgentClient, error) { +func NewAgentClient(ctx context.Context, sock string, enableYamux bool) (*AgentClient, error) { grpcAddr, parsedAddr, err := parse(sock) if err != nil { return nil, err } dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux))) - ctx := context.Background() + + var tracer opentracing.Tracer + + span := opentracing.SpanFromContext(ctx) + + // If the context contains a trace span, trace all client comms + if span != nil { + tracer = span.Tracer() + + dialOpts = append(dialOpts, + grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer))) + dialOpts = append(dialOpts, + grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer))) + } + ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) defer cancel() conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...) diff --git a/virtcontainers/kata_agent.go b/virtcontainers/kata_agent.go index 4c90ebcea9..e5f3cee30d 100644 --- a/virtcontainers/kata_agent.go +++ b/virtcontainers/kata_agent.go @@ -1231,7 +1231,7 @@ func (k *kataAgent) connect() error { } k.Logger().WithField("url", k.state.URL).Info("New client") - client, err := kataclient.NewAgentClient(k.state.URL, k.proxyBuiltIn) + client, err := kataclient.NewAgentClient(k.ctx, k.state.URL, k.proxyBuiltIn) if err != nil { return err }