-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Do not interrupt underlying Azure repository threads during errors #99320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 99320 | ||
summary: Do not interrupt underlying Azure repository threads during errors | ||
area: Snapshot/Restore | ||
type: bug | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
|
||
import fixture.azure.AzureHttpHandler; | ||
|
||
import com.sun.net.httpserver.HttpExchange; | ||
import com.sun.net.httpserver.HttpHandler; | ||
|
||
import org.elasticsearch.common.Strings; | ||
|
@@ -37,6 +38,7 @@ | |
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.stream.Collectors; | ||
|
@@ -472,4 +474,32 @@ public void testRetryFromSecondaryLocationPolicies() throws Exception { | |
assertThat(failedGetCalls.get(), equalTo(1)); | ||
} | ||
} | ||
|
||
public void testPrematureClosedConnectionDoesNotInterruptBackingThread() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you please point on the place where you simulate thread.interrupt()? is it missed or interruption is implicit somehow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The interrupt comes from the future being cancelled due to the premature connection close set up in the diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java
index f621cfe3e979..4c2c378acb12 100644
--- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java
+++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/executors/ReactorScheduledExecutorService.java
@@ -202,7 +202,7 @@ public class ReactorScheduledExecutorService extends AbstractExecutorService imp
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// Ensure that the thread is never interrupted
- return delegate.cancel(false);
+ return delegate.cancel(mayInterruptIfRunning);
}
@Override |
||
final int maxRetries = 0; | ||
|
||
final byte[] data = randomBytes(ByteSizeUnit.KB.toIntBytes(512)); | ||
|
||
httpServer.createContext("/account/container/closed_connection_blob", HttpExchange::close); | ||
|
||
final BlobContainer blobContainer = createBlobContainer(maxRetries); | ||
|
||
var interruptedThread = new AtomicBoolean(); | ||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length) { | ||
@Override | ||
public int read(byte[] b, int off, int len) throws IOException { | ||
try { | ||
// Ensure that the thread where the stream is read is not interrupted | ||
Thread.sleep(250); | ||
} catch (InterruptedException e) { | ||
interruptedThread.set(true); | ||
} | ||
return super.read(b, off, len); | ||
} | ||
}) { | ||
expectThrows(IOException.class, () -> blobContainer.writeBlob("closed_connection_blob", stream, data.length, false)); | ||
assertFalse(interruptedThread.get()); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these creating a new task for each read operation? Just thinking about how promptly a read might be cancelled without the interrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, see
org.elasticsearch.repositories.azure.AzureBlobStore#convertStreamToByteBuffer
. We're reading from the input stream in 64Kb chunks, and we emit those in order; meaning that after the pipeline has been cancelled at most we'll need to wait until we're able to read 64Kb from disk.