Skip to content

Commit

Permalink
fix: convert primitives to Objects inside of WebChunkReaderFactory (#…
Browse files Browse the repository at this point in the history
…6669)

We were already converting primitives to boxed Objects within each
sub-typed `WebColumnData`. This change moves the conversion to objects
earlier in the chain, to the WebChunkReaderFactory layer, so that we can
recursively accumulate objects into Arrays for array like columns. The
jsapi provides js native null values instead of dh internal
QueryConstant null values, and therefore data must be stored as boxed
objects. To reuse expansion kernel code the incoming Chunk must be an
ObjectChunk to generate values into object arrays.

Moving the conversion earlier guarantees that we can use the
ObjectExpansionKernel and provide downstream users with WebColumnData
that JS supports.

Fixes #6201.
  • Loading branch information
nbauernfeind authored Mar 6, 2025
1 parent 375695f commit 7093839
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 864 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ public static void main(final String[] args) throws IOException {
fixupVectorExpansionKernel(CHUNK_PACKAGE + "/vector/IntVectorExpansionKernel.java", "Int");
fixupVectorExpansionKernel(CHUNK_PACKAGE + "/vector/LongVectorExpansionKernel.java", "Long");
fixupVectorExpansionKernel(CHUNK_PACKAGE + "/vector/DoubleVectorExpansionKernel.java", "Double");

ReplicatePrimitiveCode.charToAllButBoolean("replicateBarrageUtils",
"web/client-api/src/main/java/io/deephaven/web/client/api/barrage/data/WebCharColumnData.java");
}

private static void fixupDoubleChunkReader(final @NotNull String path) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import io.deephaven.barrage.flatbuf.BarrageModColumnMetadata;
import io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.BarrageOptions;
import io.deephaven.extensions.barrage.BarrageTypeInfo;
Expand Down Expand Up @@ -59,7 +61,6 @@ public class WebBarrageMessageReader {

public WebBarrageMessage parseFrom(
final BarrageOptions options,
ChunkType[] columnChunkTypes,
Class<?>[] columnTypes,
Class<?>[] componentTypes,
FlightData flightData) throws IOException {
Expand Down Expand Up @@ -121,7 +122,7 @@ public WebBarrageMessage parseFrom(

// create an initial chunk of the correct size
final int chunkSize = (int) (Math.min(msg.rowsIncluded.size(), MAX_CHUNK_SIZE));
final WritableChunk<Values> chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
final WritableChunk<Values> chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
chunk.setSize(0);
msg.addColumnData[ci].data.add(chunk);
}
Expand All @@ -142,7 +143,7 @@ public WebBarrageMessage parseFrom(
// create an initial chunk of the correct size
final int chunkSize = (int) (Math.min(msg.modColumnData[ci].rowsModified.size(),
MAX_CHUNK_SIZE));
final WritableChunk<Values> chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
final WritableChunk<Values> chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
chunk.setSize(0);
msg.modColumnData[ci].data.add(chunk);

Expand Down Expand Up @@ -217,7 +218,7 @@ public WebBarrageMessage parseFrom(
if (batch.length() > chunk.capacity() - chunk.size()) {
// reading the rows from this batch will overflow the existing chunk; create a new one
final int chunkSize = (int) (Math.min(remaining, MAX_CHUNK_SIZE));
chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
acd.data.add(chunk);

chunk.setSize(0);
Expand Down Expand Up @@ -248,7 +249,7 @@ public WebBarrageMessage parseFrom(
if (numRowsToRead > chunk.capacity() - chunk.size()) {
// reading the rows from this batch will overflow the existing chunk; create a new one
final int chunkSize = (int) (Math.min(remaining, MAX_CHUNK_SIZE));
chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
mcd.data.add(chunk);

chunk.setSize(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@

import elemental2.core.JsDate;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.DoubleChunk;
import io.deephaven.chunk.FloatChunk;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.ShortChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
Expand Down Expand Up @@ -38,6 +44,7 @@
import io.deephaven.web.client.api.LocalDateWrapper;
import io.deephaven.web.client.api.LocalTimeWrapper;
import io.deephaven.web.client.api.LongWrapper;
import jsinterop.base.Js;
import org.apache.arrow.flatbuf.Date;
import org.apache.arrow.flatbuf.DateUnit;
import org.apache.arrow.flatbuf.Field;
Expand Down Expand Up @@ -75,50 +82,10 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
@NotNull final BarrageOptions options) {
switch (typeInfo.arrowField().typeType()) {
case Type.Int: {
Int t = new Int();
typeInfo.arrowField().type(t);
switch (t.bitWidth()) {
case 8: {
return (ChunkReader<T>) new ByteChunkReader(options);
}
case 16: {
if (t.isSigned()) {
return (ChunkReader<T>) new ShortChunkReader(options);
}
return (ChunkReader<T>) new CharChunkReader(options);
}
case 32: {
return (ChunkReader<T>) new IntChunkReader(options);
}
case 64: {
if (t.isSigned()) {
return (ChunkReader<T>) transformToObject(new LongChunkReader(options),
(src, dst, dstOffset) -> {
for (int ii = 0; ii < src.size(); ++ii) {
dst.set(dstOffset + ii, LongWrapper.of(src.get(ii)));
}
});
}
throw new IllegalArgumentException("Unsigned 64bit integers not supported");
}
default:
throw new IllegalArgumentException("Unsupported Int bitwidth: " + t.bitWidth());
}
return newIntReader(typeInfo, options);
}
case Type.FloatingPoint: {
FloatingPoint t = new FloatingPoint();
typeInfo.arrowField().type(t);
switch (t.precision()) {
case Precision.SINGLE: {
return (ChunkReader<T>) new FloatChunkReader(options);
}
case Precision.DOUBLE: {
return (ChunkReader<T>) new DoubleChunkReader(options);
}
default:
throw new IllegalArgumentException(
"Unsupported FloatingPoint precision " + Precision.name(t.precision()));
}
return newFloatReader(typeInfo, options);
}
case Type.Binary: {
if (typeInfo.type() == BigIntegerWrapper.class) {
Expand Down Expand Up @@ -160,7 +127,7 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
case Type.Bool: {
BooleanChunkReader subReader = new BooleanChunkReader();
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableByteChunk<Values> inner = (WritableByteChunk<Values>) subReader.readChunk(
try (final WritableByteChunk<Values> inner = subReader.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<Boolean, Values> chunk;
Expand All @@ -184,7 +151,6 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(

return (T) chunk;
}

};
}
case Type.Date: {
Expand Down Expand Up @@ -331,6 +297,102 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
}
}

@SuppressWarnings("unchecked")
private static <T extends WritableChunk<Values>> @NotNull ChunkReader<T> newFloatReader(
@NotNull BarrageTypeInfo<Field> typeInfo, @NotNull BarrageOptions options) {
FloatingPoint t = new FloatingPoint();
typeInfo.arrowField().type(t);
switch (t.precision()) {
case Precision.SINGLE: {
return (ChunkReader<T>) transformToObject(new FloatChunkReader(options),
(src, dst, dstOffset) -> {
final FloatChunk<?> floatChunk = src.asFloatChunk();
for (int ii = 0; ii < src.size(); ++ii) {
float value = floatChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_FLOAT ? null : Js.asAny(value));
}
});
}
case Precision.DOUBLE: {
return (ChunkReader<T>) transformToObject(new DoubleChunkReader(options),
(src, dst, dstOffset) -> {
final DoubleChunk<?> floatChunk = src.asDoubleChunk();
for (int ii = 0; ii < src.size(); ++ii) {
double value = floatChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_DOUBLE ? null : Js.asAny(value));
}
});
}
default:
throw new IllegalArgumentException(
"Unsupported FloatingPoint precision " + Precision.name(t.precision()));
}
}

@SuppressWarnings("unchecked")
private static <T extends WritableChunk<Values>> ChunkReader<T> newIntReader(
@NotNull final BarrageTypeInfo<Field> typeInfo,
@NotNull final BarrageOptions options) {
final Int t = new Int();
typeInfo.arrowField().type(t);
switch (t.bitWidth()) {
case 8: {
return (ChunkReader<T>) transformToObject(new ByteChunkReader(options),
(src, dst, dstOffset) -> {
final ByteChunk<?> byteChunk = src.asByteChunk();
for (int ii = 0; ii < src.size(); ++ii) {
byte value = byteChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_BYTE ? null : Js.asAny(value));
}
});
}
case 16: {
if (t.isSigned()) {
return (ChunkReader<T>) transformToObject(new ShortChunkReader(options),
(src, dst, dstOffset) -> {
final ShortChunk<?> shortChunk = src.asShortChunk();
for (int ii = 0; ii < src.size(); ++ii) {
short value = shortChunk.get(ii);
dst.set(dstOffset + ii,
value == QueryConstants.NULL_SHORT ? null : Js.asAny(value));
}
});
}
return (ChunkReader<T>) transformToObject(new CharChunkReader(options),
(src, dst, dstOffset) -> {
final CharChunk<?> charChunk = src.asCharChunk();
for (int ii = 0; ii < src.size(); ++ii) {
char value = charChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_CHAR ? null : Js.asAny(value));
}
});
}
case 32: {
return (ChunkReader<T>) transformToObject(new IntChunkReader(options),
(src, dst, dstOffset) -> {
final IntChunk<?> intChunk = src.asIntChunk();
for (int ii = 0; ii < src.size(); ++ii) {
int value = intChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_INT ? null : Js.asAny(value));
}
});
}
case 64: {
if (t.isSigned()) {
return (ChunkReader<T>) transformToObject(new LongChunkReader(options),
(src, dst, dstOffset) -> {
for (int ii = 0; ii < src.size(); ++ii) {
dst.set(dstOffset + ii, LongWrapper.of(src.get(ii)));
}
});
}
throw new IllegalArgumentException("Unsigned 64bit integers not supported");
}
default:
throw new IllegalArgumentException("Unsupported Int bitwidth: " + t.bitWidth());
}
}

public interface Mapper<T> {
T constructFrom(byte[] buf, int offset, int length) throws IOException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.web.client.api.barrage.data;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.web.client.api.barrage.WebBarrageMessage;
import io.deephaven.web.client.api.barrage.def.InitialTableDefinition;
Expand Down Expand Up @@ -45,36 +44,8 @@ public static WebBarrageSubscription subscribe(
final DataChangedHandler dataChangedHandler) {

WebColumnData[] dataSinks = new WebColumnData[cts.columnTypes().length];
ChunkType[] chunkTypes = cts.chunkTypes();
for (int i = 0; i < dataSinks.length; i++) {
switch (chunkTypes[i]) {
case Boolean:
throw new IllegalStateException("Boolean unsupported here");
case Char:
dataSinks[i] = new WebCharColumnData();
break;
case Byte:
dataSinks[i] = new WebByteColumnData();
break;
case Short:
dataSinks[i] = new WebShortColumnData();
break;
case Int:
dataSinks[i] = new WebIntColumnData();
break;
case Long:
dataSinks[i] = new WebLongColumnData();
break;
case Float:
dataSinks[i] = new WebFloatColumnData();
break;
case Double:
dataSinks[i] = new WebDoubleColumnData();
break;
case Object:
dataSinks[i] = new WebObjectColumnData();
break;
}
dataSinks[i] = new WebColumnData();
}

if (cts.getTableDef().getAttributes().isBlinkTable()) {
Expand Down Expand Up @@ -207,7 +178,7 @@ public void applyUpdates(WebBarrageMessage message) {
PrimitiveIterator.OfLong destIterator = destinationRowSet.indexIterator();
for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, destIterator);
destSources[ii].fillFromChunk(chunk, destIterator);
}
assert !destIterator.hasNext();
}
Expand Down Expand Up @@ -356,7 +327,7 @@ public void applyUpdates(WebBarrageMessage message) {

for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, destIterator);
destSources[ii].fillFromChunk(chunk, destIterator);
}
assert !destIterator.hasNext();
}
Expand Down Expand Up @@ -395,7 +366,7 @@ public boolean hasNext() {
};
for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, destIterator);
destSources[ii].fillFromChunk(chunk, destIterator);
}
assert !destIterator.hasNext();
}
Expand Down Expand Up @@ -538,7 +509,7 @@ public void applyUpdates(WebBarrageMessage message) {

for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, column.rowsModified.indexIterator());
destSources[ii].fillFromChunk(chunk, column.rowsModified.indexIterator());
}
}

Expand Down
Loading

0 comments on commit 7093839

Please sign in to comment.