Skip to content

Commit 8e21042

Browse files
Fix for nar unpacking
1 parent ca0fb44 commit 8e21042

File tree

2 files changed

+46
-7
lines changed

2 files changed

+46
-7
lines changed

pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java

+22-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import java.io.RandomAccessFile;
3333
import java.nio.channels.FileChannel;
3434
import java.nio.channels.FileLock;
35+
import java.nio.file.Files;
3536
import java.nio.file.Path;
37+
import java.nio.file.StandardCopyOption;
3638
import java.security.MessageDigest;
3739
import java.security.NoSuchAlgorithmException;
3840
import java.util.Base64;
@@ -86,19 +88,32 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
8688
try (FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
8789
FileLock lock = channel.lock()) {
8890
File narWorkingDirectory = new File(parentDirectory, md5Sum);
89-
if (narWorkingDirectory.mkdir()) {
91+
if (!narWorkingDirectory.exists()) {
92+
File narExtractionTempDirectory = new File(parentDirectory, md5Sum + ".tmp");
93+
if (narExtractionTempDirectory.exists()) {
94+
FileUtils.deleteFile(narExtractionTempDirectory, true);
95+
}
96+
if (!narExtractionTempDirectory.mkdir()) {
97+
throw new IOException("Cannot create " + narExtractionTempDirectory);
98+
}
9099
try {
91-
log.info("Extracting {} to {}", nar, narWorkingDirectory);
100+
log.info("Extracting {} to {}", nar, narExtractionTempDirectory);
92101
if (extractCallback != null) {
93102
extractCallback.run();
94103
}
95-
unpack(nar, narWorkingDirectory);
104+
unpack(nar, narExtractionTempDirectory);
96105
} catch (IOException e) {
97106
log.error("There was a problem extracting the nar file. Deleting {} to clean up state.",
98-
narWorkingDirectory, e);
99-
FileUtils.deleteFile(narWorkingDirectory, true);
107+
narExtractionTempDirectory, e);
108+
try {
109+
FileUtils.deleteFile(narExtractionTempDirectory, true);
110+
} catch (IOException e2) {
111+
log.error("Failed to delete temporary directory {}", narExtractionTempDirectory, e2);
112+
}
100113
throw e;
101114
}
115+
Files.move(narExtractionTempDirectory.toPath(), narWorkingDirectory.toPath(),
116+
StandardCopyOption.ATOMIC_MOVE);
102117
}
103118
return narWorkingDirectory;
104119
}
@@ -166,7 +181,7 @@ private static void makeFile(final InputStream inputStream, final File file) thr
166181
* @throws IOException
167182
* if cannot read file
168183
*/
169-
private static byte[] calculateMd5sum(final File file) throws IOException {
184+
protected static byte[] calculateMd5sum(final File file) throws IOException {
170185
try (final FileInputStream inputStream = new FileInputStream(file)) {
171186
// codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case
172187
final MessageDigest md5 = MessageDigest.getInstance("md5");
@@ -184,4 +199,4 @@ private static byte[] calculateMd5sum(final File file) throws IOException {
184199
throw new IllegalArgumentException(nsae);
185200
}
186201
}
187-
}
202+
}

pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,30 @@ public static void main(String[] args) {
118118
}
119119
}
120120

121+
@Test
122+
void shouldReExtractWhenUnpackedDirectoryIsMissing() throws InterruptedException {
123+
CountDownLatch countDownLatch = new CountDownLatch(1);
124+
AtomicInteger exceptionCounter = new AtomicInteger();
125+
AtomicInteger extractCounter = new AtomicInteger();
126+
127+
new Thread(() -> {
128+
try {
129+
File narWorkingDirectory = NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet);
130+
FileUtils.deleteFile(narWorkingDirectory, true);
131+
NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet);
132+
} catch (Exception e) {
133+
log.error("Unpacking failed", e);
134+
exceptionCounter.incrementAndGet();
135+
} finally {
136+
countDownLatch.countDown();
137+
}
138+
}).start();
139+
140+
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
141+
assertEquals(exceptionCounter.get(), 0);
142+
assertEquals(extractCounter.get(), 2);
143+
}
144+
121145
@Test
122146
void shouldExtractFilesOnceInDifferentProcess() throws InterruptedException {
123147
int processes = 5;

0 commit comments

Comments
 (0)