Skip to content

Commit 776f733

Browse files
authored
Asynchronous compaction. (#17544)
1 parent a8d7567 commit 776f733

File tree

3 files changed

+45
-16
lines changed

3 files changed

+45
-16
lines changed

ydb/apps/etcd_proxy/service/etcd_events.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ enum Ev : ui32 {
4242
};
4343

4444
struct TEvQueryResult : public NActors::TEventLocal<TEvQueryResult, Ev::QueryResult> {
45-
TEvQueryResult(const NYdb::TResultSets& result): Results(result) {}
45+
TEvQueryResult(const NYdb::TResultSets& result = {}): Results(result) {}
4646

4747
const NYdb::TResultSets Results;
4848
};

ydb/apps/etcd_proxy/service/etcd_impl.cpp

+43-15
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ class TBaseEtcdRequest {
789789
virtual std::string ParseGrpcRequest() = 0;
790790
virtual void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params) = 0;
791791
virtual void ReplyWith(const NYdb::TResultSets& results, const TActorContext& ctx) = 0;
792+
virtual bool ExecuteAsync() const { return false; }
792793

793794
i64 Revision = 0LL;
794795
};
@@ -812,7 +813,7 @@ class TEtcdRequestGrpc
812813
this->Die(ctx);
813814
} else {
814815
this->Become(&TEtcdRequestGrpc::StateFunc);
815-
SendDatabaseRequest();
816+
SendDatabaseRequest(ctx);
816817
}
817818
}
818819
private:
@@ -822,7 +823,7 @@ class TEtcdRequestGrpc
822823

823824
virtual std::ostream& Dump(std::ostream& out) const = 0;
824825

825-
void SendDatabaseRequest() {
826+
void SendDatabaseRequest(const TActorContext& ctx) {
826827
std::ostringstream sql;
827828
NYdb::TParamsBuilder params;
828829
sql << "-- " << GetRequestName() << " >>>>" << std::endl;
@@ -834,14 +835,24 @@ class TEtcdRequestGrpc
834835
TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
835836
return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args);
836837
};
837-
Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my = this->SelfId(), stuff = TSharedStuff::TWeakPtr(Stuff)](const auto& future) {
838-
if (const auto lock = stuff.lock()) {
838+
if (this->ExecuteAsync()) {
839+
Stuff->Client->RetryQuery(std::move(callback)).Subscribe([name = GetRequestName()](const auto& future) {
839840
if (const auto res = future.GetValueSync(); res.IsSuccess())
840-
lock->ActorSystem->Send(my, new NEtcd::TEvQueryResult(res.GetResultSets()));
841+
std::cout << name << " finished succesfully." << std::endl;
841842
else
842-
lock->ActorSystem->Send(my, new NEtcd::TEvQueryError(res.GetIssues()));
843-
}
844-
});
843+
std::cout << name << " finished with errors: " << res.GetIssues().ToString() << std::endl;
844+
});
845+
ctx.Send(this->SelfId(), new NEtcd::TEvQueryResult);
846+
} else {
847+
Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my = this->SelfId(), stuff = TSharedStuff::TWeakPtr(Stuff)](const auto& future) {
848+
if (const auto lock = stuff.lock()) {
849+
if (const auto res = future.GetValueSync(); res.IsSuccess())
850+
lock->ActorSystem->Send(my, new NEtcd::TEvQueryResult(res.GetResultSets()));
851+
else
852+
lock->ActorSystem->Send(my, new NEtcd::TEvQueryError(res.GetIssues()));
853+
}
854+
});
855+
}
845856
}
846857

847858
STFUNC(StateFunc) {
@@ -1076,37 +1087,54 @@ class TCompactRequest
10761087

10771088
const auto &rec = *GetProtoRequest();
10781089
KeyRevision = rec.revision();
1090+
Physical = rec.physical();
10791091
if (KeyRevision <= 0LL | KeyRevision >= Revision)
10801092
return std::string("invalid revision:" ) += std::to_string(KeyRevision);
10811093
return {};
10821094
}
10831095

1096+
bool ExecuteAsync() const final {
1097+
return !Physical;
1098+
}
1099+
10841100
void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params) final {
10851101
sql << "$Trash = select c.key as key, c.modified as modified from `history` as c inner join (" << std::endl;
10861102
sql << "select max_by((`key`, `modified`), `modified`) as pair from `history`" << std::endl;
10871103
sql << "where `modified` < " << AddParam("Revision", params, KeyRevision) << " and 0L = `version` group by `key`" << std::endl;
10881104
sql << ") as keys on keys.pair.0 = c.key where c.modified <= keys.pair.1;" << std::endl;
1089-
sql << "select count(*) from $Trash;" << std::endl;
10901105
sql << "delete from `history` on select * from $Trash;" << std::endl;
1106+
if (Physical) {
1107+
sql << "select count(*) from $Trash;" << std::endl;
1108+
}
10911109
}
10921110

10931111
void ReplyWith(const NYdb::TResultSets& results, const TActorContext& ctx) final {
1094-
auto parser = NYdb::TResultSetParser(results.front());
1095-
const auto erased = parser.TryNextRow() ? NYdb::TValueParser(parser.GetValue(0)).GetUint64() : 0ULL;
1096-
if (!erased)
1097-
TryToRollbackRevision();
1112+
Dump(std::cout);
1113+
if (Physical) {
1114+
auto parser = NYdb::TResultSetParser(results.front());
1115+
const auto erased = parser.TryNextRow() ? NYdb::TValueParser(parser.GetValue(0)).GetUint64() : 0ULL;
1116+
if (!erased)
1117+
TryToRollbackRevision();
1118+
1119+
std::cout << '=' << erased << std::endl;
1120+
} else {
1121+
std::cout << " is executing asynchronously." << std::endl;
1122+
}
10981123

10991124
etcdserverpb::CompactionResponse response;
11001125
response.mutable_header()->set_revision(Revision);
1101-
Dump(std::cout) << '=' << erased << std::endl;
11021126
return Reply(response, ctx);
11031127
}
11041128

11051129
std::ostream& Dump(std::ostream& out) const final {
1106-
return out << "Compact(" << KeyRevision << ')';
1130+
out << "Compact(" << KeyRevision;
1131+
if (Physical)
1132+
out << ",physical";
1133+
return out << ')';
11071134
}
11081135

11091136
i64 KeyRevision;
1137+
bool Physical;
11101138
};
11111139

11121140
class TLeaseGrantRequest

ydb/apps/etcd_proxy/service/ut/etcd_service_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,7 @@ Y_UNIT_TEST_SUITE(Etcd_KV) {
10501050
grpc::ClientContext compactCtx;
10511051
etcdserverpb::CompactionRequest compactionRequest;
10521052
compactionRequest.set_revision(revForCompact);
1053+
compactionRequest.set_physical(true);
10531054
etcdserverpb::CompactionResponse compactionResponse;
10541055
UNIT_ASSERT(etcd->Compact(&compactCtx, compactionRequest, &compactionResponse).ok());
10551056
}

0 commit comments

Comments
 (0)