diff --git a/src/main/java/io/cryostat/ConfigProperties.java b/src/main/java/io/cryostat/ConfigProperties.java index 7d42fecfd..3324d7202 100644 --- a/src/main/java/io/cryostat/ConfigProperties.java +++ b/src/main/java/io/cryostat/ConfigProperties.java @@ -20,6 +20,11 @@ public class ConfigProperties { public static final String AWS_OBJECT_EXPIRATION_LABELS = "storage.buckets.archives.expiration-label"; + public static final String CONNECTIONS_MAX_OPEN = "cryostat.connections.max-open"; + public static final String CONNECTIONS_TTL = "cryostat.connections.ttl"; + public static final String CONNECTIONS_FAILED_BACKOFF = "cryostat.connections.failed-backoff"; + public static final String CONNECTIONS_FAILED_TIMEOUT = "cryostat.connections.failed-timeout"; + public static final String REPORTS_SIDECAR_URL = "cryostat.services.reports.url"; public static final String MEMORY_CACHE_ENABLED = "cryostat.services.reports.memory-cache.enabled"; diff --git a/src/main/java/io/cryostat/ExceptionMappers.java b/src/main/java/io/cryostat/ExceptionMappers.java index 051feacc7..1362e09bc 100644 --- a/src/main/java/io/cryostat/ExceptionMappers.java +++ b/src/main/java/io/cryostat/ExceptionMappers.java @@ -20,6 +20,7 @@ import io.cryostat.targets.TargetConnectionManager; import io.netty.handler.codec.http.HttpResponseStatus; +import io.smallrye.mutiny.TimeoutException; import jakarta.inject.Inject; import jakarta.persistence.NoResultException; import org.hibernate.exception.ConstraintViolationException; @@ -83,4 +84,10 @@ public RestResponse mapFlightRecorderException( } return RestResponse.status(HttpResponseStatus.BAD_GATEWAY.code()); } + + @ServerExceptionMapper + public RestResponse mapMutinyTimeoutException(TimeoutException ex) { + logger.warn(ex); + return RestResponse.status(HttpResponseStatus.GATEWAY_TIMEOUT.code()); + } } diff --git a/src/main/java/io/cryostat/discovery/CustomDiscovery.java b/src/main/java/io/cryostat/discovery/CustomDiscovery.java index 12cb7bc4a..0d94ada2a 100644 --- a/src/main/java/io/cryostat/discovery/CustomDiscovery.java +++ b/src/main/java/io/cryostat/discovery/CustomDiscovery.java @@ -18,6 +18,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.ArrayList; import java.util.Map; import java.util.Optional; @@ -133,8 +134,13 @@ Response doV2Create( try { target.jvmId = - connectionManager.executeDirect( - target, credential, conn -> conn.getJvmId()); + connectionManager + .executeDirect( + target, + credential, + conn -> conn.getJvmIdentifier().getHash()) + .await() + .atMost(Duration.ofSeconds(10)); } catch (Exception e) { logger.error("Target connection failed", e); return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index 272088cdb..eaf21026b 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -34,8 +35,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -70,7 +69,6 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.inject.Named; -import jakarta.persistence.EntityManager; import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.ServerErrorException; import jakarta.ws.rs.core.Response; @@ -99,7 +97,6 @@ import software.amazon.awssdk.services.s3.model.Tag; import software.amazon.awssdk.services.s3.model.Tagging; import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.presigner.S3Presigner; @ApplicationScoped public class RecordingHelper { @@ -110,18 +107,13 @@ public class RecordingHelper { Pattern.compile("^template=([\\w]+)(?:,type=([\\w]+))?$"); public static final String DATASOURCE_FILENAME = "cryostat-analysis.jfr"; - private static final long httpTimeoutSeconds = 5; // TODO: configurable client timeout - @Inject Logger logger; - @Inject EntityManager entityManager; @Inject TargetConnectionManager connectionManager; @Inject RecordingOptionsBuilderFactory recordingOptionsBuilderFactory; @Inject EventOptionsBuilder.Factory eventOptionsBuilderFactory; - @Inject ScheduledExecutorService scheduler; @Inject EventBus bus; @Inject Clock clock; - @Inject S3Presigner presigner; @Inject @Named(Producers.BASE64_URL) @@ -139,6 +131,9 @@ public class RecordingHelper { @ConfigProperty(name = ConfigProperties.AWS_OBJECT_EXPIRATION_LABELS) String objectExpirationLabel; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionFailedTimeout; + public ActiveRecording startRecording( Target target, IConstrainedMap recordingOptions, @@ -787,7 +782,7 @@ public Response uploadToJFRDatasource(long targetEntityId, long remoteId, URL up webClient .postAbs(uploadUrl.toURI().resolve("/load").normalize().toString()) .addQueryParam("overwrite", "true") - .timeout(TimeUnit.SECONDS.toMillis(httpTimeoutSeconds)) + .timeout(connectionFailedTimeout.toMillis()) .sendMultipartForm(form); return asyncRequest .onItem() diff --git a/src/main/java/io/cryostat/recordings/Recordings.java b/src/main/java/io/cryostat/recordings/Recordings.java index 74cdd3df2..981033b72 100644 --- a/src/main/java/io/cryostat/recordings/Recordings.java +++ b/src/main/java/io/cryostat/recordings/Recordings.java @@ -64,6 +64,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.handler.HttpException; import io.vertx.mutiny.core.eventbus.EventBus; @@ -507,65 +508,63 @@ public Response patchV1(@RestPath URI connectUrl, @RestPath String recordingName @POST @Transactional - @Blocking @Path("/api/v1/targets/{connectUrl}/snapshot") @RolesAllowed("write") - public Response createSnapshotV1(@RestPath URI connectUrl) throws Exception { + public Uni createSnapshotV1(@RestPath URI connectUrl) throws Exception { Target target = Target.getTargetByConnectUrl(connectUrl); - try { - ActiveRecording recording = - connectionManager.executeConnectedTask( - target, - connection -> recordingHelper.createSnapshot(target, connection)); - return Response.status(Response.Status.OK).entity(recording.name).build(); - } catch (SnapshotCreationException sce) { - return Response.status(Response.Status.ACCEPTED).build(); - } + return connectionManager + .executeConnectedTaskUni( + target, connection -> recordingHelper.createSnapshot(target, connection)) + .onItem() + .transform( + recording -> + Response.status(Response.Status.OK).entity(recording.name).build()) + .onFailure(SnapshotCreationException.class) + .recoverWithItem(Response.status(Response.Status.ACCEPTED).build()); } @POST @Transactional - @Blocking @Path("/api/v2/targets/{connectUrl}/snapshot") @RolesAllowed("write") - public Response createSnapshotV2(@RestPath URI connectUrl) throws Exception { + public Uni createSnapshotV2(@RestPath URI connectUrl) throws Exception { Target target = Target.getTargetByConnectUrl(connectUrl); - try { - ActiveRecording recording = - connectionManager.executeConnectedTask( - target, - connection -> recordingHelper.createSnapshot(target, connection)); - return Response.status(Response.Status.CREATED) - .entity( - V2Response.json( - Response.Status.CREATED, - recordingHelper.toExternalForm(recording))) - .build(); - } catch (SnapshotCreationException sce) { - return Response.status(Response.Status.ACCEPTED) - .entity(V2Response.json(Response.Status.ACCEPTED, null)) - .build(); - } + return connectionManager + .executeConnectedTaskUni( + target, connection -> recordingHelper.createSnapshot(target, connection)) + .onItem() + .transform( + recording -> + Response.status(Response.Status.CREATED) + .entity( + V2Response.json( + Response.Status.CREATED, + recordingHelper.toExternalForm(recording))) + .build()) + .onFailure(SnapshotCreationException.class) + .recoverWithItem( + Response.status(Response.Status.ACCEPTED) + .entity(V2Response.json(Response.Status.ACCEPTED, null)) + .build()); } @POST @Transactional - @Blocking @Path("/api/v3/targets/{id}/snapshot") @RolesAllowed("write") - public Response createSnapshot(@RestPath long id) throws Exception { + public Uni createSnapshot(@RestPath long id) throws Exception { Target target = Target.find("id", id).singleResult(); - try { - ActiveRecording recording = - connectionManager.executeConnectedTask( - target, - connection -> recordingHelper.createSnapshot(target, connection)); - return Response.status(Response.Status.OK) - .entity(recordingHelper.toExternalForm(recording)) - .build(); - } catch (SnapshotCreationException sce) { - return Response.status(Response.Status.ACCEPTED).build(); - } + return connectionManager + .executeConnectedTaskUni( + target, connection -> recordingHelper.createSnapshot(target, connection)) + .onItem() + .transform( + recording -> + Response.status(Response.Status.OK) + .entity(recordingHelper.toExternalForm(recording)) + .build()) + .onFailure(SnapshotCreationException.class) + .recoverWithItem(Response.status(Response.Status.ACCEPTED).build()); } @POST diff --git a/src/main/java/io/cryostat/targets/AgentConnection.java b/src/main/java/io/cryostat/targets/AgentConnection.java index cfe4efa2a..44218a4fe 100644 --- a/src/main/java/io/cryostat/targets/AgentConnection.java +++ b/src/main/java/io/cryostat/targets/AgentConnection.java @@ -164,6 +164,7 @@ public MBeanMetrics getMBeanMetrics() MemoryMetrics memory = new MemoryMetrics(Map.of()); ThreadMetrics thread = new ThreadMetrics(Map.of()); OperatingSystemMetrics operatingSystem = new OperatingSystemMetrics(Map.of()); - return new MBeanMetrics(runtime, memory, thread, operatingSystem, getJvmId()); + return new MBeanMetrics( + runtime, memory, thread, operatingSystem, getJvmIdentifier().getHash()); } } diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index f20c9be72..f96085453 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -18,6 +18,7 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -228,7 +229,13 @@ void prePersist(Target target) throws JvmIdException { } try { target.jvmId = - connectionManager.executeConnectedTask(target, conn -> conn.getJvmId()); + connectionManager + .executeDirect( + target, + Optional.empty(), + conn -> conn.getJvmIdentifier().getHash()) + .await() + .atMost(Duration.ofSeconds(10)); } catch (Exception e) { // TODO tolerate this in the condition that the connection failed because of JMX // auth. In that instance then persist the entity with a null jvmId, but listen for diff --git a/src/main/java/io/cryostat/targets/TargetConnectionManager.java b/src/main/java/io/cryostat/targets/TargetConnectionManager.java index 282d83694..e89211c95 100644 --- a/src/main/java/io/cryostat/targets/TargetConnectionManager.java +++ b/src/main/java/io/cryostat/targets/TargetConnectionManager.java @@ -37,6 +37,7 @@ import org.openjdk.jmc.rjmx.ConnectionException; import org.openjdk.jmc.rjmx.services.jfr.FlightRecorderException; +import io.cryostat.ConfigProperties; import io.cryostat.core.net.JFRConnection; import io.cryostat.core.net.JFRConnectionToolkit; import io.cryostat.credentials.Credential; @@ -53,6 +54,7 @@ import io.quarkus.vertx.ConsumeEvent; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.unchecked.Unchecked; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.transaction.Transactional; @@ -62,6 +64,7 @@ import jdk.jfr.Label; import jdk.jfr.Name; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.projectnessie.cel.tools.ScriptException; @@ -71,13 +74,15 @@ public class TargetConnectionManager { private final JFRConnectionToolkit jfrConnectionToolkit; private final MatchExpressionEvaluator matchExpressionEvaluator; private final AgentConnectionFactory agentConnectionFactory; - private final Executor executor; private final Logger logger; private final AsyncLoadingCache connections; private final Map targetLocks; private final Optional semaphore; + private final Duration failedBackoff; + private final Duration failedTimeout; + @Inject @SuppressFBWarnings( value = "CT_CONSTRUCTOR_THROW", @@ -88,6 +93,12 @@ public class TargetConnectionManager { JFRConnectionToolkit jfrConnectionToolkit, MatchExpressionEvaluator matchExpressionEvaluator, AgentConnectionFactory agentConnectionFactory, + @ConfigProperty(name = ConfigProperties.CONNECTIONS_MAX_OPEN) int maxOpen, + @ConfigProperty(name = ConfigProperties.CONNECTIONS_TTL) Duration ttl, + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_BACKOFF) + Duration failedBackoff, + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration failedTimeout, Executor executor, Logger logger) { FlightRecorder.register(TargetConnectionOpened.class); @@ -95,13 +106,12 @@ public class TargetConnectionManager { this.jfrConnectionToolkit = jfrConnectionToolkit; this.matchExpressionEvaluator = matchExpressionEvaluator; this.agentConnectionFactory = agentConnectionFactory; - this.executor = executor; - - int maxTargetConnections = 0; // TODO make configurable + this.failedBackoff = failedBackoff; + this.failedTimeout = failedTimeout; this.targetLocks = new ConcurrentHashMap<>(); - if (maxTargetConnections > 0) { - this.semaphore = Optional.of(new Semaphore(maxTargetConnections, true)); + if (maxOpen > 0) { + this.semaphore = Optional.of(new Semaphore(maxOpen, true)); } else { this.semaphore = Optional.empty(); } @@ -111,13 +121,16 @@ public class TargetConnectionManager { .executor(executor) .scheduler(Scheduler.systemScheduler()) .removalListener(this::closeConnection); - Duration ttl = Duration.ofSeconds(10); // TODO make configurable - if (ttl.isZero() || ttl.isNegative()) { + if (ttl.isNegative()) { logger.errorv( - "TTL must be a positive integer in seconds, was {0} - ignoring", + "TTL must be a non-negative integer in seconds, was {0} - ignoring", ttl.toSeconds()); - } else { + } else if (!ttl.isZero()) { cacheBuilder = cacheBuilder.expireAfterAccess(ttl); + } else { + logger.warn( + "TTL is set to 0 - target connections will be cached indefinitely, until closed" + + " by the remote end or the network drops"); } this.connections = cacheBuilder.buildAsync(new ConnectionLoader()); this.logger = logger; @@ -174,42 +187,54 @@ void handleCredentialChange(Credential credential) { } } - public Uni executeConnectedTaskAsync(Target target, ConnectedTask task) { + @Blocking + public Uni executeConnectedTaskUni(Target target, ConnectedTask task) { return Uni.createFrom() - .completionStage( - connections - .get(target.connectUrl) - .thenApplyAsync( - conn -> { - try { - synchronized ( - targetLocks.computeIfAbsent( - target.connectUrl, - k -> new Object())) { - return task.execute(conn); - } - } catch (Exception e) { - logger.error("Connection failure", e); - throw new CompletionException(e); - } - }, - executor)); + .completionStage(connections.get(target.connectUrl)) + .onItem() + .transform( + Unchecked.function( + conn -> { + synchronized ( + targetLocks.computeIfAbsent( + target.connectUrl, k -> new Object())) { + return task.execute(conn); + } + })) + .onFailure(RuntimeException.class) + .transform(this::unwrapRuntimeException) + .onFailure() + .invoke(logger::warn) + .onFailure(t -> isTargetConnectionFailure(t) || isUnknownTargetFailure(t)) + .retry() + .withBackOff(failedBackoff) + .expireIn(failedTimeout.plusMillis(System.currentTimeMillis()).toMillis()); } @Blocking - public T executeConnectedTask(Target target, ConnectedTask task) throws Exception { - synchronized (targetLocks.computeIfAbsent(target.connectUrl, k -> new Object())) { - return task.execute(connections.get(target.connectUrl).get()); - } + public T executeConnectedTask(Target target, ConnectedTask task) { + return executeConnectedTaskUni(target, task).await().atMost(failedTimeout); } @Blocking - public T executeDirect( - Target target, Optional credentials, ConnectedTask task) - throws Exception { - try (var conn = connect(target.connectUrl, credentials)) { - return task.execute(conn); - } + public Uni executeDirect( + Target target, Optional credentials, ConnectedTask task) { + return Uni.createFrom() + .item( + Unchecked.supplier( + () -> { + try (var conn = connect(target.connectUrl, credentials)) { + return task.execute(conn); + } + })) + .onFailure(RuntimeException.class) + .transform(this::unwrapRuntimeException) + .onFailure() + .invoke(logger::warn) + .onFailure(t -> isTargetConnectionFailure(t) || isUnknownTargetFailure(t)) + .retry() + .withBackOff(failedBackoff) + .expireIn(failedTimeout.plusMillis(System.currentTimeMillis()).toMillis()); } /** @@ -358,28 +383,58 @@ public interface ConnectedTask { T execute(JFRConnection connection) throws Exception; } - public static boolean isTargetConnectionFailure(Exception e) { + public Throwable unwrapRuntimeException(Throwable t) { + final int maxDepth = 10; + int depth = 0; + Throwable cause = t; + while (cause instanceof RuntimeException && depth++ < maxDepth) { + cause = cause.getCause(); + } + return cause; + } + + public static boolean isTargetConnectionFailure(Throwable t) { + if (!(t instanceof Exception)) { + return false; + } + Exception e = (Exception) t; return ExceptionUtils.indexOfType(e, ConnectionException.class) >= 0 || ExceptionUtils.indexOfType(e, FlightRecorderException.class) >= 0; } - public static boolean isJmxAuthFailure(Exception e) { + public static boolean isJmxAuthFailure(Throwable t) { + if (!(t instanceof Exception)) { + return false; + } + Exception e = (Exception) t; return ExceptionUtils.indexOfType(e, SecurityException.class) >= 0 || ExceptionUtils.indexOfType(e, SaslException.class) >= 0; } - public static boolean isJmxSslFailure(Exception e) { + public static boolean isJmxSslFailure(Throwable t) { + if (!(t instanceof Exception)) { + return false; + } + Exception e = (Exception) t; return ExceptionUtils.indexOfType(e, ConnectIOException.class) >= 0 && !isServiceTypeFailure(e); } /** Check if the exception happened because the port connected to a non-JMX service. */ - public static boolean isServiceTypeFailure(Exception e) { + public static boolean isServiceTypeFailure(Throwable t) { + if (!(t instanceof Exception)) { + return false; + } + Exception e = (Exception) t; return ExceptionUtils.indexOfType(e, ConnectIOException.class) >= 0 && ExceptionUtils.indexOfType(e, SocketTimeoutException.class) >= 0; } - public static boolean isUnknownTargetFailure(Exception e) { + public static boolean isUnknownTargetFailure(Throwable t) { + if (!(t instanceof Exception)) { + return false; + } + Exception e = (Exception) t; return ExceptionUtils.indexOfType(e, java.net.UnknownHostException.class) >= 0 || ExceptionUtils.indexOfType(e, java.rmi.UnknownHostException.class) >= 0 || ExceptionUtils.indexOfType(e, ServiceUnavailableException.class) >= 0; diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index b543446bb..e0652a7c6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -4,6 +4,10 @@ cryostat.discovery.podman.enabled=false cryostat.discovery.docker.enabled=false quarkus.test.integration-test-profile=test +cryostat.connections.max-open=0 +cryostat.connections.ttl=10s +cryostat.connections.failed-backoff=2s +cryostat.connections.failed-timeout=10s cryostat.messaging.queue.size=1024 cryostat.services.reports.url= quarkus.cache.enabled=true