Skip to content

Commit 01a0ea3

Browse files
committed
Ydb stable 23-4-4
x-stable-origin-commit: d7fd6fddf6a14d5723ab64f6b8d512a7f239bc88
1 parent 58c0590 commit 01a0ea3

File tree

63 files changed

+1957
-626
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1957
-626
lines changed

library/cpp/actors/testlib/test_runtime.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,11 +1202,21 @@ namespace NActors {
12021202
isEdgeMailbox = true;
12031203
TEventsList events;
12041204
mbox.second->Capture(events);
1205+
1206+
TEventsList eventsToPush;
12051207
for (auto& ev : events) {
12061208
TInverseGuard<TMutex> inverseGuard(Mutex);
1207-
ObserverFunc(*this, ev);
1209+
1210+
for (auto& observer : ObserverFuncs) {
1211+
observer(ev);
1212+
if (!ev) break;
1213+
}
1214+
1215+
if (ev && ObserverFunc(*this, ev) != EEventAction::DROP) {
1216+
eventsToPush.push_back(ev);
1217+
}
12081218
}
1209-
mbox.second->PushFront(events);
1219+
mbox.second->PushFront(eventsToPush);
12101220
}
12111221

12121222
if (!isEdgeMailbox) {
@@ -1232,7 +1242,17 @@ namespace NActors {
12321242
EEventAction action;
12331243
{
12341244
TInverseGuard<TMutex> inverseGuard(Mutex);
1235-
action = ObserverFunc(*this, ev);
1245+
1246+
for (auto& observer : ObserverFuncs) {
1247+
observer(ev);
1248+
if (!ev) break;
1249+
}
1250+
1251+
if (ev) {
1252+
action = ObserverFunc(*this, ev);
1253+
} else {
1254+
action = EEventAction::DROP;
1255+
}
12361256
}
12371257

12381258
switch (action) {

library/cpp/actors/testlib/test_runtime.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,52 @@ namespace NActors {
296296
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
297297
void SetupMonitoring();
298298

299+
using TEventObserverCollection = std::list<std::function<void(TAutoPtr<IEventHandle>& event)>>;
300+
class TEventObserverHolder {
301+
public:
302+
TEventObserverHolder(TEventObserverCollection& list, TEventObserverCollection::iterator&& iter)
303+
: List(list)
304+
, Iter(iter)
305+
{
306+
}
307+
308+
~TEventObserverHolder()
309+
{
310+
Remove();
311+
}
312+
313+
void Remove()
314+
{
315+
if (Iter == List.end()) {
316+
return;
317+
}
318+
319+
List.erase(Iter);
320+
Iter = List.end();
321+
}
322+
private:
323+
TEventObserverCollection& List;
324+
TEventObserverCollection::iterator Iter;
325+
};
326+
327+
template <typename TEvType>
328+
TEventObserverHolder AddObserver(std::function<void(typename TEvType::TPtr&)> observerFunc)
329+
{
330+
auto baseFunc = [observerFunc](TAutoPtr<IEventHandle>& event) {
331+
if (event && event->GetTypeRewrite() == TEvType::EventType)
332+
observerFunc(*(reinterpret_cast<typename TEvType::TPtr*>(&event)));
333+
};
334+
335+
auto iter = ObserverFuncs.insert(ObserverFuncs.end(), baseFunc);
336+
return TEventObserverHolder(ObserverFuncs, std::move(iter));
337+
}
338+
339+
TEventObserverHolder AddObserver(std::function<void(TAutoPtr<IEventHandle>&)> observerFunc)
340+
{
341+
auto iter = ObserverFuncs.insert(ObserverFuncs.end(), observerFunc);
342+
return TEventObserverHolder(ObserverFuncs, std::move(iter));
343+
}
344+
299345
template<typename T>
300346
void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) {
301347
Y_VERIFY(!IsInitialized);
@@ -653,6 +699,7 @@ namespace NActors {
653699
TDuration DispatchTimeout;
654700
TDuration ReschedulingDelay;
655701
TEventObserver ObserverFunc;
702+
TEventObserverCollection ObserverFuncs;
656703
TScheduledEventsSelector ScheduledEventsSelectorFunc;
657704
TEventFilter EventFilterFunc;
658705
TScheduledEventFilter ScheduledEventFilterFunc;

library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class TDisjointIntervalTree {
132132
}
133133

134134
TIterator completelyRemoveEnd = completelyRemoveBegin != Tree.end() ? Tree.lower_bound(end) : Tree.end();
135-
if (completelyRemoveEnd != Tree.end() && completelyRemoveEnd != Tree.begin() && completelyRemoveEnd->first != end) {
135+
if (completelyRemoveEnd != Tree.begin() && (completelyRemoveEnd == Tree.end() || completelyRemoveEnd->first != end)) {
136136
TIterator containingEnd = completelyRemoveEnd;
137137
--containingEnd;
138138
if (containingEnd->second > end) {

library/cpp/containers/disjoint_interval_tree/ut/disjoint_interval_tree_ut.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,18 @@ Y_UNIT_TEST_SUITE(DisjointIntervalTreeTest) {
232232
UNIT_ASSERT_VALUES_EQUAL(tree.GetNumIntervals(), 2);
233233
UNIT_ASSERT_VALUES_EQUAL(tree.GetNumElements(), 8);
234234
}
235+
236+
// 12. The only one interval
237+
{
238+
TDisjointIntervalTree<ui64> tree;
239+
tree.InsertInterval(1, 10);
240+
UNIT_ASSERT_VALUES_EQUAL(tree.GetNumIntervals(), 1);
241+
UNIT_ASSERT_VALUES_EQUAL(tree.GetNumElements(), 9);
242+
UNIT_ASSERT_VALUES_EQUAL(tree.EraseInterval(0, 6), 5);
243+
UNIT_ASSERT_VALUES_EQUAL(tree.GetNumIntervals(), 1);
244+
UNIT_ASSERT_VALUES_EQUAL(tree.GetNumElements(), 4);
245+
UNIT_ASSERT(tree.Intersects(5, 10));
246+
}
235247
}
236248

237249
Y_UNIT_TEST(IntersectsTest) {

ydb/core/driver_lib/version/version.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ TCompatibilityInfo::TCompatibilityInfo() {
2828
.Application = "ydb",
2929
.Version = TVersionConstructor{
3030
.Year = 23,
31-
.Major = 3,
31+
.Major = 4,
3232
}
3333
}.ToPB();
3434

@@ -79,13 +79,13 @@ const TStored* TCompatibilityInfo::GetDefault(TComponentId componentId) const {
7979
// obsolete version control
8080
TMaybe<NActors::TInterconnectProxyCommon::TVersionInfo> VERSION = NActors::TInterconnectProxyCommon::TVersionInfo{
8181
// version of this binary
82-
"stable-23-3",
82+
"stable-23-4",
8383

8484
// compatible versions; must include all compatible old ones, including this one; version verification occurs on both
8585
// peers and connection is accepted if at least one of peers accepts the version of the other peer
8686
{
87-
"stable-23-2",
88-
"stable-23-3"
87+
"stable-23-3",
88+
"stable-23-4"
8989
}
9090
};
9191

ydb/core/persqueue/writer/writer.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
205205
auto ev = MakeRequest(PartitionId, PipeClient);
206206

207207
auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
208-
if (Opts.UseDeduplication) {
209-
cmd.SetOwner(SourceId);
210-
} else {
211-
cmd.SetOwner(CreateGuidAsString());
212-
}
208+
cmd.SetOwner(SourceId);
213209
cmd.SetForce(true);
214210

215211
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
@@ -659,7 +655,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
659655
, TabletId(tabletId)
660656
, PartitionId(partitionId)
661657
, ExpectedGeneration(expectedGeneration)
662-
, SourceId(sourceId)
658+
, SourceId(opts.UseDeduplication ? sourceId : CreateGuidAsString())
663659
, Opts(opts)
664660
{
665661
if (Opts.MeteringMode) {

ydb/core/protos/flat_tx_scheme.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,13 @@ message TEvDescribeSchemeResult {
103103
optional string Reason = 2;
104104
optional string Path = 3;
105105
optional NKikimrSchemeOp.TPathDescription PathDescription = 4;
106-
optional fixed64 PathOwner = 5;
106+
optional fixed64 DEPRECATED_PathOwner = 5; // replaced by PathOwnerId
107107
optional fixed64 PathId = 6;
108108

109109
optional string LastExistedPrefixPath = 7;
110110
optional fixed64 LastExistedPrefixPathId = 8;
111111
optional NKikimrSchemeOp.TPathDescription LastExistedPrefixDescription = 9;
112+
112113
optional fixed64 PathOwnerId = 10;
113114
}
114115

ydb/core/protos/scheme_board.proto

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import "ydb/core/protos/flat_tx_scheme.proto";
1+
import "ydb/core/protos/base.proto";
22

33
package NKikimrSchemeBoard;
44
option java_package = "ru.yandex.kikimr.proto";
@@ -13,22 +13,76 @@ message TEvHandshake {
1313
optional uint64 Generation = 2;
1414
}
1515

16-
// here and below
17-
// Owner is the tablet id of schemeshard witch holds the records
18-
// LocalPathId is a second part of TPathId
19-
// PathOwnerId is a first part of TPathId
16+
// Here and below.
17+
// Owner is the tablet id of schemeshard which holds the records.
18+
// (PathOwnerId, LocalPathId) constitute TPathId of the object.
2019

20+
// TEvUpdate.DescribeSchemeResultSerialized is a NKikimrScheme.TEvDescribeSchemeResult
21+
// in the form of opaque payload.
22+
// Originally, that field existed as a properly typed TEvDescribeSchemeResult message.
23+
// However, that induce additional overhead to serialize and deserialize this message
24+
// when transferring over wire.
25+
// This performance cost is usually either negligible or imperceptible.
26+
// But in specific situations, particularly when rapidly updating partitioning information
27+
// for tables with huge number of shards, this overhead could lead to significant issues.
28+
// Schemeboard replicas could get overloaded and become unresponsive to further requests.
29+
// This is problematic, especially considering the schemeboard subsystem's critical role
30+
// in servicing all databases within a cluster, making it a Single Point of Failure (SPOF).
31+
//
32+
// The core realization is that the schemeboard components do not require the full content of
33+
// a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of
34+
// fields (path, path-id, version and info about subdomain/database) is required for processing.
35+
// And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.
36+
//
37+
// Type change from TEvDescribeSchemeResult to (repeated) bytes without changing field number
38+
// is a safe move. Actual value of the field remains unchanged at the wire-format level.
39+
// Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message
40+
// and proceed with deserialization as usual. And newer implementations will recognize the data
41+
// as a binary blob and will deserialize it explicitly only when necessary.
42+
//
43+
// Note that the `repeated` label for the `DescribeSchemeResultSerialized` field is essential
44+
// to remain backward-compatible with the previous implementation. This is because even if
45+
// DescribeSchemeResult previously was labeled `optional` but actual value used at
46+
// the wire-format level was (and is) a pack of TEvDescribeSchemeResult messages.
47+
// Automerge of consecutive messages for the same field is a prominent feature of the protobuf.
48+
// Schemeshard use that feature to supply full TEvDescribeSchemeResult as a sequence of
49+
// partially filled TEvDescribeSchemeResult's.
50+
//
51+
// - Path
52+
// - PathOwnerId, LocalPathId
53+
// - PathDirEntryPathVersion
54+
// - PathSubdomainPathId
55+
// - PathAbandonedTenantsSchemeShards
56+
// are taken from the original TEvDescribeSchemeResult (one way or another).
57+
//
2158
message TEvUpdate {
2259
optional uint64 Owner = 1;
2360
optional uint64 Generation = 2;
2461
optional TLocalPathIdRange DeletedLocalPathIds = 3;
25-
optional string Path = 4;
26-
optional uint64 LocalPathId = 5;
62+
63+
optional string Path = 4; // extracted from DescribeSchemeResult.Path
64+
optional uint64 LocalPathId = 5; // extracted from DescribeSchemeResult.PathId
65+
2766
optional bool IsDeletion = 6 [default = false];
28-
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 7;
67+
68+
repeated bytes DescribeSchemeResultSerialized = 7;
69+
2970
optional bool NeedAck = 8 [default = false];
30-
optional uint64 PathOwnerId = 9;
71+
72+
optional uint64 PathOwnerId = 9; // extracted from DescribeSchemeResult.PathOwnerId, DescribeSchemeResult.PathDescription.Self.SchemeshardId in order of presence
73+
3174
optional TLocalPathIdRange MigratedLocalPathIds = 10;
75+
76+
// Explicit values extracted from DescribeSchemeResultSerialized
77+
78+
// DescribeSchemeResult.PathDescription.Self.PathVersion
79+
optional uint64 PathDirEntryPathVersion = 11;
80+
81+
// DescribeSchemeResult.PathDescription.DomainDescription.DomainKey
82+
optional NKikimrProto.TPathID PathSubdomainPathId = 13;
83+
84+
// DescribeSchemeResult.PathDescription.AbandonedTenantsSchemeShards
85+
repeated uint64 PathAbandonedTenantsSchemeShards = 14;
3286
}
3387

3488
message TEvUpdateAck {
@@ -65,16 +119,22 @@ message TEvUnsubscribe {
65119
optional uint64 LocalPathId = 3;
66120
}
67121

122+
// See comments for TEvUpdate.
68123
message TEvNotify {
69124
optional string Path = 1;
70125
// and/or
71126
optional uint64 PathOwnerId = 2;
72127
optional uint64 LocalPathId = 3;
73128
// common fields
74129
optional bool IsDeletion = 4 [default = false];
75-
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 5;
76-
optional uint64 Version = 6;
130+
131+
optional bytes DescribeSchemeResultSerialized = 5;
132+
133+
optional uint64 Version = 6; // same as TEvUpdate.PathDirEntryPathVersion
77134
optional bool Strong = 7 [default = false];
135+
136+
optional NKikimrProto.TPathID PathSubdomainPathId = 8;
137+
repeated uint64 PathAbandonedTenantsSchemeShards = 9;
78138
}
79139

80140
message TEvNotifyAck {

ydb/core/tx/columnshard/blobs_action/counters/storage.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,8 @@ TStorageCounters::TStorageCounters(const TString& storageId)
99
}
1010

1111
std::shared_ptr<NKikimr::NOlap::NBlobOperations::TConsumerCounters> TStorageCounters::GetConsumerCounter(const TString& consumerId) {
12-
auto it = ConsumerCounters.find(consumerId);
13-
if (it == ConsumerCounters.end()) {
14-
it = ConsumerCounters.emplace(consumerId, std::make_shared<TConsumerCounters>(consumerId, *this)).first;
15-
}
16-
return it->second;
12+
Y_UNUSED(ConsumerCounters);
13+
return std::make_shared<TConsumerCounters>(consumerId, *this);
1714
}
1815

1916
TConsumerCounters::TConsumerCounters(const TString& consumerId, const TStorageCounters& parent)

ydb/core/tx/columnshard/blobs_action/tier/storage.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* ti
4848
} else {
4949
settings.SetEndpoint("nowhere");
5050
}
51+
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
52+
return;
53+
}
54+
CurrentS3Settings = settings;
5155
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(settings);
5256
AFL_VERIFY(extStorageConfig);
5357
auto extStorageOperator = extStorageConfig->ConstructStorageOperator(false);

ydb/core/tx/columnshard/blobs_action/tier/storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class TOperator: public IBlobsStorageOperator {
1717
TAtomicCounter CurrentOperatorIdx = 0;
1818
std::deque<NWrappers::NExternalStorage::IExternalStorageOperator::TPtr> ExternalStorageOperators;
1919
std::shared_ptr<TGCInfo> GCInfo = std::make_shared<TGCInfo>();
20+
std::optional<NKikimrSchemeOp::TS3Settings> CurrentS3Settings;
2021
NWrappers::NExternalStorage::IExternalStorageConfig::TPtr ExternalStorageConfig;
2122
NWrappers::NExternalStorage::IExternalStorageOperator::TPtr ExternalStorageOperator;
2223

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(co
115115
for (auto& [writeId, data] : Inserted) {
116116
if (data.GetMeta().GetDirtyWriteTime() && data.GetMeta().GetDirtyWriteTime() < timeBorder) {
117117
toAbort.insert(writeId);
118+
if (toAbort.size() == 20000) {
119+
break;
120+
}
118121
}
119122
}
120123
return toAbort;

ydb/core/tx/columnshard/engines/portions/column_record.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo&
1414
if (context.GetMetaProto().HasRawBytes()) {
1515
RawBytes = context.GetMetaProto().GetRawBytes();
1616
}
17-
if (context.GetMetaProto().HasMinValue()) {
18-
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
19-
Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type());
20-
}
2117
if (context.GetMetaProto().HasMaxValue()) {
2218
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
2319
Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type());
@@ -37,9 +33,9 @@ NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
3733
if (RawBytes) {
3834
meta.SetRawBytes(*RawBytes);
3935
}
40-
if (HasMinMax()) {
41-
ScalarToConstant(*Min, *meta.MutableMinValue());
36+
if (HasMax()) {
4237
ScalarToConstant(*Max, *meta.MutableMaxValue());
38+
ScalarToConstant(*Max, *meta.MutableMinValue());
4339
}
4440
return meta;
4541
}

0 commit comments

Comments
 (0)