Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
agent: add default timeout for grpc requests
Browse files Browse the repository at this point in the history
If guest is malfunctioning, we need a way to bail out. Add
a default timeout for most of the grpc requests so that the
runtime does not wait indefinitely.

Fixes: #1952
Signed-off-by: Peng Tao <[email protected]>
  • Loading branch information
bergwolf committed Aug 13, 2019
1 parent 9ea469b commit debc7d9
Showing 1 changed file with 81 additions and 32 deletions.
113 changes: 81 additions & 32 deletions virtcontainers/kata_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (

var (
checkRequestTimeout = 30 * time.Second
defaultRequestTimeout = 60 * time.Second
defaultKataSocketName = "kata.sock"
defaultKataChannel = "agent.channel.0"
defaultKataDeviceID = "channel0"
Expand Down Expand Up @@ -98,6 +99,38 @@ const (
defaultAgentTraceType = agentTraceTypeIsolated
)

const (
grpcCheckRequest = "grpc.CheckRequest"
grpcExecProcessRequest = "grpc.ExecProcessRequest"
grpcCreateSandboxRequest = "grpc.CreateSandboxRequest"
grpcDestroySandboxRequest = "grpc.DestroySandboxRequest"
grpcCreateContainerRequest = "grpc.CreateContainerRequest"
grpcStartContainerRequest = "grpc.StartContainerRequest"
grpcRemoveContainerRequest = "grpc.RemoveContainerRequest"
grpcSignalProcessRequest = "grpc.SignalProcessRequest"
grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest"
grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest"
grpcListInterfacesRequest = "grpc.ListInterfacesRequest"
grpcListRoutesRequest = "grpc.ListRoutesRequest"
grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest"
grpcListProcessesRequest = "grpc.ListProcessesRequest"
grpcUpdateContainerRequest = "grpc.UpdateContainerRequest"
grpcWaitProcessRequest = "grpc.WaitProcessRequest"
grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest"
grpcWriteStreamRequest = "grpc.WriteStreamRequest"
grpcCloseStdinRequest = "grpc.CloseStdinRequest"
grpcStatsContainerRequest = "grpc.StatsContainerRequest"
grpcPauseContainerRequest = "grpc.PauseContainerRequest"
grpcResumeContainerRequest = "grpc.ResumeContainerRequest"
grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest"
grpcGuestDetailsRequest = "grpc.GuestDetailsRequest"
grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest"
grpcCopyFileRequest = "grpc.CopyFileRequest"
grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest"
grpcStartTracingRequest = "grpc.StartTracingRequest"
grpcStopTracingRequest = "grpc.StopTracingRequest"
)

// KataAgentConfig is a structure storing information needed
// to reach the Kata Containers agent.
type KataAgentConfig struct {
Expand Down Expand Up @@ -1741,97 +1774,109 @@ type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (inter

func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers = make(map[string]reqFunc)
k.reqHandlers["grpc.CheckRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, checkRequestTimeout)
defer cancel()
k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...)
}
k.reqHandlers["grpc.ExecProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...)
}
k.reqHandlers["grpc.CreateSandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...)
}
k.reqHandlers["grpc.DestroySandboxRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...)
}
k.reqHandlers["grpc.CreateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...)
}
k.reqHandlers["grpc.StartContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...)
}
k.reqHandlers["grpc.RemoveContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...)
}
k.reqHandlers["grpc.SignalProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...)
}
k.reqHandlers["grpc.UpdateRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...)
}
k.reqHandlers["grpc.UpdateInterfaceRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...)
}
k.reqHandlers["grpc.ListInterfacesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest), opts...)
}
k.reqHandlers["grpc.ListRoutesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ListRoutes(ctx, req.(*grpc.ListRoutesRequest), opts...)
}
k.reqHandlers["grpc.OnlineCPUMemRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...)
}
k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcListProcessesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
}
k.reqHandlers["grpc.UpdateContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest), opts...)
}
k.reqHandlers["grpc.WaitProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...)
}
k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
}
k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
}
k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
}
k.reqHandlers["grpc.StatsContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.StatsContainer(ctx, req.(*grpc.StatsContainerRequest), opts...)
}
k.reqHandlers["grpc.PauseContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.PauseContainer(ctx, req.(*grpc.PauseContainerRequest), opts...)
}
k.reqHandlers["grpc.ResumeContainerRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest), opts...)
}
k.reqHandlers["grpc.ReseedRandomDevRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest), opts...)
}
k.reqHandlers["grpc.GuestDetailsRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest), opts...)
}
k.reqHandlers["grpc.MemHotplugByProbeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest), opts...)
}
k.reqHandlers["grpc.CopyFileRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CopyFile(ctx, req.(*grpc.CopyFileRequest), opts...)
}
k.reqHandlers["grpc.SetGuestDateTimeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest), opts...)
}
k.reqHandlers["grpc.StartTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcStartTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.StartTracing(ctx, req.(*grpc.StartTracingRequest), opts...)
}
k.reqHandlers["grpc.StopTracingRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
k.reqHandlers[grpcStopTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.StopTracing(ctx, req.(*grpc.StopTracingRequest), opts...)
}
}

func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
ctx = context.Background()
switch reqName {
case grpcWaitProcessRequest:
// Wait has no timeout
case grpcCheckRequest:
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
default:
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
}

return ctx, cancel
}

func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
span, _ := k.trace("sendReq")
span.SetTag("request", request)
Expand All @@ -1850,9 +1895,13 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
return nil, errors.New("Invalid request type")
}
message := request.(proto.Message)
ctx, cancel := k.getReqContext(msgName)
if cancel != nil {
defer cancel()
}
k.Logger().WithField("name", msgName).WithField("req", message.String()).Debug("sending request")

return handler(k.ctx, request)
return handler(ctx, request)
}

// readStdout and readStderr are special that we cannot differentiate them with the request types...
Expand Down

0 comments on commit debc7d9

Please sign in to comment.