Skip to content

Commit

Permalink
refine docker pull output and request context
Browse files Browse the repository at this point in the history
  • Loading branch information
aylei committed Dec 22, 2018
1 parent f6c8e68 commit e00f740
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 31 deletions.
102 changes: 74 additions & 28 deletions pkg/agent/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"context"
"fmt"
"github.com/aylei/kubectl-debug/pkg/util"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/strslice"
Expand Down Expand Up @@ -41,24 +42,64 @@ type DebugAttacher struct {
runtime *RuntimeManager
image string
command []string
client *dockerclient.Client

// control the preparing of debug container
stopListenEOF chan struct{}
context context.Context
cancel context.CancelFunc
}

func (a *DebugAttacher) AttachContainer(name string, uid kubetype.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
return a.runtime.DebugContainer(container, a.image, a.command, in, out, err, tty, resize)
return a.DebugContainer(container, a.image, a.command, in, out, err, tty, resize)
}

// GetAttacher returns an implementation of Attacher
func (m *RuntimeManager) GetAttacher(image string, command []string) kubeletremote.Attacher {
return &DebugAttacher{runtime: m, image: image, command: command}
func (m *RuntimeManager) GetAttacher(image string, command []string, context context.Context, cancel context.CancelFunc) kubeletremote.Attacher {
return &DebugAttacher{
runtime: m,
image: image,
command: command,
context: context,
client: m.client,
cancel: cancel,
stopListenEOF: make(chan struct{}),
}
}

// DebugContainer executes the main debug flow
func (m *RuntimeManager) DebugContainer(container, image string, command []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
func (m *DebugAttacher) DebugContainer(container, image string, command []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {

log.Printf("Accept new debug reqeust:\n\t target container: %s \n\t image: %s \n\t command: %v \n", container, image, command)

// the following steps may takes much time,
// so we listen to EOF from stdin
// which helps user to terminate the procedure proactively

// FIXME: the following logic will 'eat' a character
//var buf bytes.Buffer
//tee := io.TeeReader(stdin, &buf)
//go func() {
// p := make([]byte, 4)
// OUTER:
// for {
// select {
// case <- m.stopListenEOF:
// break OUTER
// default:
// n, err := tee.Read(p)
// // 4 -> EOT
// if (n > 0 && binary.LittleEndian.Uint32(p) == 4) || err == io.EOF {
// log.Println("receive ctrl-d or EOF when preparing debug container, cancel session")
// m.cancel()
// break OUTER
// }
// }
// }
//} ()

// step 1: pull image
stdout.Write([]byte(fmt.Sprintf("pulling image %s ...\n\r", image)))
stdout.Write([]byte(fmt.Sprintf("pulling image %s... \n\r", image)))
err := m.PullImage(image, stdout)
if err != nil {
return err
Expand All @@ -74,6 +115,10 @@ func (m *RuntimeManager) DebugContainer(container, image string, command []strin

// step 3: attach tty
stdout.Write([]byte("container created, open tty...\n\r"))

// from now on, should pipe stdin to the container and no long read stdin
// close(m.stopListenEOF)

if err := m.AttachToContainer(id, stdin, stdout, stderr, tty, resize); err != nil {
return err
}
Expand All @@ -82,7 +127,7 @@ func (m *RuntimeManager) DebugContainer(container, image string, command []strin

// Run a new container, this container will join the network,
// mount, and pid namespace of the given container
func (m *RuntimeManager) RunDebugContainer(targetId string, image string, command []string) (string, error) {
func (m *DebugAttacher) RunDebugContainer(targetId string, image string, command []string) (string, error) {

createdBody, err := m.CreateContainer(targetId, image, command)
if err != nil {
Expand All @@ -94,8 +139,8 @@ func (m *RuntimeManager) RunDebugContainer(targetId string, image string, comman
return createdBody.ID, nil
}

func (m *RuntimeManager) StartContainer(id string) error {
ctx, cancel := m.getTimeoutContext()
func (m *DebugAttacher) StartContainer(id string) error {
ctx, cancel := m.getContextWithTimeout()
defer cancel()
err := m.client.ContainerStart(ctx, id, types.ContainerStartOptions{})
if err != nil {
Expand All @@ -104,7 +149,7 @@ func (m *RuntimeManager) StartContainer(id string) error {
return nil
}

func (m *RuntimeManager) CreateContainer(targetId string, image string, command []string) (*container.ContainerCreateCreatedBody, error) {
func (m *DebugAttacher) CreateContainer(targetId string, image string, command []string) (*container.ContainerCreateCreatedBody, error) {

config := &container.Config{
Entrypoint: strslice.StrSlice(command),
Expand All @@ -119,7 +164,7 @@ func (m *RuntimeManager) CreateContainer(targetId string, image string, command
IpcMode: container.IpcMode(m.containerMode(targetId)),
PidMode: container.PidMode(m.containerMode(targetId)),
}
ctx, cancel := m.getTimeoutContext()
ctx, cancel := m.getContextWithTimeout()
defer cancel()
body, err := m.client.ContainerCreate(ctx, config, hostConfig, nil, "")
if err != nil {
Expand All @@ -128,21 +173,21 @@ func (m *RuntimeManager) CreateContainer(targetId string, image string, command
return &body, nil
}

func (m *RuntimeManager) PullImage(image string, stdout io.WriteCloser) error {
ctx, cancel := m.getTimeoutContext()
defer cancel()
out, err := m.client.ImagePull(ctx, image, types.ImagePullOptions{})
func (m *DebugAttacher) PullImage(image string, stdout io.WriteCloser) error {
// image pull can be time consuming, just pass the request context
out, err := m.client.ImagePull(m.context, image, types.ImagePullOptions{})
if err != nil {
return err
}
defer out.Close()
// write pull progress to user
io.Copy(stdout, out)
term.DisplayDockerJsonStream(out, stdout)
return nil
}

func (m *RuntimeManager) CleanContainer(id string) {
ctx, cancel := m.getTimeoutContext()
func (m *DebugAttacher) CleanContainer(id string) {
// cleanup procedure should use background context
ctx, cancel := context.WithTimeout(context.Background(), m.runtime.timeout)
defer cancel()
// wait the container gracefully exit
statusCh, errCh := m.client.ContainerWait(ctx, id, container.WaitConditionNotRunning)
Expand All @@ -164,8 +209,9 @@ func (m *RuntimeManager) CleanContainer(id string) {
}
}

func (m *RuntimeManager) RmContainer(id string, force bool) error {
ctx, cancel := m.getTimeoutContext()
func (m *DebugAttacher) RmContainer(id string, force bool) error {
// cleanup procedure should use background context
ctx, cancel := context.WithTimeout(context.Background(), m.runtime.timeout)
defer cancel()
err := m.client.ContainerRemove(ctx, id,
types.ContainerRemoveOptions{
Expand All @@ -178,7 +224,7 @@ func (m *RuntimeManager) RmContainer(id string, force bool) error {
}

// AttachToContainer do `docker attach`
func (m *RuntimeManager) AttachToContainer(container string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
func (m *DebugAttacher) AttachToContainer(container string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
HandleResizing(resize, func(size remotecommand.TerminalSize) {
m.resizeContainerTTY(container, uint(size.Height), uint(size.Width))
})
Expand All @@ -195,7 +241,7 @@ func (m *RuntimeManager) AttachToContainer(container string, stdin io.Reader, st
ErrorStream: stderr,
RawTerminal: tty,
}
ctx, cancel := m.getTimeoutContext()
ctx, cancel := m.getContextWithTimeout()
defer cancel()
resp, err := m.client.ContainerAttach(ctx, container, opts)
if err != nil {
Expand All @@ -208,7 +254,7 @@ func (m *RuntimeManager) AttachToContainer(container string, stdin io.Reader, st

// holdHijackedConnection hold the HijackedResponse, redirect the inputStream to the connection, and redirect the response
// stream to stdout and stderr. NOTE: If needed, we could also add context in this function.
func (m *RuntimeManager) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp types.HijackedResponse) error {
func (m *DebugAttacher) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp types.HijackedResponse) error {
receiveStdout := make(chan error)
if outputStream != nil || errorStream != nil {
go func() {
Expand Down Expand Up @@ -236,7 +282,7 @@ func (m *RuntimeManager) holdHijackedConnection(tty bool, inputStream io.Reader,
return nil
}

func (m *RuntimeManager) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {
func (m *DebugAttacher) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {
if outputStream == nil {
outputStream = ioutil.Discard
}
Expand All @@ -252,19 +298,19 @@ func (m *RuntimeManager) redirectResponseToOutputStream(tty bool, outputStream,
return err
}

func (m *RuntimeManager) resizeContainerTTY(id string, height, width uint) error {
ctx, cancel := m.getTimeoutContext()
func (m *DebugAttacher) resizeContainerTTY(id string, height, width uint) error {
ctx, cancel := m.getContextWithTimeout()
defer cancel()
return m.client.ContainerResize(ctx, id, types.ResizeOptions{
Height: height,
Width: width,
})
}

func (m *RuntimeManager) getTimeoutContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), m.timeout)
func (m *DebugAttacher) getContextWithTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(m.context, m.runtime.timeout)
}

func (m *RuntimeManager) containerMode(id string) string {
func (m *DebugAttacher) containerMode(id string) string {
return fmt.Sprintf("container:%s", id)
}
5 changes: 4 additions & 1 deletion pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,14 @@ func (s *Server) ServeDebug(w http.ResponseWriter, req *http.Request) {
TTY: true,
}

context, cancel := context.WithCancel(req.Context())
defer cancel()

// replace Attacher implementation to hook the ServeAttach procedure
kubeletremote.ServeAttach(
w,
req,
s.runtimeApi.GetAttacher(image, commandSlice),
s.runtimeApi.GetAttacher(image, commandSlice, context, cancel),
"",
"",
dockerContainerId,
Expand Down
5 changes: 3 additions & 2 deletions pkg/plugin/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,13 @@ func (o *DebugOptions) Validate() error {
}

func (o *DebugOptions) Run() error {
fmt.Printf("Run command, namespace: %s, podName: %s, command: %v \n\r", o.Namespace, o.PodName, o.Command)

pod, err := o.PodClient.Pods(o.Namespace).Get(o.PodName, v1.GetOptions{})
if err != nil {
return err
}
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
return fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
return fmt.Errorf("cannot debug in a completed pod; current phase is %s", pod.Status.Phase)
}
hostIP := pod.Status.HostIP

Expand Down Expand Up @@ -175,6 +174,8 @@ func (o *DebugOptions) Run() error {
o.ErrOut = nil
}

hostIP = "localhost"
containerId = "docker://2a69a97f73720793a14a5e4bef3480a43c7c656310a100ef6260820bd872bb08"
fn := func() error {

// TODO: refactor as kubernetes api style, reuse rbac mechanism of kubernetes
Expand Down
61 changes: 61 additions & 0 deletions pkg/util/jsonstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package term

import (
"encoding/json"
"fmt"
"io"
)

type JSONError struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}

type JSONMessage struct {
Status string `json:"status,omitempty"`
ID string `json:"id,omitempty"`
From string `json:"from,omitempty"`
Time int64 `json:"time,omitempty"`
TimeNano int64 `json:"timeNano,omitempty"`
ProgressMessage string `json:"progress,omitempty"`
Error *JSONError `json:"errorDetail,omitempty"`
}

func (jm *JSONMessage) Display(out io.Writer) error {

if jm.Error != nil {
fmt.Fprintf(out, "error pulling image, %s\n\r", jm.Error.Message)
return nil
}
// do not display progress bar
if len(jm.ProgressMessage) > 0 {
return nil
}
fmt.Fprintf(out, "\t%s %s \n\r", jm.Status, jm.ID)

return nil
}

// DisplayJsonStream parse the json input from `in` and pipe the necessary message to `out`
func DisplayDockerJsonStream(in io.Reader, out io.Writer) error {

var (
dec = json.NewDecoder(in)
)

for {
var jm JSONMessage
if err := dec.Decode(&jm); err != nil {
if err == io.EOF {
break
}
return err
}
err := jm.Display(out)
if err != nil {
return err
}
}

return nil
}

0 comments on commit e00f740

Please sign in to comment.