Skip to content

DRAFT: Vector index follower read test #17945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 118 additions & 15 deletions ydb/core/kqp/ut/indexes/kqp_indexes_vector_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ using namespace NYdb::NTable;

Y_UNIT_TEST_SUITE(KqpVectorIndexes) {

std::vector<i64> DoPositiveQueryVectorIndex(TSession& session, const TString& query) {
std::vector<i64> DoPositiveQueryVectorIndex(TSession& session, TTxSettings txSettings, const TString& query) {
{
auto result = session.ExplainDataQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(),
"Failed to explain: `" << query << "` with " << result.GetIssues().ToString());
}
{
auto result = session.ExecuteDataQuery(query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()
TTxControl::BeginTx(txSettings).CommitTx()
).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(),
"Failed to execute: `" << query << "` with " << result.GetIssues().ToString());
Expand All @@ -53,20 +53,20 @@ Y_UNIT_TEST_SUITE(KqpVectorIndexes) {
}
}

void DoPositiveQueriesVectorIndex(TSession& session, const TString& mainQuery, const TString& indexQuery) {
void DoPositiveQueriesVectorIndex(TSession& session, TTxSettings txSettings, const TString& mainQuery, const TString& indexQuery) {
auto toStr = [](const auto& rs) -> TString {
TStringBuilder b;
for (const auto& r : rs) {
b << r << ", ";
}
return b;
};
auto mainResults = DoPositiveQueryVectorIndex(session, mainQuery);
auto mainResults = DoPositiveQueryVectorIndex(session, txSettings, mainQuery);
absl::c_sort(mainResults);
UNIT_ASSERT_EQUAL_C(mainResults.size(), 3, toStr(mainResults));
UNIT_ASSERT_C(std::unique(mainResults.begin(), mainResults.end()) == mainResults.end(), toStr(mainResults));

auto indexResults = DoPositiveQueryVectorIndex(session, indexQuery);
auto indexResults = DoPositiveQueryVectorIndex(session, txSettings, indexQuery);
absl::c_sort(indexResults);
UNIT_ASSERT_EQUAL_C(indexResults.size(), 3, toStr(indexResults));
UNIT_ASSERT_C(std::unique(indexResults.begin(), indexResults.end()) == indexResults.end(), toStr(indexResults));
Expand All @@ -76,10 +76,12 @@ Y_UNIT_TEST_SUITE(KqpVectorIndexes) {

void DoPositiveQueriesVectorIndexOrderBy(
TSession& session,
TTxSettings txSettings,
std::string_view function,
std::string_view direction,
std::string_view left,
std::string_view right) {
std::string_view right
) {
constexpr std::string_view target = "$target = \"\x67\x71\x03\";";
std::string metric = std::format("Knn::{}({}, {})", function, left, right);
// no metric in result
Expand All @@ -96,7 +98,7 @@ Y_UNIT_TEST_SUITE(KqpVectorIndexes) {
ORDER BY {} {}
LIMIT 3;
)", target, metric, direction)));
DoPositiveQueriesVectorIndex(session, plainQuery, indexQuery);
DoPositiveQueriesVectorIndex(session, txSettings, plainQuery, indexQuery);
}
// metric in result
{
Expand All @@ -111,7 +113,7 @@ Y_UNIT_TEST_SUITE(KqpVectorIndexes) {
ORDER BY {} {}
LIMIT 3;
)", target, metric, metric, direction)));
DoPositiveQueriesVectorIndex(session, plainQuery, indexQuery);
DoPositiveQueriesVectorIndex(session, txSettings, plainQuery, indexQuery);
}
// metric as result
{
Expand All @@ -127,27 +129,31 @@ Y_UNIT_TEST_SUITE(KqpVectorIndexes) {
ORDER BY m {}
LIMIT 3;
)", target, metric, direction)));
DoPositiveQueriesVectorIndex(session, plainQuery, indexQuery);
DoPositiveQueriesVectorIndex(session, txSettings, plainQuery, indexQuery);
}
}

void DoPositiveQueriesVectorIndexOrderBy(
TSession& session,
TTxSettings txSettings,
std::string_view function,
std::string_view direction) {
// target is left, member is right
DoPositiveQueriesVectorIndexOrderBy(session, function, direction, "$target", "emb");
DoPositiveQueriesVectorIndexOrderBy(session, txSettings, function, direction, "$target", "emb");
// target is right, member is left
DoPositiveQueriesVectorIndexOrderBy(session, function, direction, "emb", "$target");
DoPositiveQueriesVectorIndexOrderBy(session, txSettings, function, direction, "emb", "$target");
}

void DoPositiveQueriesVectorIndexOrderByCosine(TSession& session) {
void DoPositiveQueriesVectorIndexOrderByCosine(
TSession& session,
TTxSettings txSettings = TTxSettings::SerializableRW()
) {
// distance, default direction
DoPositiveQueriesVectorIndexOrderBy(session, "CosineDistance", "");
DoPositiveQueriesVectorIndexOrderBy(session, txSettings, "CosineDistance", "");
// distance, asc direction
DoPositiveQueriesVectorIndexOrderBy(session, "CosineDistance", "ASC");
DoPositiveQueriesVectorIndexOrderBy(session, txSettings, "CosineDistance", "ASC");
// similarity, desc direction
DoPositiveQueriesVectorIndexOrderBy(session, "CosineSimilarity", "DESC");
DoPositiveQueriesVectorIndexOrderBy(session, txSettings, "CosineSimilarity", "DESC");
}

TSession DoCreateTableForVectorIndex(TTableClient& db, bool nullable) {
Expand Down Expand Up @@ -609,6 +615,103 @@ Y_UNIT_TEST_SUITE(KqpVectorIndexes) {
// DoPositiveQueriesVectorIndexOrderByCosine(session);
}

Y_UNIT_TEST_TWIN(Followers, StaleRO) {
std::vector<TString> tableNames = {
"/Root/TestTable",
"/Root/TestTable/index/indexImplLevelTable",
"/Root/TestTable/index/indexImplPostingTable"
};

NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableVectorIndex(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetFeatureFlags(featureFlags)
.SetEnableForceFollowers(true)
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
//kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
//kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);

auto db = kikimr.GetTableClient();
auto session = DoCreateTableForVectorIndex(db, false);
{
const TString createIndex(Q_(R"(
ALTER TABLE `/Root/TestTable`
ADD INDEX index
GLOBAL USING vector_kmeans_tree
ON (emb)
WITH (similarity=cosine, vector_type="uint8", vector_dimension=2, levels=1, clusters=2);
)"));

auto result = session.ExecuteSchemeQuery(createIndex)
.ExtractValueSync();

UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
const TString alterTable(Q_(R"(
ALTER TABLE `/Root/TestTable` SET (READ_REPLICAS_SETTINGS = "PER_AZ:3");
)"));

auto result = session.ExecuteSchemeQuery(alterTable).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

auto checkFollowerDescription = [&session](const TString& tableName) {
auto result = session.DescribeTable(tableName).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

const auto& table = result.GetTableDescription();
UNIT_ASSERT(table.GetReadReplicasSettings()->GetMode() == NYdb::NTable::TReadReplicasSettings::EMode::PerAz);
UNIT_ASSERT_VALUES_EQUAL(table.GetReadReplicasSettings()->GetReadReplicasCount(), 3);
};

for (const TString& tableName: tableNames)
checkFollowerDescription(tableName);

DoPositiveQueriesVectorIndexOrderByCosine(session,
StaleRO ? TTxSettings::StaleRO() : TTxSettings::SerializableRW());

auto checkReads = [&session](const TString& tableName, bool follower, bool readsExpected) {
for (size_t attempt = 0; attempt < 30; ++attempt)
{
Cerr << "... SELECT from partition_stats for " << tableName << " , attempt " << attempt << Endl;
const TString selectPartitionStats(Q_(Sprintf(R"(
SELECT *
FROM `/Root/.sys/partition_stats`
WHERE FollowerId %s 0 AND (RowReads != 0 OR RangeReadRows != 0) AND Path = '%s'
)", (follower ? "!=" : "="), tableName.c_str())));
Cerr << selectPartitionStats << Endl;
auto result = session.ExecuteDataQuery(selectPartitionStats, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
AssertSuccessResult(result);

auto rs = result.GetResultSet(0);
if (readsExpected) {
if (rs.RowsCount() != 0)
return;
Sleep(TDuration::Seconds(5));
} else {
if (rs.RowsCount() == 0)
return;
Y_FAIL("Unexpected read stats for %s", tableName.c_str());
}
}
Y_FAIL("Timeout waiting for read stats from %s", tableName.c_str());
};

for (const TString& tableName: tableNames) {
if (StaleRO) {
checkReads(tableName, true, true);
checkReads(tableName, false, false);
} else {
checkReads(tableName, false, true);
checkReads(tableName, true, false);
}
}
}

Y_UNIT_TEST(VectorIndexIsNotUpdatable) {
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableVectorIndex(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ ISubOperation::TPtr FinalizeIndexImplTable(TOperationContext& context, const TPa
operation->MutablePartitionConfig()->MutableCompactionPolicy()->CopyFrom(table->PartitionConfig().GetCompactionPolicy());
operation->MutablePartitionConfig()->MutableCompactionPolicy()->SetKeepEraseMarkers(false);
operation->MutablePartitionConfig()->SetShadowData(false);

//!!!!! Temporary fix. Don't commit it
auto* followerGroup = operation->MutablePartitionConfig()->MutableFollowerGroups()->Add();
followerGroup->SetFollowerCount(3);
followerGroup->SetRequireAllDataCenters(true);
followerGroup->SetFollowerCountPerDataCenter(true);

return CreateFinalizeBuildIndexImplTable(partId, transaction);
}

Expand Down
Loading