Skip to content

Commit

Permalink
Bug 37568366 - [37201943->25.03] Topics: receive() after seek() may t…
Browse files Browse the repository at this point in the history
…hrow class cast exception

(merge main -> ce/main 114256)

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 114257]
  • Loading branch information
thegridman committed Feb 10, 2025
1 parent 602b627 commit 24ed3ea
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.tangosol.util.Gate;
import com.tangosol.util.Listeners;
import com.tangosol.util.LongArray;
import com.tangosol.util.ServiceEvent;
import com.tangosol.util.ServiceListener;
import com.tangosol.util.SparseArray;
import com.tangosol.util.TaskDaemon;
import com.tangosol.util.ThreadGateLite;
Expand All @@ -63,7 +65,6 @@
import java.util.EventListener;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -492,10 +493,7 @@ public void updateChannel(int nChannel, Consumer<TopicChannel> fn)
@SuppressWarnings("UnusedReturnValue")
public <R> R applyToChannel(int nChannel, Function<TopicChannel, R> fn)
{
// try(Sentry<?> ignored = f_gate.close())
// {
return fn.apply(m_aChannel[nChannel]);
// }
return fn.apply(m_aChannel[nChannel]);
}

/**
Expand Down Expand Up @@ -985,12 +983,12 @@ public long getNotify()
/**
* Initialise the subscriber.
*/
public TopicChannel[] initialise()
private void initialise()
{
ensureActive();
if (m_nState == STATE_CONNECTED)
{
return m_aChannel;
return;
}

// We must do initialisation under the gate lock
Expand Down Expand Up @@ -1066,7 +1064,6 @@ public TopicChannel[] initialise()
}
}
m_cSubscribe.mark();
return m_aChannel;
}
}

Expand All @@ -1077,7 +1074,14 @@ public TopicChannel[] initialise()
*/
private void trigger(int cBatch)
{
receiveInternal(f_queueReceiveOrders, cBatch);
if (isConnected())
{
receiveInternal(f_queueReceiveOrders, cBatch);
}
else
{
f_queueReceiveOrders.resetTrigger();
}
}

/**
Expand Down Expand Up @@ -1209,6 +1213,7 @@ protected void complete(BatchingOperationsQueue<Request, ?> queueRequest, Linked
Request firstRequest = queueBatch.peek();
if (firstRequest instanceof FunctionalRequest)
{
queueRequest.pause();
try (Sentry<?> ignored = gate.close())
{
while (firstRequest instanceof FunctionalRequest)
Expand All @@ -1221,9 +1226,12 @@ protected void complete(BatchingOperationsQueue<Request, ?> queueRequest, Linked
firstRequest = queueBatch.peek();
}
}
finally
{
queueRequest.resume();
}
}


int cValues = 0;
int cRequest = queueBatch.size();

Expand Down Expand Up @@ -2521,6 +2529,7 @@ public void closeInternal(boolean fDestroyed)
// ignore
}
f_connector.closeSubscription(this, fDestroyed);
f_topic.getService().removeServiceListener(f_serviceStartListener);
}
finally
{
Expand Down Expand Up @@ -3678,7 +3687,7 @@ private SeekRequest(SeekType type, Map<Integer, Position> mapPosition, Map<Integ
protected void execute(NamedTopicSubscriber<?> subscriber, BatchingOperationsQueue<Request, ?> queueBatch)
{
Map<Integer, Position> map = subscriber.seekInternal(this);
queueBatch.completeElement(map, this::onRequestComplete);
queueBatch.completeElement(this, map, this::onRequestComplete);
}

/**
Expand Down Expand Up @@ -3846,53 +3855,87 @@ private class Listener
@Override
public void onEvent(SubscriberConnector.SubscriberEvent evt)
{
switch (evt.getType())
if (isActive())
{
case GroupDestroyed:
if (isActive())
{
Logger.finest("Detected removal of subscriber group "
+ f_subscriberGroupId.getGroupName()
+ ", closing subscriber "
+ this);
switch (evt.getType())
{
case GroupDestroyed:
if (isActive())
{
Logger.finest("Detected removal of subscriber group "
+ f_subscriberGroupId.getGroupName()
+ ", closing subscriber "
+ this);
CompletableFuture.runAsync(() -> closeInternal(true), f_executor);
}
break;
case ChannelAllocation:
onChannelAllocation(evt.getAllocatedChannels(), false);
break;
case ChannelsLost:
onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true);
break;
case Unsubscribed:
onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true);
disconnectInternal(false);
break;
case ChannelPopulated:
// must use the channel executor
CompletableFuture.runAsync(() -> onChannelPopulatedNotification(evt.getPopulatedChannels()), f_executorChannels);
break;
case Destroyed:
Logger.finest("Detected destroy of topic "
+ f_sTopicName + ", closing subscriber "
+ this);
CompletableFuture.runAsync(() -> closeInternal(true), f_executor);
}
break;
case ChannelAllocation:
onChannelAllocation(evt.getAllocatedChannels(), false);
break;
case ChannelsLost:
onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true);
break;
case Unsubscribed:
onChannelAllocation(PagedTopicSubscription.NO_CHANNELS, true);
disconnectInternal(false);
break;
case ChannelPopulated:
// must use the channel executor
CompletableFuture.runAsync(() -> onChannelPopulatedNotification(evt.getPopulatedChannels()), f_executorChannels);
break;
case Destroyed:
Logger.finest("Detected destroy of topic "
+ f_sTopicName + ", closing subscriber "
+ this);
CompletableFuture.runAsync(() -> closeInternal(true), f_executor);
break;
case Released:
Logger.finest("Detected release of topic "
+ f_sTopicName + ", closing subscriber "
+ this);
CompletableFuture.runAsync(() -> closeInternal(true), f_executor);
break;
case Disconnected:
disconnectInternal(false);
break;
default:
throw new IllegalStateException("Unexpected event type: " + evt.getType());
break;
case Released:
Logger.finest("Detected release of topic "
+ f_sTopicName + ", closing subscriber "
+ this);
CompletableFuture.runAsync(() -> closeInternal(true), f_executor);
break;
case Disconnected:
disconnectInternal(false);
break;
default:
throw new IllegalStateException("Unexpected event type: " + evt.getType());
}
}
}
}

// ----- inner class: ServiceListener -----------------------------------

private class ServiceStartListener
implements ServiceListener
{
@Override
public void serviceStarting(ServiceEvent evt)
{
}

@Override
public void serviceStarted(ServiceEvent evt)
{
if (isActive())
{
ensureConnected();
f_queueReceiveOrders.triggerOperations();
}
}

@Override
public void serviceStopping(ServiceEvent evt)
{
}

@Override
public void serviceStopped(ServiceEvent evt)
{
}
}

// ----- inner class: ReconnectTask -------------------------------------

/**
Expand Down Expand Up @@ -4339,4 +4382,9 @@ public ValueConverter<U> getConverter()
* The array of channels.
*/
protected TopicChannel[] m_aChannel;

/**
* The service start listener.
*/
private final ServiceStartListener f_serviceStartListener = new ServiceStartListener();
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,27 @@ public boolean completeElement(Object oValue, Consumer<R> onComplete)
return fCompleted;
}

@SuppressWarnings("unchecked")
public void completeElement(V value, Object oValue, Consumer<R> onComplete)
{
Queue<Element> queueCurrent = getCurrentBatch();
Iterator<Element> iterator = queueCurrent.iterator();
while (iterator.hasNext())
{
Element element = iterator.next();
if (element != null && element.getValue().equals(value))
{
m_cbCurrentBatch -= f_backlogCalculator.applyAsLong(value);
if (!element.isDone())
{
element.completeSynchronous((R) oValue, onComplete);
}
iterator.remove();
break;
}
}
}

/**
* Complete the first n {@link Element Elements} in the current batch.
*
Expand Down

0 comments on commit 24ed3ea

Please sign in to comment.