Skip to content

Commit 5d44879

Browse files
authored
feat!: provide key columns as scalars (vs. vectors) to RollingFormula (#6375)
### Example: NOTE: `Sym` is a key column and is constant for each bucket. It is presented to the UDF as a string (not a vector). `intCol` / `longCol` are vectors containing the window data. ``` t_out = t.updateBy(UpdateByOperation.RollingFormula(prevTicks, postTicks, "out_val=sum(intCol) - max(longCol) + (Sym == null ? 0 : Sym.length())"), "Sym"); ```
1 parent 60a2948 commit 5d44879

12 files changed

+266
-92
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/BucketedPartitionedUpdateByManager.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//
44
package io.deephaven.engine.table.impl.updateby;
55

6-
import io.deephaven.UncheckedDeephavenException;
76
import io.deephaven.api.updateby.UpdateByControl;
87
import io.deephaven.base.verify.Assert;
98
import io.deephaven.engine.exceptions.CancellationException;
@@ -83,9 +82,12 @@ class BucketedPartitionedUpdateByManager extends UpdateBy {
8382
final PartitionedTable partitioned = source.partitionedAggBy(List.of(), true, null, byColumnNames);
8483
final PartitionedTable transformed = partitioned.transform(t -> {
8584
final long firstSourceRowKey = t.getRowSet().firstRowKey();
85+
final Object[] bucketKeyValues = Arrays.stream(byColumnNames)
86+
.map(colName -> t.getColumnSource(colName).get(firstSourceRowKey))
87+
.toArray();
8688
final String bucketDescription = BucketedPartitionedUpdateByManager.this + "-bucket-" +
87-
Arrays.stream(byColumnNames)
88-
.map(bcn -> Objects.toString(t.getColumnSource(bcn).get(firstSourceRowKey)))
89+
Arrays.stream(bucketKeyValues)
90+
.map(Objects::toString)
8991
.collect(Collectors.joining(", ", "[", "]"));
9092
UpdateByBucketHelper bucket = new UpdateByBucketHelper(
9193
bucketDescription,
@@ -94,7 +96,8 @@ class BucketedPartitionedUpdateByManager extends UpdateBy {
9496
resultSources,
9597
timestampColumnName,
9698
control,
97-
this::onBucketFailure);
99+
this::onBucketFailure,
100+
bucketKeyValues);
98101
// add this to the bucket list
99102
synchronized (buckets) {
100103
buckets.offer(bucket);

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByBucketHelper.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class UpdateByBucketHelper extends IntrusiveDoublyLinkedNode.Impl<UpdateByBucket
5050
private final ColumnSource<?> timestampColumnSource;
5151
private final ModifiedColumnSet timestampColumnSet;
5252

53+
/** Store boxed key values for this bucket */
54+
private final Object[] bucketKeyValues;
55+
5356
/** Indicates this bucket needs to be processed (at least one window and operator are dirty) */
5457
private boolean isDirty;
5558
/** This rowset will store row keys where the timestamp is not null (will mirror the SSA contents) */
@@ -65,22 +68,25 @@ class UpdateByBucketHelper extends IntrusiveDoublyLinkedNode.Impl<UpdateByBucket
6568
* @param resultSources the result sources
6669
* @param timestampColumnName the timestamp column used for time-based operations
6770
* @param control the control object.
71+
* @param failureNotifier a consumer to notify of any failures
72+
* @param bucketKeyValues the key values for this bucket (empty for zero-key)
6873
*/
69-
7074
protected UpdateByBucketHelper(
7175
@NotNull final String description,
7276
@NotNull final QueryTable source,
7377
@NotNull final UpdateByWindow[] windows,
7478
@NotNull final Map<String, ? extends ColumnSource<?>> resultSources,
7579
@Nullable final String timestampColumnName,
7680
@NotNull final UpdateByControl control,
77-
@NotNull final BiConsumer<Throwable, TableListener.Entry> failureNotifier) {
81+
@NotNull final BiConsumer<Throwable, TableListener.Entry> failureNotifier,
82+
@NotNull final Object[] bucketKeyValues) {
7883
this.description = description;
7984
this.source = source;
8085
// some columns will have multiple inputs, such as time-based and Weighted computations
8186
this.windows = windows;
8287
this.control = control;
8388
this.failureNotifier = failureNotifier;
89+
this.bucketKeyValues = bucketKeyValues;
8490

8591
result = new QueryTable(source.getRowSet(), resultSources);
8692

@@ -331,7 +337,8 @@ public void prepareForUpdate(final TableUpdate upstream, final boolean initialSt
331337
timestampValidRowSet,
332338
timestampsModified,
333339
control.chunkCapacityOrDefault(),
334-
initialStep);
340+
initialStep,
341+
bucketKeyValues);
335342

336343
// compute the affected/influenced operators and rowsets within this window
337344
windows[winIdx].computeAffectedRowsAndOperators(windowContexts[winIdx], upstream);

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByOperator.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,46 @@ protected UpdateByOperator(
159159
*/
160160
public abstract void initializeSources(@NotNull Table source, @Nullable RowRedirection rowRedirection);
161161

162+
/**
163+
* Initialize the bucket context for a cumulative operator and pass in the bucket key values. Most operators will
164+
* not need the key values, but those that do can override this method.
165+
*/
166+
public void initializeCumulativeWithKeyValues(
167+
@NotNull final Context context,
168+
final long firstUnmodifiedKey,
169+
final long firstUnmodifiedTimestamp,
170+
@NotNull final RowSet bucketRowSet,
171+
@NotNull Object[] bucketKeyValues) {
172+
initializeCumulative(context, firstUnmodifiedKey, firstUnmodifiedTimestamp, bucketRowSet);
173+
}
174+
162175
/**
163176
* Initialize the bucket context for a cumulative operator
164177
*/
165-
public void initializeCumulative(@NotNull final Context context, final long firstUnmodifiedKey,
178+
public void initializeCumulative(
179+
@NotNull final Context context,
180+
final long firstUnmodifiedKey,
166181
final long firstUnmodifiedTimestamp,
167182
@NotNull final RowSet bucketRowSet) {
168183
context.reset();
169184
}
170185

186+
/**
187+
* Initialize the bucket context for a windowed operator and pass in the bucket key values. Most operators will not
188+
* need the key values, but those that do can override this method.
189+
*/
190+
public void initializeRollingWithKeyValues(
191+
@NotNull final Context context,
192+
@NotNull final RowSet bucketRowSet,
193+
@NotNull Object[] bucketKeyValues) {
194+
initializeRolling(context, bucketRowSet);
195+
}
196+
171197
/**
172198
* Initialize the bucket context for a windowed operator
173199
*/
174-
public void initializeRolling(@NotNull final Context context,
200+
public void initializeRolling(
201+
@NotNull final Context context,
175202
@NotNull final RowSet bucketRowSet) {
176203
context.reset();
177204
}

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByOperatorFactory.java

+44-26
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class UpdateByOperatorFactory {
5959
private final MatchPair[] groupByColumns;
6060
@NotNull
6161
private final UpdateByControl control;
62-
private Map<String, ColumnDefinition<?>> vectorColumnNameMap;
62+
private Map<String, ColumnDefinition<?>> vectorColumnDefinitions;
6363

6464
public UpdateByOperatorFactory(
6565
@NotNull final TableDefinition tableDef,
@@ -1437,7 +1437,6 @@ private UpdateByOperator makeRollingFormulaOperator(@NotNull final MatchPair pai
14371437
private UpdateByOperator makeRollingFormulaMultiColumnOperator(
14381438
@NotNull final TableDefinition tableDef,
14391439
@NotNull final RollingFormulaSpec rs) {
1440-
14411440
final long prevWindowScaleUnits = rs.revWindowScale().getTimeScaleUnits();
14421441
final long fwdWindowScaleUnits = rs.fwdWindowScale().getTimeScaleUnits();
14431442

@@ -1446,42 +1445,58 @@ private UpdateByOperator makeRollingFormulaMultiColumnOperator(
14461445
// Create the colum
14471446
final SelectColumn selectColumn = SelectColumn.of(Selectable.parse(rs.formula()));
14481447

1449-
// Get or create a column definition map where the definitions are vectors of the original column types.
1450-
if (vectorColumnNameMap == null) {
1451-
vectorColumnNameMap = new HashMap<>();
1452-
columnDefinitionMap.forEach((key, value) -> {
1453-
final ColumnDefinition<?> columnDef = ColumnDefinition.fromGenericType(
1454-
key,
1455-
VectorFactory.forElementType(value.getDataType()).vectorType(),
1456-
value.getDataType());
1457-
vectorColumnNameMap.put(key, columnDef);
1458-
});
1448+
// Get or create a column definition map composed of vectors of the original column types (or scalars when
1449+
// part of the group_by columns).
1450+
final Set<String> groupByColumnSet =
1451+
Arrays.stream(groupByColumns).map(MatchPair::rightColumn).collect(Collectors.toSet());
1452+
if (vectorColumnDefinitions == null) {
1453+
vectorColumnDefinitions = tableDef.getColumnStream().collect(Collectors.toMap(
1454+
ColumnDefinition::getName,
1455+
(final ColumnDefinition<?> cd) -> groupByColumnSet.contains(cd.getName())
1456+
? cd
1457+
: ColumnDefinition.fromGenericType(
1458+
cd.getName(),
1459+
VectorFactory.forElementType(cd.getDataType()).vectorType(),
1460+
cd.getDataType())));
14591461
}
14601462

1461-
// Get the input column names and data types from the formula.
1462-
final String[] inputColumnNames =
1463-
selectColumn.initDef(vectorColumnNameMap, compilationProcessor).toArray(String[]::new);
1463+
// Get the input column names from the formula and provide them to the rolling formula operator
1464+
final String[] allInputColumns =
1465+
selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);
14641466
if (!selectColumn.getColumnArrays().isEmpty()) {
14651467
throw new IllegalArgumentException("RollingFormulaMultiColumnOperator does not support column arrays ("
14661468
+ selectColumn.getColumnArrays() + ")");
14671469
}
14681470
if (selectColumn.hasVirtualRowVariables()) {
14691471
throw new IllegalArgumentException("RollingFormula does not support virtual row variables");
14701472
}
1471-
final Class<?>[] inputColumnTypes = new Class[inputColumnNames.length];
1472-
final Class<?>[] inputVectorTypes = new Class[inputColumnNames.length];
14731473

1474-
for (int i = 0; i < inputColumnNames.length; i++) {
1475-
final ColumnDefinition<?> columnDef = columnDefinitionMap.get(inputColumnNames[i]);
1476-
inputColumnTypes[i] = columnDef.getDataType();
1477-
inputVectorTypes[i] = vectorColumnNameMap.get(inputColumnNames[i]).getDataType();
1474+
final Map<Boolean, List<String>> partitioned = Arrays.stream(allInputColumns)
1475+
.collect(Collectors.partitioningBy(groupByColumnSet::contains));
1476+
final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new);
1477+
final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new);
1478+
1479+
final Class<?>[] inputKeyColumnTypes = new Class[inputKeyColumns.length];
1480+
final Class<?>[] inputKeyComponentTypes = new Class[inputKeyColumns.length];
1481+
for (int i = 0; i < inputKeyColumns.length; i++) {
1482+
final ColumnDefinition<?> columnDef = columnDefinitionMap.get(inputKeyColumns[i]);
1483+
inputKeyColumnTypes[i] = columnDef.getDataType();
1484+
inputKeyComponentTypes[i] = columnDef.getComponentType();
1485+
}
1486+
1487+
final Class<?>[] inputNonKeyColumnTypes = new Class[inputNonKeyColumns.length];
1488+
final Class<?>[] inputNonKeyVectorTypes = new Class[inputNonKeyColumns.length];
1489+
for (int i = 0; i < inputNonKeyColumns.length; i++) {
1490+
final ColumnDefinition<?> columnDef = columnDefinitionMap.get(inputNonKeyColumns[i]);
1491+
inputNonKeyColumnTypes[i] = columnDef.getDataType();
1492+
inputNonKeyVectorTypes[i] = vectorColumnDefinitions.get(inputNonKeyColumns[i]).getDataType();
14781493
}
14791494

14801495
final String[] affectingColumns;
14811496
if (rs.revWindowScale().timestampCol() == null) {
1482-
affectingColumns = inputColumnNames;
1497+
affectingColumns = inputNonKeyColumns;
14831498
} else {
1484-
affectingColumns = ArrayUtils.add(inputColumnNames, rs.revWindowScale().timestampCol());
1499+
affectingColumns = ArrayUtils.add(inputNonKeyColumns, rs.revWindowScale().timestampCol());
14851500
}
14861501

14871502
// Create a new column pair with the same name for the left and right columns
@@ -1494,9 +1509,12 @@ private UpdateByOperator makeRollingFormulaMultiColumnOperator(
14941509
prevWindowScaleUnits,
14951510
fwdWindowScaleUnits,
14961511
selectColumn,
1497-
inputColumnNames,
1498-
inputColumnTypes,
1499-
inputVectorTypes);
1512+
inputKeyColumns,
1513+
inputKeyColumnTypes,
1514+
inputKeyComponentTypes,
1515+
inputNonKeyColumns,
1516+
inputNonKeyColumnTypes,
1517+
inputNonKeyVectorTypes);
15001518
}
15011519
}
15021520
}

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindow.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ static class UpdateByWindowBucketContext implements SafeCloseable {
4747
protected final boolean timestampsModified;
4848
/** Whether this is the creation phase of this window */
4949
protected final boolean initialStep;
50-
50+
/** Store the key values for this bucket */
51+
protected final Object[] bucketKeyValues;
5152

5253
/** An array of ColumnSources for each underlying operator */
5354
protected ColumnSource<?>[] inputSources;
@@ -71,12 +72,14 @@ static class UpdateByWindowBucketContext implements SafeCloseable {
7172
final TrackingRowSet timestampValidRowSet,
7273
final boolean timestampsModified,
7374
final int chunkSize,
74-
final boolean initialStep) {
75+
final boolean initialStep,
76+
@NotNull final Object[] bucketKeyValues) {
7577
this.sourceRowSet = sourceRowSet;
7678
this.timestampColumnSource = timestampColumnSource;
7779
this.timestampSsa = timestampSsa;
7880
this.timestampValidRowSet = timestampValidRowSet;
7981
this.timestampsModified = timestampsModified;
82+
this.bucketKeyValues = bucketKeyValues;
8083

8184
workingChunkSize = chunkSize;
8285
this.initialStep = initialStep;
@@ -91,13 +94,15 @@ public void close() {
9194
}
9295
}
9396

94-
abstract UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
97+
abstract UpdateByWindowBucketContext makeWindowContext(
98+
final TrackingRowSet sourceRowSet,
9599
final ColumnSource<?> timestampColumnSource,
96100
final LongSegmentedSortedArray timestampSsa,
97101
final TrackingRowSet timestampValidRowSet,
98102
final boolean timestampsModified,
99103
final int chunkSize,
100-
final boolean isInitializeStep);
104+
final boolean isInitializeStep,
105+
final Object[] bucketKeyValues);
101106

102107
UpdateByWindow(final UpdateByOperator[] operators,
103108
final int[][] operatorInputSourceSlots,

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowCumulative.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,17 @@ UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
5555
final TrackingRowSet timestampValidRowSet,
5656
final boolean timestampsModified,
5757
final int chunkSize,
58-
final boolean isInitializeStep) {
59-
return new UpdateByWindowBucketContext(sourceRowSet, timestampColumnSource, timestampSsa, timestampValidRowSet,
60-
timestampsModified, chunkSize, isInitializeStep);
58+
final boolean isInitializeStep,
59+
final Object[] bucketKeyValues) {
60+
return new UpdateByWindowBucketContext(
61+
sourceRowSet,
62+
timestampColumnSource,
63+
timestampSsa,
64+
timestampValidRowSet,
65+
timestampsModified,
66+
chunkSize,
67+
isInitializeStep,
68+
bucketKeyValues);
6169
}
6270

6371
@Override
@@ -192,7 +200,8 @@ void processWindowBucketOperatorSet(final UpdateByWindowBucketContext context,
192200
continue;
193201
}
194202
UpdateByOperator cumOp = operators[opIdx];
195-
cumOp.initializeCumulative(winOpContexts[ii], rowKey, timestamp, context.sourceRowSet);
203+
cumOp.initializeCumulativeWithKeyValues(winOpContexts[ii], rowKey, timestamp, context.sourceRowSet,
204+
context.bucketKeyValues);
196205
}
197206

198207
while (affectedIt.hasMore()) {

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowRollingBase.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@ static class UpdateByWindowRollingBucketContext extends UpdateByWindowBucketCont
4141
final TrackingRowSet timestampValidRowSet,
4242
final boolean timestampsModified,
4343
final int chunkSize,
44-
final boolean initialStep) {
44+
final boolean initialStep,
45+
final Object[] bucketKeyValues) {
4546
super(sourceRowSet,
4647
timestampColumnSource,
4748
timestampSsa,
4849
timestampValidRowSet,
4950
timestampsModified,
5051
chunkSize,
51-
initialStep);
52+
initialStep,
53+
bucketKeyValues);
5254
}
5355

5456
@Override
@@ -60,7 +62,7 @@ public void close() {
6062
}
6163

6264
UpdateByWindowRollingBase(@NotNull final UpdateByOperator[] operators,
63-
@NotNull final int[][] operatorSourceSlots,
65+
final int[][] operatorSourceSlots,
6466
final long prevUnits,
6567
final long fwdUnits,
6668
@Nullable final String timestampColumnName) {
@@ -152,7 +154,7 @@ void processWindowBucketOperatorSet(final UpdateByWindowBucketContext context,
152154
continue;
153155
}
154156
UpdateByOperator rollingOp = operators[opIdx];
155-
rollingOp.initializeRolling(winOpContexts[ii], bucketRowSet);
157+
rollingOp.initializeRollingWithKeyValues(winOpContexts[ii], bucketRowSet, context.bucketKeyValues);
156158
}
157159

158160
int affectedChunkOffset = 0;

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowRollingTicks.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ static class UpdateByWindowTicksBucketContext extends UpdateByWindowRollingBucke
3030
private RowSet affectedRowPositions;
3131
private RowSet influencerPositions;
3232

33-
UpdateByWindowTicksBucketContext(final TrackingRowSet sourceRowSet,
34-
final int chunkSize, final boolean initialStep) {
35-
super(sourceRowSet, null, null, null, false, chunkSize, initialStep);
33+
UpdateByWindowTicksBucketContext(
34+
final TrackingRowSet sourceRowSet,
35+
final int chunkSize,
36+
final boolean initialStep,
37+
final Object[] bucketKeyValues) {
38+
super(sourceRowSet, null, null, null, false, chunkSize, initialStep, bucketKeyValues);
3639
}
3740

3841
@Override
@@ -77,14 +80,16 @@ void finalizeWindowBucket(UpdateByWindowBucketContext context) {
7780
}
7881

7982
@Override
80-
UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
83+
UpdateByWindowBucketContext makeWindowContext(
84+
final TrackingRowSet sourceRowSet,
8185
final ColumnSource<?> timestampColumnSource,
8286
final LongSegmentedSortedArray timestampSsa,
8387
final TrackingRowSet timestampValidRowSet,
8488
final boolean timestampsModified,
8589
final int chunkSize,
86-
final boolean isInitializeStep) {
87-
return new UpdateByWindowTicksBucketContext(sourceRowSet, chunkSize, isInitializeStep);
90+
final boolean isInitializeStep,
91+
final Object[] bucketKeyValues) {
92+
return new UpdateByWindowTicksBucketContext(sourceRowSet, chunkSize, isInitializeStep, bucketKeyValues);
8893
}
8994

9095
private static WritableRowSet computeAffectedRowsTicks(final RowSet sourceSet, final RowSet invertedSubSet,

0 commit comments

Comments
 (0)