Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
api: add sandbox iostream API
Browse files Browse the repository at this point in the history
It returns stdin, stdout and stderr stream of the specified process in
the container.

Fixes: #258

Signed-off-by: Peng Tao <[email protected]>
  • Loading branch information
bergwolf committed May 4, 2018
1 parent bf4ef43 commit 1bb6ab9
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 1 deletion.
12 changes: 12 additions & 0 deletions virtcontainers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ type agent interface {
// winsizeProcess will tell the agent to set a process' tty size
winsizeProcess(c *Container, processID string, height, width uint32) error

// writeProcessStdin will tell the agent to write a process stdin
writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error)

// closeProcessStdin will tell the agent to close a process stdin
closeProcessStdin(c *Container, ProcessID string) error

// readProcessStdout will tell the agent to read a process stdout
readProcessStdout(c *Container, processID string, data []byte) (int, error)

// readProcessStderr will tell the agent to read a process stderr
readProcessStderr(c *Container, processID string, data []byte) (int, error)

// processListContainer will list the processes running inside the container
processListContainer(sandbox *Sandbox, c Container, options ProcessListOptions) (ProcessList, error)

Expand Down
11 changes: 11 additions & 0 deletions virtcontainers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package virtcontainers
import (
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"syscall"
Expand Down Expand Up @@ -758,6 +759,16 @@ func (c *Container) winsizeProcess(processID string, height, width uint32) error
return c.sandbox.agent.winsizeProcess(c, processID, height, width)
}

func (c *Container) ioStream(processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
if c.state.State != StateReady && c.state.State != StateRunning {
return nil, nil, nil, fmt.Errorf("Container not ready or running, impossible to signal the container")
}

stream := newIOStream(c.sandbox, c, processID)

return stream.stdin(), stream.stdout(), stream.stderr(), nil
}

func (c *Container) processList(options ProcessListOptions) (ProcessList, error) {
if err := c.checkSandboxRunning("ps"); err != nil {
return nil, err
Expand Down
26 changes: 26 additions & 0 deletions virtcontainers/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,29 @@ func TestWinsizeProcessErrorState(t *testing.T) {
err = c.winsizeProcess(processID, 100, 200)
assert.Error(err)
}

func TestProcessIOStream(t *testing.T) {
assert := assert.New(t)
c := &Container{
sandbox: &Sandbox{
state: State{
State: StateRunning,
},
},
}
processID := "foobar"

// Container state undefined
_, _, _, err := c.ioStream(processID)
assert.Error(err)

// Container paused
c.state.State = StatePaused
_, _, _, err = c.ioStream(processID)
assert.Error(err)

// Container stopped
c.state.State = StateStopped
_, _, _, err = c.ioStream(processID)
assert.Error(err)
}
20 changes: 20 additions & 0 deletions virtcontainers/hyperstart_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,3 +816,23 @@ func (h *hyper) winsizeProcess(c *Container, processID string, height, width uin
// cc-agent does not support winsize process
return nil
}

func (h *hyper) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
// cc-agent does not support stdin write request
return 0, nil
}

func (h *hyper) closeProcessStdin(c *Container, ProcessID string) error {
// cc-agent does not support stdin close request
return nil
}

func (h *hyper) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
// cc-agent does not support stdout read request
return 0, nil
}

func (h *hyper) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
// cc-agent does not support stderr read request
return 0, nil
}
2 changes: 2 additions & 0 deletions virtcontainers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package virtcontainers

import (
"io"
"syscall"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -60,6 +61,7 @@ type VCSandbox interface {
WaitProcess(containerID, processID string) (int32, error)
SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error
WinsizeProcess(containerID, processID string, height, width uint32) error
IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error)
}

// VCContainer is the Container interface
Expand Down
91 changes: 91 additions & 0 deletions virtcontainers/iostream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"errors"
"io"
)

type iostream struct {
sandbox *Sandbox
container *Container
process string
closed bool
}

// io.WriteCloser
type stdinStream struct {
*iostream
}

// io.Reader
type stdoutStream struct {
*iostream
}

// io.Reader
type stderrStream struct {
*iostream
}

func newIOStream(s *Sandbox, c *Container, proc string) *iostream {
return &iostream{
sandbox: s,
container: c,
process: proc,
closed: false, // needed to workaround buggy structcheck
}
}

func (s *iostream) stdin() io.WriteCloser {
return &stdinStream{s}
}

func (s *iostream) stdout() io.Reader {
return &stdoutStream{s}
}

func (s *iostream) stderr() io.Reader {
return &stderrStream{s}
}

func (s *stdinStream) Write(data []byte) (n int, err error) {
if s.closed {
return 0, errors.New("stream closed")
}

return s.sandbox.agent.writeProcessStdin(s.container, s.process, data)
}

func (s *stdinStream) Close() error {
if s.closed {
return errors.New("stream closed")
}

err := s.sandbox.agent.closeProcessStdin(s.container, s.process)
if err == nil {
s.closed = true
}

return err
}

func (s *stdoutStream) Read(data []byte) (n int, err error) {
if s.closed {
return 0, errors.New("stream closed")
}

return s.sandbox.agent.readProcessStdout(s.container, s.process, data)
}

func (s *stderrStream) Read(data []byte) (n int, err error) {
if s.closed {
return 0, errors.New("stream closed")
}

return s.sandbox.agent.readProcessStderr(s.container, s.process, data)
}
59 changes: 59 additions & 0 deletions virtcontainers/iostream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package virtcontainers

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIOStream(t *testing.T) {
hConfig := newHypervisorConfig(nil, nil)
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()

contID := "foo"
processID := "bar"
config := newTestContainerConfigNoop(contID)
c := &Container{
sandbox: s,
config: &config,
}

stream := newIOStream(s, c, processID)
stdin := stream.stdin()
stdout := stream.stdout()
stderr := stream.stderr()

buffer := []byte("randombufferdata")
_, err = stdin.Write(buffer)
assert.Nil(t, err, "stdin write failed: %s", err)

_, err = stdout.Read(buffer)
assert.Nil(t, err, "stdout read failed: %s", err)

_, err = stderr.Read(buffer)
assert.Nil(t, err, "stderr read failed: %s", err)

err = stdin.Close()
assert.Nil(t, err, "stream close failed: %s", err)

_, err = stdin.Write(buffer)
assert.NotNil(t, err, "stdin write closed should fail")

_, err = stdout.Read(buffer)
assert.NotNil(t, err, "stdout read closed should fail")

_, err = stderr.Read(buffer)
assert.NotNil(t, err, "stderr read closed should fail")

err = stdin.Close()
assert.NotNil(t, err, "stdin close closed should fail")
}
70 changes: 69 additions & 1 deletion virtcontainers/kata_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package virtcontainers

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -23,6 +22,7 @@ import (
vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
ns "github.com/kata-containers/runtime/virtcontainers/pkg/nsenter"
"github.com/kata-containers/runtime/virtcontainers/pkg/uuid"
"golang.org/x/net/context"

"github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -966,6 +966,29 @@ func (k *kataAgent) waitProcess(c *Container, processID string) (int32, error) {
return resp.(*grpc.WaitProcessResponse).Status, nil
}

func (k *kataAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
resp, err := k.sendReq(&grpc.WriteStreamRequest{
ContainerId: c.id,
ExecId: ProcessID,
Data: data,
})

if err != nil {
return 0, err
}

return int(resp.(*grpc.WriteStreamResponse).Len), nil
}

func (k *kataAgent) closeProcessStdin(c *Container, ProcessID string) error {
_, err := k.sendReq(&grpc.CloseStdinRequest{
ContainerId: c.id,
ExecId: ProcessID,
})

return err
}

type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error)

func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
Expand Down Expand Up @@ -1014,6 +1037,12 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
}
k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
}
k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
}
}

func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
Expand All @@ -1032,3 +1061,42 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {

return handler(context.Background(), request)
}

// readStdout and readStderr are special that we cannot differentiate them with the request types...
func (k *kataAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
if err := k.connect(); err != nil {
return 0, err
}
if !k.keepConn {
defer k.disconnect()
}

return k.readProcessStream(c.id, processID, data, k.client.ReadStdout)
}

// readStdout and readStderr are special that we cannot differentiate them with the request types...
func (k *kataAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
if err := k.connect(); err != nil {
return 0, err
}
if !k.keepConn {
defer k.disconnect()
}

return k.readProcessStream(c.id, processID, data, k.client.ReadStderr)
}

type readFn func(context.Context, *grpc.ReadStreamRequest, ...golangGrpc.CallOption) (*grpc.ReadStreamResponse, error)

func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) {
resp, err := read(context.Background(), &grpc.ReadStreamRequest{
ContainerId: containerID,
ExecId: processID,
Len: uint32(len(data))})
if err == nil {
copy(data, resp.Data)
return len(resp.Data), nil
}

return 0, err
}
20 changes: 20 additions & 0 deletions virtcontainers/noop_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,23 @@ func (n *noopAgent) waitProcess(c *Container, processID string) (int32, error) {
func (n *noopAgent) winsizeProcess(c *Container, processID string, height, width uint32) error {
return nil
}

// writeProcessStdin is the Noop agent process stdin writer. It does nothing.
func (n *noopAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
return 0, nil
}

// closeProcessStdin is the Noop agent process stdin closer. It does nothing.
func (n *noopAgent) closeProcessStdin(c *Container, ProcessID string) error {
return nil
}

// readProcessStdout is the Noop agent process stdout reader. It does nothing.
func (n *noopAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
return 0, nil
}

// readProcessStderr is the Noop agent process stderr reader. It does nothing.
func (n *noopAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
return 0, nil
}
Loading

0 comments on commit 1bb6ab9

Please sign in to comment.