diff --git a/pkg/agent/runtime.go b/pkg/agent/runtime.go index afd347e..6a5d424 100644 --- a/pkg/agent/runtime.go +++ b/pkg/agent/runtime.go @@ -3,6 +3,7 @@ package agent import ( "context" "fmt" + "github.com/aylei/kubectl-debug/pkg/util" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/strslice" @@ -41,24 +42,64 @@ type DebugAttacher struct { runtime *RuntimeManager image string command []string + client *dockerclient.Client + + // control the preparing of debug container + stopListenEOF chan struct{} + context context.Context + cancel context.CancelFunc } func (a *DebugAttacher) AttachContainer(name string, uid kubetype.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - return a.runtime.DebugContainer(container, a.image, a.command, in, out, err, tty, resize) + return a.DebugContainer(container, a.image, a.command, in, out, err, tty, resize) } // GetAttacher returns an implementation of Attacher -func (m *RuntimeManager) GetAttacher(image string, command []string) kubeletremote.Attacher { - return &DebugAttacher{runtime: m, image: image, command: command} +func (m *RuntimeManager) GetAttacher(image string, command []string, context context.Context, cancel context.CancelFunc) kubeletremote.Attacher { + return &DebugAttacher{ + runtime: m, + image: image, + command: command, + context: context, + client: m.client, + cancel: cancel, + stopListenEOF: make(chan struct{}), + } } // DebugContainer executes the main debug flow -func (m *RuntimeManager) DebugContainer(container, image string, command []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { +func (m *DebugAttacher) DebugContainer(container, image string, command []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { log.Printf("Accept new debug reqeust:\n\t target container: %s \n\t image: %s \n\t command: %v \n", container, image, command) + // the following steps may takes much time, + // so we listen to EOF from stdin + // which helps user to terminate the procedure proactively + + // FIXME: the following logic will 'eat' a character + //var buf bytes.Buffer + //tee := io.TeeReader(stdin, &buf) + //go func() { + // p := make([]byte, 4) + // OUTER: + // for { + // select { + // case <- m.stopListenEOF: + // break OUTER + // default: + // n, err := tee.Read(p) + // // 4 -> EOT + // if (n > 0 && binary.LittleEndian.Uint32(p) == 4) || err == io.EOF { + // log.Println("receive ctrl-d or EOF when preparing debug container, cancel session") + // m.cancel() + // break OUTER + // } + // } + // } + //} () + // step 1: pull image - stdout.Write([]byte(fmt.Sprintf("pulling image %s ...\n\r", image))) + stdout.Write([]byte(fmt.Sprintf("pulling image %s... \n\r", image))) err := m.PullImage(image, stdout) if err != nil { return err @@ -74,6 +115,10 @@ func (m *RuntimeManager) DebugContainer(container, image string, command []strin // step 3: attach tty stdout.Write([]byte("container created, open tty...\n\r")) + + // from now on, should pipe stdin to the container and no long read stdin + // close(m.stopListenEOF) + if err := m.AttachToContainer(id, stdin, stdout, stderr, tty, resize); err != nil { return err } @@ -82,7 +127,7 @@ func (m *RuntimeManager) DebugContainer(container, image string, command []strin // Run a new container, this container will join the network, // mount, and pid namespace of the given container -func (m *RuntimeManager) RunDebugContainer(targetId string, image string, command []string) (string, error) { +func (m *DebugAttacher) RunDebugContainer(targetId string, image string, command []string) (string, error) { createdBody, err := m.CreateContainer(targetId, image, command) if err != nil { @@ -94,8 +139,8 @@ func (m *RuntimeManager) RunDebugContainer(targetId string, image string, comman return createdBody.ID, nil } -func (m *RuntimeManager) StartContainer(id string) error { - ctx, cancel := m.getTimeoutContext() +func (m *DebugAttacher) StartContainer(id string) error { + ctx, cancel := m.getContextWithTimeout() defer cancel() err := m.client.ContainerStart(ctx, id, types.ContainerStartOptions{}) if err != nil { @@ -104,7 +149,7 @@ func (m *RuntimeManager) StartContainer(id string) error { return nil } -func (m *RuntimeManager) CreateContainer(targetId string, image string, command []string) (*container.ContainerCreateCreatedBody, error) { +func (m *DebugAttacher) CreateContainer(targetId string, image string, command []string) (*container.ContainerCreateCreatedBody, error) { config := &container.Config{ Entrypoint: strslice.StrSlice(command), @@ -119,7 +164,7 @@ func (m *RuntimeManager) CreateContainer(targetId string, image string, command IpcMode: container.IpcMode(m.containerMode(targetId)), PidMode: container.PidMode(m.containerMode(targetId)), } - ctx, cancel := m.getTimeoutContext() + ctx, cancel := m.getContextWithTimeout() defer cancel() body, err := m.client.ContainerCreate(ctx, config, hostConfig, nil, "") if err != nil { @@ -128,21 +173,21 @@ func (m *RuntimeManager) CreateContainer(targetId string, image string, command return &body, nil } -func (m *RuntimeManager) PullImage(image string, stdout io.WriteCloser) error { - ctx, cancel := m.getTimeoutContext() - defer cancel() - out, err := m.client.ImagePull(ctx, image, types.ImagePullOptions{}) +func (m *DebugAttacher) PullImage(image string, stdout io.WriteCloser) error { + // image pull can be time consuming, just pass the request context + out, err := m.client.ImagePull(m.context, image, types.ImagePullOptions{}) if err != nil { return err } defer out.Close() // write pull progress to user - io.Copy(stdout, out) + term.DisplayDockerJsonStream(out, stdout) return nil } -func (m *RuntimeManager) CleanContainer(id string) { - ctx, cancel := m.getTimeoutContext() +func (m *DebugAttacher) CleanContainer(id string) { + // cleanup procedure should use background context + ctx, cancel := context.WithTimeout(context.Background(), m.runtime.timeout) defer cancel() // wait the container gracefully exit statusCh, errCh := m.client.ContainerWait(ctx, id, container.WaitConditionNotRunning) @@ -164,8 +209,9 @@ func (m *RuntimeManager) CleanContainer(id string) { } } -func (m *RuntimeManager) RmContainer(id string, force bool) error { - ctx, cancel := m.getTimeoutContext() +func (m *DebugAttacher) RmContainer(id string, force bool) error { + // cleanup procedure should use background context + ctx, cancel := context.WithTimeout(context.Background(), m.runtime.timeout) defer cancel() err := m.client.ContainerRemove(ctx, id, types.ContainerRemoveOptions{ @@ -178,7 +224,7 @@ func (m *RuntimeManager) RmContainer(id string, force bool) error { } // AttachToContainer do `docker attach` -func (m *RuntimeManager) AttachToContainer(container string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { +func (m *DebugAttacher) AttachToContainer(container string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { HandleResizing(resize, func(size remotecommand.TerminalSize) { m.resizeContainerTTY(container, uint(size.Height), uint(size.Width)) }) @@ -195,7 +241,7 @@ func (m *RuntimeManager) AttachToContainer(container string, stdin io.Reader, st ErrorStream: stderr, RawTerminal: tty, } - ctx, cancel := m.getTimeoutContext() + ctx, cancel := m.getContextWithTimeout() defer cancel() resp, err := m.client.ContainerAttach(ctx, container, opts) if err != nil { @@ -208,7 +254,7 @@ func (m *RuntimeManager) AttachToContainer(container string, stdin io.Reader, st // holdHijackedConnection hold the HijackedResponse, redirect the inputStream to the connection, and redirect the response // stream to stdout and stderr. NOTE: If needed, we could also add context in this function. -func (m *RuntimeManager) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp types.HijackedResponse) error { +func (m *DebugAttacher) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp types.HijackedResponse) error { receiveStdout := make(chan error) if outputStream != nil || errorStream != nil { go func() { @@ -236,7 +282,7 @@ func (m *RuntimeManager) holdHijackedConnection(tty bool, inputStream io.Reader, return nil } -func (m *RuntimeManager) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error { +func (m *DebugAttacher) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error { if outputStream == nil { outputStream = ioutil.Discard } @@ -252,8 +298,8 @@ func (m *RuntimeManager) redirectResponseToOutputStream(tty bool, outputStream, return err } -func (m *RuntimeManager) resizeContainerTTY(id string, height, width uint) error { - ctx, cancel := m.getTimeoutContext() +func (m *DebugAttacher) resizeContainerTTY(id string, height, width uint) error { + ctx, cancel := m.getContextWithTimeout() defer cancel() return m.client.ContainerResize(ctx, id, types.ResizeOptions{ Height: height, @@ -261,10 +307,10 @@ func (m *RuntimeManager) resizeContainerTTY(id string, height, width uint) error }) } -func (m *RuntimeManager) getTimeoutContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), m.timeout) +func (m *DebugAttacher) getContextWithTimeout() (context.Context, context.CancelFunc) { + return context.WithTimeout(m.context, m.runtime.timeout) } -func (m *RuntimeManager) containerMode(id string) string { +func (m *DebugAttacher) containerMode(id string) string { return fmt.Sprintf("container:%s", id) } diff --git a/pkg/agent/server.go b/pkg/agent/server.go index 2acfb2f..fb39305 100644 --- a/pkg/agent/server.go +++ b/pkg/agent/server.go @@ -97,11 +97,14 @@ func (s *Server) ServeDebug(w http.ResponseWriter, req *http.Request) { TTY: true, } + context, cancel := context.WithCancel(req.Context()) + defer cancel() + // replace Attacher implementation to hook the ServeAttach procedure kubeletremote.ServeAttach( w, req, - s.runtimeApi.GetAttacher(image, commandSlice), + s.runtimeApi.GetAttacher(image, commandSlice, context, cancel), "", "", dockerContainerId, diff --git a/pkg/plugin/cmd.go b/pkg/plugin/cmd.go index 630efb8..1edc4d6 100644 --- a/pkg/plugin/cmd.go +++ b/pkg/plugin/cmd.go @@ -140,14 +140,13 @@ func (o *DebugOptions) Validate() error { } func (o *DebugOptions) Run() error { - fmt.Printf("Run command, namespace: %s, podName: %s, command: %v \n\r", o.Namespace, o.PodName, o.Command) pod, err := o.PodClient.Pods(o.Namespace).Get(o.PodName, v1.GetOptions{}) if err != nil { return err } if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - return fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase) + return fmt.Errorf("cannot debug in a completed pod; current phase is %s", pod.Status.Phase) } hostIP := pod.Status.HostIP @@ -175,6 +174,8 @@ func (o *DebugOptions) Run() error { o.ErrOut = nil } + hostIP = "localhost" + containerId = "docker://2a69a97f73720793a14a5e4bef3480a43c7c656310a100ef6260820bd872bb08" fn := func() error { // TODO: refactor as kubernetes api style, reuse rbac mechanism of kubernetes diff --git a/pkg/util/jsonstream.go b/pkg/util/jsonstream.go new file mode 100644 index 0000000..364cac2 --- /dev/null +++ b/pkg/util/jsonstream.go @@ -0,0 +1,61 @@ +package term + +import ( + "encoding/json" + "fmt" + "io" +) + +type JSONError struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` +} + +type JSONMessage struct { + Status string `json:"status,omitempty"` + ID string `json:"id,omitempty"` + From string `json:"from,omitempty"` + Time int64 `json:"time,omitempty"` + TimeNano int64 `json:"timeNano,omitempty"` + ProgressMessage string `json:"progress,omitempty"` + Error *JSONError `json:"errorDetail,omitempty"` +} + +func (jm *JSONMessage) Display(out io.Writer) error { + + if jm.Error != nil { + fmt.Fprintf(out, "error pulling image, %s\n\r", jm.Error.Message) + return nil + } + // do not display progress bar + if len(jm.ProgressMessage) > 0 { + return nil + } + fmt.Fprintf(out, "\t%s %s \n\r", jm.Status, jm.ID) + + return nil +} + +// DisplayJsonStream parse the json input from `in` and pipe the necessary message to `out` +func DisplayDockerJsonStream(in io.Reader, out io.Writer) error { + + var ( + dec = json.NewDecoder(in) + ) + + for { + var jm JSONMessage + if err := dec.Decode(&jm); err != nil { + if err == io.EOF { + break + } + return err + } + err := jm.Display(out) + if err != nil { + return err + } + } + + return nil +}