diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java index 753da2324e2..55ec350d3fb 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.Ignite; @@ -476,11 +477,22 @@ static List insertValues(Table table, int offset) { void assertValuesPresentOnNodes(HybridTimestamp ts, Table table, Integer... indexes) { for (Integer index : indexes) { - assertValuesPresentOnNode(table, ts, index); + assertValuesOnNode(table, ts, index, fut -> fut.join() != null); } } - private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, int targetNodeIndex) { + void assertValuesNotPresentOnNodes(HybridTimestamp ts, Table table, Integer... indexes) { + for (Integer index : indexes) { + assertValuesOnNode(table, ts, index, rowFut -> rowFut.join() == null); + } + } + + private void assertValuesOnNode( + Table table, + HybridTimestamp ts, + int targetNodeIndex, + Predicate> dataCondition + ) { IgniteImpl targetNode = unwrapIgniteImpl(node(targetNodeIndex)); TableImpl tableImpl = unwrapTableImpl(table); @@ -491,7 +503,7 @@ private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, int targ internalTable.get(marshalKey(tableImpl, Tuple.create(of("id", i))), ts, targetNode.node()); assertThat(fut, willCompleteSuccessfully()); - assertNotNull(fut.join()); + assertTrue(dataCondition.test(fut)); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionSequentialRecoveriesTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionSequentialRecoveriesTest.java index afeede74197..8ad9db355c0 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionSequentialRecoveriesTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionSequentialRecoveriesTest.java @@ -17,7 +17,13 @@ package org.apache.ignite.internal.table.distributed.disaster; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import java.util.List; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.table.Table; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -40,6 +46,14 @@ void testTwoSequentialResets() throws InterruptedException { IgniteImpl node = igniteImpl(0); + Table table = node.tables().table(HA_TABLE_NAME); + + List errors = insertValues(table, 0); + + assertThat(errors, is(empty())); + + assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2, 3, 4); + assertRecoveryKeyIsEmpty(node); stopNodes(2, 3, 4); @@ -49,5 +63,7 @@ void testTwoSequentialResets() throws InterruptedException { stopNode(1); waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0)); + + assertValuesPresentOnNodes(node.clock().now(), table, 0); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java index 18d1a24ed7c..24a7712c44f 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java @@ -17,10 +17,17 @@ package org.apache.ignite.internal.table.distributed.disaster; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import java.util.List; import java.util.Set; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.table.Table; import org.intellij.lang.annotations.Language; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** Test suite for the cases with a recovery of the group replication factor after reset by zone filter update. */ @@ -32,6 +39,8 @@ public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}", null); + private static final String CUSTOM_NODES_CONFIG = nodeConfig("{zone = custom}", null); + private static final String ROCKS_NODES_CONFIG = nodeConfig(null, "{lru_rocks.engine = rocksdb}"); private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null, "{segmented_aipersist.engine = aipersist}"); @@ -122,6 +131,190 @@ void testThatPartitionResetZoneStorageProfileFilterAware() throws InterruptedExc waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0)); } + /** + * Test scenario. + *
    + *
  1. Create a zone in HA mode (7 nodes, A, B, C, D, E, F, G) - phase 1
  2. + *
  3. Insert data and wait for replication to all nodes.
  4. + *
  5. Stop a majority of nodes (4 nodes A, B, C, D)
  6. + *
  7. Wait for the partition to become available (E, F, G), no new writes - phase 2
  8. + *
  9. Stop a majority of nodes once again (E, F)
  10. + *
  11. Wait for the partition to become available (G), no new writes - phase 3
  12. + *
  13. Stop the last node G
  14. + *
  15. Start one node from phase 1, A
  16. + *
  17. Start one node from phase 3, G
  18. + *
  19. Start one node from phase 2, E
  20. + *
  21. No data should be lost (reads from partition on A and E must be consistent with G)
  22. + *
+ * + * @throws Exception If failed. + */ + @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-24513") + void testSeveralHaResetsAndSomeNodeRestart() throws Exception { + for (int i = 1; i < 8; i++) { + startNode(i, CUSTOM_NODES_CONFIG); + } + + String globalFilter = "$[?(@.zone == \"custom\")]"; + createHaZoneWithTable(7, globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7)); + + IgniteImpl node0 = igniteImpl(0); + Table table = node0.tables().table(HA_TABLE_NAME); + + List errors = insertValues(table, 0); + assertThat(errors, is(empty())); + assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 3, 4, 5, 6, 7); + + // Stop 4 nodes (A, B, C, D) + stopNodes(4, 5, 6, 7); + + // Wait for the partition to become available on the remaining nodes (E, F, G) + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 3)); + + // Stop 2 more nodes (E, F) + stopNodes(2, 3); + + // Wait for the partition to become available on the last node (G) + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1)); + + // Stop the last node (G) + stopNode(1); + + // Start one node from phase 1 (A) + startNode(4); + + // Start one node from phase 3 (G) + startNode(1); + + // Start one node from phase 2 (E) + startNode(2); + + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 4)); + + // Verify that no data is lost and reads from partition on nodes A and E are consistent with node G + assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 4); + } + + /** + * Test scenario, where we start nodes from the previous assignments chain, with new writes. + * The whole scenario will be possible when second phase of HA feature will be implemented. + * + *
    + *
  1. Create a zone in HA mode with 7 nodes (A, B, C, D, E, F, G).
  2. + *
  3. Stop a majority of nodes (4 nodes A, B, C, D).
  4. + *
  5. Wait for the partition to become available on the remaining nodes (E, F, G).
  6. + *
  7. Stop a majority of nodes (E, F).
  8. + *
  9. Write data to node G.
  10. + *
  11. Stop node G.
  12. + *
  13. Start nodes E and F.
  14. + *
  15. Nodes should wait for node G to come back online.
  16. + *
+ * + * @throws Exception If failed. + */ + @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-24509") + void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() throws Exception { + for (int i = 1; i < 8; i++) { + startNode(i, CUSTOM_NODES_CONFIG); + } + + String globalFilter = "$[?(@.zone == \"custom\")]"; + createHaZoneWithTable(7, globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7)); + + IgniteImpl node0 = igniteImpl(0); + Table table = node0.tables().table(HA_TABLE_NAME); + + List errors = insertValues(table, 0); + assertThat(errors, is(empty())); + assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 3, 4, 5, 6, 7); + + // Stop 4 nodes (A, B, C, D) + stopNodes(4, 5, 6, 7); + + // Wait for the partition to become available on the remaining nodes (E, F, G) + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 3)); + + // Stop 2 more nodes (E, F) + stopNodes(2, 3); + + // Wait for the partition to become available on the last node (G) + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1)); + + errors = insertValues(table, 1000); + assertThat(errors, is(empty())); + + assertValuesPresentOnNodes(node0.clock().now(), table, 1); + + // Stop the last node (G) + stopNode(1); + + // Start one node from phase 3 (E) + startNode(2); + + // Start one node from phase 2 (F) + startNode(3); + + assertValuesNotPresentOnNodes(node0.clock().now(), table, 2, 3); + } + + /** + * Test scenario, where we start nodes from the previous assignments chain, without new writes. + * The whole scenario will be possible when second phase of HA feature will be implemented. + *
    + *
  1. Create a zone in HA mode with 7 nodes (A, B, C, D, E, F, G).
  2. + *
  3. Stop a majority of nodes (4 nodes A, B, C, D).
  4. + *
  5. Wait for the partition to become available on the remaining nodes (E, F, G).
  6. + *
  7. Stop a majority of nodes (E, F).
  8. + *
  9. Stop node G.
  10. + *
  11. Start nodes E and F.
  12. + *
  13. Nodes should wait for nodes A, B, C, D, E, F, G to come back online.
  14. + *
+ * + * @throws Exception If failed. + */ + @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-24509") + void testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops() throws Exception { + for (int i = 1; i < 8; i++) { + startNode(i, CUSTOM_NODES_CONFIG); + } + + String globalFilter = "$[?(@.zone == \"custom\")]"; + createHaZoneWithTable(7, globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7)); + + IgniteImpl node0 = igniteImpl(0); + Table table = node0.tables().table(HA_TABLE_NAME); + + List errors = insertValues(table, 0); + assertThat(errors, is(empty())); + assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 3, 4, 5, 6, 7); + + // Stop 4 nodes (A, B, C, D) + stopNodes(4, 5, 6, 7); + + // Wait for the partition to become available on the remaining nodes (E, F, G) + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 3)); + + // Stop 2 more nodes (E, F) + stopNodes(2, 3); + + // Wait for the partition to become available on the last node (G) + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1)); + + // Stop the last node (G) + stopNode(1); + + // Start one node from phase 3 (E) + startNode(2); + + // Start one node from phase 2 (F) + startNode(3); + + assertValuesNotPresentOnNodes(node0.clock().now(), table, 2, 3); + } + private void alterZoneSql(String filter, String zoneName) { executeSql(String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", zoneName, filter)); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java index 5b2424b24a5..3e8f78565e8 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; +import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.table.Table; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** Test for the HA zones recovery. */ @@ -257,6 +259,12 @@ void testScaleUpAfterHaRecoveryWhenMajorityLoss() throws Exception { stopNodes(1, 2, 3, 4); + Table table = node.tables().table(HA_TABLE_NAME); + + List errors = insertValues(table, 0); + + assertThat(errors, is(empty())); + waitAndAssertRecoveryKeyIsNotEmpty(node); assertRecoveryRequestForHaZoneTable(node); @@ -280,6 +288,8 @@ void testScaleUpAfterHaRecoveryWhenMajorityLoss() throws Exception { PARTITION_IDS, Set.of(node.name(), node1.name(), node2.name()) ); + + assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2); } @Test @@ -397,4 +407,45 @@ void testScaleDownTimerIsWorkingForHaZone() throws InterruptedException { keyValueStorage ); } + + /** + * Test scenario. + *
    + *
  1. Create a zone in HA mode with 7 nodes (A, B, C, D, E, F, G).
  2. + *
  3. Insert data and wait for replication to all nodes.
  4. + *
  5. Stop a majority of nodes (A, B, C, D).
  6. + *
  7. Wait for the partition to become available on the remaining nodes (E, F, G).
  8. + *
  9. Start node A.
  10. + *
  11. Verify that node A cleans up its state.
  12. + *
+ * + * @throws Exception If failed. + */ + @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-24509") + void testNodeStateCleanupAfterRestartInHaMode() throws Exception { + startNode(3); + startNode(4); + startNode(5); + startNode(6); + + createHaZoneWithTable(); + + IgniteImpl node0 = igniteImpl(0); + Table table = node0.tables().table(HA_TABLE_NAME); + + List errors = insertValues(table, 0); + assertThat(errors, is(empty())); + assertValuesPresentOnNodes(node0.clock().now(), table, 0, 1, 2, 3, 4, 5, 6); + + alterZone(node0.catalogManager(), HA_ZONE_NAME, INFINITE_TIMER_VALUE, null, null); + + stopNodes(3, 4, 5, 6); + + waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0, 1, 2)); + + startNode(6); + + assertValuesNotPresentOnNodes(node0.clock().now(), table, 6); + } }