Skip to content
This repository was archived by the owner on Oct 16, 2024. It is now read-only.

Issue 225: Create log should create missing path components #228

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
Expand Down Expand Up @@ -363,6 +363,7 @@ static void ensureMetadataExist(Versioned<byte[]> metadata) {
}

static void createMissingMetadata(final ZooKeeper zk,
final String basePath,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
Expand All @@ -374,10 +375,10 @@ static void createMissingMetadata(final ZooKeeper zk,
CreateMode createMode = CreateMode.PERSISTENT;

// log root parent path
String logRootParentPath = Utils.getParent(logRootPath);
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
String logRootParentPath = Utils.getParent(logRootPath);
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
}
Expand Down Expand Up @@ -425,7 +426,7 @@ static void createMissingMetadata(final ZooKeeper zk,
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
Expand All @@ -436,7 +437,7 @@ static void createMissingMetadata(final ZooKeeper zk,
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
EMPTY_BYTES, acl, createMode));
EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
Expand All @@ -449,6 +450,41 @@ static void createMissingMetadata(final ZooKeeper zk,
return;
}

getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
.whenComplete(new FutureEventListener<List<String>>() {
@Override
public void onSuccess(List<String> paths) {
for (String path : paths) {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(
0, Op.create(path, EMPTY_BYTES, acl, createMode));
}
executeCreateMissingPathTxn(
zk,
zkOps,
pathsToCreate,
metadatas,
logRootPath,
promise
);
}

@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
return;
}
});

}

private static void executeCreateMissingPathTxn(ZooKeeper zk,
List<Op> zkOps,
List<byte[]> pathsToCreate,
List<Versioned<byte[]>> metadatas,
String logRootPath,
CompletableFuture<List<Versioned<byte[]>>> promise) {

zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
Expand Down Expand Up @@ -549,29 +585,30 @@ static CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
try {
final ZooKeeper zk = zooKeeperClient.get();
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
.thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() {
@Override
public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
CompletableFuture<List<Versioned<byte[]>>> promise =
new CompletableFuture<List<Versioned<byte[]>>>();
createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
ownAllocator, createIfNotExists, promise);
return promise;
}
}).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() {
@Override
public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) {
try {
return FutureUtils.value(
processLogMetadatas(
uri,
logName,
logIdentifier,
metadatas,
ownAllocator));
} catch (UnexpectedException e) {
return FutureUtils.exception(e);
}
.thenCompose(metadatas -> {
CompletableFuture<List<Versioned<byte[]>>> promise =
new CompletableFuture<List<Versioned<byte[]>>>();
createMissingMetadata(
zk,
uri.getPath(),
logRootPath,
metadatas,
zooKeeperClient.getDefaultACL(),
ownAllocator,
createIfNotExists,
promise);
return promise;
}).thenCompose(metadatas -> {
try {
return FutureUtils.value(
processLogMetadatas(
uri,
logName,
logIdentifier,
metadatas,
ownAllocator));
} catch (UnexpectedException e) {
return FutureUtils.exception(e);
}
});
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
Expand Down Expand Up @@ -749,16 +786,22 @@ private CompletableFuture<Void> renameLogMetadata(URI uri,

@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperConnectionException | InterruptedException e) {
return FutureUtils.exception(e);
}
String basePath = uri.getPath();
String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
LinkedList<String> missingPaths = Lists.newLinkedList();
return getMissingPaths(zk, basePath, logStreamPath);
}

@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
LinkedList<String> missingPaths = Lists.newLinkedList();
CompletableFuture<List<String>> future = FutureUtils.createFuture();
try {
existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
} catch (ZooKeeperConnectionException | InterruptedException e) {
future.completeExceptionally(e);
}
existPath(zk, logStreamPath, basePath, missingPaths, future);
return future;
}

Expand Down