Skip to content

Commit 8e854a7

Browse files
authored
Merge pull request #10018 from neo-technology/aioob-degrees-louvain
Add validation during relationships scanning which excludes relationships to nodes that were not loaded during node scanning
2 parents 9da0e1d + 0d16f36 commit 8e854a7

File tree

15 files changed

+184
-41
lines changed

15 files changed

+184
-41
lines changed

core/src/integrationTest/java/org/neo4j/gds/core/GraphLoaderTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ void shouldLogProgressWithNativeLoading() {
127127
.build()
128128
.graph();
129129

130-
log.assertContainsMessage(TestLog.DEBUG, "Loading :: Nodes :: Store Scan :: Imported 3 records and 1 properties");
131-
log.assertContainsMessage(TestLog.DEBUG, "Loading :: Relationships :: Store Scan :: Imported 4 records and 0 properties");
130+
log.assertContainsMessage(TestLog.INFO, "Loading :: Nodes :: Store Scan :: Imported 3 records and 1 properties");
131+
log.assertContainsMessage(TestLog.INFO, "Loading :: Relationships :: Store Scan :: Imported 4 records and 0 properties");
132132
}
133133

134134
@Test
@@ -163,13 +163,13 @@ void shouldTrackProgressWithNativeLoading() {
163163
)
164164
.doesNotContain("Loading :: Nodes :: Property Index Scan :: Start");
165165

166-
assertThat(log.getMessages(TestLog.DEBUG))
166+
assertThat(log.getMessages(TestLog.INFO))
167167
.extracting(removingThreadId())
168168
.contains("Loading :: Nodes :: Store Scan :: Start using NodeCursorBasedScanner")
169169
.contains("Loading :: Relationships :: Store Scan :: Start using RelationshipScanCursorBasedScanner");
170170

171-
log.assertContainsMessage(TestLog.DEBUG, "Loading :: Nodes :: Store Scan :: Imported 3 records and 1 properties");
172-
log.assertContainsMessage(TestLog.DEBUG, "Loading :: Relationships :: Store Scan :: Imported 4 records and 0 properties");
171+
log.assertContainsMessage(TestLog.INFO, "Loading :: Nodes :: Store Scan :: Imported 3 records and 1 properties");
172+
log.assertContainsMessage(TestLog.INFO, "Loading :: Relationships :: Store Scan :: Imported 4 records and 0 properties");
173173
}
174174

175175
@Test

core/src/main/java/org/neo4j/gds/api/compress/AdjacencyCompressor.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,50 @@ public interface AdjacencyCompressor extends AutoCloseable {
5959
void close();
6060

6161
interface ValueMapper {
62+
int INVALID_ID = -1;
63+
6264
/**
6365
* A mapper to transform values before compressing them.
66+
* May return {@link #INVALID_ID} to indicate that the value should be skipped.
6467
* Implementations must be thread-safe
6568
*/
6669
long map(long value);
70+
71+
default ValueMapper andThen(ValueMapper other) {
72+
return new AndThen(this, other);
73+
}
74+
75+
enum Identity implements AdjacencyCompressor.ValueMapper {
76+
INSTANCE {
77+
@Override
78+
public long map(long value) {
79+
return value;
80+
}
81+
82+
@Override
83+
public ValueMapper andThen(ValueMapper other) {
84+
return other;
85+
}
86+
}
87+
}
88+
89+
final class AndThen implements ValueMapper {
90+
private final ValueMapper first;
91+
private final ValueMapper second;
92+
93+
AndThen(ValueMapper first, ValueMapper second) {
94+
this.first = first;
95+
this.second = second;
96+
}
97+
98+
@Override
99+
public long map(long value) {
100+
var firstMapped = this.first.map(value);
101+
if (firstMapped == INVALID_ID) {
102+
return INVALID_ID;
103+
}
104+
return this.second.map(firstMapped);
105+
}
106+
}
67107
}
68108
}

core/src/main/java/org/neo4j/gds/api/compress/AdjacencyCompressorFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,6 @@ public interface AdjacencyCompressorFactory {
4444
* @return the final adjacency list, together with any number of properties, if any.
4545
*/
4646
AdjacencyListsWithProperties build(boolean allowReordering);
47+
48+
boolean validateNode(long nodeId);
4749
}

core/src/main/java/org/neo4j/gds/core/compression/common/AbstractAdjacencyCompressorFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.neo4j.gds.api.compress.AdjacencyListBuilder;
2727
import org.neo4j.gds.api.compress.AdjacencyListsWithProperties;
2828
import org.neo4j.gds.api.compress.ImmutableAdjacencyListsWithProperties;
29-
import org.neo4j.gds.core.Aggregation;
3029
import org.neo4j.gds.collections.ha.HugeIntArray;
3130
import org.neo4j.gds.collections.ha.HugeLongArray;
31+
import org.neo4j.gds.core.Aggregation;
3232

3333
import java.util.concurrent.atomic.LongAdder;
3434
import java.util.function.LongSupplier;
@@ -42,6 +42,10 @@ public abstract class AbstractAdjacencyCompressorFactory<TARGET_PAGE, PROPERTY_P
4242
private final Aggregation[] aggregations;
4343
private final LongAdder relationshipCounter;
4444

45+
// set during `init`, used to validate nodes against this value since it is also
46+
// used to size the adjacency lists
47+
private long nodeCount = -1;
48+
4549
private HugeIntArray adjacencyDegrees;
4650
private HugeLongArray adjacencyOffsets;
4751
private HugeLongArray propertyOffsets;
@@ -67,6 +71,7 @@ public void init() {
6771
this.adjacencyDegrees = HugeIntArray.newArray(nodeCount);
6872
this.adjacencyOffsets = HugeLongArray.newArray(nodeCount);
6973
this.propertyOffsets = HugeLongArray.newArray(nodeCount);
74+
this.nodeCount = nodeCount;
7075
}
7176

7277
@Override
@@ -81,6 +86,12 @@ public LongAdder relationshipCounter() {
8186
return relationshipCounter;
8287
}
8388

89+
@Override
90+
public boolean validateNode(long nodeId) {
91+
assert this.nodeCount != -1 : "Called validateNode before init";
92+
return nodeId < this.nodeCount;
93+
}
94+
8495
@Override
8596
public AdjacencyListsWithProperties build(boolean allowReordering) {
8697
var builder = ImmutableAdjacencyListsWithProperties

core/src/main/java/org/neo4j/gds/core/compression/common/AdjacencyCompression.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,18 @@ public static void zigZagUncompressFrom(
5050
AdjacencyCompressor.ValueMapper mapper
5151
) {
5252
into.ensureCapacity(compressedValues);
53-
zigZagUncompressFrom(into.buffer, targets, compressedValues, limit, mapper);
54-
into.length = compressedValues;
53+
into.length = zigZagUncompressFrom(into.buffer, targets, compressedValues, limit, mapper);
5554
}
5655

57-
public static void zigZagUncompressFrom(
56+
public static int zigZagUncompressFrom(
5857
long[] into,
5958
byte[] targets,
6059
int compressedValues,
6160
int limit,
6261
AdjacencyCompressor.ValueMapper mapper
6362
) {
6463
assert into.length >= compressedValues;
65-
zigZagUncompress(targets, limit, into, mapper);
64+
return zigZagUncompress(targets, limit, into, mapper);
6665
}
6766

6867
public static int applyDeltaEncoding(LongArrayBuffer data, Aggregation aggregation) {

core/src/main/java/org/neo4j/gds/core/compression/common/ZigZagLongDecoding.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,11 @@
2020
package org.neo4j.gds.core.compression.common;
2121

2222
import org.neo4j.gds.api.compress.AdjacencyCompressor;
23+
import org.neo4j.gds.api.compress.AdjacencyCompressor.ValueMapper.Identity;
2324

24-
public final class ZigZagLongDecoding {
25+
import static org.neo4j.gds.api.compress.AdjacencyCompressor.ValueMapper.INVALID_ID;
2526

26-
public enum Identity implements AdjacencyCompressor.ValueMapper {
27-
INSTANCE {
28-
@Override
29-
public long map(long value) {
30-
return value;
31-
}
32-
}
33-
}
27+
public final class ZigZagLongDecoding {
3428

3529
public static int zigZagUncompress(byte[] compressedData, int length, long[] uncompressedData) {
3630
return zigZagUncompress(compressedData, length, uncompressedData, Identity.INSTANCE);
@@ -49,7 +43,11 @@ static int zigZagUncompress(
4943
value += (input & 127L) << shift;
5044
if ((input & 128L) == 128L) {
5145
startValue += ((value >>> 1L) ^ -(value & 1L));
52-
uncompressedData[into++] = mapper.map(startValue);
46+
var mappedId = mapper.map(startValue);
47+
if (mappedId != INVALID_ID) {
48+
uncompressedData[into++] = mappedId;
49+
}
50+
5351
value = 0L;
5452
shift = 0;
5553
} else {

core/src/main/java/org/neo4j/gds/core/compression/mixed/MixedCompressor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ static class Factory implements AdjacencyCompressorFactory {
120120
private final AdjacencyCompressorFactory packedCompressorFactory;
121121
private final AdjacencyCompressorFactory vlongCompressorFactory;
122122

123-
124123
Factory(
125124
LongSupplier nodeCountSupplier,
126125
LongAdder relationshipCounter,
@@ -194,6 +193,11 @@ public AdjacencyListsWithProperties build(boolean allowReordering) {
194193
.build();
195194
}
196195

196+
@Override
197+
public boolean validateNode(long nodeId) {
198+
return this.vlongCompressorFactory.validateNode(nodeId) && this.packedCompressorFactory.validateNode(nodeId);
199+
}
200+
197201
private static MemoryInfo mergeMemoryInfo(
198202
MemoryInfo packed,
199203
MemoryInfo vlong

core/src/main/java/org/neo4j/gds/core/compression/packed/PackedCompressor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
import org.neo4j.gds.api.compress.AdjacencyListBuilder;
3030
import org.neo4j.gds.api.compress.AdjacencyListBuilderFactory;
3131
import org.neo4j.gds.api.compress.ModifiableSlice;
32+
import org.neo4j.gds.collections.ha.HugeIntArray;
33+
import org.neo4j.gds.collections.ha.HugeLongArray;
3234
import org.neo4j.gds.core.Aggregation;
3335
import org.neo4j.gds.core.compression.common.AbstractAdjacencyCompressorFactory;
3436
import org.neo4j.gds.core.compression.common.AdjacencyCompression;
3537
import org.neo4j.gds.core.compression.common.MemoryTracker;
36-
import org.neo4j.gds.collections.ha.HugeIntArray;
37-
import org.neo4j.gds.collections.ha.HugeLongArray;
3838
import org.neo4j.gds.utils.GdsFeatureToggles;
3939

4040
import java.util.Arrays;

core/src/main/java/org/neo4j/gds/core/compression/uncompressed/RawCompressor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
import org.neo4j.gds.api.compress.AdjacencyListBuilder;
3030
import org.neo4j.gds.api.compress.AdjacencyListBuilderFactory;
3131
import org.neo4j.gds.api.compress.ModifiableSlice;
32+
import org.neo4j.gds.collections.ha.HugeIntArray;
33+
import org.neo4j.gds.collections.ha.HugeLongArray;
3234
import org.neo4j.gds.core.Aggregation;
3335
import org.neo4j.gds.core.compression.common.AbstractAdjacencyCompressorFactory;
3436
import org.neo4j.gds.core.compression.common.MemoryTracker;
3537
import org.neo4j.gds.core.utils.AscendingLongComparator;
36-
import org.neo4j.gds.collections.ha.HugeIntArray;
37-
import org.neo4j.gds.collections.ha.HugeLongArray;
3838

3939
import java.util.Arrays;
4040
import java.util.function.LongSupplier;

core/src/main/java/org/neo4j/gds/core/compression/varlong/DeltaVarLongCompressor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import org.neo4j.gds.api.compress.AdjacencyListBuilder;
2929
import org.neo4j.gds.api.compress.AdjacencyListBuilderFactory;
3030
import org.neo4j.gds.api.compress.ModifiableSlice;
31+
import org.neo4j.gds.collections.ha.HugeIntArray;
32+
import org.neo4j.gds.collections.ha.HugeLongArray;
3133
import org.neo4j.gds.core.Aggregation;
3234
import org.neo4j.gds.core.compression.common.AbstractAdjacencyCompressorFactory;
3335
import org.neo4j.gds.core.compression.common.AdjacencyCompression;
3436
import org.neo4j.gds.core.compression.common.MemoryTracker;
3537
import org.neo4j.gds.core.compression.common.VarLongEncoding;
36-
import org.neo4j.gds.collections.ha.HugeIntArray;
37-
import org.neo4j.gds.collections.ha.HugeLongArray;
3838

3939
import java.util.Arrays;
4040
import java.util.function.LongSupplier;

core/src/main/java/org/neo4j/gds/core/loading/AdjacencyBuffer.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.jetbrains.annotations.Nullable;
2727
import org.neo4j.gds.RelationshipType;
2828
import org.neo4j.gds.api.compress.AdjacencyCompressor;
29+
import org.neo4j.gds.api.compress.AdjacencyCompressor.ValueMapper.Identity;
2930
import org.neo4j.gds.api.compress.AdjacencyCompressorFactory;
3031
import org.neo4j.gds.api.compress.LongArrayBuffer;
3132
import org.neo4j.gds.core.Aggregation;
3233
import org.neo4j.gds.core.compression.common.AdjacencyCompression;
33-
import org.neo4j.gds.core.compression.common.ZigZagLongDecoding;
3434
import org.neo4j.gds.core.concurrency.Concurrency;
3535
import org.neo4j.gds.mem.MemoryEstimation;
3636
import org.neo4j.gds.mem.MemoryEstimations;
@@ -248,7 +248,7 @@ Collection<AdjacencyListBuilderTask> adjacencyListBuilderTasks(
248248
adjacencyCompressorFactory,
249249
chunkedAdjacencyLists[page],
250250
relationshipCounter,
251-
mapper.orElse(ZigZagLongDecoding.Identity.INSTANCE),
251+
mapper.orElse(Identity.INSTANCE),
252252
drainCountConsumer.orElse(n -> {})
253253
));
254254
}
@@ -305,32 +305,48 @@ public static final class AdjacencyListBuilderTask implements Runnable {
305305

306306
@Override
307307
public void run() {
308+
var mapper = this.valueMapper.andThen(this::validateNode);
308309
try (var compressor = adjacencyCompressorFactory.createCompressor()) {
309310
var buffer = new LongArrayBuffer();
310311
var importedRelationships = new MutableLong(0L);
311312
chunkedAdjacencyLists.consume((localId, targets, properties, compressedByteSize, numberOfCompressedTargets) -> {
312313
var sourceNodeId = this.paging.sourceNodeId(localId, this.page);
313-
var nodeId = valueMapper.map(sourceNodeId);
314+
var nodeId = mapper.map(sourceNodeId);
315+
if (nodeId == AdjacencyCompressor.ValueMapper.INVALID_ID) {
316+
return;
317+
}
314318

315319
AdjacencyCompression.zigZagUncompressFrom(
316320
buffer,
317321
targets,
318322
numberOfCompressedTargets,
319323
compressedByteSize,
320-
valueMapper
324+
mapper
321325
);
322326

327+
if (buffer.length == 0) {
328+
// special case: we skipped a relationship because it pointed to or from a node that we didn't load
329+
// this can result in some nodes that should not get an adjacencylist at all
330+
return;
331+
}
332+
323333
importedRelationships.add(compressor.compress(
324334
nodeId,
325335
buffer.buffer,
326336
properties,
327-
numberOfCompressedTargets
337+
buffer.length
328338
));
329339
});
330340
relationshipCounter.add(importedRelationships.longValue());
331341
drainCountConsumer.accept(importedRelationships.longValue());
332342
}
333343
}
344+
345+
long validateNode(long nodeId) {
346+
return (this.adjacencyCompressorFactory.validateNode(nodeId))
347+
? nodeId
348+
: AdjacencyCompressor.ValueMapper.INVALID_ID;
349+
}
334350
}
335351

336352
private static final class PagingWithKnownPageSize implements AdjacencyBufferPaging {

0 commit comments

Comments
 (0)