Skip to content

Commit

Permalink
feat: add LastBy / FirstBy features to NaturalJoin (deephaven#6604
Browse files Browse the repository at this point in the history
)

Allows user to specify the row to include when there are duplicates of
RHS rows.

Choices are:
* ERROR_ON_DUPLICATES - original NJ behavior
* FIRST_MATCH - equivalent to running FirstBy + NaturalJoin
* LAST_MATCH - equivalent to running LastBy + NaturalJoin
* EXACT_MATCH - equivalent of `exactJoin`, requires exactly one RHS row
for every LHS row
  • Loading branch information
lbooker42 authored Feb 5, 2025
1 parent e1dff59 commit 46579d3
Show file tree
Hide file tree
Showing 66 changed files with 6,126 additions and 2,962 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public interface BothIncrementalNaturalJoinStateManager extends IncrementalNatur

void compactAll();

WritableRowRedirection buildIndexedRowRedirection(QueryTable leftTable, boolean exactMatch, InitialBuildContext ibc,
WritableRowRedirection buildIndexedRowRedirection(QueryTable leftTable, InitialBuildContext ibc,
ColumnSource<RowSet> indexRowSets, JoinControl.RedirectionType redirectionType);

WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable leftTable, boolean exactMatch,
InitialBuildContext ibc, JoinControl.RedirectionType redirectionType);
WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable leftTable, InitialBuildContext ibc,
JoinControl.RedirectionType redirectionType);

Context makeProbeContext(ColumnSource<?>[] probeSources, long maxSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,58 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.NaturalJoinType;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;

public interface IncrementalNaturalJoinStateManager {
long getRightIndex(int slot);
long getRightRowKey(int slot);

RowSet getLeftIndex(int slot);
RowSet getRightRowSet(int slot);

RowSet getLeftRowSet(int slot);

String keyString(int slot);

void checkExactMatch(boolean exactMatch, long leftKeyIndex, long rightSide);
void checkExactMatch(long leftKeyIndex, long rightSide);

/**
* Given the join type, return the correct row key from the set of RHS duplicates.
*/
default long getRightRowKeyFromDuplicates(final WritableRowSet duplicates, final NaturalJoinType joinType) {
if (joinType == NaturalJoinType.LAST_MATCH) {
return duplicates.lastRowKey();
}
return duplicates.firstRowKey();
}

/**
* Add a key to the RHS duplicates, return the appropriate row key from this set *AFTER* the addition.
*/
default long addRightRowKeyToDuplicates(final WritableRowSet duplicates, final long keyToRemove,
final NaturalJoinType joinType) {
duplicates.insert(keyToRemove);
return getRightRowKeyFromDuplicates(duplicates, joinType);
}

/**
* Remove the key from the RHS duplicates, return the appropriate row key from this set *BEFORE* the removal.
*/
default long removeRightRowKeyFromDuplicates(final WritableRowSet duplicates, final long keyToRemove,
final NaturalJoinType joinType) {
final long originalRowKey = getRightRowKeyFromDuplicates(duplicates, joinType);
duplicates.remove(keyToRemove);
return originalRowKey;
}

/**
* Shift a key in the RHS duplicate row set.
*/
default void shiftOneKey(WritableRowSet duplicates, long shiftedKey, long shiftDelta) {
final long sizeBefore = duplicates.size();
duplicates.remove(shiftedKey - shiftDelta);
duplicates.insert(shiftedKey);
Assert.eq(duplicates.size(), "duplicates.size()", sizeBefore, "sizeBefore");
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ private Table exactJoinImpl(Table table, MatchPair[] columnsToMatch, MatchPair[]
"exactJoin(" + table + "," + Arrays.toString(columnsToMatch) + "," + Arrays.toString(columnsToMatch)
+ ")",
sizeForInstrumentation(),
() -> naturalJoinInternal(table, columnsToMatch, columnsToAdd, true));
() -> naturalJoinInternal(table, columnsToMatch, columnsToAdd, NaturalJoinType.EXACTLY_ONE_MATCH));
}
}

Expand Down Expand Up @@ -2263,29 +2263,38 @@ private Table ajInternal(Table rightTable, MatchPair[] columnsToMatch, MatchPair
public Table naturalJoin(
Table rightTable,
Collection<? extends JoinMatch> columnsToMatch,
Collection<? extends JoinAddition> columnsToAdd) {
Collection<? extends JoinAddition> columnsToAdd,
NaturalJoinType joinType) {
return naturalJoinImpl(
rightTable,
MatchPair.fromMatches(columnsToMatch),
MatchPair.fromAddition(columnsToAdd));
MatchPair.fromAddition(columnsToAdd),
joinType);
}

private Table naturalJoinImpl(final Table rightTable, final MatchPair[] columnsToMatch, MatchPair[] columnsToAdd) {
private Table naturalJoinImpl(
final Table rightTable,
final MatchPair[] columnsToMatch,
final MatchPair[] columnsToAdd,
final NaturalJoinType joinType) {
final UpdateGraph updateGraph = getUpdateGraph(rightTable);
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return QueryPerformanceRecorder.withNugget(
"naturalJoin(" + matchString(columnsToMatch) + ", " + matchString(columnsToAdd) + ")",
() -> naturalJoinInternal(rightTable, columnsToMatch, columnsToAdd, false));
() -> naturalJoinInternal(rightTable, columnsToMatch, columnsToAdd, joinType));
}
}

private Table naturalJoinInternal(final Table rightTable, final MatchPair[] columnsToMatch,
MatchPair[] columnsToAdd, boolean exactMatch) {
private Table naturalJoinInternal(
final Table rightTable,
final MatchPair[] columnsToMatch,
MatchPair[] columnsToAdd,
final NaturalJoinType joinType) {
columnsToAdd = createColumnsToAddIfMissing(rightTable, columnsToMatch, columnsToAdd);

final QueryTable rightTableCoalesced = (QueryTable) rightTable.coalesce();

return NaturalJoinHelper.naturalJoin(this, rightTableCoalesced, columnsToMatch, columnsToAdd, exactMatch);
return NaturalJoinHelper.naturalJoin(this, rightTableCoalesced, columnsToMatch, columnsToAdd, joinType);
}

private MatchPair[] createColumnsToAddIfMissing(Table rightTable, MatchPair[] columnsToMatch,
Expand Down Expand Up @@ -2393,7 +2402,8 @@ private Table joinNoMemo(
final Table rightGrouped = rightTable.groupBy(rightColumnsToMatch)
.view(columnsToAddSelectColumns.values());
final Table naturalJoinResult = naturalJoinImpl(rightGrouped, columnsToMatch,
columnsToAddAfterRename.toArray(MatchPair.ZERO_LENGTH_MATCH_PAIR_ARRAY));
columnsToAddAfterRename.toArray(MatchPair.ZERO_LENGTH_MATCH_PAIR_ARRAY),
NaturalJoinType.ERROR_ON_DUPLICATE);
final QueryTable ungroupedResult = (QueryTable) naturalJoinResult
.ungroup(columnsToUngroupBy.toArray(String[]::new));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.NaturalJoinType;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ColumnSource;
Expand All @@ -15,8 +16,11 @@
public abstract class RightIncrementalNaturalJoinStateManager extends StaticNaturalJoinStateManager
implements IncrementalNaturalJoinStateManager {

protected RightIncrementalNaturalJoinStateManager(ColumnSource<?>[] keySourcesForErrorMessages) {
super(keySourcesForErrorMessages);
protected RightIncrementalNaturalJoinStateManager(
ColumnSource<?>[] keySourcesForErrorMessages,
NaturalJoinType joinType,
boolean addOnly) {
super(keySourcesForErrorMessages, joinType, addOnly);
}

public abstract void buildFromLeftSide(final Table leftTable, ColumnSource<?>[] leftSources,
Expand All @@ -27,12 +31,12 @@ public abstract void convertLeftDataIndex(int groupingSize, InitialBuildContext

public abstract void addRightSide(RowSequence rightIndex, ColumnSource<?>[] rightSources);

public abstract WritableRowRedirection buildRowRedirectionFromHashSlot(QueryTable leftTable, boolean exactMatch,
public abstract WritableRowRedirection buildRowRedirectionFromHashSlot(QueryTable leftTable,
InitialBuildContext initialBuildContext, JoinControl.RedirectionType redirectionType);

public abstract WritableRowRedirection buildRowRedirectionFromHashSlotIndexed(QueryTable leftTable,
ColumnSource<RowSet> rowSetSource, int groupingSize, boolean exactMatch,
InitialBuildContext initialBuildContext, JoinControl.RedirectionType redirectionType);
ColumnSource<RowSet> rowSetSource, int groupingSize, InitialBuildContext initialBuildContext,
JoinControl.RedirectionType redirectionType);

// modification probes
public abstract void applyRightShift(Context pc, ColumnSource<?>[] rightSources, RowSet shiftedRowSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.NaturalJoinType;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.chunk.util.hashing.ToIntFunctor;
Expand Down Expand Up @@ -32,9 +33,13 @@ class SimpleUniqueStaticNaturalJoinStateManager extends StaticNaturalJoinStateMa

private final LongArraySource rightRowSetSource = new LongArraySource();

SimpleUniqueStaticNaturalJoinStateManager(ColumnSource<?>[] tableKeySources, int tableSize,
ToIntFunctor<Values> transform) {
super(tableKeySources);
SimpleUniqueStaticNaturalJoinStateManager(
ColumnSource<?>[] tableKeySources,
int tableSize,
ToIntFunctor<Values> transform,
NaturalJoinType joinType,
boolean addOnly) {
super(tableKeySources, joinType, addOnly);
this.tableSize = Require.gtZero(tableSize, "tableSize");
this.transform = transform;
rightRowSetSource.ensureCapacity(tableSize);
Expand All @@ -61,10 +66,14 @@ void setRightSide(RowSet rightRowSet, ColumnSource<?> valueSource) {
return true;
}
final long existingRight = rightRowSetSource.getLong(tableLocation);
if (existingRight == RowSequence.NULL_ROW_KEY) {
if (existingRight == RowSequence.NULL_ROW_KEY || joinType == NaturalJoinType.LAST_MATCH) {
rightRowSetSource.set(tableLocation, keyIndex);
} else {
rightRowSetSource.set(tableLocation, DUPLICATE_RIGHT_VALUE);
if (joinType == NaturalJoinType.FIRST_MATCH) {
// no-op, already have the first match
} else {
rightRowSetSource.set(tableLocation, DUPLICATE_RIGHT_VALUE);
}
}
return true;
});
Expand Down Expand Up @@ -112,8 +121,8 @@ protected void decorateLeftSide(RowSet leftRowSet, ColumnSource<?>[] valueSource
}

@NotNull
WritableRowRedirection buildRowRedirection(QueryTable leftTable, boolean exactMatch,
WritableRowRedirection buildRowRedirection(QueryTable leftTable,
LongArraySource leftRedirections, JoinControl.RedirectionType redirectionType) {
return buildRowRedirection(leftTable, exactMatch, leftRedirections::getLong, redirectionType);
return buildRowRedirection(leftTable, leftRedirections::getLong, redirectionType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.NaturalJoinType;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ColumnSource;
Expand All @@ -20,14 +21,21 @@ public abstract class StaticNaturalJoinStateManager {
public static final long NO_RIGHT_ENTRY_VALUE = RowSequence.NULL_ROW_KEY;

protected final ColumnSource<?>[] keySourcesForErrorMessages;
protected final NaturalJoinType joinType;
protected final boolean addOnly;

protected StaticNaturalJoinStateManager(ColumnSource<?>[] keySourcesForErrorMessages) {
protected StaticNaturalJoinStateManager(
ColumnSource<?>[] keySourcesForErrorMessages,
NaturalJoinType joinType,
boolean addOnly) {
this.keySourcesForErrorMessages = keySourcesForErrorMessages;
this.joinType = joinType;
this.addOnly = addOnly;
}

@SuppressWarnings("WeakerAccess")
public void checkExactMatch(boolean exactMatch, long leftKeyIndex, long rightSide) {
if (exactMatch && rightSide == NO_RIGHT_ENTRY_VALUE) {
public void checkExactMatch(long leftKeyIndex, long rightSide) {
if (joinType == NaturalJoinType.EXACTLY_ONE_MATCH && rightSide == NO_RIGHT_ENTRY_VALUE) {
throw new RuntimeException("Tables don't have one-to-one mapping - no mappings for key "
+ extractKeyStringFromSourceTable(leftKeyIndex) + ".");
}
Expand All @@ -42,8 +50,8 @@ protected String extractKeyStringFromSourceTable(long leftKey) {
.collect(Collectors.joining(", ")) + "]";
}

public WritableRowRedirection buildRowRedirection(QueryTable leftTable, boolean exactMatch,
LongUnaryOperator rightSideFromSlot, JoinControl.RedirectionType redirectionType) {
public WritableRowRedirection buildRowRedirection(QueryTable leftTable, LongUnaryOperator rightSideFromSlot,
JoinControl.RedirectionType redirectionType) {
switch (redirectionType) {
case Contiguous: {
if (!leftTable.isFlat()) {
Expand All @@ -53,7 +61,7 @@ public WritableRowRedirection buildRowRedirection(QueryTable leftTable, boolean
final long[] innerIndex = new long[leftTable.intSize("contiguous redirection build")];
for (int ii = 0; ii < innerIndex.length; ++ii) {
final long rightSide = rightSideFromSlot.applyAsLong(ii);
checkExactMatch(exactMatch, leftTable.getRowSet().get(ii), rightSide);
checkExactMatch(leftTable.getRowSet().get(ii), rightSide);
innerIndex[ii] = rightSide;
}
return new ContiguousWritableRowRedirection(innerIndex);
Expand All @@ -65,7 +73,7 @@ public WritableRowRedirection buildRowRedirection(QueryTable leftTable, boolean
for (final RowSet.Iterator it = leftTable.getRowSet().iterator(); it.hasNext();) {
final long next = it.nextLong();
final long rightSide = rightSideFromSlot.applyAsLong(leftPosition++);
checkExactMatch(exactMatch, leftTable.getRowSet().get(next), rightSide);
checkExactMatch(leftTable.getRowSet().get(next), rightSide);
if (rightSide != NO_RIGHT_ENTRY_VALUE) {
sparseRedirections.set(next, rightSide);
}
Expand All @@ -80,7 +88,7 @@ public WritableRowRedirection buildRowRedirection(QueryTable leftTable, boolean
for (final RowSet.Iterator it = leftTable.getRowSet().iterator(); it.hasNext();) {
final long next = it.nextLong();
final long rightSide = rightSideFromSlot.applyAsLong(leftPosition++);
checkExactMatch(exactMatch, leftTable.getRowSet().get(next), rightSide);
checkExactMatch(leftTable.getRowSet().get(next), rightSide);
if (rightSide != NO_RIGHT_ENTRY_VALUE) {
rowRedirection.put(next, rightSide);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import io.deephaven.api.ColumnName;
import io.deephaven.api.JoinAddition;
import io.deephaven.api.JoinMatch;
import io.deephaven.api.NaturalJoinType;
import io.deephaven.api.Pair;
import io.deephaven.api.RangeJoinMatch;
import io.deephaven.api.Selectable;
import io.deephaven.api.SortColumn;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.Pair;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.api.filter.Filter;
import io.deephaven.api.snapshot.SnapshotWhenOptions;
Expand Down Expand Up @@ -266,7 +267,7 @@ default Table asOfJoin(Table rightTable, Collection<? extends JoinMatch> exactMa

@Override
default Table naturalJoin(Table rightTable, Collection<? extends JoinMatch> columnsToMatch,
Collection<? extends JoinAddition> columnsToAdd) {
Collection<? extends JoinAddition> columnsToAdd, NaturalJoinType joinType) {
return throwUnsupported();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import io.deephaven.api.ColumnName;
import io.deephaven.api.JoinAddition;
import io.deephaven.api.JoinMatch;
import io.deephaven.api.NaturalJoinType;
import io.deephaven.api.Pair;
import io.deephaven.api.RangeJoinMatch;
import io.deephaven.api.Selectable;
import io.deephaven.api.SortColumn;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.Pair;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.api.filter.Filter;
import io.deephaven.api.snapshot.SnapshotWhenOptions;
Expand Down Expand Up @@ -335,8 +336,9 @@ public Table asOfJoin(Table rightTable, Collection<? extends JoinMatch> exactMat
public Table naturalJoin(
Table rightTable,
Collection<? extends JoinMatch> columnsToMatch,
Collection<? extends JoinAddition> columnsToAdd) {
return coalesce().naturalJoin(rightTable, columnsToMatch, columnsToAdd);
Collection<? extends JoinAddition> columnsToAdd,
NaturalJoinType joinType) {
return coalesce().naturalJoin(rightTable, columnsToMatch, columnsToAdd, joinType);
}

@Override
Expand Down
Loading

0 comments on commit 46579d3

Please sign in to comment.