Skip to content
This repository was archived by the owner on Mar 9, 2025. It is now read-only.

Statefulset support: displaying pods, logs, describe, restart #883

Merged
merged 1 commit into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,22 @@ func podLogs(
messages chan *streaming.WSMessage,
runningLogStreams *runningLogStreams,
) {
var matchLabels map[string]string
deployment, err := kubeEnv.Client.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, meta_v1.GetOptions{})
if err != nil {
logrus.Errorf("could not get deployments: %v", err)
return
if strings.Contains(err.Error(), "not found") {
statefulset, err := kubeEnv.Client.AppsV1().StatefulSets(namespace).Get(context.TODO(), deploymentName, meta_v1.GetOptions{})
if err != nil {
logrus.Errorf("could not get statefulset: %v", err)
return
}
matchLabels = statefulset.Spec.Selector.MatchLabels
} else {
logrus.Errorf("could not get deployments: %v", err)
return
}
} else {
matchLabels = deployment.Spec.Selector.MatchLabels
}

podsInNamespace, err := kubeEnv.Client.CoreV1().Pods(namespace).List(context.TODO(), meta_v1.ListOptions{})
Expand All @@ -415,7 +427,7 @@ func podLogs(
}

for _, pod := range podsInNamespace.Items {
if labelsMatchSelectors(pod.ObjectMeta.Labels, deployment.Spec.Selector.MatchLabels) {
if labelsMatchSelectors(pod.ObjectMeta.Labels, matchLabels) {
containers := agent.PodContainers(pod.Spec)
for _, container := range containers {
go streamPodLogs(kubeEnv, namespace, pod.Name, container.Name, deploymentName, messages, runningLogStreams)
Expand Down Expand Up @@ -568,8 +580,16 @@ func restartDeployment(kubeEnv *agent.KubeEnv, namespace, name string) {
data := fmt.Sprintf(`{"spec": {"template": {"metadata": {"annotations": {"kubectl.kubernetes.io/restartedAt": "%s"}}}}}`, time.Now().Format(time.RFC3339))
_, err := kubeEnv.Client.AppsV1().Deployments(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(data), meta_v1.PatchOptions{})
if err != nil {
logrus.Errorf("could not patch deployment %s in %s: %s", name, namespace, err)
return
if strings.Contains(err.Error(), "not found") {
_, err := kubeEnv.Client.AppsV1().StatefulSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(data), meta_v1.PatchOptions{})
if err != nil {
logrus.Errorf("could not patch statefulset: %v", err)
return
}
} else {
logrus.Errorf("could not patch deployment %s in %s: %s", name, namespace, err)
return
}
}
}

Expand Down
38 changes: 32 additions & 6 deletions pkg/agent/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (e *KubeEnv) Services(repo string) ([]*api.Stack, error) {
}
e.Perf.WithLabelValues("gimlet_agent_deployments").Observe(float64(time.Since(t0).Seconds()))

t0 = time.Now()
s, err := e.Client.AppsV1().StatefulSets(e.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("could not get statefulsets: %s", err)
}
e.Perf.WithLabelValues("gimlet_agent_statefulsets").Observe(float64(time.Since(t0).Seconds()))

t0 = time.Now()
i, err := e.Client.NetworkingV1().Ingresses(e.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand All @@ -87,11 +94,10 @@ func (e *KubeEnv) Services(repo string) ([]*api.Stack, error) {
t0 = time.Now()
var stacks []*api.Stack
for _, service := range annotatedServices {
deployment, err := e.deploymentForService(service, d.Items)
if err != nil {
return nil, fmt.Errorf("could not get deployment for service: %s", err)
deployment := e.deploymentForService(service, d.Items)
if deployment == nil {
deployment = e.statefulsetForService(service, s.Items)
}

if deployment != nil {
deployment.Pods = []*api.Pod{}
for _, pod := range pods.Items {
Expand Down Expand Up @@ -272,7 +278,7 @@ func (e *KubeEnv) annotatedServices(repo string) ([]v1.Service, error) {
return services, nil
}

func (e *KubeEnv) deploymentForService(service v1.Service, deployments []appsv1.Deployment) (*api.Deployment, error) {
func (e *KubeEnv) deploymentForService(service v1.Service, deployments []appsv1.Deployment) *api.Deployment {
var deployment *api.Deployment

for _, d := range deployments {
Expand All @@ -289,7 +295,27 @@ func (e *KubeEnv) deploymentForService(service v1.Service, deployments []appsv1.
}
}

return deployment, nil
return deployment
}

func (e *KubeEnv) statefulsetForService(service v1.Service, statefulsets []appsv1.StatefulSet) *api.Deployment {
var statefulset *api.Deployment

for _, s := range statefulsets {
if SelectorsMatch(s.Spec.Selector.MatchLabels, service.Spec.Selector) {
var branch, sha string
if hash, ok := s.GetAnnotations()[AnnotationGitSha]; ok {
sha = hash
}
if b, ok := s.GetAnnotations()[AnnotationGitBranch]; ok {
branch = b
}

statefulset = &api.Deployment{Name: s.Name, Namespace: s.Namespace, Branch: branch, SHA: sha}
}
}

return statefulset
}

func logs(e *KubeEnv, pod v1.Pod) string {
Expand Down
149 changes: 107 additions & 42 deletions pkg/agent/podController.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gimlet-io/gimlet/pkg/dashboard/api"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -33,26 +34,38 @@ func PodController(kubeEnv *KubeEnv, gimletHost string, agentKey string) *Contro
return err
}

allStatefulsets, err := kubeEnv.Client.AppsV1().StatefulSets(kubeEnv.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

createdPod := obj.(*v1.Pod)
for _, svc := range integratedServices {
for _, deployment := range allDeployments.Items {
if SelectorsMatch(deployment.Spec.Selector.MatchLabels, svc.Spec.Selector) {
if HasLabels(deployment.Spec.Selector.MatchLabels, createdPod.GetObjectMeta().GetLabels()) &&
createdPod.Namespace == deployment.Namespace {
update := &api.StackUpdate{
Event: EventPodCreated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: string(createdPod.Status.Phase),
Deployment: deployment.Namespace + "/" + deployment.Name,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
matchAndSendCreatedEvent(
deployment.Spec.Selector.MatchLabels,
deployment.Namespace,
deployment.Name,
svc,
createdPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
for _, statefulset := range allStatefulsets.Items {
matchAndSendCreatedEvent(
statefulset.Spec.Selector.MatchLabels,
statefulset.Namespace,
statefulset.Name,
svc,
createdPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
}
case "update":
Expand All @@ -66,38 +79,42 @@ func PodController(kubeEnv *KubeEnv, gimletHost string, agentKey string) *Contro
return err
}

allStatefulsets, err := kubeEnv.Client.AppsV1().StatefulSets(kubeEnv.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

if obj == nil {
return nil
}

updatedPod := obj.(*v1.Pod)
for _, svc := range integratedServices {
for _, deployment := range allDeployments.Items {
if SelectorsMatch(deployment.Spec.Selector.MatchLabels, svc.Spec.Selector) {
if HasLabels(deployment.Spec.Selector.MatchLabels, updatedPod.GetObjectMeta().GetLabels()) &&
updatedPod.Namespace == deployment.Namespace {
podStatus := podStatus(*updatedPod)
podLogs := ""
if "CrashLoopBackOff" == podStatus {
podLogs = logs(kubeEnv, *updatedPod)
}

update := &api.StackUpdate{
Event: EventPodUpdated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: podStatus,
Deployment: deployment.Namespace + "/" + deployment.Name,
ErrorCause: podErrorCause(*updatedPod),
Logs: podLogs,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
newFunction(
deployment.Spec.Selector.MatchLabels,
deployment.Namespace,
deployment.Name,
svc,
updatedPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
for _, statefulset := range allStatefulsets.Items {
newFunction(
statefulset.Spec.Selector.MatchLabels,
statefulset.Namespace,
statefulset.Name,
svc,
updatedPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
}
case "delete":
Expand All @@ -113,6 +130,54 @@ func PodController(kubeEnv *KubeEnv, gimletHost string, agentKey string) *Contro
return podController
}

func newFunction(matchLabels map[string]string, namespace string, name string, svc v1.Service, updatedPod *v1.Pod, kubeEnv *KubeEnv, objectMeta metav1.ObjectMeta, gimletHost string, agentKey string) {
if SelectorsMatch(matchLabels, svc.Spec.Selector) {
if HasLabels(matchLabels, updatedPod.GetObjectMeta().GetLabels()) &&
updatedPod.Namespace == namespace {
podStatus := podStatus(*updatedPod)
podLogs := ""
if "CrashLoopBackOff" == podStatus {
podLogs = logs(kubeEnv, *updatedPod)
}

update := &api.StackUpdate{
Event: EventPodUpdated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: podStatus,
Deployment: namespace + "/" + name,
ErrorCause: podErrorCause(*updatedPod),
Logs: podLogs,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
}

func matchAndSendCreatedEvent(matchLabels map[string]string, namespace string, name string, svc v1.Service, createdPod *v1.Pod, kubeEnv *KubeEnv, objectMeta metav1.ObjectMeta, gimletHost string, agentKey string) {
if SelectorsMatch(matchLabels, svc.Spec.Selector) {
if HasLabels(matchLabels, createdPod.GetObjectMeta().GetLabels()) &&
createdPod.Namespace == namespace {
update := &api.StackUpdate{
Event: EventPodCreated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: string(createdPod.Status.Phase),
Deployment: namespace + "/" + name,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
}

// hasLabels determines if all the selectors are present as labels
func HasLabels(selector map[string]string, labels map[string]string) bool {
for selectorLabel, selectorValue := range selector {
Expand Down
4 changes: 2 additions & 2 deletions web/src/components/serviceDetail/serviceDetail.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ function ServiceDetail(props) {

let hostPort = "<host-port>"
let appPort = "<app-port>"
if (config) {
if (config && config.values) {
appPort = config.values.containerPort ?? 80;

if (appPort < 99) {
Expand Down Expand Up @@ -258,7 +258,7 @@ function ServiceDetail(props) {
</Menu>
</div>
</h3>
{deployment && config && <DeployIndicator deploy={config.values.deploy} owner={owner} repo={repoName} branch={deployment.branch} />}
{deployment && config && <DeployIndicator deploy={config.values && config.values.deploy} owner={owner} repo={repoName} branch={deployment.branch} />}
{pullRequests && pullRequests.length !== 0 &&
<PullRequests items={pullRequests} />
}
Expand Down
2 changes: 1 addition & 1 deletion web/src/components/serviceDetail/simpleServiceDetail.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function SimpleServiceDetail(props) {

let hostPort = "<host-port>"
let appPort = "<app-port>"
if (config) {
if (config && config.values) {
appPort = config.values.containerPort ?? 80;

if (appPort < 99) {
Expand Down
Loading