From 7ec3bfa5e93d539682de4ac41ad843fba0939e7f Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 Nov 2024 14:36:32 -0500 Subject: [PATCH 1/2] feat(k8s): implement wildcard All Namespaces discovery --- .../cryostat/discovery/KubeApiDiscovery.java | 52 +++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 685dbd92c..10134cf91 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.remote.JMXServiceURL; @@ -70,6 +71,7 @@ @ApplicationScoped public class KubeApiDiscovery implements ResourceEventHandler { + private static final String ALL_NAMESPACES = "*"; private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; @@ -116,13 +118,25 @@ protected HashMap> initialize() .getWatchNamespaces() .forEach( ns -> { - result.put( - ns, - client.endpoints() - .inNamespace(ns) - .inform( - KubeApiDiscovery.this, - informerResyncPeriod.toMillis())); + SharedIndexInformer informer; + if (ALL_NAMESPACES.equals(ns)) { + informer = + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis()); + } else { + informer = + client.endpoints() + .inNamespace(ns) + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis()); + } + result.put(ns, informer); logger.debugv( "Started Endpoints SharedInformer for namespace" + " \"{0}\" with resync period {1}", @@ -148,7 +162,11 @@ void onStart(@Observes StartupEvent evt) { () -> { try { logger.debug("Resyncing"); - notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); } catch (Exception e) { logger.warn(e); } @@ -226,6 +244,15 @@ List tuplesFromEndpoints(Endpoints endpoints) { for (EndpointPort port : subset.getPorts()) { for (EndpointAddress addr : subset.getAddresses()) { var ref = addr.getTargetRef(); + if (ref == null) { + logger.debugv( + "Endpoints object {0} in {1} with address {2} had a null" + + " targetRef", + endpoints.getMetadata().getName(), + endpoints.getMetadata().getNamespace(), + addr.getIp()); + continue; + } tts.add( new TargetTuple( ref, @@ -295,8 +322,15 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { persistedTargets.add(node.target); } + Stream endpoints; + if (safeGetInformers().containsKey(namespace)) { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); + } else { + endpoints = + client.endpoints().inNamespace(namespace).list().getItems().stream(); + } Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() + endpoints .map((endpoint) -> getTargetTuplesFrom(endpoint)) .flatMap(List::stream) .filter((tuple) -> Objects.nonNull(tuple.objRef)) From 0bc8a526063490c139015d2fb33dc2576c7bb492 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 Nov 2024 14:51:37 -0500 Subject: [PATCH 2/2] refactor --- .../cryostat/discovery/KubeApiDiscovery.java | 101 ++++++++++-------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 10134cf91..d5cd02f2e 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -110,42 +110,42 @@ public class KubeApiDiscovery implements ResourceEventHandler { @Override protected HashMap> initialize() throws ConcurrentException { - // TODO: add support for some wildcard indicating a single Informer for any - // namespace that Cryostat has permissions to. This will need some restructuring - // of how the namespaces within the discovery tree are mapped. var result = new HashMap>(); - kubeConfig - .getWatchNamespaces() - .forEach( - ns -> { - SharedIndexInformer informer; - if (ALL_NAMESPACES.equals(ns)) { - informer = - client.endpoints() - .inAnyNamespace() - .inform( - KubeApiDiscovery.this, - informerResyncPeriod - .toMillis()); - } else { - informer = + if (watchAllNamespaces()) { + result.put( + ALL_NAMESPACES, + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod.toMillis())); + } else { + kubeConfig + .getWatchNamespaces() + .forEach( + ns -> { + result.put( + ns, client.endpoints() .inNamespace(ns) .inform( KubeApiDiscovery.this, informerResyncPeriod - .toMillis()); - } - result.put(ns, informer); - logger.debugv( - "Started Endpoints SharedInformer for namespace" - + " \"{0}\" with resync period {1}", - ns, informerResyncPeriod); - }); + .toMillis())); + logger.debugv( + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); + }); + } return result; } }; + private boolean watchAllNamespaces() { + return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns)); + } + void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; @@ -158,22 +158,26 @@ void onStart(@Observes StartupEvent evt) { logger.debugv("Starting {0} client", REALM); safeGetInformers(); - resyncWorker.scheduleAtFixedRate( - () -> { - try { - logger.debug("Resyncing"); - notify( - NamespaceQueryEvent.from( - kubeConfig.getWatchNamespaces().stream() - .filter(ns -> !ALL_NAMESPACES.equals(ns)) - .toList())); - } catch (Exception e) { - logger.warn(e); - } - }, - 0, - informerResyncPeriod.toMillis(), - TimeUnit.MILLISECONDS); + // TODO we should not need to force manual re-syncs this way - the Informer is already + // supposed to resync itself. + if (!watchAllNamespaces()) { + resyncWorker.scheduleAtFixedRate( + () -> { + try { + logger.debug("Resyncing"); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); + } catch (Exception e) { + logger.warn(e); + } + }, + 0, + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); + } } void onStop(@Observes ShutdownEvent evt) { @@ -323,11 +327,16 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { } Stream endpoints; - if (safeGetInformers().containsKey(namespace)) { - endpoints = safeGetInformers().get(namespace).getStore().list().stream(); - } else { + if (watchAllNamespaces()) { endpoints = - client.endpoints().inNamespace(namespace).list().getItems().stream(); + safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream() + .filter( + ep -> + Objects.equals( + ep.getMetadata().getNamespace(), + namespace)); + } else { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); } Set observedTargets = endpoints