Skip to content

Commit 36ebeb1

Browse files
loadbalancer-experimental: add CatchAllLoadBalancerObserver (#3147)
Motivation: We generally promise that if observers throw it won't mess up the greater state of the system. However, we don't currently do that in DefaultLoadBalancer. Modifications: - Add the CatchAllLoadBalancerObserver to catch exceptions thrown by the underlying underlying observers. - Install it when using anything other than the NoopLoadBalancerObserver.
1 parent 79b24c2 commit 36ebeb1

File tree

3 files changed

+173
-4
lines changed

3 files changed

+173
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.loadbalancer;
17+
18+
import io.servicetalk.client.api.NoActiveHostException;
19+
import io.servicetalk.client.api.NoAvailableHostException;
20+
import io.servicetalk.client.api.ServiceDiscovererEvent;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.util.Collection;
26+
import javax.annotation.Nullable;
27+
28+
import static java.util.Objects.requireNonNull;
29+
30+
final class CatchAllLoadBalancerObserver implements LoadBalancerObserver {
31+
32+
private static final Logger LOGGER = LoggerFactory.getLogger(CatchAllLoadBalancerObserver.class);
33+
34+
private final LoadBalancerObserver delegate;
35+
36+
private CatchAllLoadBalancerObserver(LoadBalancerObserver delegate) {
37+
this.delegate = delegate;
38+
}
39+
40+
@Override
41+
public HostObserver hostObserver(Object resolvedAddress) {
42+
try {
43+
return new CatchAllHostObserver(delegate.hostObserver(resolvedAddress));
44+
} catch (Throwable ex) {
45+
LOGGER.warn("Unexpected exception from {} while getting a HostObserver", delegate, ex);
46+
return NoopLoadBalancerObserver.NoopHostObserver.INSTANCE;
47+
}
48+
}
49+
50+
@Override
51+
public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events) {
52+
try {
53+
delegate.onServiceDiscoveryEvent(events);
54+
} catch (Throwable unexpected) {
55+
LOGGER.warn("Unexpected exception from {} while reporting an onServiceDiscoveryEvent event",
56+
delegate, unexpected);
57+
}
58+
}
59+
60+
@Override
61+
public void onHostsUpdate(Collection<? extends Host> oldHosts, Collection<? extends Host> newHosts) {
62+
try {
63+
delegate.onHostsUpdate(oldHosts, newHosts);
64+
} catch (Throwable unexpected) {
65+
LOGGER.warn("Unexpected exception from {} while reporting an onHostsUpdate event", delegate, unexpected);
66+
}
67+
}
68+
69+
@Override
70+
public void onNoAvailableHostException(NoAvailableHostException exception) {
71+
try {
72+
delegate.onNoAvailableHostException(exception);
73+
} catch (Throwable unexpected) {
74+
LOGGER.warn("Unexpected exception from {} while reporting an onNoAvailableHostException event",
75+
delegate, unexpected);
76+
}
77+
}
78+
79+
@Override
80+
public void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exception) {
81+
try {
82+
delegate.onNoActiveHostException(hosts, exception);
83+
} catch (Throwable unexpected) {
84+
LOGGER.warn("Unexpected exception from {} while reporting an onNoActiveHostException event",
85+
delegate, unexpected);
86+
}
87+
}
88+
89+
private static final class CatchAllHostObserver implements HostObserver {
90+
91+
private final HostObserver delegate;
92+
93+
CatchAllHostObserver(HostObserver delegate) {
94+
this.delegate = requireNonNull(delegate, "delegate");
95+
}
96+
97+
@Override
98+
public void onHostMarkedExpired(int connectionCount) {
99+
try {
100+
delegate.onHostMarkedExpired(connectionCount);
101+
} catch (Throwable unexpected) {
102+
LOGGER.warn("Unexpected exception from {} while reporting an onHostMarkedExpired event",
103+
delegate, unexpected);
104+
}
105+
}
106+
107+
@Override
108+
public void onActiveHostRemoved(int connectionCount) {
109+
try {
110+
delegate.onActiveHostRemoved(connectionCount);
111+
} catch (Throwable unexpected) {
112+
LOGGER.warn("Unexpected exception from {} while reporting an onActiveHostRemoved event",
113+
delegate, unexpected);
114+
}
115+
}
116+
117+
@Override
118+
public void onExpiredHostRevived(int connectionCount) {
119+
try {
120+
delegate.onExpiredHostRevived(connectionCount);
121+
} catch (Throwable unexpected) {
122+
LOGGER.warn("Unexpected exception from {} while reporting an onExpiredHostRevived event",
123+
delegate, unexpected);
124+
}
125+
}
126+
127+
@Override
128+
public void onExpiredHostRemoved(int connectionCount) {
129+
try {
130+
delegate.onExpiredHostRemoved(connectionCount);
131+
} catch (Throwable unexpected) {
132+
LOGGER.warn("Unexpected exception from {} while reporting an onExpiredHostRemoved event",
133+
delegate, unexpected);
134+
}
135+
}
136+
137+
@Override
138+
public void onHostMarkedUnhealthy(@Nullable Throwable cause) {
139+
try {
140+
delegate.onHostMarkedUnhealthy(cause);
141+
} catch (Throwable unexpected) {
142+
LOGGER.warn("Unexpected exception from {} while reporting an onHostMarkedUnhealthy event",
143+
delegate, unexpected);
144+
}
145+
}
146+
147+
@Override
148+
public void onHostRevived() {
149+
try {
150+
delegate.onHostRevived();
151+
} catch (Throwable unexpected) {
152+
LOGGER.warn("Unexpected exception from {} while reporting an onHostRevived event",
153+
delegate, unexpected);
154+
}
155+
}
156+
}
157+
158+
static LoadBalancerObserver wrap(LoadBalancerObserver observer) {
159+
requireNonNull(observer, "observer");
160+
if (observer instanceof CatchAllLoadBalancerObserver) {
161+
return observer;
162+
}
163+
if (observer instanceof NoopLoadBalancerObserver) {
164+
return observer;
165+
}
166+
return new CatchAllLoadBalancerObserver(observer);
167+
}
168+
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,9 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
162162
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
163163
this.connectionFactory = requireNonNull(connectionFactory);
164164
this.subsetter = requireNonNull(subsetter, "subsetter");
165-
this.loadBalancerObserver = requireNonNull(loadBalancerObserverFactory, "loadBalancerObserverFactory")
166-
.newObserver(lbDescription);
165+
this.loadBalancerObserver = CatchAllLoadBalancerObserver.wrap(
166+
requireNonNull(loadBalancerObserverFactory, "loadBalancerObserverFactory")
167+
.newObserver(lbDescription));
167168
this.healthCheckConfig = healthCheckConfig;
168169
this.sequentialExecutor = new SequentialExecutor((uncaughtException) ->
169170
LOGGER.error("{}: Uncaught exception in {}", this, this.getClass().getSimpleName(), uncaughtException));

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ public void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHo
5656
// noop
5757
}
5858

59-
private static final class NoopHostObserver implements LoadBalancerObserver.HostObserver {
59+
static final class NoopHostObserver implements LoadBalancerObserver.HostObserver {
6060

61-
private static final HostObserver INSTANCE = new NoopHostObserver();
61+
static final HostObserver INSTANCE = new NoopHostObserver();
6262

6363
private NoopHostObserver() {
6464
}

0 commit comments

Comments
 (0)