From 24ed3ea1a85172d263673e4501e8f808a633f2a5 Mon Sep 17 00:00:00 2001 From: Jonathan Knight Date: Mon, 10 Feb 2025 08:01:25 -0500 Subject: [PATCH] Bug 37568366 - [37201943->25.03] Topics: receive() after seek() may throw class cast exception (merge main -> ce/main 114256) [git-p4: depot-paths = "//dev/coherence-ce/main/": change = 114257] --- .../net/topic/NamedTopicSubscriber.java | 152 ++++++++++++------ .../impl/paged/BatchingOperationsQueue.java | 21 +++ 2 files changed, 121 insertions(+), 52 deletions(-) diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java index 789535957ec94..4bdd380e3dec6 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java @@ -47,6 +47,8 @@ import com.tangosol.util.Gate; import com.tangosol.util.Listeners; import com.tangosol.util.LongArray; +import com.tangosol.util.ServiceEvent; +import com.tangosol.util.ServiceListener; import com.tangosol.util.SparseArray; import com.tangosol.util.TaskDaemon; import com.tangosol.util.ThreadGateLite; @@ -63,7 +65,6 @@ import java.util.EventListener; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -492,10 +493,7 @@ public void updateChannel(int nChannel, Consumer fn) @SuppressWarnings("UnusedReturnValue") public R applyToChannel(int nChannel, Function fn) { -// try(Sentry ignored = f_gate.close()) -// { - return fn.apply(m_aChannel[nChannel]); -// } + return fn.apply(m_aChannel[nChannel]); } /** @@ -985,12 +983,12 @@ public long getNotify() /** * Initialise the subscriber. */ - public TopicChannel[] initialise() + private void initialise() { ensureActive(); if (m_nState == STATE_CONNECTED) { - return m_aChannel; + return; } // We must do initialisation under the gate lock @@ -1066,7 +1064,6 @@ public TopicChannel[] initialise() } } m_cSubscribe.mark(); - return m_aChannel; } } @@ -1077,7 +1074,14 @@ public TopicChannel[] initialise() */ private void trigger(int cBatch) { - receiveInternal(f_queueReceiveOrders, cBatch); + if (isConnected()) + { + receiveInternal(f_queueReceiveOrders, cBatch); + } + else + { + f_queueReceiveOrders.resetTrigger(); + } } /** @@ -1209,6 +1213,7 @@ protected void complete(BatchingOperationsQueue queueRequest, Linked Request firstRequest = queueBatch.peek(); if (firstRequest instanceof FunctionalRequest) { + queueRequest.pause(); try (Sentry ignored = gate.close()) { while (firstRequest instanceof FunctionalRequest) @@ -1221,9 +1226,12 @@ protected void complete(BatchingOperationsQueue queueRequest, Linked firstRequest = queueBatch.peek(); } } + finally + { + queueRequest.resume(); + } } - int cValues = 0; int cRequest = queueBatch.size(); @@ -2521,6 +2529,7 @@ public void closeInternal(boolean fDestroyed) // ignore } f_connector.closeSubscription(this, fDestroyed); + f_topic.getService().removeServiceListener(f_serviceStartListener); } finally { @@ -3678,7 +3687,7 @@ private SeekRequest(SeekType type, Map mapPosition, Map subscriber, BatchingOperationsQueue queueBatch) { Map map = subscriber.seekInternal(this); - queueBatch.completeElement(map, this::onRequestComplete); + queueBatch.completeElement(this, map, this::onRequestComplete); } /** @@ -3846,53 +3855,87 @@ private class Listener @Override public void onEvent(SubscriberConnector.SubscriberEvent evt) { - switch (evt.getType()) + if (isActive()) { - case GroupDestroyed: - if (isActive()) - { - Logger.finest("Detected removal of subscriber group " - + f_subscriberGroupId.getGroupName() - + ", closing subscriber " - + this); + switch (evt.getType()) + { + case GroupDestroyed: + if (isActive()) + { + Logger.finest("Detected removal of subscriber group " + + f_subscriberGroupId.getGroupName() + + ", closing subscriber " + + this); + CompletableFuture.runAsync(() -> closeInternal(true), f_executor); + } + break; + case ChannelAllocation: + onChannelAllocation(evt.getAllocatedChannels(), false); + break; + case ChannelsLost: + onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true); + break; + case Unsubscribed: + onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true); + disconnectInternal(false); + break; + case ChannelPopulated: + // must use the channel executor + CompletableFuture.runAsync(() -> onChannelPopulatedNotification(evt.getPopulatedChannels()), f_executorChannels); + break; + case Destroyed: + Logger.finest("Detected destroy of topic " + + f_sTopicName + ", closing subscriber " + + this); CompletableFuture.runAsync(() -> closeInternal(true), f_executor); - } - break; - case ChannelAllocation: - onChannelAllocation(evt.getAllocatedChannels(), false); - break; - case ChannelsLost: - onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true); - break; - case Unsubscribed: - onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true); - disconnectInternal(false); - break; - case ChannelPopulated: - // must use the channel executor - CompletableFuture.runAsync(() -> onChannelPopulatedNotification(evt.getPopulatedChannels()), f_executorChannels); - break; - case Destroyed: - Logger.finest("Detected destroy of topic " - + f_sTopicName + ", closing subscriber " - + this); - CompletableFuture.runAsync(() -> closeInternal(true), f_executor); - break; - case Released: - Logger.finest("Detected release of topic " - + f_sTopicName + ", closing subscriber " - + this); - CompletableFuture.runAsync(() -> closeInternal(true), f_executor); - break; - case Disconnected: - disconnectInternal(false); - break; - default: - throw new IllegalStateException("Unexpected event type: " + evt.getType()); + break; + case Released: + Logger.finest("Detected release of topic " + + f_sTopicName + ", closing subscriber " + + this); + CompletableFuture.runAsync(() -> closeInternal(true), f_executor); + break; + case Disconnected: + disconnectInternal(false); + break; + default: + throw new IllegalStateException("Unexpected event type: " + evt.getType()); + } } } } + // ----- inner class: ServiceListener ----------------------------------- + + private class ServiceStartListener + implements ServiceListener + { + @Override + public void serviceStarting(ServiceEvent evt) + { + } + + @Override + public void serviceStarted(ServiceEvent evt) + { + if (isActive()) + { + ensureConnected(); + f_queueReceiveOrders.triggerOperations(); + } + } + + @Override + public void serviceStopping(ServiceEvent evt) + { + } + + @Override + public void serviceStopped(ServiceEvent evt) + { + } + } + // ----- inner class: ReconnectTask ------------------------------------- /** @@ -4339,4 +4382,9 @@ public ValueConverter getConverter() * The array of channels. */ protected TopicChannel[] m_aChannel; + + /** + * The service start listener. + */ + private final ServiceStartListener f_serviceStartListener = new ServiceStartListener(); } diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java index b462f44ef307f..98f2afb53dda4 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java @@ -537,6 +537,27 @@ public boolean completeElement(Object oValue, Consumer onComplete) return fCompleted; } + @SuppressWarnings("unchecked") + public void completeElement(V value, Object oValue, Consumer onComplete) + { + Queue queueCurrent = getCurrentBatch(); + Iterator iterator = queueCurrent.iterator(); + while (iterator.hasNext()) + { + Element element = iterator.next(); + if (element != null && element.getValue().equals(value)) + { + m_cbCurrentBatch -= f_backlogCalculator.applyAsLong(value); + if (!element.isDone()) + { + element.completeSynchronous((R) oValue, onComplete); + } + iterator.remove(); + break; + } + } + } + /** * Complete the first n {@link Element Elements} in the current batch. *