6
6
import io .deephaven .chunk .attributes .Values ;
7
7
import io .deephaven .engine .rowset .RowSequence ;
8
8
import io .deephaven .engine .table .impl .MatchPair ;
9
+ import io .deephaven .engine .table .impl .locations .TableDataException ;
9
10
import io .deephaven .engine .table .impl .updateby .UpdateByOperator ;
10
11
import org .jetbrains .annotations .NotNull ;
11
12
import org .jetbrains .annotations .Nullable ;
@@ -22,10 +23,11 @@ protected Context(final int chunkSize) {
22
23
}
23
24
24
25
@ Override
25
- public void accumulateCumulative (@ NotNull RowSequence inputKeys ,
26
- Chunk <? extends Values >[] valueChunkArr ,
27
- LongChunk <? extends Values > tsChunk ,
28
- int len ) {
26
+ public void accumulateCumulative (
27
+ @ NotNull final RowSequence inputKeys ,
28
+ @ NotNull final Chunk <? extends Values >[] valueChunkArr ,
29
+ @ Nullable final LongChunk <? extends Values > tsChunk ,
30
+ final int len ) {
29
31
setValueChunks (valueChunkArr );
30
32
31
33
// chunk processing
@@ -58,22 +60,25 @@ public void accumulateCumulative(@NotNull RowSequence inputKeys,
58
60
handleBadData (this , true );
59
61
} else if (isNullTime ) {
60
62
// no change to curVal and lastStamp
63
+ } else if (curVal == null ) {
64
+ // We have a valid input value, we can initialize the output value with it.
65
+ curVal = new BigDecimal (input , control .bigValueContextOrDefault ());
66
+ lastStamp = timestamp ;
61
67
} else {
62
- final BigDecimal decimalInput = new BigDecimal (input , control .bigValueContextOrDefault ());
63
- if (curVal == null ) {
64
- curVal = decimalInput ;
65
- lastStamp = timestamp ;
66
- } else {
67
- final long dt = timestamp - lastStamp ;
68
- // Alpha is dynamic based on time, but only recalculated when needed
69
- if (dt != lastDt ) {
70
- alpha = computeAlpha (-dt , reverseWindowScaleUnits );
71
- oneMinusAlpha = computeOneMinusAlpha (alpha );
72
- lastDt = dt ;
73
- }
74
- curVal = aggFunction .apply (curVal , decimalInput , alpha , oneMinusAlpha );
75
- lastStamp = timestamp ;
68
+ final long dt = timestamp - lastStamp ;
69
+ if (dt < 0 ) {
70
+ // negative time deltas are not allowed, throw an exception
71
+ throw new TableDataException ("Timestamp values in UpdateBy operators must not decrease" );
72
+ }
73
+ // Alpha is dynamic based on time, but only recalculated when needed
74
+ if (dt != lastDt ) {
75
+ alpha = computeAlpha (-dt , reverseWindowScaleUnits );
76
+ oneMinusAlpha = computeOneMinusAlpha (alpha );
77
+ lastDt = dt ;
76
78
}
79
+ final BigDecimal decimalInput = new BigDecimal (input , control .bigValueContextOrDefault ());
80
+ curVal = aggFunction .apply (curVal , decimalInput , alpha , oneMinusAlpha );
81
+ lastStamp = timestamp ;
77
82
}
78
83
outputValues .set (ii , curVal );
79
84
}
0 commit comments