Skip to content

Commit eaa7af4

Browse files
rickymaHeng Qin
authored and
Heng Qin
committed
[apache#1675][FOLLOWUP] fix(test): Fix various flaky tests (apache#1730)
### What changes were proposed in this pull request? Fix various flaky tests: 1. After using `SimpleClusterManager`, release resources at once, which might cause tests to fail sometimes. 2. The compiler machine may not necessarily have a valid broadcast address (maybe in a pod created by K8S, and the broadcast address could be 0.0.0.0). Unit tests should running successfully on such machines. We need to support this scenario. 3. Refactor the `URI.create` logic in `SimpleClusterManagerTest` to support cross-platform operation, such as running tests on Windows. 4. Use `IntegrationTestBase#shutdownServers` to reduce duplicated codes. ### Why are the changes needed? For apache#1675. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unnecessary.
1 parent 601119d commit eaa7af4

File tree

14 files changed

+236
-264
lines changed

14 files changed

+236
-264
lines changed

common/src/main/java/org/apache/uniffle/common/util/RssUtils.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ public static String getHostIp() throws Exception {
123123
}
124124
return ip;
125125
}
126+
// Unit tests are executed on a single machine and do not interact with other machines.
127+
// Therefore, unit tests should not require a valid broadcast address.
128+
// When running UTs, we will still return the IP address whose broadcast address is invalid.
129+
boolean isTestMode = Boolean.parseBoolean(System.getProperty("test.mode", "false"));
126130
Enumeration<NetworkInterface> nif = NetworkInterface.getNetworkInterfaces();
127131
String siteLocalAddress = null;
128132
while (nif.hasMoreElements()) {
@@ -133,7 +137,7 @@ public static String getHostIp() throws Exception {
133137
for (InterfaceAddress ifa : ni.getInterfaceAddresses()) {
134138
InetAddress ia = ifa.getAddress();
135139
InetAddress brd = ifa.getBroadcast();
136-
if (brd == null || brd.isAnyLocalAddress()) {
140+
if ((brd == null || brd.isAnyLocalAddress()) && !isTestMode) {
137141
LOGGER.info(
138142
"ip {} was filtered, because it don't have effective broadcast address",
139143
ia.getHostAddress());

coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java

+89-84
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.FileWriter;
2222
import java.io.PrintWriter;
2323
import java.net.URI;
24+
import java.nio.file.Paths;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Set;
@@ -132,91 +133,93 @@ public void getLostServerListTest() throws Exception {
132133
CoordinatorConf coordinatorConf = new CoordinatorConf();
133134
// Shorten the heartbeat time
134135
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
135-
SimpleClusterManager clusterManager =
136-
new SimpleClusterManager(coordinatorConf, new Configuration());
137-
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, grpcTags);
138-
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, grpcTags);
139-
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
140-
clusterManager.add(sn1);
141-
clusterManager.add(sn2);
142-
clusterManager.add(sn3);
143-
Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
144-
await()
145-
.atMost(1, TimeUnit.SECONDS)
146-
.until(
147-
() -> {
148-
Set<String> lostServerList =
149-
clusterManager.getLostServerList().stream()
150-
.map(ServerNode::getId)
151-
.collect(Collectors.toSet());
152-
return CollectionUtils.isEqualCollection(lostServerList, expectedIds);
153-
});
154-
// re-register sn3
155-
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
156-
clusterManager.add(sn3);
157-
Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
158-
await()
159-
.atMost(1, TimeUnit.SECONDS)
160-
.until(
161-
() -> {
162-
// Retrieve listed ServerNode List
163-
Set<String> lostServerListre =
164-
clusterManager.getLostServerList().stream()
165-
.map(ServerNode::getId)
166-
.collect(Collectors.toSet());
167-
return CollectionUtils.isEqualCollection(lostServerListre, expectedIdsre);
168-
});
136+
try (SimpleClusterManager clusterManager =
137+
new SimpleClusterManager(coordinatorConf, new Configuration())) {
138+
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, grpcTags);
139+
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, grpcTags);
140+
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
141+
clusterManager.add(sn1);
142+
clusterManager.add(sn2);
143+
clusterManager.add(sn3);
144+
Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
145+
await()
146+
.atMost(1, TimeUnit.SECONDS)
147+
.until(
148+
() -> {
149+
Set<String> lostServerList =
150+
clusterManager.getLostServerList().stream()
151+
.map(ServerNode::getId)
152+
.collect(Collectors.toSet());
153+
return CollectionUtils.isEqualCollection(lostServerList, expectedIds);
154+
});
155+
// re-register sn3
156+
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
157+
clusterManager.add(sn3);
158+
Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
159+
await()
160+
.atMost(1, TimeUnit.SECONDS)
161+
.until(
162+
() -> {
163+
// Retrieve listed ServerNode List
164+
Set<String> lostServerListre =
165+
clusterManager.getLostServerList().stream()
166+
.map(ServerNode::getId)
167+
.collect(Collectors.toSet());
168+
return CollectionUtils.isEqualCollection(lostServerListre, expectedIdsre);
169+
});
170+
}
169171
}
170172

171173
@Test
172174
public void getUnhealthyServerList() throws Exception {
173175
CoordinatorConf coordinatorConf = new CoordinatorConf();
174176
// Shorten the heartbeat time
175177
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
176-
SimpleClusterManager clusterManager =
177-
new SimpleClusterManager(coordinatorConf, new Configuration());
178-
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, grpcTags);
179-
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, grpcTags);
180-
ServerNode sn3 =
181-
new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, ServerStatus.UNHEALTHY);
182-
ServerNode sn4 =
183-
new ServerNode("sn4", "ip", 0, 100L, 50L, 20, 11, grpcTags, ServerStatus.UNHEALTHY);
184-
clusterManager.add(sn1);
185-
clusterManager.add(sn2);
186-
clusterManager.add(sn3);
187-
clusterManager.add(sn4);
188-
// Analog timeout registration
189-
Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
190-
await()
191-
.atMost(1, TimeUnit.SECONDS)
192-
.until(
193-
() -> {
194-
Set<String> unhealthyServerList =
195-
clusterManager.getUnhealthyServerList().stream()
196-
.map(ServerNode::getId)
197-
.collect(Collectors.toSet());
198-
return CollectionUtils.isEqualCollection(unhealthyServerList, expectedIds);
199-
});
200-
// Register unhealthy node sn3 again
201-
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, ServerStatus.UNHEALTHY);
202-
clusterManager.add(sn3);
203-
Set<String> expectedIdsre = Sets.newHashSet("sn3");
204-
await()
205-
.atMost(1, TimeUnit.SECONDS)
206-
.until(
207-
() -> {
208-
Set<String> unhealthyServerListre =
209-
clusterManager.getUnhealthyServerList().stream()
210-
.map(ServerNode::getId)
211-
.collect(Collectors.toSet());
212-
return CollectionUtils.isEqualCollection(unhealthyServerListre, expectedIdsre);
213-
});
214-
// At this point verify that sn4 is in the lost list
215-
List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
216-
Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", "sn2", "sn4");
217-
assertEquals(
218-
expectedIdlostremoveunhealthy,
219-
lostremoveunhealthy.stream().map(ServerNode::getId).collect(Collectors.toSet()));
178+
try (SimpleClusterManager clusterManager =
179+
new SimpleClusterManager(coordinatorConf, new Configuration())) {
180+
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, grpcTags);
181+
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, grpcTags);
182+
ServerNode sn3 =
183+
new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, ServerStatus.UNHEALTHY);
184+
ServerNode sn4 =
185+
new ServerNode("sn4", "ip", 0, 100L, 50L, 20, 11, grpcTags, ServerStatus.UNHEALTHY);
186+
clusterManager.add(sn1);
187+
clusterManager.add(sn2);
188+
clusterManager.add(sn3);
189+
clusterManager.add(sn4);
190+
// Analog timeout registration
191+
Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
192+
await()
193+
.atMost(1, TimeUnit.SECONDS)
194+
.until(
195+
() -> {
196+
Set<String> unhealthyServerList =
197+
clusterManager.getUnhealthyServerList().stream()
198+
.map(ServerNode::getId)
199+
.collect(Collectors.toSet());
200+
return CollectionUtils.isEqualCollection(unhealthyServerList, expectedIds);
201+
});
202+
// Register unhealthy node sn3 again
203+
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, ServerStatus.UNHEALTHY);
204+
clusterManager.add(sn3);
205+
Set<String> expectedIdsre = Sets.newHashSet("sn3");
206+
await()
207+
.atMost(1, TimeUnit.SECONDS)
208+
.until(
209+
() -> {
210+
Set<String> unhealthyServerListre =
211+
clusterManager.getUnhealthyServerList().stream()
212+
.map(ServerNode::getId)
213+
.collect(Collectors.toSet());
214+
return CollectionUtils.isEqualCollection(unhealthyServerListre, expectedIdsre);
215+
});
216+
// At this point verify that sn4 is in the lost list
217+
List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
218+
Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", "sn2", "sn4");
219+
assertEquals(
220+
expectedIdlostremoveunhealthy,
221+
lostremoveunhealthy.stream().map(ServerNode::getId).collect(Collectors.toSet()));
222+
}
220223
}
221224

222225
@Test
@@ -423,11 +426,12 @@ public void testGetCorrectServerNodesWhenOneNodeRemoved() throws Exception {
423426
public void updateExcludeNodesTest() throws Exception {
424427
String excludeNodesFolder =
425428
(new File(ClassLoader.getSystemResource("empty").getFile())).getParent();
426-
String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
429+
String excludeNodesPath = Paths.get(excludeNodesFolder, "excludeNodes").toString();
427430
CoordinatorConf ssc = new CoordinatorConf();
431+
File excludeNodesFile = new File(excludeNodesPath);
432+
URI excludeNodesUri = excludeNodesFile.toURI();
428433
ssc.setString(
429-
CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
430-
URI.create(excludeNodesPath).toString());
434+
CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, excludeNodesUri.toURL().toString());
431435
ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 1000);
432436

433437
try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration())) {
@@ -492,11 +496,12 @@ public void updateExcludeNodesTest() throws Exception {
492496
public void excludeNodesNoDelayTest() throws Exception {
493497
String excludeNodesFolder =
494498
(new File(ClassLoader.getSystemResource("empty").getFile())).getParent();
495-
String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
499+
String excludeNodesPath = Paths.get(excludeNodesFolder, "excludeNodes").toString();
496500
CoordinatorConf ssc = new CoordinatorConf();
501+
File excludeNodesFile = new File(excludeNodesPath);
502+
URI excludeNodesUri = excludeNodesFile.toURI();
497503
ssc.setString(
498-
CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
499-
URI.create(excludeNodesPath).toString());
504+
CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, excludeNodesUri.toURL().toString());
500505
ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 5000);
501506

502507
final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");

0 commit comments

Comments
 (0)