Skip to content

Commit 4c573ce

Browse files
authored
Do not call TaskRunner for periodic stats updates (#18143)
1 parent 52058e4 commit 4c573ce

File tree

4 files changed

+26
-22
lines changed

4 files changed

+26
-22
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+2-8
Original file line numberDiff line numberDiff line change
@@ -1133,15 +1133,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
11331133
break;
11341134
}
11351135
case EEvWakeupTag::PeriodicStatsTag: {
1136-
const auto maxInterval = RuntimeSettings.ReportStatsSettings->MaxInterval;
11371136
if (Running && State == NDqProto::COMPUTE_STATE_EXECUTING) {
1138-
if (ProcessOutputsState.LastRunStatus == ERunStatus::Finished) {
1139-
// wait until all outputs are drained
1140-
ReportStats();
1141-
} else {
1142-
DoExecute();
1143-
}
1144-
this->Schedule(maxInterval, new NActors::TEvents::TEvWakeup(EEvWakeupTag::PeriodicStatsTag));
1137+
ReportStats();
1138+
this->Schedule(RuntimeSettings.ReportStatsSettings->MaxInterval, new NActors::TEvents::TEvWakeup(EEvWakeupTag::PeriodicStatsTag));
11451139
}
11461140
break;
11471141
}

ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp

+8-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,14 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& ta
6565

6666
protoTask->SetWaitInputTimeUs(taskStats.WaitInputTime.MicroSeconds());
6767
protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds());
68-
protoTask->SetCurrentWaitInputTimeUs(taskStats.CurrentWaitInputTime.MicroSeconds());
69-
protoTask->SetCurrentWaitOutputTimeUs(taskStats.CurrentWaitOutputTime.MicroSeconds());
68+
69+
auto now = TInstant::Now();
70+
if (taskStats.CurrentWaitInputStartTime) {
71+
protoTask->SetCurrentWaitInputTimeUs((now - taskStats.CurrentWaitInputStartTime).MicroSeconds());
72+
}
73+
if (taskStats.CurrentWaitOutputStartTime) {
74+
protoTask->SetCurrentWaitOutputTimeUs((now - taskStats.CurrentWaitOutputStartTime).MicroSeconds());
75+
}
7076

7177
if (StatsLevelCollectFull(level)) {
7278
protoTask->SetSpillingComputeWriteBytes(taskStats.SpillingComputeWriteBytes);

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

+13-10
Original file line numberDiff line numberDiff line change
@@ -786,27 +786,34 @@ class TDqTaskRunner : public IDqTaskRunner {
786786
}
787787

788788
if (Y_LIKELY(CollectBasic())) {
789+
auto now = TInstant::Now();
789790
switch (runStatus) {
790791
case ERunStatus::Finished:
791792
// finished => waiting for nothing
792-
Stats->CurrentWaitInputTime = TDuration::Zero();
793-
Stats->CurrentWaitOutputTime = TDuration::Zero();
794-
Stats->FinishTs = TInstant::Now();
793+
Stats->CurrentWaitInputStartTime = TInstant::Zero();
794+
Stats->CurrentWaitOutputStartTime = TInstant::Zero();
795+
Stats->FinishTs = now;
795796
break;
796797
case ERunStatus::PendingInput:
797798
// output is checked first => not waiting for output
798-
Stats->CurrentWaitOutputTime = TDuration::Zero();
799+
Stats->CurrentWaitOutputStartTime = TInstant::Zero();
799800
if (Y_LIKELY(InputConsumed)) {
800801
// did smth => waiting for nothing
801-
Stats->CurrentWaitInputTime = TDuration::Zero();
802+
Stats->CurrentWaitInputStartTime = TInstant::Zero();
802803
} else {
803804
StartWaitingInput();
805+
if (Y_LIKELY(!Stats->CurrentWaitInputStartTime)) {
806+
Stats->CurrentWaitInputStartTime = now;
807+
}
804808
}
805809
break;
806810
case ERunStatus::PendingOutput:
807811
// waiting for output => not waiting for input
808-
Stats->CurrentWaitInputTime = TDuration::Zero();
812+
Stats->CurrentWaitInputStartTime = TInstant::Zero();
809813
StartWaitingOutput();
814+
if (Y_LIKELY(!Stats->CurrentWaitOutputStartTime)) {
815+
Stats->CurrentWaitOutputStartTime = now;
816+
}
810817
break;
811818
}
812819
}
@@ -1077,7 +1084,6 @@ class TDqTaskRunner : public IDqTaskRunner {
10771084
} else {
10781085
Stats->WaitStartTime += delta;
10791086
}
1080-
Stats->CurrentWaitInputTime += delta;
10811087
}
10821088
StartWaitInputTime = now;
10831089
}
@@ -1087,7 +1093,6 @@ class TDqTaskRunner : public IDqTaskRunner {
10871093
if (Y_LIKELY(StartWaitOutputTime)) {
10881094
auto delta = now - *StartWaitOutputTime;
10891095
Stats->WaitOutputTime += delta;
1090-
Stats->CurrentWaitOutputTime += delta;
10911096
}
10921097
StartWaitOutputTime = now;
10931098
}
@@ -1101,14 +1106,12 @@ class TDqTaskRunner : public IDqTaskRunner {
11011106
} else {
11021107
Stats->WaitStartTime += delta;
11031108
}
1104-
Stats->CurrentWaitInputTime += delta;
11051109
StartWaitInputTime.reset();
11061110
TDuration::Zero();
11071111
}
11081112
if (StartWaitOutputTime) {
11091113
auto delta = now - *StartWaitOutputTime;
11101114
Stats->WaitOutputTime += delta;
1111-
Stats->CurrentWaitOutputTime += delta;
11121115
StartWaitOutputTime.reset();
11131116
}
11141117
}

ydb/library/yql/dq/runtime/dq_tasks_runner.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ struct TDqTaskRunnerStats {
5757
TDuration WaitStartTime;
5858
TDuration WaitInputTime;
5959
TDuration WaitOutputTime;
60-
TDuration CurrentWaitInputTime;
61-
TDuration CurrentWaitOutputTime;
60+
61+
TInstant CurrentWaitInputStartTime;
62+
TInstant CurrentWaitOutputStartTime;
6263

6364
ui64 SpillingComputeWriteBytes;
6465
ui64 SpillingChannelWriteBytes;

0 commit comments

Comments
 (0)