Skip to content

Commit 1ae0f77

Browse files
authored
feat!: support refreshing Iceberg tables (deephaven#5707)
Add two methods of refreshing tables: - Manual refreshing - user specifies which snapshot to load and the engine will parse the snapshot to add/remove Iceberg data files as needed and notify downstream tables of the changes - Auto refreshing - at regular intervals (user configurable) the engine will query Iceberg for the latest snapshot and then parse and load # Example code: ## Java automatic and manually refreshing tables ``` import io.deephaven.iceberg.util.*; import org.apache.iceberg.catalog.*; // Create a map to hold the Iceberg Catalog properties def properties = [ "type": "rest", "uri": "http://rest:8181", "client.region": "us-east-1", "s3.access-key-id": "admin", "s3.secret-access-key": "password", "s3.endpoint": "http://minio:9000", ] adapter = IcebergTools.createAdapter("generic-adapter", properties); ////////////////////////////////////////////////////////////////////// tableAdapter = adapter.loadTable("sales.sales_multi") snapshots = tableAdapter.snapshots() definition = tableAdapter.definitionTable() ////////////////////////////////////////////////////////////////////// // Load the latest snapshot as a static table sales_multi_static = tableAdapter.table() // Load a specific snapshot as a static table sales_multi_static = tableAdapter.table(6119130922262455673L) ////////////////////////////////////////////////////////////////////// // Manual refreshing refreshing_instructions = IcebergInstructions.builder() .updateMode(IcebergUpdateMode.manualRefreshingMode()) .build() // Load a table with a specific snapshot sales_multi = adapter.readTable( "sales.sales_multi", 5120804857276751995, iceberg_instructions) // Update the table to a specific snapshot sales_multi.update(848129305390678414L) // Update to the latest snapshot sales_multi.update() ////////////////////////////////////////////////////////////////////// import io.deephaven.iceberg.util.IcebergUpdateMode; // Automatic refreshing every 1 second iceberg_instructions = IcebergInstructions.builder() .updateMode(IcebergUpdateMode.autoRefreshingMode(1_000L)) .build() sales_multi = tableAdapter.table(iceberg_instructions) // Load the table using the default refresh of 60 seconds iceberg_instructions = IcebergInstructions.builder() .updateMode(IcebergUpdateMode.autoRefreshingMode()) .build() sales_multi = tableAdapter.table(iceberg_instructions) ``` ## Python automatic and manually refreshing tables ``` from deephaven.experimental import s3, iceberg local_adapter = iceberg.adapter(name="generic-adapter", properties={ "type" : "rest", "uri" : "http://rest:8181", "client.region" : "us-east-1", "s3.access-key-id" : "admin", "s3.secret-access-key" : "password", "s3.endpoint" : "http://minio:9000" }); t_namespaces = local_adapter.namespaces() t_tables = local_adapter.tables("sales") ## Create a table adapter for table_adapter = local_adapter.load_table("sales.sales_multi") t_snapshots = table_adapter.snapshots() t_definition = table_adapter.definition() ################################################# # Get the latest snapshot as a static table sales_multi_static_latest = table_adapter.table() # Get a specific snapshot as a static table sales_multi_static_snap = table_adapter.table(snapshot_id=6119130922262455673) ################################################# iceberg_instructions = iceberg.IcebergInstructions( update_mode=iceberg.IcebergUpdateMode.manual_refresh()) # Get the latest snapshot as a manual refreshing table sales_multi_refreshing = table_adapter.table(instructions=iceberg_instructions) # Get a specific snapshot as a manual refreshing table sales_multi_refreshing = table_adapter.table( snapshot_id=6119130922262455673, instructions=iceberg_instructions) # Update to a specific snapshot sales_multi_refreshing.update(861950607215619880) # Update to a specific snapshot sales_multi_refreshing.update(4720492918960789101) # Update to the latest snapshot sales_multi_refreshing.update() ################################################# # Get an auto refreshing table that updates each second (1000 ms) iceberg_instructions = iceberg.IcebergInstructions( update_mode=iceberg.IcebergUpdateMode.auto_refresh(1000)) sales_multi = table_adapter.table(instructions=iceberg_instructions) # Get an auto refreshing table that updates at the default interval of 60 seconds iceberg_instructions = iceberg.IcebergInstructions( update_mode=iceberg.IcebergUpdateMode.auto_refresh()) sales_multi = table_adapter.table(instructions=iceberg_instructions) ```
1 parent d338c7d commit 1ae0f77

File tree

88 files changed

+4898
-1572
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+4898
-1572
lines changed

Util/src/main/java/io/deephaven/util/datastructures/SubscriptionSet.java

+31
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.deephaven.base.reference.WeakReferenceWrapper;
88
import io.deephaven.base.verify.Assert;
99
import io.deephaven.base.verify.Require;
10+
import org.apache.commons.lang3.function.TriConsumer;
1011
import org.jetbrains.annotations.NotNull;
1112
import org.jetbrains.annotations.Nullable;
1213

@@ -219,6 +220,36 @@ public final <NOTIFICATION_TYPE> boolean deliverNotification(
219220
return initialSize > 0 && size == 0;
220221
}
221222

223+
/**
224+
* Dispatch a notification to all subscribers. Clean up any GC'd subscriptions.
225+
*
226+
* @param procedure The notification procedure to invoke
227+
* @param firstNotification The first item to deliver
228+
* @param secondNotification The second item to deliver (must be of the same type as {@code firstNotification})
229+
* @param activeOnly Whether to restrict this notification to active subscriptions only
230+
* @return Whether this operation caused the set to become <b>empty</b>
231+
*/
232+
public final <NOTIFICATION_TYPE> boolean deliverNotification(
233+
@NotNull final TriConsumer<LISTENER_TYPE, NOTIFICATION_TYPE, NOTIFICATION_TYPE> procedure,
234+
@Nullable final NOTIFICATION_TYPE firstNotification,
235+
@Nullable final NOTIFICATION_TYPE secondNotification,
236+
final boolean activeOnly) {
237+
final int initialSize = size;
238+
for (int si = 0; si < size;) {
239+
final Entry currentEntry = subscriptions[si];
240+
final LISTENER_TYPE currentListener = currentEntry.getListener();
241+
if (currentListener == null) {
242+
removeAt(si);
243+
continue; // si is not incremented in this case - we'll reconsider the same slot if necessary.
244+
}
245+
if (!activeOnly || currentEntry.isActive()) {
246+
procedure.accept(currentListener, firstNotification, secondNotification);
247+
}
248+
++si;
249+
}
250+
return initialSize > 0 && size == 0;
251+
}
252+
222253
private void removeAt(final int subscriptionIndex) {
223254
final int lastSubscriptionIndex = --size;
224255
subscriptions[subscriptionIndex] = subscriptions[lastSubscriptionIndex];

engine/table/src/main/java/io/deephaven/engine/table/impl/ColumnSourceManager.java

+16-10
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,10 @@
33
//
44
package io.deephaven.engine.table.impl;
55

6-
import io.deephaven.engine.liveness.LivenessReferent;
6+
import io.deephaven.engine.liveness.LivenessNode;
77
import io.deephaven.engine.rowset.RowSet;
88
import io.deephaven.engine.rowset.TrackingWritableRowSet;
9-
import io.deephaven.engine.rowset.WritableRowSet;
10-
import io.deephaven.engine.table.ColumnSource;
11-
import io.deephaven.engine.table.DataIndex;
12-
import io.deephaven.engine.table.Table;
13-
import io.deephaven.engine.table.TableListener;
9+
import io.deephaven.engine.table.*;
1410
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
1511
import io.deephaven.engine.table.impl.locations.TableLocation;
1612
import org.jetbrains.annotations.NotNull;
@@ -22,7 +18,7 @@
2218
/**
2319
* Manager for ColumnSources in a Table.
2420
*/
25-
public interface ColumnSourceManager extends LivenessReferent {
21+
public interface ColumnSourceManager extends LivenessNode {
2622

2723
/**
2824
* Get a map of name to {@link ColumnSource} for the column sources maintained by this manager.
@@ -53,7 +49,7 @@ public interface ColumnSourceManager extends LivenessReferent {
5349
*
5450
* @return The set of added row keys, to be owned by the caller
5551
*/
56-
WritableRowSet refresh();
52+
TableUpdate refresh();
5753

5854
/**
5955
* Advise this ColumnSourceManager that an error has occurred, and that it will no longer be {@link #refresh()
@@ -116,8 +112,18 @@ public interface ColumnSourceManager extends LivenessReferent {
116112
/**
117113
* Remove a table location key from the sources.
118114
*
119-
* @return true if the location key was actually removed
120115
* @param tableLocationKey the location key being removed
121116
*/
122-
boolean removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
117+
void removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
118+
119+
/**
120+
* Get a map of Table attributes that can be applied to the output source table, given the update modes of the
121+
* underlying table location provider.
122+
*
123+
* @param tableUpdateMode The update mode of the table location set
124+
* @param tableLocationUpdateMode The update mode of the table location rows
125+
*/
126+
Map<String, Object> getTableAttributes(
127+
@NotNull TableUpdateMode tableUpdateMode,
128+
@NotNull TableUpdateMode tableLocationUpdateMode);
123129
}

engine/table/src/main/java/io/deephaven/engine/table/impl/PartitionAwareSourceTable.java

+22-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.deephaven.api.Selectable;
77
import io.deephaven.api.filter.Filter;
88
import io.deephaven.base.verify.Assert;
9+
import io.deephaven.engine.liveness.LiveSupplier;
910
import io.deephaven.engine.table.*;
1011
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
1112
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
@@ -204,7 +205,7 @@ protected final Table redefine(TableDefinition newDefinitionExternal, TableDefin
204205
reference, null, viewColumns, null);
205206
}
206207

207-
private static final String LOCATION_KEY_COLUMN_NAME = "__PartitionAwareSourceTable_TableLocationKey__";
208+
private static final String KEY_SUPPLIER_COLUMN_NAME = "__PartitionAwareSourceTable_KeySupplier__";
208209

209210
private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final ColumnDefinition<T> columnDefinition,
210211
@NotNull final Collection<ImmutableTableLocationKey> locationKeys) {
@@ -221,30 +222,42 @@ private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final Co
221222
}
222223

223224
@Override
224-
protected final Collection<ImmutableTableLocationKey> filterLocationKeys(
225-
@NotNull final Collection<ImmutableTableLocationKey> foundLocationKeys) {
225+
protected final Collection<LiveSupplier<ImmutableTableLocationKey>> filterLocationKeys(
226+
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> foundLocationKeys) {
226227
if (partitioningColumnFilters.length == 0) {
227228
return foundLocationKeys;
228229
}
230+
231+
final Collection<ImmutableTableLocationKey> immutableTableLocationKeys = foundLocationKeys.stream()
232+
.map(LiveSupplier::get)
233+
.collect(Collectors.toList());
234+
229235
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
230236
final List<String> partitionTableColumnNames = Stream.concat(
231237
partitioningColumnDefinitions.keySet().stream(),
232-
Stream.of(LOCATION_KEY_COLUMN_NAME)).collect(Collectors.toList());
238+
Stream.of(KEY_SUPPLIER_COLUMN_NAME)).collect(Collectors.toList());
233239
final List<ColumnSource<?>> partitionTableColumnSources =
234240
new ArrayList<>(partitioningColumnDefinitions.size() + 1);
235241
for (final ColumnDefinition<?> columnDefinition : partitioningColumnDefinitions.values()) {
236-
partitionTableColumnSources.add(makePartitionSource(columnDefinition, foundLocationKeys));
242+
partitionTableColumnSources.add(makePartitionSource(columnDefinition, immutableTableLocationKeys));
237243
}
238-
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(foundLocationKeys,
239-
ImmutableTableLocationKey.class, null));
244+
// Add the key suppliers to the table
245+
// noinspection unchecked,rawtypes
246+
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(
247+
(Collection<LiveSupplier>) (Collection) foundLocationKeys,
248+
LiveSupplier.class,
249+
null));
250+
240251
final Table filteredColumnPartitionTable = TableTools
241252
.newTable(foundLocationKeys.size(), partitionTableColumnNames, partitionTableColumnSources)
242253
.where(Filter.and(partitioningColumnFilters));
243254
if (filteredColumnPartitionTable.size() == foundLocationKeys.size()) {
244255
return foundLocationKeys;
245256
}
246-
final Iterable<ImmutableTableLocationKey> iterable =
247-
() -> filteredColumnPartitionTable.columnIterator(LOCATION_KEY_COLUMN_NAME);
257+
258+
// Return the filtered keys
259+
final Iterable<LiveSupplier<ImmutableTableLocationKey>> iterable =
260+
() -> filteredColumnPartitionTable.columnIterator(KEY_SUPPLIER_COLUMN_NAME);
248261
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
249262
}
250263

engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java

+65-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.deephaven.engine.table.impl;
55

66
import io.deephaven.base.verify.Assert;
7+
import io.deephaven.engine.liveness.*;
78
import io.deephaven.engine.primitive.iterator.CloseableIterator;
89
import io.deephaven.engine.rowset.*;
910
import io.deephaven.engine.table.*;
@@ -15,7 +16,9 @@
1516
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
1617
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
1718
import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator;
19+
import io.deephaven.engine.updategraph.NotificationQueue;
1820
import io.deephaven.engine.updategraph.UpdateCommitter;
21+
import io.deephaven.engine.updategraph.UpdateGraph;
1922
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
2023
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
2124
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
@@ -73,7 +76,8 @@ public SourcePartitionedTable(
7376
false);
7477
}
7578

76-
private static final class UnderlyingTableMaintainer {
79+
private static final class UnderlyingTableMaintainer extends ReferenceCountedLivenessNode
80+
implements NotificationQueue.Dependency {
7781

7882
private final TableDefinition constituentDefinition;
7983
private final UnaryOperator<Table> applyTablePermissions;
@@ -103,13 +107,14 @@ private UnderlyingTableMaintainer(
103107
final boolean refreshLocations,
104108
final boolean refreshSizes,
105109
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
110+
super(false);
111+
106112
this.constituentDefinition = constituentDefinition;
107113
this.applyTablePermissions = applyTablePermissions;
108114
this.tableLocationProvider = tableLocationProvider;
109115
this.refreshSizes = refreshSizes;
110116
this.locationKeyMatcher = locationKeyMatcher;
111117

112-
// noinspection resource
113118
resultRows = RowSetFactory.empty().toTracking();
114119
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
115120
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);
@@ -123,13 +128,19 @@ private UnderlyingTableMaintainer(
123128
if (needToRefreshLocations || refreshSizes) {
124129
result.setRefreshing(true);
125130
refreshCombiner = new UpdateSourceCombiner(result.getUpdateGraph());
126-
result.addParentReference(refreshCombiner);
131+
result.addParentReference(this);
132+
manage(refreshCombiner);
127133
} else {
128134
refreshCombiner = null;
129135
}
130136

131137
if (needToRefreshLocations) {
138+
resultTableLocationKeys.startTrackingPrevValues();
139+
resultLocationTables.startTrackingPrevValues();
140+
132141
subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider);
142+
manage(subscriptionBuffer);
143+
133144
pendingLocationStates = new IntrusiveDoublyLinkedQueue<>(
134145
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
135146
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
@@ -143,7 +154,6 @@ protected void instrumentedRefresh() {
143154
processPendingLocations(true);
144155
}
145156
};
146-
result.addParentReference(processNewLocationsUpdateRoot);
147157
refreshCombiner.addSource(processNewLocationsUpdateRoot);
148158

149159
this.removedLocationsComitter = new UpdateCommitter<>(
@@ -154,7 +164,6 @@ protected void instrumentedRefresh() {
154164
removedConstituents.forEach(result::unmanage);
155165
removedConstituents = null;
156166
});
157-
158167
processPendingLocations(false);
159168
} else {
160169
subscriptionBuffer = null;
@@ -163,9 +172,12 @@ protected void instrumentedRefresh() {
163172
processNewLocationsUpdateRoot = null;
164173
removedLocationsComitter = null;
165174
tableLocationProvider.refresh();
166-
try (final RowSet added = sortAndAddLocations(tableLocationProvider.getTableLocationKeys().stream()
167-
.filter(locationKeyMatcher)
168-
.map(tableLocationProvider::getTableLocation))) {
175+
176+
final Collection<TableLocation> locations = new ArrayList<>();
177+
tableLocationProvider.getTableLocationKeys(
178+
tlk -> locations.add(tableLocationProvider.getTableLocation(tlk.get())),
179+
locationKeyMatcher);
180+
try (final RowSet added = sortAndAddLocations(locations.stream())) {
169181
resultRows.insert(added);
170182
}
171183
}
@@ -204,18 +216,30 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
204216
constituentDefinition,
205217
"SingleLocationSourceTable-" + tableLocation,
206218
RegionedTableComponentFactoryImpl.INSTANCE,
207-
new SingleTableLocationProvider(tableLocation),
219+
new SingleTableLocationProvider(tableLocation, refreshSizes
220+
? tableLocationProvider.getLocationUpdateMode()
221+
: TableUpdateMode.STATIC),
208222
refreshSizes ? refreshCombiner : null);
209223

224+
// Transfer management to the constituent CSM. NOTE: this is likely to end up double-managed
225+
// after the CSM adds the location to the table, but that's acceptable.
226+
constituent.columnSourceManager.manage(tableLocation);
227+
unmanage(tableLocation);
228+
210229
// Be careful to propagate the systemic attribute properly to child tables
211230
constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject());
212231
return applyTablePermissions.apply(constituent);
213232
}
214233

215234
private void processPendingLocations(final boolean notifyListeners) {
216-
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending();
217-
final RowSet removed = processRemovals(locationUpdate);
218-
final RowSet added = processAdditions(locationUpdate);
235+
final RowSet removed;
236+
final RowSet added;
237+
238+
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
239+
subscriptionBuffer.processPending()) {
240+
removed = processRemovals(locationUpdate);
241+
added = processAdditions(locationUpdate);
242+
}
219243

220244
resultRows.update(added, removed);
221245
if (notifyListeners) {
@@ -242,8 +266,10 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
242266
*/
243267
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
244268
locationUpdate.getPendingAddedLocationKeys().stream()
269+
.map(LiveSupplier::get)
245270
.filter(locationKeyMatcher)
246271
.map(tableLocationProvider::getTableLocation)
272+
.peek(this::manage)
247273
.map(PendingLocationState::new)
248274
.forEach(pendingLocationStates::offer);
249275
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
@@ -263,13 +289,24 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
263289
final Set<ImmutableTableLocationKey> relevantRemovedLocations =
264290
locationUpdate.getPendingRemovedLocationKeys()
265291
.stream()
292+
.map(LiveSupplier::get)
266293
.filter(locationKeyMatcher)
267294
.collect(Collectors.toSet());
268295

269296
if (relevantRemovedLocations.isEmpty()) {
270297
return RowSetFactory.empty();
271298
}
272299

300+
// Iterate through the pending locations and remove any that are in the removed set.
301+
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
302+
final PendingLocationState pendingLocationState = iter.next();
303+
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
304+
iter.remove();
305+
// Release the state and unmanage the location
306+
unmanage(pendingLocationState.release());
307+
}
308+
}
309+
273310
// At the end of the cycle we need to make sure we unmanage any removed constituents.
274311
this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size());
275312
final RowSetBuilderSequential deleteBuilder = RowSetFactory.builderSequential();
@@ -306,6 +343,22 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
306343
resultLocationTables.setNull(deletedRows);
307344
return deletedRows;
308345
}
346+
347+
@Override
348+
public boolean satisfied(final long step) {
349+
if (refreshCombiner == null) {
350+
throw new UnsupportedOperationException("This method should not be called when result is static");
351+
}
352+
return refreshCombiner.satisfied(step);
353+
}
354+
355+
@Override
356+
public UpdateGraph getUpdateGraph() {
357+
if (refreshCombiner == null) {
358+
throw new UnsupportedOperationException("This method should not be called when result is static");
359+
}
360+
return refreshCombiner.getUpdateGraph();
361+
}
309362
}
310363

311364
private static final class PendingLocationState extends IntrusiveDoublyLinkedNode.Impl<PendingLocationState> {

0 commit comments

Comments
 (0)