Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Agent HTTP client improperly closes HTTP response body streams #481

Closed
andrewazores opened this issue Sep 12, 2024 · 0 comments · Fixed by #482
Closed

[Bug] Agent HTTP client improperly closes HTTP response body streams #481

andrewazores opened this issue Sep 12, 2024 · 0 comments · Fixed by #482
Assignees
Labels
bug Something isn't working

Comments

@andrewazores
Copy link
Member

          Applying this Agent patch seems to solve the problem, implying that the explanation above is correct:
diff --git a/src/main/java/io/cryostat/agent/CryostatClient.java b/src/main/java/io/cryostat/agent/CryostatClient.java
index 4692bd6..5556e71 100644
--- a/src/main/java/io/cryostat/agent/CryostatClient.java
+++ b/src/main/java/io/cryostat/agent/CryostatClient.java
@@ -18,6 +18,7 @@ package io.cryostat.agent;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
@@ -53,6 +54,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import jdk.jfr.Recording;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.input.CountingInputStream;
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -125,7 +127,9 @@ public class CryostatClient {
                                         + "?token="
                                         + pluginInfo.getToken()));
         log.trace("{}", req);
-        return supply(req, (res) -> logResponse(req, res)).thenApply(this::isOkStatus);
+        return supply(req, (res) -> logResponse(req, res))
+                .thenApply(this::isOkStatus)
+                .whenComplete((v, t) -> req.reset());
     }
 
     public CompletableFuture<PluginInfo> register(
@@ -174,7 +178,8 @@ public class CryostatClient {
                                     log.error("Unable to parse response as JSON", e);
                                     throw new RegistrationException(e);
                                 }
-                            });
+                            })
+                    .whenComplete((v, t) -> req.reset());
         } catch (JsonProcessingException e) {
             return CompletableFuture.failedFuture(e);
         }
@@ -209,7 +214,8 @@ public class CryostatClient {
                                 return CompletableFuture.completedFuture(prevId);
                             }
                             return submitCredentials(prevId, credentials, callback);
-                        });
+                        })
+                .whenComplete((v, t) -> req.reset());
     }
 
     private CompletableFuture<Integer> queryExistingCredentials(URI callback) {
@@ -226,7 +232,15 @@ public class CryostatClient {
                         })
                 .thenApply(
                         res -> {
-                            try (InputStream is = res.getEntity().getContent()) {
+                            try (InputStream is =
+                                    new TeeInputStream(
+                                            res.getEntity().getContent(),
+                                            new OutputStream() {
+                                                @Override
+                                                public void write(int b) throws IOException {
+                                                    log.info(new String(new char[] {(char) b}));
+                                                }
+                                            })) {
                                 return mapper.readValue(is, ObjectNode.class);
                             } catch (IOException e) {
                                 log.error("Unable to parse response as JSON", e);
@@ -254,7 +268,8 @@ public class CryostatClient {
                                                                 selfMatchExpression(callback)))
                                         .map(sc -> sc.id)
                                         .findFirst()
-                                        .orElse(-1));
+                                        .orElse(-1))
+                .whenComplete((v, t) -> req.reset());
     }
 
     private CompletableFuture<Integer> submitCredentials(
@@ -317,7 +332,8 @@ public class CryostatClient {
                                     location.substring(
                                             location.lastIndexOf('/') + 1, location.length());
                             return Integer.valueOf(id);
-                        });
+                        })
+                .whenComplete((v, t) -> req.reset());
     }
 
     public CompletableFuture<Void> deleteCredentials(int id) {
@@ -326,7 +342,9 @@ public class CryostatClient {
         }
         HttpDelete req = new HttpDelete(baseUri.resolve(CREDENTIALS_API_PATH + "/" + id));
         log.trace("{}", req);
-        return supply(req, (res) -> logResponse(req, res)).thenApply(res -> null);
+        return supply(req, (res) -> logResponse(req, res))
+                .whenComplete((v, t) -> req.reset())
+                .thenApply(res -> null);
     }
 
     public CompletableFuture<Void> deregister(PluginInfo pluginInfo) {
@@ -341,6 +359,7 @@ public class CryostatClient {
         log.trace("{}", req);
         return supply(req, (res) -> logResponse(req, res))
                 .thenApply(res -> assertOkStatus(req, res))
+                .whenComplete((v, t) -> req.reset())
                 .thenApply(res -> null);
     }
 
@@ -362,6 +381,7 @@ public class CryostatClient {
             log.trace("{}", req);
             return supply(req, (res) -> logResponse(req, res))
                     .thenApply(res -> assertOkStatus(req, res))
+                    .whenComplete((v, t) -> req.reset())
                     .thenApply(res -> null);
         } catch (JsonProcessingException e) {
             return CompletableFuture.failedFuture(e);
@@ -432,20 +452,21 @@ public class CryostatClient {
                                         .build());
         req.setEntity(entityBuilder.build());
         return supply(
-                req,
-                (res) -> {
-                    Instant finish = Instant.now();
-                    log.trace(
-                            "{} {} ({} -> {}): {}/{}",
-                            req.getMethod(),
-                            res.getStatusLine().getStatusCode(),
-                            fileName,
-                            req.getURI(),
-                            FileUtils.byteCountToDisplaySize(is.getByteCount()),
-                            Duration.between(start, finish));
-                    assertOkStatus(req, res);
-                    return (Void) null;
-                });
+                        req,
+                        (res) -> {
+                            Instant finish = Instant.now();
+                            log.trace(
+                                    "{} {} ({} -> {}): {}/{}",
+                                    req.getMethod(),
+                                    res.getStatusLine().getStatusCode(),
+                                    fileName,
+                                    req.getURI(),
+                                    FileUtils.byteCountToDisplaySize(is.getByteCount()),
+                                    Duration.between(start, finish));
+                            assertOkStatus(req, res);
+                            return (Void) null;
+                        })
+                .whenComplete((v, t) -> req.reset());
     }
 
     private HttpResponse logResponse(HttpRequestBase req, HttpResponse res) {
@@ -460,8 +481,7 @@ public class CryostatClient {
         // it responds with an auth challenge, and then send the auth information we have, and use
         // the client auth cache. This flow is supported for Bearer tokens in httpclient 5.
         authorizationSupplier.get().ifPresent(v -> req.addHeader(HttpHeaders.AUTHORIZATION, v));
-        return CompletableFuture.supplyAsync(() -> fn.apply(executeQuiet(req)), executor)
-                .whenComplete((v, t) -> req.reset());
+        return CompletableFuture.supplyAsync(() -> fn.apply(executeQuiet(req)), executor);
     }
 
     private HttpResponse executeQuiet(HttpUriRequest req) {

Originally posted by @andrewazores in cryostatio/cryostat#609 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

1 participant