diff --git a/docs/changelog/99320.yaml b/docs/changelog/99320.yaml new file mode 100644 index 0000000000000..e06d5f995d1f4 --- /dev/null +++ b/docs/changelog/99320.yaml @@ -0,0 +1,5 @@ +pr: 99320 +summary: Do not interrupt underlying Azure repository threads during errors +area: Snapshot/Restore +type: bug +issues: [] diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java index a3359e07119b5..f621cfe3e979e 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java @@ -21,10 +21,13 @@ import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.elasticsearch.core.Strings.format; @@ -122,6 +125,16 @@ public void execute(Runnable command) { delegate.execute(decorateRunnable(command)); } + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new UninterruptibleFuture<>(super.newTaskFor(runnable, value)); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new UninterruptibleFuture<>(super.newTaskFor(callable)); + } + protected Runnable decorateRunnable(Runnable command) { return command; } @@ -172,4 +185,45 @@ public V get(long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } } + + @SuppressForbidden(reason = "It wraps a Future to avoid interrupting threads") + private static final class UninterruptibleFuture implements RunnableFuture { + private final RunnableFuture delegate; + + UninterruptibleFuture(RunnableFuture delegate) { + this.delegate = delegate; + } + + @Override + public void run() { + delegate.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // Ensure that the thread is never interrupted + return delegate.cancel(false); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + } } diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 4c9a3b1e69919..841a6e66e0c00 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -9,6 +9,7 @@ import fixture.azure.AzureHttpHandler; +import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import org.elasticsearch.common.Strings; @@ -37,6 +38,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -472,4 +474,32 @@ public void testRetryFromSecondaryLocationPolicies() throws Exception { assertThat(failedGetCalls.get(), equalTo(1)); } } + + public void testPrematureClosedConnectionDoesNotInterruptBackingThread() throws Exception { + final int maxRetries = 0; + + final byte[] data = randomBytes(ByteSizeUnit.KB.toIntBytes(512)); + + httpServer.createContext("/account/container/closed_connection_blob", HttpExchange::close); + + final BlobContainer blobContainer = createBlobContainer(maxRetries); + + var interruptedThread = new AtomicBoolean(); + try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length) { + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + // Ensure that the thread where the stream is read is not interrupted + Thread.sleep(250); + } catch (InterruptedException e) { + interruptedThread.set(true); + } + return super.read(b, off, len); + } + }) { + expectThrows(IOException.class, () -> blobContainer.writeBlob("closed_connection_blob", stream, data.length, false)); + assertFalse(interruptedThread.get()); + } + } + }