Skip to content

Commit 41f8fb6

Browse files
committed
node watcher
1 parent e169cc5 commit 41f8fb6

File tree

3 files changed

+86
-24
lines changed

3 files changed

+86
-24
lines changed

Diff for: example-lb-svc.yaml

-15
This file was deleted.

Diff for: internal/controller/service_controller.go

+43-3
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@ import (
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/apimachinery/pkg/types"
3435
ctrl "sigs.k8s.io/controller-runtime"
36+
"sigs.k8s.io/controller-runtime/pkg/builder"
3537
"sigs.k8s.io/controller-runtime/pkg/client"
3638
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
39+
"sigs.k8s.io/controller-runtime/pkg/handler"
3740
"sigs.k8s.io/controller-runtime/pkg/log"
3841
"sigs.k8s.io/controller-runtime/pkg/predicate"
42+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3943
)
4044

4145
// ServiceReconciler reconciles a Service object
@@ -365,9 +369,45 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
365369
return ok && svc.Spec.Type == corev1.ServiceTypeLoadBalancer
366370
})
367371

372+
mapNodeToLBServices := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
373+
374+
node, ok := obj.(*corev1.Node)
375+
if !ok {
376+
return nil
377+
}
378+
log.Log.Info("Node event received", "node", node.Name)
379+
// List all LB Services and enqueue each
380+
svcList := &corev1.ServiceList{}
381+
if err := r.Client.List(ctx, svcList); err != nil {
382+
// On error, return no requests
383+
return nil
384+
}
385+
386+
var reqs []reconcile.Request
387+
for _, svc := range svcList.Items {
388+
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
389+
reqs = append(reqs, reconcile.Request{
390+
NamespacedName: types.NamespacedName{
391+
Namespace: svc.Namespace,
392+
Name: svc.Name,
393+
},
394+
})
395+
}
396+
}
397+
return reqs
398+
},
399+
)
368400
return ctrl.NewControllerManagedBy(mgr).
369-
For(&corev1.Service{}).
370-
WithEventFilter(loadBalancerPredicate).
371-
Named("service").
401+
// Watch only Service objects of type=LoadBalancer
402+
For(
403+
&corev1.Service{},
404+
builder.WithPredicates(loadBalancerPredicate),
405+
).
406+
// Also watch Node events, with no filter so they pass
407+
Watches(
408+
&corev1.Node{},
409+
mapNodeToLBServices,
410+
).
372411
Complete(r)
412+
373413
}

Diff for: internal/controller/util.go

+43-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/ory/viper"
1717
corev1 "k8s.io/api/core/v1"
1818
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/apimachinery/pkg/types"
1920
"k8s.io/client-go/kubernetes"
2021
"k8s.io/client-go/rest"
2122
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -93,6 +94,15 @@ func ParseHealthCheckOptionsFromAnnotations(annotations map[string]string) *crus
9394
return healthCheckOptions
9495
}
9596

97+
func isNodeReady(node *corev1.Node) bool {
98+
for _, condition := range node.Status.Conditions {
99+
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
100+
return true
101+
}
102+
}
103+
return false
104+
}
105+
96106
func (r *ServiceReconciler) parseListenPortsAndBackends(ctx context.Context, svc *corev1.Service, logger logr.Logger) []crusoeapi.ListenPortAndBackend {
97107
listenPortsAndBackends := []crusoeapi.ListenPortAndBackend{}
98108

@@ -103,9 +113,17 @@ func (r *ServiceReconciler) parseListenPortsAndBackends(ctx context.Context, svc
103113
return listenPortsAndBackends
104114
}
105115

116+
// Filter out only Ready nodes
117+
var readyNodes []corev1.Node
118+
for _, node := range nodeList.Items {
119+
if isNodeReady(&node) {
120+
readyNodes = append(readyNodes, node)
121+
}
122+
}
123+
106124
// Extract internal IPs of all nodes
107125
internalIPs := []string{}
108-
for _, node := range nodeList.Items {
126+
for _, node := range readyNodes {
109127
for _, address := range node.Status.Addresses {
110128
if address.Type == corev1.NodeInternalIP {
111129
internalIPs = append(internalIPs, address.Address)
@@ -114,18 +132,37 @@ func (r *ServiceReconciler) parseListenPortsAndBackends(ctx context.Context, svc
114132
}
115133
logger.Info("Retrieved internal IPs of nodes", "internalIPs", internalIPs)
116134

135+
// 1. Retrieve the NodePort Service from the cluster
136+
nodePortSvc := &corev1.Service{}
137+
err := r.Client.Get(ctx, types.NamespacedName{Name: utils.GenerateNodePortServiceName(svc.Name), Namespace: svc.Namespace}, nodePortSvc)
138+
if err != nil {
139+
logger.Error(err, "Failed to get NodePort service")
140+
}
141+
142+
// 2. Extract the NodePort from its Spec
143+
var nodePorts []int32
144+
for _, port := range nodePortSvc.Spec.Ports {
145+
if port.NodePort != 0 {
146+
nodePorts = append(nodePorts, port.NodePort)
147+
}
148+
}
149+
logger.Info("Discovered NodePorts", "nodePorts", nodePorts)
150+
117151
// Map backends using the retrieved internal IPs
118152
for _, port := range svc.Spec.Ports {
119153
backends := []crusoeapi.Backend{}
120154

121-
for _, ip := range internalIPs {
122-
backend := crusoeapi.Backend{
123-
Ip: ip,
124-
Port: int64(port.TargetPort.IntVal), // Use the TargetPort from service spec
155+
for _, nodePortVal := range nodePorts {
156+
for _, ip := range internalIPs {
157+
backends = append(backends, crusoeapi.Backend{
158+
Ip: ip,
159+
Port: int64(nodePortVal),
160+
})
125161
}
126-
backends = append(backends, backend)
127162
}
128163

164+
logger.Info("Discovered backends", "backends", backends, "listenport", port.Port)
165+
129166
// Append to the list of ListenPortAndBackend
130167
listenPortsAndBackends = append(listenPortsAndBackends, crusoeapi.ListenPortAndBackend{
131168
ListenPort: int64(port.Port), // Map the port exposed by the service

0 commit comments

Comments
 (0)