From 458611838c0d4a9a9d640c6cef30e5159be0fd61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 7 Sep 2023 19:41:47 +0200 Subject: [PATCH 1/2] Do not interrupt underlying Azure repository threads during errors We subscribe into a different thread to read from the blocking input stream in order to avoid blocking the azure client event loop. When the connection is dropped, reactor interrupts the thread where the input stream is read to cancel the task promptly, this causes issues and adds confusing error messages to the exception chain, hidding important details. --- .../ReactorScheduledExecutorService.java | 54 +++++++++++++++++++ .../azure/AzureBlobContainerRetriesTests.java | 30 +++++++++++ 2 files changed, 84 insertions(+) diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java index a3359e07119b5..f621cfe3e979e 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java @@ -21,10 +21,13 @@ import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.elasticsearch.core.Strings.format; @@ -122,6 +125,16 @@ public void execute(Runnable command) { delegate.execute(decorateRunnable(command)); } + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new UninterruptibleFuture<>(super.newTaskFor(runnable, value)); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new UninterruptibleFuture<>(super.newTaskFor(callable)); + } + protected Runnable decorateRunnable(Runnable command) { return command; } @@ -172,4 +185,45 @@ public V get(long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } } + + @SuppressForbidden(reason = "It wraps a Future to avoid interrupting threads") + private static final class UninterruptibleFuture implements RunnableFuture { + private final RunnableFuture delegate; + + UninterruptibleFuture(RunnableFuture delegate) { + this.delegate = delegate; + } + + @Override + public void run() { + delegate.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // Ensure that the thread is never interrupted + return delegate.cancel(false); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + } } diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 4c9a3b1e69919..841a6e66e0c00 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -9,6 +9,7 @@ import fixture.azure.AzureHttpHandler; +import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import org.elasticsearch.common.Strings; @@ -37,6 +38,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -472,4 +474,32 @@ public void testRetryFromSecondaryLocationPolicies() throws Exception { assertThat(failedGetCalls.get(), equalTo(1)); } } + + public void testPrematureClosedConnectionDoesNotInterruptBackingThread() throws Exception { + final int maxRetries = 0; + + final byte[] data = randomBytes(ByteSizeUnit.KB.toIntBytes(512)); + + httpServer.createContext("/account/container/closed_connection_blob", HttpExchange::close); + + final BlobContainer blobContainer = createBlobContainer(maxRetries); + + var interruptedThread = new AtomicBoolean(); + try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length) { + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + // Ensure that the thread where the stream is read is not interrupted + Thread.sleep(250); + } catch (InterruptedException e) { + interruptedThread.set(true); + } + return super.read(b, off, len); + } + }) { + expectThrows(IOException.class, () -> blobContainer.writeBlob("closed_connection_blob", stream, data.length, false)); + assertFalse(interruptedThread.get()); + } + } + } From 3727c5f9a83405835338723772347b5172b66373 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 7 Sep 2023 19:45:29 +0200 Subject: [PATCH 2/2] Update docs/changelog/99320.yaml --- docs/changelog/99320.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/99320.yaml diff --git a/docs/changelog/99320.yaml b/docs/changelog/99320.yaml new file mode 100644 index 0000000000000..e06d5f995d1f4 --- /dev/null +++ b/docs/changelog/99320.yaml @@ -0,0 +1,5 @@ +pr: 99320 +summary: Do not interrupt underlying Azure repository threads during errors +area: Snapshot/Restore +type: bug +issues: []