Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24289: Sql. SQL-schemas. Extend test coverage. #5203

Merged
merged 5 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.Arrays;
import java.util.List;
import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.DropSchemaCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.sql.SqlCommon;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -119,19 +125,6 @@ public void testDropIfExists() {
assertThat(latestCatalog().schema(TEST_SCHEMA), nullValue());
}

@Test
public void testDropDefaultSchemaIsAllowed() {
CatalogCommand cmd = DropSchemaCommand.builder().name(SqlCommon.DEFAULT_SCHEMA_NAME).build();

tryApplyAndExpectApplied(cmd);
assertThat(latestCatalog().schema(SqlCommon.DEFAULT_SCHEMA_NAME), nullValue());

assertThat(
manager.execute(simpleTable("test")),
willThrowFast(CatalogValidationException.class, "Schema with name 'PUBLIC' not found.")
);
}

@Test
public void testDropNonEmpty() {
CatalogCommand newSchemaCmd = CreateSchemaCommand.builder().name(TEST_SCHEMA).build();
Expand Down Expand Up @@ -170,6 +163,111 @@ public void testDropNonEmpty() {
}
}

@Test
public void testSameTableNameInDifferentSchemas() {
// Create table1 in schema1
{
CatalogCommand newSchemaCmd = CreateSchemaCommand.builder().name("S1").build();
CatalogCommand newTableCmd = newTableCommand("S1", "T1");

tryApplyAndCheckExpect(List.of(newSchemaCmd, newTableCmd), true, true);
}

// Create table1 in schema2
{
CatalogCommand newSchemaCmd = CreateSchemaCommand.builder().name("S2").build();
CatalogCommand newTableCmd = newTableCommand("S2", "T1");

tryApplyAndCheckExpect(List.of(newSchemaCmd, newTableCmd), true, true);
}

{
CatalogSchemaDescriptor schema1 = latestCatalog().schema("S1");
assertNotNull(schema1);

CatalogSchemaDescriptor schema2 = latestCatalog().schema("S2");
assertNotNull(schema2);

CatalogTableDescriptor schema1table1 = schema1.table("T1");
assertNotNull(schema1table1);

CatalogTableDescriptor schema2table1 = schema2.table("T1");
assertNotNull(schema2table1);
assertNotEquals(schema1table1.id(), schema2table1.id(), "Table ids should differ");
}
}

@Test
public void testSameIndexNameInDifferentSchemas() {
// Create table1 in schema1
{
CatalogCommand newSchemaCmd = CreateSchemaCommand.builder().name("S1").build();
CatalogCommand newTableCmd = newTableCommand("S1", "T1");
CatalogCommand newIndexCmd = CreateHashIndexCommand.builder()
.schemaName("S1")
.tableName("T1")
.indexName("MY_INDEX")
.columns(List.of("key2"))
.build();

tryApplyAndCheckExpect(List.of(newSchemaCmd, newTableCmd, newIndexCmd), true, true, true);
}

// Create table1 in schema2
{
CatalogCommand newSchemaCmd = CreateSchemaCommand.builder().name("S2").build();
CatalogCommand newTableCmd = newTableCommand("S2", "T2");
CatalogCommand newIndexCmd = CreateHashIndexCommand.builder()
.schemaName("S2")
.tableName("T2")
.indexName("MY_INDEX")
.columns(List.of("key2"))
.build();

tryApplyAndCheckExpect(List.of(newSchemaCmd, newTableCmd, newIndexCmd), true, true, true);
}

{
CatalogSchemaDescriptor schema1 = latestCatalog().schema("S1");
assertNotNull(schema1);

CatalogSchemaDescriptor schema2 = latestCatalog().schema("S2");
assertNotNull(schema2);

CatalogIndexDescriptor schema1index = Arrays.stream(schema1.indexes())
.filter(i -> "MY_INDEX".equals(i.name()))
.findAny()
.orElseThrow();

CatalogIndexDescriptor schema2index = Arrays.stream(schema2.indexes())
.filter(i -> "MY_INDEX".equals(i.name()))
.findAny()
.orElseThrow();

assertNotEquals(schema1index.id(), schema2index.id(), "Index ids should differ");
}
}

@Test
public void testDropCreateEmptyDefaultSchema() {
CatalogSchemaDescriptor publicSchema1 = latestCatalog().schema(SqlCommon.DEFAULT_SCHEMA_NAME);
assertNotNull(publicSchema1);

// Drop public schema
CatalogCommand dropSchemaCmd = DropSchemaCommand.builder().name(SqlCommon.DEFAULT_SCHEMA_NAME).build();
tryApplyAndExpectApplied(dropSchemaCmd);
assertNull(latestCatalog().schema(SqlCommon.DEFAULT_SCHEMA_NAME));

// Create public schema
CatalogCommand newSchemaCmd = CreateSchemaCommand.builder().name(SqlCommon.DEFAULT_SCHEMA_NAME).build();
tryApplyAndExpectApplied(newSchemaCmd);

CatalogSchemaDescriptor publicSchema2 = latestCatalog().schema(SqlCommon.DEFAULT_SCHEMA_NAME);
assertNotNull(publicSchema2);
assertNotEquals(publicSchema1.id(), publicSchema2.id());
}


private Catalog latestCatalog() {
Catalog latestCatalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong()));

Expand All @@ -178,21 +276,27 @@ private Catalog latestCatalog() {
return latestCatalog;
}

@SuppressWarnings("SameParameterValue")
private static CatalogCommand newTableCommand(String tableName) {
return newTableCommand(TEST_SCHEMA, tableName);
}

private static CatalogCommand newTableCommand(String schemaName, String tableName) {
return createTableCommand(
TEST_SCHEMA,
schemaName,
tableName,
List.of(columnParams("key1", INT32), columnParams("key2", INT32), columnParams("val", INT32, true)),
List.of("key1", "key2"),
List.of("key2")
);
}

@SuppressWarnings("SameParameterValue")
private static CatalogCommand newIndexCommand(String tableName, String indexName) {
return newIndexCommand(TEST_SCHEMA, tableName, indexName);
}

private static CatalogCommand newIndexCommand(String schemaName, String tableName, String indexName) {
return createSortedIndexCommand(
TEST_SCHEMA,
schemaName,
tableName,
indexName,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.ignite.compute.JobStatus.FAILED;
import static org.apache.ignite.compute.JobStatus.QUEUED;
import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -83,10 +85,12 @@
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -105,6 +109,14 @@ protected IgniteCompute compute() {
return node(0).compute();
}

@BeforeEach
public void initCleanState() {
dropAllSchemas();
dropAllTables();

sql("CREATE SCHEMA IF NOT EXISTS PUBLIC");
}

/**
* Submits the job for execution, verifies that the execution future completes successfully and returns an execution object.
*
Expand Down Expand Up @@ -448,6 +460,43 @@ void executeColocatedThrowsTableNotFoundExceptionWhenTableDoesNotExist() {
assertThat(ex.getCause().getMessage(), containsString("The table does not exist [name=PUBLIC.BAD_TABLE]"));
}

@ParameterizedTest
@ValueSource(strings = {"WRONG_SCHEMA", "PUBLIC"})
void submitColocatedThrowsTableNotFoundExceptionWhenSchemaDoesNotExist(String schemaName) {
sql("DROP SCHEMA IF EXISTS " + schemaName);

var ex = assertThrows(CompletionException.class,
() -> compute().submitAsync(
JobTarget.colocated(schemaName + ".test", Tuple.create(Map.of("k", 1))),
JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(),
null
).join()
);

assertInstanceOf(TableNotFoundException.class, ex.getCause());

String errorMessage = format("The table does not exist [name={}]", QualifiedName.of(schemaName, "TEST").toCanonicalForm());
assertThat(ex.getCause().getMessage(), containsString(errorMessage));
}

@ParameterizedTest
@ValueSource(strings = {"WRONG_SCHEMA", "PUBLIC"})
void submitBroadcastThrowsTableNotFoundExceptionWhenSchemaDoesNotExist(String schemaName) {
sql("DROP SCHEMA IF EXISTS " + schemaName);

var ex = assertThrows(CompletionException.class,
() -> {
JobDescriptor<Void, Integer> job = JobDescriptor.builder(GetPartitionJob.class).units(units()).build();
compute().submitAsync(BroadcastJobTarget.table(schemaName + ".test"), job, null).join();
}
);

assertInstanceOf(TableNotFoundException.class, ex.getCause());

String errorMessage = format("The table does not exist [name={}]", QualifiedName.of(schemaName, "TEST").toCanonicalForm());
assertThat(ex.getCause().getMessage(), containsString(errorMessage));
}

@ParameterizedTest(name = "local: {0}")
@ValueSource(booleans = {true, false})
void cancelComputeExecuteAsyncWithCancelHandle(boolean local) {
Expand Down Expand Up @@ -785,6 +834,74 @@ void partitionedBroadcast() {
});
}

@Test
public void colocatedJobTargetDifferentSchemas() {
// s1.test
sql("CREATE SCHEMA s1");
sql("CREATE TABLE s1.test (k int, v varchar, CONSTRAINT PK PRIMARY KEY (k))");
sql("INSERT INTO s1.test(k, v) VALUES (1, 'a')");

// s2.test
sql("CREATE SCHEMA s2");
sql("CREATE TABLE s2.test (k int, v varchar, CONSTRAINT PK PRIMARY KEY (k))");
sql("INSERT INTO s2.test(k, v) VALUES (1, 'b')");

{
String actualNodeName = compute().execute(
JobTarget.colocated("s1.test", Tuple.create(Map.of("k", 1))),
JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(), null);
assertThat(actualNodeName, in(allNodeNames()));
}

{
String actualNodeName = compute().execute(
JobTarget.colocated("s2.test", Tuple.create(Map.of("k", 1))),
JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(), null);
assertThat(actualNodeName, in(allNodeNames()));
}
}

@Test
public void broadcastJobTargetDifferentSchemas() {
// Both S1 and S2 has tables named test

// S1 schema
sql("CREATE ZONE zone1 WITH PARTITIONS=5, STORAGE_PROFILES='default'");
sql("CREATE SCHEMA s1");
sql("CREATE TABLE s1.test (k int, v varchar, CONSTRAINT PK PRIMARY KEY (k)) ZONE zone1");
sql("INSERT INTO s1.test(k, v) VALUES (1, 'a')");

// S2 schema
sql("CREATE ZONE zone2 WITH PARTITIONS=7, STORAGE_PROFILES='default'");
sql("CREATE SCHEMA s2");
sql("CREATE TABLE s2.test (k int, v varchar, CONSTRAINT PK PRIMARY KEY (k)) ZONE zone2");
sql("INSERT INTO s2.test(k, v) VALUES (1, 'b')");

// S1 schema
{
JobDescriptor<Void, Integer> job = JobDescriptor.builder(GetPartitionJob.class).units(units()).build();
CompletableFuture<BroadcastExecution<Integer>> future = compute()
.submitAsync(BroadcastJobTarget.table("s1.test"), job, null);
assertThat(future, willCompleteSuccessfully());

CompletableFuture<Collection<Integer>> resultFuture = future.join().resultsAsync();
assertThat(resultFuture, willCompleteSuccessfully());
assertEquals(5, future.join().resultsAsync().join().size());
}

// S2 schema
{
JobDescriptor<Void, Integer> job = JobDescriptor.builder(GetPartitionJob.class).units(units()).build();
CompletableFuture<BroadcastExecution<Integer>> future = compute()
.submitAsync(BroadcastJobTarget.table("s2.test"), job, null);
assertThat(future, willCompleteSuccessfully());

CompletableFuture<Collection<Integer>> resultFuture = future.join().resultsAsync();
assertThat(resultFuture, willCompleteSuccessfully());
assertEquals(7, future.join().resultsAsync().join().size());
}
}

static Class<ToStringJob> toStringJobClass() {
return ToStringJob.class;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ public void accessTheSameObject() {
.returns(0L)
.check();

// Should work as wee, because we do not access the MISSING schema,
// Works fine because we do not access the MISSING schema.
assertQuery("SELECT count(*) FROM s1.t1")
.withDefaultSchema("MISSING")
Expand Down