|
76 | 76 | import java.util.Optional;
|
77 | 77 | import java.util.Set;
|
78 | 78 | import java.util.concurrent.ThreadLocalRandom;
|
| 79 | +import java.util.concurrent.TimeoutException; |
79 | 80 | import java.util.concurrent.atomic.AtomicReference;
|
80 | 81 | import java.util.concurrent.locks.Lock;
|
81 | 82 | import java.util.concurrent.locks.ReadWriteLock;
|
@@ -362,14 +363,15 @@ private DistributedQueryRunner(
|
362 | 363 | }
|
363 | 364 | prestoClients = prestoClientsBuilder.build();
|
364 | 365 |
|
365 |
| - long start = nanoTime(); |
366 |
| - while (!allNodesGloballyVisible()) { |
367 |
| - Assertions.assertLessThan(nanosSince(start), new Duration(100, SECONDS)); |
368 |
| - MILLISECONDS.sleep(10); |
| 366 | + try { |
| 367 | + waitForAllNodesGloballyVisible(); |
| 368 | + } |
| 369 | + catch (TimeoutException e) { |
| 370 | + closer.close(); |
| 371 | + throw e; |
369 | 372 | }
|
370 |
| - log.info("Announced servers in %s", nanosSince(start).convertToMostSuccinctTimeUnit()); |
371 | 373 |
|
372 |
| - start = nanoTime(); |
| 374 | + long start = nanoTime(); |
373 | 375 | for (TestingPrestoServer server : servers) {
|
374 | 376 | server.getMetadata().registerBuiltInFunctions(AbstractTestQueries.CUSTOM_FUNCTIONS);
|
375 | 377 | }
|
@@ -517,22 +519,49 @@ else if (coordinatorSidecar) {
|
517 | 519 | return server;
|
518 | 520 | }
|
519 | 521 |
|
520 |
| - private boolean allNodesGloballyVisible() |
| 522 | + private void waitForAllNodesGloballyVisible() |
| 523 | + throws Exception |
521 | 524 | {
|
522 |
| - int expectedActiveNodesForRm = externalWorkers.size() + servers.size(); |
523 |
| - int expectedActiveNodesForCoordinator = externalWorkers.size() + servers.size(); |
| 525 | + long startTimeInMs = nanoTime(); |
| 526 | + int expectedActiveNodes = externalWorkers.size() + servers.size(); |
| 527 | + Duration timeout = new Duration(100, SECONDS); |
524 | 528 |
|
525 |
| - for (TestingPrestoServer server : servers) { |
| 529 | + for (int serverIndex = 0; serverIndex < servers.size(); ) { |
| 530 | + TestingPrestoServer server = servers.get(serverIndex); |
526 | 531 | AllNodes allNodes = server.refreshNodes();
|
527 | 532 | int activeNodeCount = allNodes.getActiveNodes().size();
|
528 | 533 |
|
529 |
| - if (!allNodes.getInactiveNodes().isEmpty() || |
530 |
| - (server.isCoordinator() && activeNodeCount != expectedActiveNodesForCoordinator) || |
531 |
| - (server.isResourceManager() && activeNodeCount != expectedActiveNodesForRm)) { |
532 |
| - return false; |
| 534 | + if (!allNodes.getInactiveNodes().isEmpty()) { |
| 535 | + throwTimeoutIfNotReady( |
| 536 | + startTimeInMs, |
| 537 | + timeout, |
| 538 | + format("Timed out waiting for all nodes to be globally visible. Inactive nodes: %s", allNodes.getInactiveNodes())); |
| 539 | + MILLISECONDS.sleep(10); |
| 540 | + serverIndex = 0; |
| 541 | + } |
| 542 | + else if ((server.isCoordinator() || server.isResourceManager()) && activeNodeCount != expectedActiveNodes) { |
| 543 | + throwTimeoutIfNotReady( |
| 544 | + startTimeInMs, |
| 545 | + timeout, |
| 546 | + format("Timed out waiting for all nodes to be globally visible. Node count: %s, expected: %s", activeNodeCount, expectedActiveNodes)); |
| 547 | + MILLISECONDS.sleep(10); |
| 548 | + serverIndex = 0; |
| 549 | + } |
| 550 | + else { |
| 551 | + log.info("Server %s has %s active nodes", server.getBaseUrl(), activeNodeCount); |
| 552 | + serverIndex++; |
533 | 553 | }
|
534 | 554 | }
|
535 |
| - return true; |
| 555 | + |
| 556 | + log.info("Announced servers in %s", nanosSince(startTimeInMs).convertToMostSuccinctTimeUnit()); |
| 557 | + } |
| 558 | + |
| 559 | + private static void throwTimeoutIfNotReady(long startTimeInMs, Duration timeout, String message) |
| 560 | + throws TimeoutException |
| 561 | + { |
| 562 | + if (nanosSince(startTimeInMs).compareTo(timeout) >= 0) { |
| 563 | + throw new TimeoutException(message); |
| 564 | + } |
536 | 565 | }
|
537 | 566 |
|
538 | 567 | public TestingPrestoClient getRandomClient()
|
|
0 commit comments