Skip to content

Commit a6ad4c9

Browse files
committed
Barrage: Refactor Read/Write Chunk Factories
1 parent 564b146 commit a6ad4c9

File tree

112 files changed

+6687
-4296
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+6687
-4296
lines changed

engine/chunk/src/main/java/io/deephaven/chunk/BooleanChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -74,6 +75,10 @@ public final boolean get(int index) {
7475
return data[offset + index];
7576
}
7677

78+
public final boolean isNullAt(int index) {
79+
return data[offset + index] == QueryConstants.NULL_BOOLEAN;
80+
}
81+
7782
@Override
7883
public BooleanChunk<ATTR> slice(int offset, int capacity) {
7984
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/ByteChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -78,6 +79,10 @@ public final byte get(int index) {
7879
return data[offset + index];
7980
}
8081

82+
public final boolean isNullAt(int index) {
83+
return data[offset + index] == QueryConstants.NULL_BYTE;
84+
}
85+
8186
@Override
8287
public ByteChunk<ATTR> slice(int offset, int capacity) {
8388
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/CharChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//
44
package io.deephaven.chunk;
55

6+
import io.deephaven.util.QueryConstants;
67
import io.deephaven.util.type.ArrayTypeUtils;
78
import io.deephaven.chunk.attributes.Any;
89

@@ -73,6 +74,10 @@ public final char get(int index) {
7374
return data[offset + index];
7475
}
7576

77+
public final boolean isNullAt(int index) {
78+
return data[offset + index] == QueryConstants.NULL_CHAR;
79+
}
80+
7681
@Override
7782
public CharChunk<ATTR> slice(int offset, int capacity) {
7883
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/Chunk.java

+6
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ default void copyToBuffer(int srcOffset, @NotNull Buffer destBuffer, int destOff
109109
*/
110110
int size();
111111

112+
/**
113+
* @return whether The value offset is null
114+
* @param index The index to check
115+
*/
116+
boolean isNullAt(int index);
117+
112118
/**
113119
* @return The underlying chunk type
114120
*/

engine/chunk/src/main/java/io/deephaven/chunk/DoubleChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -77,6 +78,10 @@ public final double get(int index) {
7778
return data[offset + index];
7879
}
7980

81+
public final boolean isNullAt(int index) {
82+
return data[offset + index] == QueryConstants.NULL_DOUBLE;
83+
}
84+
8085
@Override
8186
public DoubleChunk<ATTR> slice(int offset, int capacity) {
8287
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/FloatChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -77,6 +78,10 @@ public final float get(int index) {
7778
return data[offset + index];
7879
}
7980

81+
public final boolean isNullAt(int index) {
82+
return data[offset + index] == QueryConstants.NULL_FLOAT;
83+
}
84+
8085
@Override
8186
public FloatChunk<ATTR> slice(int offset, int capacity) {
8287
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/IntChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -77,6 +78,10 @@ public final int get(int index) {
7778
return data[offset + index];
7879
}
7980

81+
public final boolean isNullAt(int index) {
82+
return data[offset + index] == QueryConstants.NULL_INT;
83+
}
84+
8085
@Override
8186
public IntChunk<ATTR> slice(int offset, int capacity) {
8287
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/LongChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -77,6 +78,10 @@ public final long get(int index) {
7778
return data[offset + index];
7879
}
7980

81+
public final boolean isNullAt(int index) {
82+
return data[offset + index] == QueryConstants.NULL_LONG;
83+
}
84+
8085
@Override
8186
public LongChunk<ATTR> slice(int offset, int capacity) {
8287
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/ObjectChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -77,6 +78,10 @@ public final T get(int index) {
7778
return data[offset + index];
7879
}
7980

81+
public final boolean isNullAt(int index) {
82+
return data[offset + index] == null;
83+
}
84+
8085
@Override
8186
public ObjectChunk<T, ATTR> slice(int offset, int capacity) {
8287
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/chunk/src/main/java/io/deephaven/chunk/ShortChunk.java

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// @formatter:off
88
package io.deephaven.chunk;
99

10+
import io.deephaven.util.QueryConstants;
1011
import io.deephaven.util.type.ArrayTypeUtils;
1112
import io.deephaven.chunk.attributes.Any;
1213

@@ -77,6 +78,10 @@ public final short get(int index) {
7778
return data[offset + index];
7879
}
7980

81+
public final boolean isNullAt(int index) {
82+
return data[offset + index] == QueryConstants.NULL_SHORT;
83+
}
84+
8085
@Override
8186
public ShortChunk<ATTR> slice(int offset, int capacity) {
8287
ChunkHelpers.checkSliceArgs(size, offset, capacity);

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

+1
Original file line numberDiff line numberDiff line change
@@ -2613,6 +2613,7 @@ private Table snapshotIncrementalInternal(final Table base, final boolean doInit
26132613
new ListenerRecorder("snapshotIncremental (triggerTable)", this, resultTable);
26142614
addUpdateListener(triggerListenerRecorder);
26152615

2616+
dropColumns(getColumnSourceMap().keySet());
26162617
final SnapshotIncrementalListener listener =
26172618
new SnapshotIncrementalListener(this, resultTable, resultColumns,
26182619
baseListenerRecorder, triggerListenerRecorder, baseTable, triggerColumns);

engine/table/src/main/java/io/deephaven/engine/table/impl/preview/ArrayPreview.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//
44
package io.deephaven.engine.table.impl.preview;
55

6+
import io.deephaven.util.type.TypeUtils;
67
import io.deephaven.vector.Vector;
78
import io.deephaven.vector.VectorFactory;
89
import org.jetbrains.annotations.NotNull;
@@ -34,7 +35,9 @@ public static ArrayPreview fromArray(final Object array) {
3435
if (componentType == boolean.class) {
3536
return new ArrayPreview(convertToString((boolean[]) array));
3637
}
37-
return new ArrayPreview(VectorFactory.forElementType(componentType)
38+
// Boxed primitives need the Object wrapper.
39+
final Class<?> elementType = TypeUtils.isBoxedType(componentType) ? Object.class : componentType;
40+
return new ArrayPreview(VectorFactory.forElementType(elementType)
3841
.vectorWrap(array)
3942
.toString(ARRAY_SIZE_CUTOFF));
4043
}

extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageMessageWriter.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package io.deephaven.extensions.barrage;
55

66
import com.google.flatbuffers.FlatBufferBuilder;
7+
import io.deephaven.chunk.Chunk;
8+
import io.deephaven.chunk.attributes.Values;
79
import io.deephaven.engine.rowset.RowSet;
810
import io.deephaven.engine.table.impl.util.BarrageMessage;
11+
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
912
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
1013
import io.deephaven.util.SafeCloseable;
1114
import org.jetbrains.annotations.NotNull;
@@ -17,10 +20,10 @@
1720
import java.util.function.ToIntFunction;
1821

1922
/**
20-
* A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers
21-
* that may subscribe to different viewports and columns.
23+
* A {@code BarrageMessageWriter} takes a {@link BarrageMessage} and re-uses portions of the serialized payload across
24+
* different subscribers that may subscribe to different viewports and columns.
2225
*/
23-
public interface BarrageStreamGenerator extends SafeCloseable {
26+
public interface BarrageMessageWriter extends SafeCloseable {
2427

2528
/**
2629
* Represents a single update, which might be sent as multiple distinct payloads as necessary based in the
@@ -32,16 +35,18 @@ interface MessageView {
3235

3336
interface Factory {
3437
/**
35-
* Create a StreamGenerator that now owns the BarrageMessage.
38+
* Create a {@code BarrageMessageWriter} that now owns the {@link BarrageMessage}.
3639
*
3740
* @param message the message that contains the update that we would like to propagate
3841
* @param metricsConsumer a method that can be used to record write metrics
3942
*/
40-
BarrageStreamGenerator newGenerator(
41-
BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);
43+
BarrageMessageWriter newMessageWriter(
44+
@NotNull BarrageMessage message,
45+
@NotNull ChunkWriter<Chunk<Values>>[] chunkWriters,
46+
@NotNull BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);
4247

4348
/**
44-
* Create a MessageView of the Schema to send as the initial message to a new subscriber.
49+
* Create a {@link MessageView} of the Schema to send as the initial message to a new subscriber.
4550
*
4651
* @param schemaPayloadWriter a function that writes schema data to a {@link FlatBufferBuilder} and returns the
4752
* schema offset
@@ -51,21 +56,22 @@ BarrageStreamGenerator newGenerator(
5156
}
5257

5358
/**
54-
* @return the BarrageMessage that this generator is operating on
59+
* @return the {@link BarrageMessage} that this writer is operating on
5560
*/
5661
BarrageMessage getMessage();
5762

5863
/**
59-
* Obtain a Full-Subscription View of this StreamGenerator that can be sent to a single subscriber.
64+
* Obtain a Full-Subscription {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single
65+
* subscriber.
6066
*
6167
* @param options serialization options for this specific view
62-
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
68+
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
6369
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
6470
*/
6571
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot);
6672

6773
/**
68-
* Obtain a View of this StreamGenerator that can be sent to a single subscriber.
74+
* Obtain a {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single subscriber.
6975
*
7076
* @param options serialization options for this specific view
7177
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
@@ -79,15 +85,16 @@ MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnap
7985
boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet subscribedColumns);
8086

8187
/**
82-
* Obtain a Full-Snapshot View of this StreamGenerator that can be sent to a single requestor.
88+
* Obtain a Full-Snapshot {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single
89+
* requestor.
8390
*
8491
* @param options serialization options for this specific view
8592
* @return a MessageView filtered by the snapshot properties that can be sent to that requestor
8693
*/
8794
MessageView getSnapshotView(BarrageSnapshotOptions options);
8895

8996
/**
90-
* Obtain a View of this StreamGenerator that can be sent to a single requestor.
97+
* Obtain a {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single requestor.
9198
*
9299
* @param options serialization options for this specific view
93100
* @param viewport is the position-space viewport

0 commit comments

Comments
 (0)