|
28 | 28 | import javax.annotation.OverridingMethodsMustInvokeSuper;
|
29 | 29 | import java.util.ArrayList;
|
30 | 30 | import java.util.Collection;
|
| 31 | +import java.util.stream.Stream; |
31 | 32 |
|
32 | 33 | /**
|
33 | 34 | * Basic uncoalesced table that only adds keys.
|
@@ -153,7 +154,6 @@ private void initializeAvailableLocations() {
|
153 | 154 | try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
|
154 | 155 | locationBuffer.processPending()) {
|
155 | 156 | if (locationUpdate != null) {
|
156 |
| - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); |
157 | 157 | maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
|
158 | 158 | }
|
159 | 159 | }
|
@@ -188,14 +188,26 @@ private void maybeAddLocations(@NotNull final Collection<LiveSupplier<ImmutableT
|
188 | 188 | .forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.get())));
|
189 | 189 | }
|
190 | 190 |
|
191 |
| - private void maybeRemoveLocations(@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) { |
| 191 | + private void maybeRemoveLocations(@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys, |
| 192 | + final boolean removedAllowed) { |
192 | 193 | if (removedKeys.isEmpty()) {
|
193 | 194 | return;
|
194 | 195 | }
|
195 | 196 |
|
196 |
| - filterLocationKeys(removedKeys).stream() |
| 197 | + final Collection<LiveSupplier<ImmutableTableLocationKey>> filteredSuppliers = filterLocationKeys(removedKeys); |
| 198 | + if (filteredSuppliers.isEmpty()) { |
| 199 | + return; |
| 200 | + } |
| 201 | + |
| 202 | + if (removedAllowed) { |
| 203 | + filteredSuppliers.stream().map(LiveSupplier::get).forEach(columnSourceManager::removeLocationKey); |
| 204 | + return; |
| 205 | + } |
| 206 | + |
| 207 | + final ImmutableTableLocationKey[] keys = filteredSuppliers.stream() |
197 | 208 | .map(LiveSupplier::get)
|
198 |
| - .forEach(columnSourceManager::removeLocationKey); |
| 209 | + .toArray(ImmutableTableLocationKey[]::new); |
| 210 | + throw new TableLocationRemovedException("Source table does not support removed locations", keys); |
199 | 211 | }
|
200 | 212 |
|
201 | 213 | private void initializeLocationSizes() {
|
@@ -238,16 +250,8 @@ protected void instrumentedRefresh() {
|
238 | 250 | try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
|
239 | 251 | locationBuffer.processPending()) {
|
240 | 252 | if (locationUpdate != null) {
|
241 |
| - if (!locationProvider.getUpdateMode().removeAllowed() |
242 |
| - && !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) { |
243 |
| - // This TLP doesn't support removed locations, we need to throw an exception. |
244 |
| - final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream() |
245 |
| - .map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new); |
246 |
| - throw new TableLocationRemovedException( |
247 |
| - "Source table does not support removed locations", keys); |
248 |
| - } |
249 |
| - |
250 |
| - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); |
| 253 | + maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys(), |
| 254 | + locationProvider.getUpdateMode().removeAllowed()); |
251 | 255 | maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
|
252 | 256 | }
|
253 | 257 | }
|
|
0 commit comments