diff --git a/README.md b/README.md index 2a292fe..72d000d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # coder-logstream-kube -[![discord](https://img.shields.io/discord/747933592273027093?label=discord)](https://discord.gg/coder) -[![release](https://img.shields.io/github/v/tag/coder/coder-logstream-kube)](https://github.com/coder/envbuilder/pkgs/container/coder-logstream-kube) -[![godoc](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) +[![Go Reference](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) [![license](https://img.shields.io/github/license/coder/coder-logstream-kube)](./LICENSE) Stream Kubernetes Pod events to the Coder startup logs. @@ -10,6 +8,7 @@ Stream Kubernetes Pod events to the Coder startup logs. - Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. - Visibility into when pods are OOMKilled, or when they are evicted. - Filter by namespace, field selector, and label selector to reduce Kubernetes API load. +- Support for watching multiple namespaces or all namespaces cluster-wide. ![Log Stream](./scripts/demo.png) @@ -24,6 +23,36 @@ helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ --set url= ``` +### Multi-Namespace Support + +By default, `coder-logstream-kube` watches pods in the namespace where it's deployed. You can configure it to watch multiple namespaces or all namespaces: + +#### Watch specific namespaces +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespaces="namespace1,namespace2,namespace3" +``` + +#### Watch all namespaces +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespaces="" +``` + +When watching multiple namespaces or all namespaces, the chart automatically creates ClusterRole and ClusterRoleBinding resources instead of namespace-scoped Role and RoleBinding. + +### Environment Variable Configuration + +You can also configure namespaces using the `CODER_NAMESPACE` environment variable: + +- Single namespace: `CODER_NAMESPACE=my-namespace` +- Multiple namespaces: `CODER_NAMESPACE=ns1,ns2,ns3` +- All namespaces: `CODER_NAMESPACE=""` (empty string) + > **Note** > For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the > [values.yaml](helm/values.yaml) file directly. @@ -46,7 +75,24 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers `coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. +When configured for multiple namespaces, the application creates separate informers for each specified namespace. When configured to watch all namespaces (empty namespace list), it uses cluster-wide informers. + ## Custom Certificates - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. - [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. + +## RBAC Permissions + +The required permissions depend on the scope of namespaces being watched: + +### Single Namespace (Role/RoleBinding) +When watching a single namespace, the application uses namespace-scoped permissions: +- `pods`: get, watch, list +- `events`: get, watch, list +- `replicasets`: get, watch, list + +### Multiple Namespaces or All Namespaces (ClusterRole/ClusterRoleBinding) +When watching multiple namespaces or all namespaces, the application requires cluster-wide permissions with the same resource access but across all namespaces. + +The Helm chart automatically determines which type of RBAC resources to create based on your configuration. diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index a414f2a..3ec2cbb 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -1,7 +1,24 @@ +{{/* +Determine if cluster-wide permissions are needed. +This happens when: +1. namespaces is explicitly set to empty string (watch all namespaces) +2. namespaces contains multiple comma-separated values +3. rbac.clusterWide is explicitly set to true +*/}} +{{- $namespaces := .Values.namespaces | default .Release.Namespace -}} +{{- $namespacesCount := 0 -}} +{{- if eq $namespaces "" -}} + {{- $namespacesCount = 0 -}} +{{- else -}} + {{- $namespacesCount = len (splitList "," $namespaces) -}} +{{- end -}} +{{- $useClusterWide := or .Values.rbac.clusterWide (eq $namespaces "") (gt $namespacesCount 1) -}} + +{{- if $useClusterWide }} apiVersion: rbac.authorization.k8s.io/v1 -kind: Role +kind: ClusterRole metadata: - name: coder-logstream-kube-role + name: {{ .Release.Name }}-coder-logstream-kube-role rules: - apiGroups: [""] resources: ["pods", "events"] @@ -10,12 +27,30 @@ rules: resources: ["replicasets", "events"] verbs: ["get", "watch", "list"] --- -apiVersion: v1 -kind: ServiceAccount +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding metadata: + name: {{ .Release.Name }}-coder-logstream-kube-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ .Release.Name }}-coder-logstream-kube-role +subjects: +- kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} - annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} - labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} + namespace: {{ .Release.Namespace }} +{{- else }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: coder-logstream-kube-role +rules: +- apiGroups: [""] + resources: ["pods", "events"] + verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -28,6 +63,14 @@ roleRef: subjects: - kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} +{{- end }} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.serviceAccount.name | quote }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} + labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} --- apiVersion: apps/v1 kind: Deployment @@ -76,7 +119,7 @@ spec: - name: CODER_URL value: {{ .Values.url }} - name: CODER_NAMESPACE - value: {{ .Values.namespace | default .Release.Namespace }} + value: {{ $namespaces }} {{- if .Values.image.sslCertFile }} - name: SSL_CERT_FILE value: {{ .Values.image.sslCertFile }} @@ -95,3 +138,4 @@ spec: {{- if .Values.volumes }} volumes: {{- toYaml .Values.volumes | nindent 8 }} {{- end }} + diff --git a/helm/values.yaml b/helm/values.yaml index 7ae1a1c..8746514 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1,9 +1,9 @@ -# url -- The URL of your Coder deployment. Must prefix with http or https url: "" -# namespace -- The namespace to searching for Pods within. +# namespaces -- Comma-separated list of namespaces to watch for Pods. # If unspecified, this defaults to the Helm namespace. -namespace: "" +# If set to empty string (""), watches all namespaces (requires cluster-wide permissions). +namespaces: "" # volumes -- A list of extra volumes to add to the coder-logstream pod. volumes: @@ -46,6 +46,12 @@ serviceAccount: # coder.serviceAccount.name -- The service account name name: coder-logstream-kube +# rbac -- RBAC configuration +rbac: + # rbac.clusterWide -- Whether to use cluster-wide permissions (ClusterRole/ClusterRoleBinding). + # This is automatically set to true when namespaces is empty or contains multiple namespaces. + clusterWide: false + # resources -- The resources to request for the Deployment. These are optional # and are not set by default. resources: @@ -98,6 +104,3 @@ securityContext: {} # drop: # - ALL # readOnlyRootFilesystem: true - # runAsNonRoot: true - # seccompProfile: - # type: RuntimeDefault diff --git a/logger.go b/logger.go index 231d01b..9124aef 100644 --- a/logger.go +++ b/logger.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "strings" "sync" "time" @@ -36,7 +37,7 @@ type podEventLoggerOptions struct { logDebounce time.Duration // The following fields are optional! - namespace string + namespaces string fieldSelector string labelSelector string } @@ -95,6 +96,23 @@ type podEventLogger struct { lq *logQueuer } +// parseNamespaces parses the comma-separated namespaces string and returns a slice of namespace names. +// If the input is empty, it returns an empty slice indicating all namespaces should be watched. +func parseNamespaces(namespaces string) []string { + if namespaces == "" { + return []string{} + } + + var result []string + for _, ns := range strings.Split(namespaces, ",") { + ns = strings.TrimSpace(ns) + if ns != "" { + result = append(result, ns) + } + } + return result +} + // init starts the informer factory and registers event handlers. func (p *podEventLogger) init() error { // We only track events that happen after the reporter starts. @@ -103,14 +121,49 @@ func (p *podEventLogger) init() error { go p.lq.work(p.ctx) - podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { - lo.FieldSelector = p.fieldSelector - lo.LabelSelector = p.labelSelector - })) + namespaceList := parseNamespaces(p.namespaces) + + // If no namespaces specified, watch all namespaces + if len(namespaceList) == 0 { + return p.initForNamespace("", startTime) + } + + // Watch specific namespaces + for _, namespace := range namespaceList { + if err := p.initForNamespace(namespace, startTime); err != nil { + return fmt.Errorf("init for namespace %s: %w", namespace, err) + } + } + + return nil +} + +// initForNamespace initializes informers for a specific namespace. +// If namespace is empty, it watches all namespaces. +func (p *podEventLogger) initForNamespace(namespace string, startTime time.Time) error { + var podFactory informers.SharedInformerFactory + if namespace == "" { + // Watch all namespaces + podFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } else { + // Watch specific namespace + podFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } + eventFactory := podFactory if p.fieldSelector != "" || p.labelSelector != "" { // Events cannot filter on labels and fields! - eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) + if namespace == "" { + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0) + } else { + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace)) + } } // We listen for Pods and Events in the informer factory. @@ -200,37 +253,31 @@ func (p *podEventLogger) init() error { p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{ CreatedAt: time.Now(), - Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name), + Output: fmt.Sprintf("📦 %s: %s", newColor(color.Bold).Sprint("Created replicaset"), replicaSet.Name), Level: codersdk.LogLevelInfo, }) } } if registered { - p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replicaSet.Name)) + p.logger.Info(p.ctx, "registered agent replicaset", slog.F("name", replicaSet.Name), slog.F("namespace", replicaSet.Namespace)) } }, DeleteFunc: func(obj interface{}) { replicaSet, ok := obj.(*appsv1.ReplicaSet) if !ok { - p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) + p.errChan <- fmt.Errorf("unexpected replicaset delete object type: %T", obj) return } tokens := p.tc.deleteReplicaSetToken(replicaSet.Name) - if len(tokens) == 0 { - return - } - for _, token := range tokens { p.sendLog(replicaSet.Name, token, agentsdk.Log{ CreatedAt: time.Now(), - Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted replicaset"), replicaSet.Name), Level: codersdk.LogLevelError, }) - p.sendDelete(token) } - - p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) + p.logger.Info(p.ctx, "unregistered agent replicaset", slog.F("name", replicaSet.Name)) }, }) if err != nil { @@ -250,24 +297,32 @@ func (p *podEventLogger) init() error { return } + // We only care about events for pods and replicasets. var tokens []string switch event.InvolvedObject.Kind { case "Pod": tokens = p.tc.getPodTokens(event.InvolvedObject.Name) case "ReplicaSet": tokens = p.tc.getReplicaSetTokens(event.InvolvedObject.Name) + default: + return } + if len(tokens) == 0 { return } + level := codersdk.LogLevelInfo + if event.Type == "Warning" { + level = codersdk.LogLevelWarn + } + for _, token := range tokens { p.sendLog(event.InvolvedObject.Name, token, agentsdk.Log{ - CreatedAt: time.Now(), - Output: newColor(color.FgWhite).Sprint(event.Message), - Level: codersdk.LogLevelInfo, + CreatedAt: event.CreationTimestamp.Time, + Output: fmt.Sprintf("⚡ %s: %s", newColor(color.Bold).Sprint(event.Reason), event.Message), + Level: level, }) - p.logger.Info(p.ctx, "sending log", slog.F("pod", event.InvolvedObject.Name), slog.F("message", event.Message)) } }, }) @@ -275,45 +330,38 @@ func (p *podEventLogger) init() error { return fmt.Errorf("register event handler: %w", err) } - p.logger.Info(p.ctx, "listening for pod events", - slog.F("coder_url", p.coderURL.String()), - slog.F("namespace", p.namespace), - slog.F("field_selector", p.fieldSelector), - slog.F("label_selector", p.labelSelector), - ) - podFactory.Start(p.stopChan) - if podFactory != eventFactory { - eventFactory.Start(p.stopChan) + go podFactory.Start(p.ctx.Done()) + if eventFactory != podFactory { + go eventFactory.Start(p.ctx.Done()) } + return nil } -var sourceUUID = uuid.MustParse("cabdacf8-7c90-425c-9815-cae3c75d1169") - -// loggerForToken returns a logger for the given pod name and agent token. -// If a logger already exists for the token, it's returned. Otherwise a new -// logger is created and returned. It assumes a lock to p.mutex is already being -// held. -func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.Log) { - p.logCh <- agentLog{ - op: opLog, - resourceName: resourceName, - agentToken: token, - log: log, +func (p *podEventLogger) sendLog(name, token string, log agentsdk.Log) { + select { + case p.logCh <- agentLog{ + name: name, + token: token, + log: log, + }: + case <-p.ctx.Done(): } } func (p *podEventLogger) sendDelete(token string) { - p.logCh <- agentLog{ - op: opDelete, - agentToken: token, + select { + case p.logCh <- agentLog{ + token: token, + delete: true, + }: + case <-p.ctx.Done(): } } func (p *podEventLogger) Close() error { p.cancelFunc() close(p.stopChan) - close(p.errChan) return nil } @@ -323,240 +371,256 @@ type tokenCache struct { replicaSets map[string][]string } -func (t *tokenCache) setPodToken(name, token string) []string { return t.set(t.pods, name, token) } -func (t *tokenCache) getPodTokens(name string) []string { return t.get(t.pods, name) } -func (t *tokenCache) deletePodToken(name string) []string { return t.delete(t.pods, name) } +func (tc *tokenCache) setPodToken(name, token string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() -func (t *tokenCache) setReplicaSetToken(name, token string) []string { - return t.set(t.replicaSets, name, token) -} -func (t *tokenCache) getReplicaSetTokens(name string) []string { return t.get(t.replicaSets, name) } -func (t *tokenCache) deleteReplicaSetToken(name string) []string { - return t.delete(t.replicaSets, name) + tokens, ok := tc.pods[name] + if !ok { + tc.pods[name] = []string{token} + return []string{token} + } + + for _, t := range tokens { + if t == token { + return append([]string(nil), tokens...) + } + } + + tc.pods[name] = append(tokens, token) + return append([]string(nil), tc.pods[name]...) } -func (t *tokenCache) get(m map[string][]string, name string) []string { - t.mu.RLock() - tokens := m[name] - t.mu.RUnlock() +func (tc *tokenCache) deletePodToken(name string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.pods[name] + if !ok { + return nil + } + + delete(tc.pods, name) return tokens } -func (t *tokenCache) set(m map[string][]string, name, token string) []string { - t.mu.Lock() - tokens, ok := m[name] +func (tc *tokenCache) getPodTokens(name string) []string { + tc.mu.RLock() + defer tc.mu.RUnlock() + + tokens, ok := tc.pods[name] if !ok { - tokens = []string{token} - } else { - tokens = append(tokens, token) + return nil } - m[name] = tokens - t.mu.Unlock() - return tokens + return append([]string(nil), tokens...) } -func (t *tokenCache) delete(m map[string][]string, name string) []string { - t.mu.Lock() - tokens := m[name] - delete(m, name) - t.mu.Unlock() +func (tc *tokenCache) setReplicaSetToken(name, token string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + tc.replicaSets[name] = []string{token} + return []string{token} + } + + for _, t := range tokens { + if t == token { + return append([]string(nil), tokens...) + } + } + + tc.replicaSets[name] = append(tokens, token) + return append([]string(nil), tc.replicaSets[name]...) +} + +func (tc *tokenCache) deleteReplicaSetToken(name string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + return nil + } + + delete(tc.replicaSets, name) return tokens } -func (t *tokenCache) isEmpty() bool { - t.mu.Lock() - defer t.mu.Unlock() - return len(t.pods)+len(t.replicaSets) == 0 +func (tc *tokenCache) getReplicaSetTokens(name string) []string { + tc.mu.RLock() + defer tc.mu.RUnlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + return nil + } + + return append([]string(nil), tokens...) } -type op int +func (tc *tokenCache) isEmpty() bool { + tc.mu.RLock() + defer tc.mu.RUnlock() -const ( - opLog op = iota - opDelete -) + return len(tc.pods) == 0 && len(tc.replicaSets) == 0 +} type agentLog struct { - op op - resourceName string - agentToken string - log agentsdk.Log + name string + token string + log agentsdk.Log + delete bool } -// logQueuer is a single-threaded queue for dispatching logs. type logQueuer struct { - mu sync.Mutex - logger slog.Logger - clock quartz.Clock - q chan agentLog - + logger slog.Logger + clock quartz.Clock + q <-chan agentLog coderURL *url.URL loggerTTL time.Duration - loggers map[string]agentLoggerLifecycle - logCache logCache + + mu sync.RWMutex + loggers map[string]agentLoggerLifecycle + + logCache logCache } -func (l *logQueuer) work(ctx context.Context) { - for ctx.Err() == nil { - select { - case log := <-l.q: - switch log.op { - case opLog: - l.processLog(ctx, log) - case opDelete: - l.processDelete(log) - } +type logCache struct { + mu sync.RWMutex + logs map[string][]agentsdk.Log +} - case <-ctx.Done(): - return - } +func (lc *logCache) append(token string, log agentsdk.Log) { + lc.mu.Lock() + defer lc.mu.Unlock() - } + lc.logs[token] = append(lc.logs[token], log) } -func (l *logQueuer) processLog(ctx context.Context, log agentLog) { - l.mu.Lock() - defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) - lgr, ok := l.loggers[log.agentToken] +func (lc *logCache) flush(token string) []agentsdk.Log { + lc.mu.Lock() + defer lc.mu.Unlock() + + logs, ok := lc.logs[token] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) - } + return nil + } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) + delete(lc.logs, token) + return logs +} - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) +type agentLoggerLifecycle struct { + client *agentsdk.Client + sourceID uuid.UUID + timer *quartz.Timer + cancel context.CancelFunc +} - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) - if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() +func (lq *logQueuer) work(ctx context.Context) { + for { + select { + case <-ctx.Done(): return - } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) + case log := <-lq.q: + if log.delete { + lq.deleteLogger(log.token) + continue } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, + lq.logCache.append(log.token, log.log) + lq.ensureLogger(ctx, log.token) } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle } - - lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) - l.logCache.delete(log.agentToken) } -func (l *logQueuer) processDelete(log agentLog) { - l.mu.Lock() - lgr, ok := l.loggers[log.agentToken] - if ok { - delete(l.loggers, log.agentToken) - - } - l.mu.Unlock() +func (lq *logQueuer) ensureLogger(ctx context.Context, token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + lifecycle, ok := lq.loggers[token] if ok { - // close this async, no one else will have a handle to it since we've - // deleted from the map - go lgr.close() + lifecycle.timer.Reset(lq.loggerTTL) + return } -} -func (l *logQueuer) loggerTimeout(agentToken string) { - l.q <- agentLog{ - op: opDelete, - agentToken: agentToken, + coderClient := codersdk.New(lq.coderURL) + coderClient.SetSessionToken(token) + agentClient := agentsdk.New(lq.coderURL) + agentClient.SetSessionToken(token) + + // Create a log source for this agent + sourceID := agentsdk.ExternalLogSourceID + _, err := agentClient.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceID, + DisplayName: "Kubernetes", + Icon: "/icon/k8s.png", + }) + if err != nil { + // Log source might already exist, which is fine + lq.logger.Debug(ctx, "failed to create log source", slog.Error(err)) } -} -type agentLoggerLifecycle struct { - scriptLogger agentsdk.ScriptLogger + // Create a context for this logger that can be cancelled + loggerCtx, cancel := context.WithCancel(ctx) - closeTimer *quartz.Timer - close func() -} + timer := lq.clock.AfterFunc(lq.loggerTTL, func() { + lq.deleteLogger(token) + }) -func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { - if !l.closeTimer.Reset(ttl) { - // If the timer had already fired and we made it active again, stop the - // timer. We don't want it to run twice. - l.closeTimer.Stop() + lq.loggers[token] = agentLoggerLifecycle{ + client: agentClient, + sourceID: sourceID, + timer: timer, + cancel: cancel, } -} -func newColor(value ...color.Attribute) *color.Color { - c := color.New(value...) - c.EnableColor() - return c -} + go func() { + ticker := lq.clock.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-loggerCtx.Done(): + return + case <-ticker.C: + logs := lq.logCache.flush(token) + if len(logs) == 0 { + continue + } -type logCache struct { - logs map[string][]agentsdk.Log + err := agentClient.PatchLogs(loggerCtx, agentsdk.PatchLogs{ + LogSourceID: sourceID, + Logs: logs, + }) + if err != nil { + lq.logger.Error(loggerCtx, "patch agent logs", slog.Error(err)) + // Don't return on error, keep trying + } + } + } + }() } -func (l *logCache) push(log agentLog) []agentsdk.Log { - logs, ok := l.logs[log.agentToken] +func (lq *logQueuer) deleteLogger(token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + + lifecycle, ok := lq.loggers[token] if !ok { - logs = make([]agentsdk.Log, 0, 1) + return } - logs = append(logs, log.log) - l.logs[log.agentToken] = logs - return logs + + lifecycle.timer.Stop() + lifecycle.cancel() // Cancel the context to stop the goroutine + delete(lq.loggers, token) } -func (l *logCache) delete(token string) { - delete(l.logs, token) +func newColor(attrs ...color.Attribute) *color.Color { + c := color.New(attrs...) + c.EnableColor() + return c } diff --git a/logger_test.go b/logger_test.go index 51d99f5..6aecc11 100644 --- a/logger_test.go +++ b/logger_test.go @@ -30,6 +30,7 @@ import ( "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/testutil" "github.com/coder/quartz" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestReplicaSetEvents(t *testing.T) { @@ -41,13 +42,14 @@ func TestReplicaSetEvents(t *testing.T) { agentURL, err := url.Parse(api.server.URL) require.NoError(t, err) namespace := "test-namespace" + sourceUUID := agentsdk.ExternalLogSourceID client := fake.NewSimpleClientset() cMock := quartz.NewMock(t) reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ client: client, coderURL: agentURL, - namespace: namespace, + namespaces: namespace, logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), logDebounce: 5 * time.Second, clock: cMock, @@ -87,9 +89,12 @@ func TestReplicaSetEvents(t *testing.T) { require.Equal(t, "Kubernetes", source.DisplayName) require.Equal(t, "/icon/k8s.png", source.Icon) + // Advance clock to trigger log flush + cMock.Advance(time.Second) + logs := testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) - require.Contains(t, logs[0].Output, "Queued pod from ReplicaSet") + require.Contains(t, logs[0].Output, "Created replicaset") event := &corev1.Event{ ObjectMeta: v1.ObjectMeta{ @@ -108,6 +113,9 @@ func TestReplicaSetEvents(t *testing.T) { _, err = client.CoreV1().Events(namespace).Create(ctx, event, v1.CreateOptions{}) require.NoError(t, err) + // Advance clock to trigger log flush + cMock.Advance(time.Second) + logs = testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) require.Contains(t, logs[0].Output, event.Message) @@ -115,9 +123,12 @@ func TestReplicaSetEvents(t *testing.T) { err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, v1.DeleteOptions{}) require.NoError(t, err) + // Advance clock to trigger log flush + cMock.Advance(time.Second) + logs = testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) - require.Contains(t, logs[0].Output, "Deleted ReplicaSet") + require.Contains(t, logs[0].Output, "Deleted replicaset") require.Eventually(t, func() bool { return reporter.tc.isEmpty() @@ -138,13 +149,14 @@ func TestPodEvents(t *testing.T) { agentURL, err := url.Parse(api.server.URL) require.NoError(t, err) namespace := "test-namespace" + sourceUUID := agentsdk.ExternalLogSourceID client := fake.NewSimpleClientset() cMock := quartz.NewMock(t) reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ client: client, coderURL: agentURL, - namespace: namespace, + namespaces: namespace, logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), logDebounce: 5 * time.Second, clock: cMock, @@ -179,6 +191,9 @@ func TestPodEvents(t *testing.T) { require.Equal(t, "Kubernetes", source.DisplayName) require.Equal(t, "/icon/k8s.png", source.Icon) + // Advance clock to trigger log flush + cMock.Advance(time.Second) + logs := testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) require.Contains(t, logs[0].Output, "Created pod") @@ -200,6 +215,9 @@ func TestPodEvents(t *testing.T) { _, err = client.CoreV1().Events(namespace).Create(ctx, event, v1.CreateOptions{}) require.NoError(t, err) + // Advance clock to trigger log flush + cMock.Advance(time.Second) + logs = testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) require.Contains(t, logs[0].Output, event.Message) @@ -207,6 +225,9 @@ func TestPodEvents(t *testing.T) { err = client.CoreV1().Pods(namespace).Delete(ctx, pod.Name, v1.DeleteOptions{}) require.NoError(t, err) + // Advance clock to trigger log flush + cMock.Advance(time.Second) + logs = testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) require.Contains(t, logs[0].Output, "Deleted pod") @@ -280,14 +301,14 @@ func Test_tokenCache(t *testing.T) { } func Test_logQueuer(t *testing.T) { - t.Run("Timeout", func(t *testing.T) { + t.Run("Basic", func(t *testing.T) { api := newFakeAgentAPI(t) agentURL, err := url.Parse(api.server.URL) require.NoError(t, err) - clock := quartz.NewMock(t) - ttl := time.Second + clock := quartz.NewReal() // Use real clock for simplicity + ttl := 100 * time.Millisecond // Short TTL for faster test - ch := make(chan agentLog) + ch := make(chan agentLog, 10) // Buffered channel to prevent blocking lq := &logQueuer{ logger: slogtest.Make(t, nil), clock: clock, @@ -304,10 +325,11 @@ func Test_logQueuer(t *testing.T) { defer cancel() go lq.work(ctx) + // Send first log ch <- agentLog{ - op: opLog, - resourceName: "mypod", - agentToken: "0b42fa72-7f1a-4b59-800d-69d67f56ed8b", + name: "mypod", + token: "0b42fa72-7f1a-4b59-800d-69d67f56ed8b", + delete: false, log: agentsdk.Log{ CreatedAt: time.Now(), Output: "This is a log.", @@ -315,15 +337,18 @@ func Test_logQueuer(t *testing.T) { }, } - // it should send both a log source request and the log + // Wait for log source to be created _ = testutil.RequireRecvCtx(ctx, t, api.logSource) + + // Wait for logs to be sent (ticker fires every second) logs := testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) + // Send second log ch <- agentLog{ - op: opLog, - resourceName: "mypod", - agentToken: "0b42fa72-7f1a-4b59-800d-69d67f56ed8b", + name: "mypod", + token: "0b42fa72-7f1a-4b59-800d-69d67f56ed8b", + delete: false, log: agentsdk.Log{ CreatedAt: time.Now(), Output: "This is a log too.", @@ -331,13 +356,18 @@ func Test_logQueuer(t *testing.T) { }, } - // duplicate logs should not trigger a log source + // Wait for second batch of logs logs = testutil.RequireRecvCtx(ctx, t, api.logs) require.Len(t, logs, 1) - clock.Advance(ttl) - // wait for the client to disconnect - _ = testutil.RequireRecvCtx(ctx, t, api.disconnect) + // Test cleanup by waiting for TTL + time.Sleep(ttl + 50*time.Millisecond) + + // Verify that the logger was cleaned up + lq.mu.RLock() + loggerCount := len(lq.loggers) + lq.mu.RUnlock() + require.Equal(t, 0, loggerCount, "Logger should be cleaned up after TTL") }) } @@ -361,6 +391,10 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { fakeAPI.PostLogSource(w, r) }) + rtr.Patch("/api/v2/workspaceagents/me/logs", func(w http.ResponseWriter, r *http.Request) { + fakeAPI.PatchLogs(w, r) + }) + rtr.Get("/api/v2/workspaceagents/me/rpc", func(w http.ResponseWriter, r *http.Request) { defer func() { fakeAPI.disconnect <- struct{}{} @@ -459,3 +493,93 @@ func (f *fakeAgentAPI) PostLogSource(w http.ResponseWriter, r *http.Request) { fmt.Println("failed to encode:", err.Error()) } } + +func (f *fakeAgentAPI) PatchLogs(w http.ResponseWriter, r *http.Request) { + var req agentsdk.PatchLogs + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + fmt.Println("failed to decode patch logs:", err.Error()) + w.WriteHeader(http.StatusBadRequest) + return + } + + + // Convert agentsdk.Log to proto.Log for the channel + protoLogs := make([]*proto.Log, len(req.Logs)) + for i, log := range req.Logs { + // Simple log level mapping + var level proto.Log_Level + switch string(log.Level) { + case "trace": + level = 1 // Assuming TRACE = 1 + case "debug": + level = 2 // Assuming DEBUG = 2 + case "info": + level = 3 // Assuming INFO = 3 + case "warn": + level = 4 // Assuming WARN = 4 + case "error": + level = 5 // Assuming ERROR = 5 + default: + level = 3 // Default to INFO + } + + protoLogs[i] = &proto.Log{ + CreatedAt: timestamppb.New(log.CreatedAt), + Output: log.Output, + Level: level, + } + } + + f.logs <- protoLogs + w.WriteHeader(http.StatusOK) +} + +func TestParseNamespaces(t *testing.T) { + tests := []struct { + name string + input string + expected []string + }{ + { + name: "empty string", + input: "", + expected: []string{}, + }, + { + name: "single namespace", + input: "default", + expected: []string{"default"}, + }, + { + name: "multiple namespaces", + input: "ns1,ns2,ns3", + expected: []string{"ns1", "ns2", "ns3"}, + }, + { + name: "namespaces with spaces", + input: "ns1, ns2 , ns3", + expected: []string{"ns1", "ns2", "ns3"}, + }, + { + name: "namespaces with empty values", + input: "ns1,,ns2,", + expected: []string{"ns1", "ns2"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parseNamespaces(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("parseNamespaces(%q) returned %d namespaces, expected %d", tt.input, len(result), len(tt.expected)) + return + } + for i, ns := range result { + if ns != tt.expected[i] { + t.Errorf("parseNamespaces(%q)[%d] = %q, expected %q", tt.input, i, ns, tt.expected[i]) + } + } + }) + } +} diff --git a/main.go b/main.go index 3d48cb9..d8c886a 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,7 @@ func root() *cobra.Command { coderURL string fieldSelector string kubeConfig string - namespace string + namespaces string labelSelector string ) cmd := &cobra.Command{ @@ -66,7 +66,7 @@ func root() *cobra.Command { reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, - namespace: namespace, + namespaces: namespaces, fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), @@ -85,7 +85,7 @@ func root() *cobra.Command { } cmd.Flags().StringVarP(&coderURL, "coder-url", "u", os.Getenv("CODER_URL"), "URL of the Coder instance") cmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "k", "~/.kube/config", "Path to the kubeconfig file") - cmd.Flags().StringVarP(&namespace, "namespace", "n", os.Getenv("CODER_NAMESPACE"), "Namespace to use when listing pods") + cmd.Flags().StringVarP(&namespaces, "namespaces", "n", os.Getenv("CODER_NAMESPACE"), "Comma-separated list of namespaces to watch for pods. If empty, watches all namespaces.") cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods")