Skip to content

Commit 8a49797

Browse files
committed
Stream changes to gui
1 parent 74ae908 commit 8a49797

File tree

6 files changed

+207
-0
lines changed

6 files changed

+207
-0
lines changed

Diff for: cmd/capacitor/main.go

+8
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ func main() {
5757
runController(err, helmReleaseController, stopCh)
5858
eventController, err := controllers.EventController(client, dynamicClient, clientHub)
5959
runController(err, eventController, stopCh)
60+
deploymentController, err := controllers.DeploymentController(client, dynamicClient, clientHub)
61+
runController(err, deploymentController, stopCh)
62+
podController, err := controllers.PodController(client, dynamicClient, clientHub)
63+
runController(err, podController, stopCh)
64+
serviceController, err := controllers.ServiceController(client, dynamicClient, clientHub)
65+
runController(err, serviceController, stopCh)
66+
ingressController, err := controllers.IngressController(client, dynamicClient, clientHub)
67+
runController(err, ingressController, stopCh)
6068

6169
r := api.SetupRouter(client, dynamicClient, config, clientHub, runningLogStreams)
6270
go func() {

Diff for: pkg/controllers/deploymentController.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/gimlet-io/capacitor/pkg/flux"
7+
"github.com/gimlet-io/capacitor/pkg/streaming"
8+
apps_v1 "k8s.io/api/apps/v1"
9+
v1 "k8s.io/api/core/v1"
10+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/fields"
12+
"k8s.io/client-go/dynamic"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/client-go/tools/cache"
15+
)
16+
17+
func DeploymentController(
18+
client *kubernetes.Clientset,
19+
dynamicClient *dynamic.DynamicClient,
20+
clientHub *streaming.ClientHub,
21+
) (*Controller, error) {
22+
deploymentListWatcher := cache.NewListWatchFromClient(client.AppsV1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything())
23+
deploymentController := NewController(
24+
"deployment",
25+
deploymentListWatcher,
26+
&apps_v1.Deployment{},
27+
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
28+
switch informerEvent.eventType {
29+
case "create":
30+
fallthrough
31+
case "update":
32+
fallthrough
33+
case "delete":
34+
services, err := flux.Services(client, dynamicClient)
35+
if err != nil {
36+
panic(err.Error())
37+
}
38+
servicesBytes, err := json.Marshal(streaming.Envelope{
39+
Type: streaming.SERVICES_RECEIVED,
40+
Payload: services,
41+
})
42+
if err != nil {
43+
panic(err.Error())
44+
}
45+
clientHub.Broadcast <- servicesBytes
46+
}
47+
return nil
48+
})
49+
return deploymentController, nil
50+
}

Diff for: pkg/controllers/ingressController.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/gimlet-io/capacitor/pkg/flux"
7+
"github.com/gimlet-io/capacitor/pkg/streaming"
8+
v1 "k8s.io/api/core/v1"
9+
networking_v1 "k8s.io/api/networking/v1"
10+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/fields"
12+
"k8s.io/client-go/dynamic"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/client-go/tools/cache"
15+
)
16+
17+
func IngressController(
18+
client *kubernetes.Clientset,
19+
dynamicClient *dynamic.DynamicClient,
20+
clientHub *streaming.ClientHub,
21+
) (*Controller, error) {
22+
ingressListWatcher := cache.NewListWatchFromClient(client.NetworkingV1().RESTClient(), "ingresses", v1.NamespaceAll, fields.Everything())
23+
ingressController := NewController(
24+
"ingress",
25+
ingressListWatcher,
26+
&networking_v1.Ingress{},
27+
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
28+
switch informerEvent.eventType {
29+
case "create":
30+
fallthrough
31+
case "update":
32+
fallthrough
33+
case "delete":
34+
services, err := flux.Services(client, dynamicClient)
35+
if err != nil {
36+
panic(err.Error())
37+
}
38+
servicesBytes, err := json.Marshal(streaming.Envelope{
39+
Type: streaming.SERVICES_RECEIVED,
40+
Payload: services,
41+
})
42+
if err != nil {
43+
panic(err.Error())
44+
}
45+
clientHub.Broadcast <- servicesBytes
46+
}
47+
return nil
48+
})
49+
return ingressController, nil
50+
}

Diff for: pkg/controllers/podController.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/gimlet-io/capacitor/pkg/flux"
7+
"github.com/gimlet-io/capacitor/pkg/streaming"
8+
v1 "k8s.io/api/core/v1"
9+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/fields"
11+
"k8s.io/client-go/dynamic"
12+
"k8s.io/client-go/kubernetes"
13+
"k8s.io/client-go/tools/cache"
14+
)
15+
16+
func PodController(
17+
client *kubernetes.Clientset,
18+
dynamicClient *dynamic.DynamicClient,
19+
clientHub *streaming.ClientHub,
20+
) (*Controller, error) {
21+
podListWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
22+
podController := NewController(
23+
"pod",
24+
podListWatcher,
25+
&v1.Pod{},
26+
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
27+
switch informerEvent.eventType {
28+
case "create":
29+
fallthrough
30+
case "update":
31+
fallthrough
32+
case "delete":
33+
services, err := flux.Services(client, dynamicClient)
34+
if err != nil {
35+
panic(err.Error())
36+
}
37+
servicesBytes, err := json.Marshal(streaming.Envelope{
38+
Type: streaming.SERVICES_RECEIVED,
39+
Payload: services,
40+
})
41+
if err != nil {
42+
panic(err.Error())
43+
}
44+
clientHub.Broadcast <- servicesBytes
45+
}
46+
return nil
47+
})
48+
return podController, nil
49+
}

Diff for: pkg/controllers/serviceController.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/gimlet-io/capacitor/pkg/flux"
7+
"github.com/gimlet-io/capacitor/pkg/streaming"
8+
v1 "k8s.io/api/core/v1"
9+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/fields"
11+
"k8s.io/client-go/dynamic"
12+
"k8s.io/client-go/kubernetes"
13+
"k8s.io/client-go/tools/cache"
14+
)
15+
16+
func ServiceController(
17+
client *kubernetes.Clientset,
18+
dynamicClient *dynamic.DynamicClient,
19+
clientHub *streaming.ClientHub,
20+
) (*Controller, error) {
21+
serviceListWatcher := cache.NewListWatchFromClient(client.AppsV1().RESTClient(), "services", v1.NamespaceAll, fields.Everything())
22+
serviceController := NewController(
23+
"service",
24+
serviceListWatcher,
25+
&v1.Service{},
26+
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
27+
switch informerEvent.eventType {
28+
case "create":
29+
fallthrough
30+
case "update":
31+
fallthrough
32+
case "delete":
33+
services, err := flux.Services(client, dynamicClient)
34+
if err != nil {
35+
panic(err.Error())
36+
}
37+
servicesBytes, err := json.Marshal(streaming.Envelope{
38+
Type: streaming.SERVICES_RECEIVED,
39+
Payload: services,
40+
})
41+
if err != nil {
42+
panic(err.Error())
43+
}
44+
clientHub.Broadcast <- servicesBytes
45+
}
46+
return nil
47+
})
48+
return serviceController, nil
49+
}

Diff for: pkg/streaming/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streaming
33
const (
44
FLUX_STATE_RECEIVED string = "FLUX_STATE_RECEIVED"
55
FLUX_EVENTS_RECEIVED string = "FLUX_EVENTS_RECEIVED"
6+
SERVICES_RECEIVED string = "SERVICES_RECEIVED"
67
POD_LOGS_RECEIVED string = "POD_LOGS_RECEIVED"
78
)
89

0 commit comments

Comments
 (0)