Skip to content

Commit 9ac8c8c

Browse files
ivanmorozov333ivanmorozov333
andauthored
additional signals (#19745)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
1 parent caaaef5 commit 9ac8c8c

File tree

10 files changed

+205
-85
lines changed

10 files changed

+205
-85
lines changed

ydb/core/tx/columnshard/data_reader/fetcher.h

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,56 @@
1111
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
1212

1313
#include <ydb/library/accessor/accessor.h>
14+
#include <ydb/library/signals/states.h>
1415

1516
namespace NKikimr::NOlap::NDataFetcher {
1617

18+
class TClassCounters {
19+
private:
20+
std::shared_ptr<NCounters::TStateSignalsOperator<EFetchingStage>> StateSignals;
21+
22+
public:
23+
TClassCounters(NColumnShard::TCommonCountersOwner& owner) {
24+
StateSignals = std::make_shared<NCounters::TStateSignalsOperator<EFetchingStage>>(owner, "fetching_stage");
25+
}
26+
27+
NCounters::TStateSignalsOperator<EFetchingStage>::TGuard GetGuard(const std::optional<EFetchingStage> start) {
28+
return StateSignals->BuildGuard(start);
29+
}
30+
};
31+
32+
class TCounters: public NColumnShard::TCommonCountersOwner {
33+
private:
34+
using TBase = NColumnShard::TCommonCountersOwner;
35+
TMutex Mutex;
36+
THashMap<TString, std::shared_ptr<TClassCounters>> ClassCounters;
37+
38+
public:
39+
TCounters()
40+
: TBase("data_fetcher") {
41+
}
42+
43+
std::shared_ptr<TClassCounters> GetClassCounters(const TString& className) {
44+
TGuard<TMutex> g(Mutex);
45+
auto it = ClassCounters.find(className);
46+
if (it == ClassCounters.end()) {
47+
it = ClassCounters.emplace(className, std::make_shared<TClassCounters>(*this)).first;
48+
}
49+
return it->second;
50+
}
51+
};
52+
1753
class TPortionsDataFetcher: TNonCopyable {
1854
private:
1955
const TRequestInput Input;
2056
const std::shared_ptr<IFetchCallback> Callback;
57+
std::shared_ptr<TClassCounters> ClassCounters;
58+
NCounters::TStateSignalsOperator<EFetchingStage>::TGuard Guard;
2159
TScriptExecution Script;
2260
TCurrentContext CurrentContext;
2361
std::shared_ptr<TEnvironment> Environment;
2462
const NConveyorComposite::ESpecialTaskCategory ConveyorCategory;
2563
bool IsFinishedFlag = false;
26-
EFetchingStage Stage = EFetchingStage::Created;
2764

2865
public:
2966
void AskMemoryAllocation(const std::shared_ptr<NGroupedMemoryManager::IAllocation>& task) {
@@ -35,6 +72,8 @@ class TPortionsDataFetcher: TNonCopyable {
3572
const std::shared_ptr<TScript>& script, const NConveyorComposite::ESpecialTaskCategory conveyorCategory)
3673
: Input(std::move(input))
3774
, Callback(std::move(callback))
75+
, ClassCounters(Singleton<TCounters>()->GetClassCounters(Callback->GetClassName()))
76+
, Guard(ClassCounters->GetGuard(EFetchingStage::Created))
3877
, Script(script)
3978
, Environment(environment)
4079
, ConveyorCategory(conveyorCategory) {
@@ -43,7 +82,8 @@ class TPortionsDataFetcher: TNonCopyable {
4382
}
4483

4584
~TPortionsDataFetcher() {
46-
AFL_VERIFY(IsFinishedFlag || Stage == EFetchingStage::Created || Callback->IsAborted())("stage", Stage)("class_name", Callback->GetClassName());
85+
AFL_VERIFY(NActors::TActorSystem::IsStopped() || IsFinishedFlag || Guard.GetStage() == EFetchingStage::Created
86+
|| Callback->IsAborted())("stage", Guard.GetStage())("class_name", Callback->GetClassName());
4787
}
4888

4989
static void StartAccessorPortionsFetching(TRequestInput&& input, std::shared_ptr<IFetchCallback>&& callback,
@@ -78,7 +118,7 @@ class TPortionsDataFetcher: TNonCopyable {
78118

79119
void SetStage(const EFetchingStage stage) {
80120
Callback->OnStageStarting(stage);
81-
Stage = stage;
121+
Guard.SetState(stage);
82122
}
83123

84124
void OnError(const TString& errMessage) {

ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
namespace NKikimr::NOlap {
1111

1212
void TColumnEngineChanges::SetStage(const NChanges::EStage stage) {
13-
Counters->SetStage(stage);
13+
StateGuard.SetState(stage);
1414
}
1515

1616
TString TColumnEngineChanges::DebugString() const {
@@ -23,8 +23,8 @@ TString TColumnEngineChanges::DebugString() const {
2323

2424
TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& context) noexcept {
2525
const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("task_id", GetTaskIdentifier())("task_class", TypeString());
26-
AFL_VERIFY(Counters->GetStage() == NChanges::EStage::ReadyForConstruct || Counters->GetStage() == NChanges::EStage::Started)(
27-
"actual_stage", Counters->GetStage());
26+
AFL_VERIFY(StateGuard.GetStage() == NChanges::EStage::ReadyForConstruct || StateGuard.GetStage() == NChanges::EStage::Started)(
27+
"actual_stage", StateGuard.GetStage());
2828

2929
context.Counters.CompactionInputSize(Blobs.GetTotalBlobsSize());
3030
const TMonotonic start = TMonotonic::Now();
@@ -39,16 +39,16 @@ TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& con
3939
}
4040

4141
void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
42-
AFL_VERIFY(Counters->GetStage() != NChanges::EStage::Aborted);
43-
AFL_VERIFY(Counters->GetStage() <= NChanges::EStage::Written);
44-
AFL_VERIFY(Counters->GetStage() >= NChanges::EStage::Compiled);
42+
AFL_VERIFY(StateGuard.GetStage() != NChanges::EStage::Aborted);
43+
AFL_VERIFY(StateGuard.GetStage() <= NChanges::EStage::Written);
44+
AFL_VERIFY(StateGuard.GetStage() >= NChanges::EStage::Compiled);
4545

4646
DoWriteIndexOnExecute(self, context);
4747
SetStage(NChanges::EStage::Written);
4848
}
4949

5050
void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
51-
Y_ABORT_UNLESS(Counters->GetStage() == NChanges::EStage::Written || !self);
51+
Y_ABORT_UNLESS(StateGuard.GetStage() == NChanges::EStage::Written || !self);
5252
SetStage(NChanges::EStage::Finished);
5353
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
5454
DoWriteIndexOnComplete(self, context);
@@ -59,7 +59,7 @@ void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self
5959
}
6060

6161
void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
62-
AFL_VERIFY(Counters->GetStage() != NChanges::EStage::Aborted);
62+
AFL_VERIFY(StateGuard.GetStage() != NChanges::EStage::Aborted);
6363

6464
DoCompile(context);
6565
DoOnAfterCompile();
@@ -73,7 +73,8 @@ TColumnEngineChanges::~TColumnEngineChanges() {
7373

7474
void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
7575
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Abort")("reason", context.ErrorMessage);
76-
AFL_VERIFY(Counters->GetStage() != NChanges::EStage::Finished && Counters->GetStage() != NChanges::EStage::Created && Counters->GetStage() != NChanges::EStage::Aborted)("stage", Counters->GetStage())(
76+
AFL_VERIFY(StateGuard.GetStage() != NChanges::EStage::Finished && StateGuard.GetStage() != NChanges::EStage::Created && StateGuard.GetStage() != NChanges::EStage::Aborted)(
77+
"stage", StateGuard.GetStage())(
7778
"reason", context.ErrorMessage)("prev_reason", AbortedReason);
7879
SetStage(NChanges::EStage::Aborted);
7980
AbortedReason = context.ErrorMessage;
@@ -83,7 +84,7 @@ void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinis
8384
void TColumnEngineChanges::Start(NColumnShard::TColumnShard& self) {
8485
AFL_VERIFY(!LockGuard);
8586
LockGuard = self.DataLocksManager->RegisterLock(BuildDataLock());
86-
Y_ABORT_UNLESS(Counters->GetStage() == NChanges::EStage::Created);
87+
Y_ABORT_UNLESS(StateGuard.GetStage() == NChanges::EStage::Created);
8788
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(self.TabletID(), *this);
8889
DoStart(self);
8990
SetStage(NChanges::EStage::Started);
@@ -93,7 +94,7 @@ void TColumnEngineChanges::Start(NColumnShard::TColumnShard& self) {
9394
}
9495

9596
void TColumnEngineChanges::StartEmergency() {
96-
Y_ABORT_UNLESS(Counters->GetStage() == NChanges::EStage::Created);
97+
Y_ABORT_UNLESS(StateGuard.GetStage() == NChanges::EStage::Created);
9798
SetStage(NChanges::EStage::Started);
9899
// if (!NeedConstruction()) {
99100
// SetStage(NChanges::EStage::Constructed);
@@ -102,7 +103,7 @@ void TColumnEngineChanges::StartEmergency() {
102103

103104
void TColumnEngineChanges::AbortEmergency(const TString& reason) {
104105
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "AbortEmergency")("reason", reason)("prev_reason", AbortedReason);
105-
if (Counters->GetStage() == NChanges::EStage::Aborted) {
106+
if (StateGuard.GetStage() == NChanges::EStage::Aborted) {
106107
AbortedReason += "; AnotherReason: " + reason;
107108
} else {
108109
SetStage(NChanges::EStage::Aborted);

ydb/core/tx/columnshard/engines/changes/abstract/abstract.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class TColumnEngineChanges: public TMoveOnly {
255255
TString AbortedReason;
256256
const TString TaskIdentifier = TGUID::CreateTimebased().AsGuidString();
257257
std::shared_ptr<const TAtomicCounter> ActivityFlag;
258-
std::shared_ptr<NChanges::TChangesCounters::TStageCountersGuard> Counters;
258+
NCounters::TStateSignalsOperator<NChanges::EStage>::TGuard StateGuard;
259259

260260
protected:
261261
std::optional<TDataAccessorsResult> FetchedDataAccessors;
@@ -354,15 +354,15 @@ class TColumnEngineChanges: public TMoveOnly {
354354
}
355355

356356
TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager, const NBlobOperations::EConsumer consumerId)
357-
: Counters(NChanges::TChangesCounters::GetStageCounters(consumerId))
357+
: StateGuard(NChanges::TChangesCounters::GetStageCounters(consumerId))
358358
, BlobsAction(storagesManager, consumerId) {
359359
}
360360

361361
TConclusionStatus ConstructBlobs(TConstructionContext& context) noexcept;
362362
virtual ~TColumnEngineChanges();
363363

364364
bool IsAborted() const {
365-
return Counters->GetCurrentStage() == NChanges::EStage::Aborted;
365+
return StateGuard.GetStage() == NChanges::EStage::Aborted;
366366
}
367367

368368
void StartEmergency();
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
#include "changes.h"
22

3-
namespace NKikimr::NOlap::NChanges {}
3+
namespace NKikimr::NOlap::NChanges {
4+
}

ydb/core/tx/columnshard/engines/changes/counters/changes.h

Lines changed: 9 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#pragma once
22

33
#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h>
4-
#include <ydb/library/signals/owner.h>
54

65
#include <ydb/library/actors/core/log.h>
6+
#include <ydb/library/signals/owner.h>
7+
#include <ydb/library/signals/states.h>
78

89
#include <library/cpp/monlib/dynamic_counters/counters.h>
910
#include <util/generic/hash.h>
@@ -60,77 +61,26 @@ class TChangesCounters: public NColumnShard::TCommonCountersOwner {
6061
}
6162
};
6263

63-
class TStageCounters: NColumnShard::TCommonCountersOwner {
64-
private:
65-
using TBase = NColumnShard::TCommonCountersOwner;
66-
std::array<std::shared_ptr<TTaskCounters>, (ui64)EStage::COUNT> Stages;
67-
68-
public:
69-
TStageCounters(const NColumnShard::TCommonCountersOwner& owner, const NBlobOperations::EConsumer consumerId)
70-
: TBase(owner, "consumer", ToString(consumerId)) {
71-
for (size_t i = 0; i < (ui64)EStage::COUNT; ++i) {
72-
Stages[i] = std::make_shared<TTaskCounters>(TBase::CreateSubGroup("stage", ToString(static_cast<EStage>(i))));
73-
}
74-
}
75-
76-
void OnStageChanged(const std::optional<EStage> stageFrom, const std::optional<EStage> stageTo) const {
77-
if (stageFrom) {
78-
AFL_VERIFY(static_cast<size_t>(*stageFrom) < Stages.size())("index", *stageFrom)("size", Stages.size());
79-
Stages[static_cast<size_t>(*stageFrom)]->Dec();
80-
}
81-
if (stageTo) {
82-
AFL_VERIFY(static_cast<size_t>(*stageTo) < Stages.size())("index", *stageTo)("size", Stages.size());
83-
Stages[static_cast<size_t>(*stageTo)]->Inc();
84-
}
85-
}
86-
};
87-
8864
private:
89-
std::array<std::shared_ptr<TStageCounters>, static_cast<size_t>(NBlobOperations::EConsumer::COUNT)> StagesByConsumer;
65+
std::array<std::shared_ptr<NCounters::TStateSignalsOperator<EStage>>, static_cast<size_t>(NBlobOperations::EConsumer::COUNT)> StagesByConsumer;
9066

91-
std::shared_ptr<TStageCounters> GetStageCountersImpl(const NBlobOperations::EConsumer consumerId) {
67+
std::shared_ptr<NCounters::TStateSignalsOperator<EStage>> GetStageCountersImpl(const NBlobOperations::EConsumer consumerId) {
9268
AFL_VERIFY((ui64)consumerId < StagesByConsumer.size())("index", consumerId)("size", StagesByConsumer.size());
9369
return StagesByConsumer[(ui64)consumerId];
9470
}
9571

9672
public:
97-
class TStageCountersGuard: TNonCopyable {
98-
private:
99-
const std::shared_ptr<TStageCounters> Counters;
100-
YDB_READONLY(EStage, CurrentStage, EStage::Created);
101-
public:
102-
EStage GetStage() const {
103-
return CurrentStage;
104-
}
105-
106-
TStageCountersGuard(const std::shared_ptr<TStageCounters>& counters, const EStage startStage)
107-
: Counters(counters)
108-
, CurrentStage(startStage) {
109-
Counters->OnStageChanged(std::nullopt, startStage);
110-
}
111-
112-
~TStageCountersGuard() {
113-
Counters->OnStageChanged(CurrentStage, std::nullopt);
114-
}
115-
116-
void SetStage(const EStage stageTo) {
117-
AFL_VERIFY(stageTo >= CurrentStage)("current", CurrentStage)("to", stageTo);
118-
if (CurrentStage != stageTo) {
119-
Counters->OnStageChanged(CurrentStage, stageTo);
120-
CurrentStage = stageTo;
121-
}
122-
}
123-
};
124-
12573
TChangesCounters()
12674
: TBase("ColumnEngineChanges") {
12775
for (ui64 i = 0; i < (ui64)NBlobOperations::EConsumer::COUNT; ++i) {
128-
StagesByConsumer[i] = std::make_shared<TStageCounters>(*this, static_cast<NBlobOperations::EConsumer>(i));
76+
auto base = this->CreateSubGroup("consumer", ::ToString(static_cast<NBlobOperations::EConsumer>(i)));
77+
StagesByConsumer[i] = std::make_shared<NCounters::TStateSignalsOperator<EStage>>(
78+
base, "indexation_stage");
12979
}
13080
}
13181

132-
static std::shared_ptr<TStageCountersGuard> GetStageCounters(const NBlobOperations::EConsumer consumerId) {
133-
return std::make_shared<TStageCountersGuard>(Singleton<TChangesCounters>()->GetStageCountersImpl(consumerId), EStage::Created);
82+
static NCounters::TStateSignalsOperator<EStage>::TGuard GetStageCounters(const NBlobOperations::EConsumer consumerId) {
83+
return Singleton<TChangesCounters>()->GetStageCountersImpl(consumerId)->BuildGuard(EStage::Created);
13484
}
13585
};
13686

ydb/library/actors/core/actorsystem.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,6 @@ namespace NActors {
8484
, DefSelfID(NodeId, "actorsystem")
8585
, AppData0(appData)
8686
, LoggerSettings0(loggerSettings)
87-
, StartExecuted(false)
88-
, StopExecuted(false)
89-
, CleanupExecuted(false)
9087
{
9188
ServiceMap.Reset(new TServiceMap());
9289
}
@@ -95,7 +92,14 @@ namespace NActors {
9592
Cleanup();
9693
}
9794

98-
static void CheckEventMemory(IEventBase *ev) {
95+
bool TActorSystem::IsStopped() {
96+
if (!TlsActivationContext) {
97+
return true;
98+
}
99+
return TlsActivationContext->ActorSystem()->StopExecuted || !TlsActivationContext->ActorSystem()->StartExecuted;
100+
}
101+
102+
static void CheckEventMemory(IEventBase* ev) {
99103
if constexpr (!NSan::MSanIsOn()) {
100104
return;
101105
}

ydb/library/actors/core/actorsystem.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,9 @@ namespace NActors {
162162
TMutex ProxyCreationLock;
163163
mutable std::vector<TActorId> DynamicProxies;
164164

165-
bool StartExecuted;
166-
bool StopExecuted;
167-
bool CleanupExecuted;
165+
bool StartExecuted = false;
166+
bool StopExecuted = false;
167+
bool CleanupExecuted = false;
168168

169169
std::deque<std::function<void()>> DeferredPreStop;
170170
public:
@@ -176,6 +176,8 @@ namespace NActors {
176176
void Stop();
177177
void Cleanup();
178178

179+
static bool IsStopped();
180+
179181
template <ESendingType SendingType = ESendingType::Common>
180182
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
181183
ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());

ydb/library/signals/states.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "states.h"
2+
3+
namespace NKikimr::NOlap::NCounters {
4+
5+
}

0 commit comments

Comments
 (0)