Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24412 Extend test coverage of basic HA functionality #5220

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
Expand Down Expand Up @@ -476,11 +477,22 @@ static List<Throwable> insertValues(Table table, int offset) {

void assertValuesPresentOnNodes(HybridTimestamp ts, Table table, Integer... indexes) {
for (Integer index : indexes) {
assertValuesPresentOnNode(table, ts, index);
assertValuesOnNode(table, ts, index, fut -> fut.join() != null);
}
}

private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, int targetNodeIndex) {
void assertValuesNotPresentOnNodes(HybridTimestamp ts, Table table, Integer... indexes) {
for (Integer index : indexes) {
assertValuesOnNode(table, ts, index, rowFut -> rowFut.join() == null);
}
}

private void assertValuesOnNode(
Table table,
HybridTimestamp ts,
int targetNodeIndex,
Predicate<CompletableFuture<BinaryRow>> dataCondition
) {
IgniteImpl targetNode = unwrapIgniteImpl(node(targetNodeIndex));

TableImpl tableImpl = unwrapTableImpl(table);
Expand All @@ -491,7 +503,7 @@ private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, int targ
internalTable.get(marshalKey(tableImpl, Tuple.create(of("id", i))), ts, targetNode.node());
assertThat(fut, willCompleteSuccessfully());

assertNotNull(fut.join());
assertTrue(dataCondition.test(fut));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.ignite.internal.table.distributed.disaster;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

import java.util.List;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

Expand All @@ -40,6 +46,14 @@ void testTwoSequentialResets() throws InterruptedException {

IgniteImpl node = igniteImpl(0);

Table table = node.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);

assertThat(errors, is(empty()));

assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2, 3, 4);

assertRecoveryKeyIsEmpty(node);

stopNodes(2, 3, 4);
Expand All @@ -49,5 +63,7 @@ void testTwoSequentialResets() throws InterruptedException {
stopNode(1);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0));

assertValuesPresentOnNodes(node.clock().now(), table, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@

package org.apache.ignite.internal.table.distributed.disaster;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.table.Table;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/** Test suite for the cases with a recovery of the group replication factor after reset by zone filter update. */
Expand All @@ -32,6 +39,8 @@ public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac

private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}", null);

private static final String CUSTOM_NODES_CONFIG = nodeConfig("{zone = custom}", null);

private static final String ROCKS_NODES_CONFIG = nodeConfig(null, "{lru_rocks.engine = rocksdb}");

private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null, "{segmented_aipersist.engine = aipersist}");
Expand Down Expand Up @@ -122,6 +131,190 @@ void testThatPartitionResetZoneStorageProfileFilterAware() throws InterruptedExc
waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0));
}

/**
* Test scenario.
* <ol>
* <li>Create a zone in HA mode (7 nodes, A, B, C, D, E, F, G) - phase 1</li>
* <li>Insert data and wait for replication to all nodes.</li>
* <li>Stop a majority of nodes (4 nodes A, B, C, D)</li>
* <li>Wait for the partition to become available (E, F, G), no new writes - phase 2</li>
* <li>Stop a majority of nodes once again (E, F)</li>
* <li>Wait for the partition to become available (G), no new writes - phase 3</li>
* <li>Stop the last node G</li>
* <li>Start one node from phase 1, A</li>
* <li>Start one node from phase 3, G</li>
* <li>Start one node from phase 2, E</li>
* <li>No data should be lost (reads from partition on A and E must be consistent with G)</li>
* </ol>
*
* @throws Exception If failed.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-24513")
void testSeveralHaResetsAndSomeNodeRestart() throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
}

String globalFilter = "$[?(@.zone == \"custom\")]";
createHaZoneWithTable(7, globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7));

IgniteImpl node0 = igniteImpl(0);
Table table = node0.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);
assertThat(errors, is(empty()));
assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 3, 4, 5, 6, 7);

// Stop 4 nodes (A, B, C, D)
stopNodes(4, 5, 6, 7);

// Wait for the partition to become available on the remaining nodes (E, F, G)
waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 3));

// Stop 2 more nodes (E, F)
stopNodes(2, 3);

// Wait for the partition to become available on the last node (G)
waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1));

// Stop the last node (G)
stopNode(1);

// Start one node from phase 1 (A)
startNode(4);

// Start one node from phase 3 (G)
startNode(1);

// Start one node from phase 2 (E)
startNode(2);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 4));

// Verify that no data is lost and reads from partition on nodes A and E are consistent with node G
assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 4);
}

/**
* Test scenario, where we start nodes from the previous assignments chain, with new writes.
* The whole scenario will be possible when second phase of HA feature will be implemented.
*
* <ol>
* <li>Create a zone in HA mode with 7 nodes (A, B, C, D, E, F, G).</li>
* <li>Stop a majority of nodes (4 nodes A, B, C, D).</li>
* <li>Wait for the partition to become available on the remaining nodes (E, F, G).</li>
* <li>Stop a majority of nodes (E, F).</li>
* <li>Write data to node G.</li>
* <li>Stop node G.</li>
* <li>Start nodes E and F.</li>
* <li>Nodes should wait for node G to come back online.</li>
* </ol>
*
* @throws Exception If failed.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-24509")
void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
}

String globalFilter = "$[?(@.zone == \"custom\")]";
createHaZoneWithTable(7, globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7));

IgniteImpl node0 = igniteImpl(0);
Table table = node0.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);
assertThat(errors, is(empty()));
assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 3, 4, 5, 6, 7);

// Stop 4 nodes (A, B, C, D)
stopNodes(4, 5, 6, 7);

// Wait for the partition to become available on the remaining nodes (E, F, G)
waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 3));

// Stop 2 more nodes (E, F)
stopNodes(2, 3);

// Wait for the partition to become available on the last node (G)
waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1));

errors = insertValues(table, 1000);
assertThat(errors, is(empty()));

assertValuesPresentOnNodes(node0.clock().now(), table, 1);

// Stop the last node (G)
stopNode(1);

// Start one node from phase 3 (E)
startNode(2);

// Start one node from phase 2 (F)
startNode(3);

assertValuesNotPresentOnNodes(node0.clock().now(), table, 2, 3);
}

/**
* Test scenario, where we start nodes from the previous assignments chain, without new writes.
* The whole scenario will be possible when second phase of HA feature will be implemented.
* <ol>
* <li>Create a zone in HA mode with 7 nodes (A, B, C, D, E, F, G).</li>
* <li>Stop a majority of nodes (4 nodes A, B, C, D).</li>
* <li>Wait for the partition to become available on the remaining nodes (E, F, G).</li>
* <li>Stop a majority of nodes (E, F).</li>
* <li>Stop node G.</li>
* <li>Start nodes E and F.</li>
* <li>Nodes should wait for nodes A, B, C, D, E, F, G to come back online.</li>
* </ol>
*
* @throws Exception If failed.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-24509")
void testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops() throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
}

String globalFilter = "$[?(@.zone == \"custom\")]";
createHaZoneWithTable(7, globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7));

IgniteImpl node0 = igniteImpl(0);
Table table = node0.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);
assertThat(errors, is(empty()));
assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 3, 4, 5, 6, 7);

// Stop 4 nodes (A, B, C, D)
stopNodes(4, 5, 6, 7);

// Wait for the partition to become available on the remaining nodes (E, F, G)
waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 3));

// Stop 2 more nodes (E, F)
stopNodes(2, 3);

// Wait for the partition to become available on the last node (G)
waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(1));

// Stop the last node (G)
stopNode(1);

// Start one node from phase 3 (E)
startNode(2);

// Start one node from phase 2 (F)
startNode(3);

assertValuesNotPresentOnNodes(node0.clock().now(), table, 2, 3);
}

private void alterZoneSql(String filter, String zoneName) {
executeSql(String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", zoneName, filter));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
Expand All @@ -47,6 +48,7 @@
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/** Test for the HA zones recovery. */
Expand Down Expand Up @@ -257,6 +259,12 @@ void testScaleUpAfterHaRecoveryWhenMajorityLoss() throws Exception {

stopNodes(1, 2, 3, 4);

Table table = node.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);

assertThat(errors, is(empty()));

waitAndAssertRecoveryKeyIsNotEmpty(node);

assertRecoveryRequestForHaZoneTable(node);
Expand All @@ -280,6 +288,8 @@ void testScaleUpAfterHaRecoveryWhenMajorityLoss() throws Exception {
PARTITION_IDS,
Set.of(node.name(), node1.name(), node2.name())
);

assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2);
}

@Test
Expand Down Expand Up @@ -397,4 +407,45 @@ void testScaleDownTimerIsWorkingForHaZone() throws InterruptedException {
keyValueStorage
);
}

/**
* Test scenario.
* <ol>
* <li>Create a zone in HA mode with 7 nodes (A, B, C, D, E, F, G).</li>
* <li>Insert data and wait for replication to all nodes.</li>
* <li>Stop a majority of nodes (A, B, C, D).</li>
* <li>Wait for the partition to become available on the remaining nodes (E, F, G).</li>
* <li>Start node A.</li>
* <li>Verify that node A cleans up its state.</li>
* </ol>
*
* @throws Exception If failed.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-24509")
void testNodeStateCleanupAfterRestartInHaMode() throws Exception {
startNode(3);
startNode(4);
startNode(5);
startNode(6);

createHaZoneWithTable();

IgniteImpl node0 = igniteImpl(0);
Table table = node0.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);
assertThat(errors, is(empty()));
assertValuesPresentOnNodes(node0.clock().now(), table, 0, 1, 2, 3, 4, 5, 6);

alterZone(node0.catalogManager(), HA_ZONE_NAME, INFINITE_TIMER_VALUE, null, null);

stopNodes(3, 4, 5, 6);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0, 1, 2));

startNode(6);

assertValuesNotPresentOnNodes(node0.clock().now(), table, 6);
}
}