Skip to content

Commit ed88251

Browse files
committed
enable configuring even more flexibly indexes to hint for in Cleanup aggregations
* also create indexes speeding up the aggregation on snapshot collection Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
1 parent 80b89f3 commit ed88251

File tree

11 files changed

+306
-96
lines changed

11 files changed

+306
-96
lines changed

connectivity/service/src/main/resources/connectivity.conf

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,26 @@ ditto {
66
database = ${?MONGO_DB_DATABASE}
77

88
read-journal {
9+
should-create-additional-snapshot-aggregation-index-pid-id = false
10+
should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID}
11+
12+
should-create-additional-snapshot-aggregation-index-pid = false
13+
should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID}
14+
915
hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
1016
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}
1117

1218
hint-name-listLatestJournalEntries = null
1319
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}
1420

15-
hint-name-listNewestActiveSnapshotsByBatch = "_id_"
16-
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
21+
hint-name-listNewestActiveSnapshotsByBatchPidId = null
22+
hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID}
23+
24+
hint-name-listNewestActiveSnapshotsByBatchPid = null
25+
hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID}
26+
27+
hint-name-listNewestActiveSnapshotsByBatchId = null
28+
hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID}
1729
}
1830
}
1931

internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,31 @@ public final class DefaultMongoReadJournalConfig implements MongoReadJournalConf
3232

3333
private static final String CONFIG_PATH = "read-journal";
3434

35+
private final boolean createAdditionalSnapshotAggregationIndexPidId;
36+
private final boolean createAdditionalSnapshotAggregationIndexPid;
3537
@Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry;
3638
@Nullable private final String hintNameListLatestJournalEntries;
37-
@Nullable private final String listNewestActiveSnapshotsByBatch;
39+
@Nullable private final String listNewestActiveSnapshotsByBatchPidId;
40+
@Nullable private final String listNewestActiveSnapshotsByBatchPid;
41+
@Nullable private final String listNewestActiveSnapshotsByBatchId;
3842

3943
private DefaultMongoReadJournalConfig(final ScopedConfig config) {
44+
createAdditionalSnapshotAggregationIndexPidId = config.getBoolean(
45+
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID.getConfigPath()
46+
);
47+
createAdditionalSnapshotAggregationIndexPid = config.getBoolean(
48+
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID.getConfigPath()
49+
);
4050
hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config,
4151
MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY);
4252
hintNameListLatestJournalEntries = getNullableString(config,
4353
MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES);
44-
listNewestActiveSnapshotsByBatch = getNullableString(config,
45-
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH);
54+
listNewestActiveSnapshotsByBatchPidId = getNullableString(config,
55+
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID);
56+
listNewestActiveSnapshotsByBatchPid = getNullableString(config,
57+
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID);
58+
listNewestActiveSnapshotsByBatchId = getNullableString(config,
59+
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID);
4660
}
4761

4862
/**
@@ -59,7 +73,20 @@ public static DefaultMongoReadJournalConfig of(final Config config) {
5973

6074
@Nullable
6175
private static String getNullableString(final Config config, final KnownConfigValue configValue) {
62-
return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath());
76+
return config.getIsNull(configValue.getConfigPath()) ? null :
77+
Optional.of(config.getString(configValue.getConfigPath()))
78+
.filter(s -> !s.equals("null"))
79+
.orElse(null);
80+
}
81+
82+
@Override
83+
public boolean shouldCreateAdditionalSnapshotAggregationIndexPidId() {
84+
return createAdditionalSnapshotAggregationIndexPidId;
85+
}
86+
87+
@Override
88+
public boolean shouldCreateAdditionalSnapshotAggregationIndexPid() {
89+
return createAdditionalSnapshotAggregationIndexPid;
6390
}
6491

6592
@Override
@@ -73,8 +100,18 @@ public Optional<String> getIndexNameHintForListLatestJournalEntries() {
73100
}
74101

75102
@Override
76-
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch() {
77-
return Optional.ofNullable(listNewestActiveSnapshotsByBatch);
103+
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId() {
104+
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPidId);
105+
}
106+
107+
@Override
108+
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid() {
109+
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPid);
110+
}
111+
112+
@Override
113+
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId() {
114+
return Optional.ofNullable(listNewestActiveSnapshotsByBatchId);
78115
}
79116

80117
@Override
@@ -86,25 +123,33 @@ public boolean equals(final Object o) {
86123
return false;
87124
}
88125
final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o;
89-
return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
90-
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
126+
return createAdditionalSnapshotAggregationIndexPidId == that.createAdditionalSnapshotAggregationIndexPidId &&
127+
createAdditionalSnapshotAggregationIndexPid == that.createAdditionalSnapshotAggregationIndexPid &&
128+
Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
129+
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
91130
Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) &&
92-
Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch);
131+
Objects.equals(listNewestActiveSnapshotsByBatchPidId, that.listNewestActiveSnapshotsByBatchPidId);
93132
}
94133

95134
@Override
96135
public int hashCode() {
97-
return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
98-
listNewestActiveSnapshotsByBatch);
136+
return Objects.hash(createAdditionalSnapshotAggregationIndexPidId, createAdditionalSnapshotAggregationIndexPid,
137+
hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
138+
listNewestActiveSnapshotsByBatchPidId, listNewestActiveSnapshotsByBatchPid,
139+
listNewestActiveSnapshotsByBatchId);
99140
}
100141

101142
@Override
102143
public String toString() {
103144
return getClass().getSimpleName() + " [" +
104-
"hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
145+
"createAdditionalSnapshotAggregationIndexPidId=" + createAdditionalSnapshotAggregationIndexPidId +
146+
", createAdditionalSnapshotAggregationIndexPid=" + createAdditionalSnapshotAggregationIndexPid +
147+
", hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
105148
hintNameFilterPidsThatDoesntContainTagInNewestEntry +
106149
", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries +
107-
", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch +
150+
", listNewestActiveSnapshotsByBatchPidId=" + listNewestActiveSnapshotsByBatchPidId +
151+
", listNewestActiveSnapshotsByBatchPid=" + listNewestActiveSnapshotsByBatchPid +
152+
", listNewestActiveSnapshotsByBatchId=" + listNewestActiveSnapshotsByBatchId +
108153
"]";
109154
}
110155

internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@
2525
@Immutable
2626
public interface MongoReadJournalConfig {
2727

28+
/**
29+
* @return whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal
30+
* aggregation queries on the snapshot collection.
31+
*/
32+
boolean shouldCreateAdditionalSnapshotAggregationIndexPidId();
33+
34+
/**
35+
* @return whether additional index for "pid" should be created in order to speed up MongoReadJournal
36+
* aggregation queries on the snapshot collection.
37+
*/
38+
boolean shouldCreateAdditionalSnapshotAggregationIndexPid();
39+
2840
/**
2941
* @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
3042
*/
@@ -36,17 +48,41 @@ public interface MongoReadJournalConfig {
3648
Optional<String> getIndexNameHintForListLatestJournalEntries();
3749

3850
/**
39-
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
51+
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} containing both
52+
* "pid" and "_id" fields in first "$match".
4053
*/
41-
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch();
54+
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();
4255

56+
/**
57+
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
58+
* "pid" field in first "$match".
59+
*/
60+
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid();
61+
62+
/**
63+
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
64+
* "_id" field in first "$match".
65+
*/
66+
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId();
4367

4468
/**
4569
* An enumeration of the known config path expressions and their associated default values for
4670
* {@code MongoReadJournalConfig}.
4771
*/
4872
enum MongoReadJournalConfigValue implements KnownConfigValue {
4973

74+
/**
75+
* Whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal
76+
* aggregation queries on the snapshot collection.
77+
*/
78+
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID("should-create-additional-snapshot-aggregation-index-pid-id", false),
79+
80+
/**
81+
* Whether additional index for "pid" should be created in order to speed up MongoReadJournal aggregation
82+
* queries on the snapshot collection.
83+
*/
84+
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID("should-create-additional-snapshot-aggregation-index-pid", false),
85+
5086
/**
5187
* Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
5288
*/
@@ -58,9 +94,19 @@ enum MongoReadJournalConfigValue implements KnownConfigValue {
5894
HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null),
5995

6096
/**
61-
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
97+
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPidId}.
98+
*/
99+
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID("hint-name-listNewestActiveSnapshotsByBatchPidId", null),
100+
101+
/**
102+
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPid}.
103+
*/
104+
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID("hint-name-listNewestActiveSnapshotsByBatchPid", null),
105+
106+
/**
107+
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchId}.
62108
*/
63-
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null);
109+
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID("hint-name-listNewestActiveSnapshotsByBatchId", null);
64110

65111
private final String path;
66112
private final Object defaultValue;

internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Optional;
2222
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.Function;
2526
import java.util.stream.Collectors;
@@ -166,6 +167,12 @@ public final class MongoReadJournal implements CurrentEventsByPersistenceIdQuery
166167
private static final Index TAG_PID_INDEX =
167168
IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true);
168169

170+
private static final Index SNAPS_PID_ID_INDEX =
171+
IndexFactory.newInstance("snaps_pid_id_index", List.of(S_PROCESSOR_ID, S_ID), false, false);
172+
173+
private static final Index SNAPS_PID_INDEX =
174+
IndexFactory.newInstance("snaps_pid_index", List.of(S_PROCESSOR_ID), false, false);
175+
169176
private final String journalCollection;
170177
private final String snapsCollection;
171178
private final DittoMongoClient mongoClient;
@@ -201,7 +208,8 @@ public static MongoReadJournal newInstance(final ActorSystem system) {
201208
final Config config = system.settings().config();
202209
final MongoDbConfig mongoDbConfig =
203210
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
204-
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), system);
211+
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(),
212+
system);
205213
}
206214

207215
/**
@@ -240,6 +248,32 @@ public CompletionStage<Done> ensureTagPidIndex() {
240248
return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX));
241249
}
242250

251+
/**
252+
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" and "_id".
253+
*
254+
* @return a future that completes after index creation completes or fails when index creation fails.
255+
*/
256+
public CompletionStage<Done> ensureSnapshotCollectionPidIdIndex() {
257+
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPidId()) {
258+
return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_ID_INDEX));
259+
} else {
260+
return CompletableFuture.completedFuture(Done.getInstance());
261+
}
262+
}
263+
264+
/**
265+
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" .
266+
*
267+
* @return a future that completes after index creation completes or fails when index creation fails.
268+
*/
269+
public CompletionStage<Done> ensureSnapshotCollectionPidIndex() {
270+
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPid()) {
271+
return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_INDEX));
272+
} else {
273+
return CompletableFuture.completedFuture(Done.getInstance());
274+
}
275+
}
276+
243277
/**
244278
* Retrieve all unique PIDs in journals. Does its best not to create long-living cursors on the database by reading
245279
* {@code batchSize} events per query.
@@ -349,6 +383,7 @@ private Source<String, NotUsed> filterPidsThatDoesntContainTagInNewestEntry(fina
349383
));
350384
final AggregatePublisher<Document> hintedAggregate =
351385
readJournalConfig.getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry()
386+
.filter(hint -> !hint.equals("null"))
352387
.map(aggregate::hintString)
353388
.orElse(aggregate);
354389
return Source.fromPublisher(hintedAggregate)
@@ -928,7 +963,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
928963

929964
final List<Bson> pipeline = new ArrayList<>(5);
930965
// match stage
931-
pipeline.add(Aggregates.match(snapshotFilter.toMongoFilter()));
966+
final Bson matchFilter = snapshotFilter.toMongoFilter();
967+
pipeline.add(Aggregates.match(matchFilter));
932968

933969
// sort stage
934970
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN))));
@@ -947,8 +983,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
947983
final String items = "i";
948984
pipeline.add(Aggregates.group(
949985
new Document("_id", new BsonNull()),
950-
Accumulators.max(maxPid, "$"+ S_ID),
951-
Accumulators.push(items,"$$ROOT")));
986+
Accumulators.max(maxPid, "$" + S_ID),
987+
Accumulators.push(items, "$$ROOT")));
952988

953989
// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
954990
// if includeDeleted=true keeps them using "$$DESCEND"
@@ -963,13 +999,15 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
963999
)));
9641000

9651001
final AggregatePublisher<Document> aggregate = snapshotStore.aggregate(pipeline);
966-
final AggregatePublisher<Document> hintedAggregate =
967-
readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatch()
968-
.map(aggregate::hintString)
969-
.orElse(aggregate);
1002+
final Optional<String> indexHint = calculateIndexHint(matchFilter);
1003+
final AggregatePublisher<Document> hintedAggregate = indexHint
1004+
.filter(hint -> !hint.equals("null"))
1005+
.map(aggregate::hintString)
1006+
.orElse(aggregate);
9701007
return Source.fromPublisher(
971-
hintedAggregate
972-
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
1008+
hintedAggregate
1009+
.batchSize(batchSize)
1010+
// use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
9731011
)
9741012
.flatMapConcat(document -> {
9751013
final String theMaxPid = document.getString(maxPid);
@@ -983,6 +1021,23 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
9831021
});
9841022
}
9851023

1024+
private Optional<String> calculateIndexHint(final Bson matchFilter) {
1025+
final String matchJson = matchFilter.toBsonDocument().toJson();
1026+
final boolean matchContainsPid = matchJson.contains("\"pid\":");
1027+
final boolean matchContainsId = matchJson.contains("\"_id\":");
1028+
final Optional<String> indexHint;
1029+
if (matchContainsPid && matchContainsId) {
1030+
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();
1031+
} else if (matchContainsPid) {
1032+
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPid();
1033+
} else if (matchContainsId) {
1034+
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchId();
1035+
} else {
1036+
indexHint = Optional.empty();
1037+
}
1038+
return indexHint;
1039+
}
1040+
9861041
private static Source<List<String>, NotUsed> listJournalEntryTags(final MongoCollection<Document> journal,
9871042
final String pid) {
9881043

0 commit comments

Comments
 (0)