Skip to content

Commit

Permalink
fix(connections): use Unis to enable internal connection retry behavi…
Browse files Browse the repository at this point in the history
…our (#244)

* refactor to avoid deprecated method, use Uni to enable retry behaviours

* refactor snapshot endpoints to use Unis

* configurable connection backoff and timeout

* configurable max open connections

* configurable connection TTL

* use connection failure timeout for datasource uploads
  • Loading branch information
andrewazores authored Jan 19, 2024
1 parent 0e3fedb commit 34c7a6b
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 100 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/cryostat/ConfigProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public class ConfigProperties {
public static final String AWS_OBJECT_EXPIRATION_LABELS =
"storage.buckets.archives.expiration-label";

public static final String CONNECTIONS_MAX_OPEN = "cryostat.connections.max-open";
public static final String CONNECTIONS_TTL = "cryostat.connections.ttl";
public static final String CONNECTIONS_FAILED_BACKOFF = "cryostat.connections.failed-backoff";
public static final String CONNECTIONS_FAILED_TIMEOUT = "cryostat.connections.failed-timeout";

public static final String REPORTS_SIDECAR_URL = "cryostat.services.reports.url";
public static final String MEMORY_CACHE_ENABLED =
"cryostat.services.reports.memory-cache.enabled";
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/cryostat/ExceptionMappers.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cryostat.targets.TargetConnectionManager;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.smallrye.mutiny.TimeoutException;
import jakarta.inject.Inject;
import jakarta.persistence.NoResultException;
import org.hibernate.exception.ConstraintViolationException;
Expand Down Expand Up @@ -83,4 +84,10 @@ public RestResponse<Void> mapFlightRecorderException(
}
return RestResponse.status(HttpResponseStatus.BAD_GATEWAY.code());
}

@ServerExceptionMapper
public RestResponse<Void> mapMutinyTimeoutException(TimeoutException ex) {
logger.warn(ex);
return RestResponse.status(HttpResponseStatus.GATEWAY_TIMEOUT.code());
}
}
10 changes: 8 additions & 2 deletions src/main/java/io/cryostat/discovery/CustomDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -133,8 +134,13 @@ Response doV2Create(

try {
target.jvmId =
connectionManager.executeDirect(
target, credential, conn -> conn.getJvmId());
connectionManager
.executeDirect(
target,
credential,
conn -> conn.getJvmIdentifier().getHash())
.await()
.atMost(Duration.ofSeconds(10));
} catch (Exception e) {
logger.error("Target connection failed", e);
return Response.status(Response.Status.BAD_REQUEST.getStatusCode())
Expand Down
15 changes: 5 additions & 10 deletions src/main/java/io/cryostat/recordings/RecordingHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
Expand All @@ -34,8 +35,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -70,7 +69,6 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.persistence.EntityManager;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.ServerErrorException;
import jakarta.ws.rs.core.Response;
Expand Down Expand Up @@ -99,7 +97,6 @@
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;

@ApplicationScoped
public class RecordingHelper {
Expand All @@ -110,18 +107,13 @@ public class RecordingHelper {
Pattern.compile("^template=([\\w]+)(?:,type=([\\w]+))?$");
public static final String DATASOURCE_FILENAME = "cryostat-analysis.jfr";

private static final long httpTimeoutSeconds = 5; // TODO: configurable client timeout

@Inject Logger logger;
@Inject EntityManager entityManager;
@Inject TargetConnectionManager connectionManager;
@Inject RecordingOptionsBuilderFactory recordingOptionsBuilderFactory;
@Inject EventOptionsBuilder.Factory eventOptionsBuilderFactory;
@Inject ScheduledExecutorService scheduler;
@Inject EventBus bus;

@Inject Clock clock;
@Inject S3Presigner presigner;

@Inject
@Named(Producers.BASE64_URL)
Expand All @@ -139,6 +131,9 @@ public class RecordingHelper {
@ConfigProperty(name = ConfigProperties.AWS_OBJECT_EXPIRATION_LABELS)
String objectExpirationLabel;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Duration connectionFailedTimeout;

public ActiveRecording startRecording(
Target target,
IConstrainedMap<String> recordingOptions,
Expand Down Expand Up @@ -787,7 +782,7 @@ public Response uploadToJFRDatasource(long targetEntityId, long remoteId, URL up
webClient
.postAbs(uploadUrl.toURI().resolve("/load").normalize().toString())
.addQueryParam("overwrite", "true")
.timeout(TimeUnit.SECONDS.toMillis(httpTimeoutSeconds))
.timeout(connectionFailedTimeout.toMillis())
.sendMultipartForm(form);
return asyncRequest
.onItem()
Expand Down
83 changes: 41 additions & 42 deletions src/main/java/io/cryostat/recordings/Recordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.mutiny.core.eventbus.EventBus;
Expand Down Expand Up @@ -507,65 +508,63 @@ public Response patchV1(@RestPath URI connectUrl, @RestPath String recordingName

@POST
@Transactional
@Blocking
@Path("/api/v1/targets/{connectUrl}/snapshot")
@RolesAllowed("write")
public Response createSnapshotV1(@RestPath URI connectUrl) throws Exception {
public Uni<Response> createSnapshotV1(@RestPath URI connectUrl) throws Exception {
Target target = Target.getTargetByConnectUrl(connectUrl);
try {
ActiveRecording recording =
connectionManager.executeConnectedTask(
target,
connection -> recordingHelper.createSnapshot(target, connection));
return Response.status(Response.Status.OK).entity(recording.name).build();
} catch (SnapshotCreationException sce) {
return Response.status(Response.Status.ACCEPTED).build();
}
return connectionManager
.executeConnectedTaskUni(
target, connection -> recordingHelper.createSnapshot(target, connection))
.onItem()
.transform(
recording ->
Response.status(Response.Status.OK).entity(recording.name).build())
.onFailure(SnapshotCreationException.class)
.recoverWithItem(Response.status(Response.Status.ACCEPTED).build());
}

@POST
@Transactional
@Blocking
@Path("/api/v2/targets/{connectUrl}/snapshot")
@RolesAllowed("write")
public Response createSnapshotV2(@RestPath URI connectUrl) throws Exception {
public Uni<Response> createSnapshotV2(@RestPath URI connectUrl) throws Exception {
Target target = Target.getTargetByConnectUrl(connectUrl);
try {
ActiveRecording recording =
connectionManager.executeConnectedTask(
target,
connection -> recordingHelper.createSnapshot(target, connection));
return Response.status(Response.Status.CREATED)
.entity(
V2Response.json(
Response.Status.CREATED,
recordingHelper.toExternalForm(recording)))
.build();
} catch (SnapshotCreationException sce) {
return Response.status(Response.Status.ACCEPTED)
.entity(V2Response.json(Response.Status.ACCEPTED, null))
.build();
}
return connectionManager
.executeConnectedTaskUni(
target, connection -> recordingHelper.createSnapshot(target, connection))
.onItem()
.transform(
recording ->
Response.status(Response.Status.CREATED)
.entity(
V2Response.json(
Response.Status.CREATED,
recordingHelper.toExternalForm(recording)))
.build())
.onFailure(SnapshotCreationException.class)
.recoverWithItem(
Response.status(Response.Status.ACCEPTED)
.entity(V2Response.json(Response.Status.ACCEPTED, null))
.build());
}

@POST
@Transactional
@Blocking
@Path("/api/v3/targets/{id}/snapshot")
@RolesAllowed("write")
public Response createSnapshot(@RestPath long id) throws Exception {
public Uni<Response> createSnapshot(@RestPath long id) throws Exception {
Target target = Target.find("id", id).singleResult();
try {
ActiveRecording recording =
connectionManager.executeConnectedTask(
target,
connection -> recordingHelper.createSnapshot(target, connection));
return Response.status(Response.Status.OK)
.entity(recordingHelper.toExternalForm(recording))
.build();
} catch (SnapshotCreationException sce) {
return Response.status(Response.Status.ACCEPTED).build();
}
return connectionManager
.executeConnectedTaskUni(
target, connection -> recordingHelper.createSnapshot(target, connection))
.onItem()
.transform(
recording ->
Response.status(Response.Status.OK)
.entity(recordingHelper.toExternalForm(recording))
.build())
.onFailure(SnapshotCreationException.class)
.recoverWithItem(Response.status(Response.Status.ACCEPTED).build());
}

@POST
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/cryostat/targets/AgentConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public MBeanMetrics getMBeanMetrics()
MemoryMetrics memory = new MemoryMetrics(Map.of());
ThreadMetrics thread = new ThreadMetrics(Map.of());
OperatingSystemMetrics operatingSystem = new OperatingSystemMetrics(Map.of());
return new MBeanMetrics(runtime, memory, thread, operatingSystem, getJvmId());
return new MBeanMetrics(
runtime, memory, thread, operatingSystem, getJvmIdentifier().getHash());
}
}
9 changes: 8 additions & 1 deletion src/main/java/io/cryostat/targets/Target.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -228,7 +229,13 @@ void prePersist(Target target) throws JvmIdException {
}
try {
target.jvmId =
connectionManager.executeConnectedTask(target, conn -> conn.getJvmId());
connectionManager
.executeDirect(
target,
Optional.empty(),
conn -> conn.getJvmIdentifier().getHash())
.await()
.atMost(Duration.ofSeconds(10));
} catch (Exception e) {
// TODO tolerate this in the condition that the connection failed because of JMX
// auth. In that instance then persist the entity with a null jvmId, but listen for
Expand Down
Loading

0 comments on commit 34c7a6b

Please sign in to comment.