Skip to content

Commit 8286ffa

Browse files
committed
[native] Add optional Task queuing framework.
1 parent f419d2f commit 8286ffa

File tree

12 files changed

+408
-100
lines changed

12 files changed

+408
-100
lines changed

presto-docs/src/main/sphinx/presto_cpp/properties.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,25 @@ memory use. Ignored if zero.
617617
CPU threshold in % above which the worker is considered overloaded in terms of
618618
CPU use. Ignored if zero.
619619

620+
``worker-overloaded-cooldown-period-sec``
621+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
622+
623+
* **Type:** ``integer``
624+
* **Default value:** ``5``
625+
626+
Specifies how many seconds worker has to be not overloaded (in terms of
627+
memory and CPU) before its status changes to not overloaded.
628+
This is to prevent spiky fluctuation of the overloaded status.
629+
630+
``worker-overloaded-task-queuing-enabled``
631+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
632+
633+
* **Type:** ``boolean``
634+
* **Default value:** ``false``
635+
636+
If true, the worker starts queuing new tasks when overloaded, and
637+
starts them gradually when it stops being overloaded.
638+
620639
Environment Variables As Values For Worker Properties
621640
-----------------------------------------------------
622641

presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,19 +334,23 @@ void PeriodicTaskManager::updateTaskStats() {
334334
kCounterNumTasksBytesProcessed, taskManager_->getBytesProcessed());
335335
RECORD_METRIC_VALUE(
336336
kCounterNumTasksRunning,
337-
taskNumbers[static_cast<int>(velox::exec::TaskState::kRunning)]);
337+
taskNumbers[static_cast<int>(PrestoTaskState::kRunning)]);
338338
RECORD_METRIC_VALUE(
339339
kCounterNumTasksFinished,
340-
taskNumbers[static_cast<int>(velox::exec::TaskState::kFinished)]);
340+
taskNumbers[static_cast<int>(PrestoTaskState::kFinished)]);
341341
RECORD_METRIC_VALUE(
342342
kCounterNumTasksCancelled,
343-
taskNumbers[static_cast<int>(velox::exec::TaskState::kCanceled)]);
343+
taskNumbers[static_cast<int>(PrestoTaskState::kCanceled)]);
344344
RECORD_METRIC_VALUE(
345345
kCounterNumTasksAborted,
346-
taskNumbers[static_cast<int>(velox::exec::TaskState::kAborted)]);
346+
taskNumbers[static_cast<int>(PrestoTaskState::kAborted)]);
347347
RECORD_METRIC_VALUE(
348348
kCounterNumTasksFailed,
349-
taskNumbers[static_cast<int>(velox::exec::TaskState::kFailed)]);
349+
taskNumbers[static_cast<int>(PrestoTaskState::kFailed)]);
350+
RECORD_METRIC_VALUE(
351+
kCounterNumTasksPlanned,
352+
taskNumbers[static_cast<int>(PrestoTaskState::kPlanned)]);
353+
RECORD_METRIC_VALUE(kCounterNumTasksQueued, taskManager_->numQueuedTasks());
350354

351355
const auto driverCounts = taskManager_->getDriverCounts();
352356
RECORD_METRIC_VALUE(kCounterNumQueuedDrivers, driverCounts.numQueuedDrivers);

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,6 +1425,9 @@ void PrestoServer::populateMemAndCPUInfo() {
14251425
poolInfo.reservedBytes += bytes;
14261426
});
14271427
RECORD_METRIC_VALUE(kCounterNumQueryContexts, numContexts);
1428+
RECORD_METRIC_VALUE(
1429+
kCounterMemoryManagerTotalBytes,
1430+
velox::memory::memoryManager()->getTotalBytes());
14281431
cpuMon_.update();
14291432
checkOverload();
14301433
**memoryInfo_.wlock() = std::move(memoryInfo);
@@ -1439,41 +1442,74 @@ void PrestoServer::checkOverload() {
14391442
const auto currentUsedMemoryBytes = (memoryChecker_ != nullptr)
14401443
? memoryChecker_->cachedSystemUsedMemoryBytes()
14411444
: 0;
1442-
const bool isMemOverloaded =
1445+
const bool memOverloaded =
14431446
(currentUsedMemoryBytes > overloadedThresholdMemBytes);
1444-
if (isMemOverloaded) {
1445-
LOG(WARNING) << "Server memory is overloaded. Currently used: "
1447+
if (memOverloaded && !memOverloaded_) {
1448+
LOG(WARNING) << "OVERLOAD: Server memory is overloaded. Currently used: "
14461449
<< velox::succinctBytes(currentUsedMemoryBytes)
1450+
<< ", including tracked: "
1451+
<< velox::succinctBytes(
1452+
velox::memory::memoryManager()->getTotalBytes())
14471453
<< ", threshold: "
14481454
<< velox::succinctBytes(overloadedThresholdMemBytes);
1449-
} else if (isMemOverloaded_) {
1450-
LOG(INFO) << "Server memory is no longer overloaded. Currently used: "
1451-
<< velox::succinctBytes(currentUsedMemoryBytes)
1452-
<< ", threshold: "
1453-
<< velox::succinctBytes(overloadedThresholdMemBytes);
1455+
} else if (!memOverloaded && memOverloaded_) {
1456+
LOG(INFO)
1457+
<< "OVERLOAD: Server memory is no longer overloaded. Currently used: "
1458+
<< velox::succinctBytes(currentUsedMemoryBytes)
1459+
<< ", including tracked: "
1460+
<< velox::succinctBytes(
1461+
velox::memory::memoryManager()->getTotalBytes())
1462+
<< ", threshold: "
1463+
<< velox::succinctBytes(overloadedThresholdMemBytes);
14541464
}
1455-
RECORD_METRIC_VALUE(kCounterOverloadedMem, isMemOverloaded ? 100 : 0);
1456-
isMemOverloaded_ = isMemOverloaded;
1465+
RECORD_METRIC_VALUE(kCounterOverloadedMem, memOverloaded ? 100 : 0);
1466+
memOverloaded_ = memOverloaded;
14571467
}
14581468

14591469
const auto overloadedThresholdCpuPct =
14601470
systemConfig->workerOverloadedThresholdCpuPct();
14611471
if (overloadedThresholdCpuPct > 0) {
14621472
const auto currentUsedCpuPct = cpuMon_.getCPULoadPct();
1463-
const bool isCpuOverloaded =
1464-
(currentUsedCpuPct > overloadedThresholdCpuPct);
1465-
if (isCpuOverloaded) {
1466-
LOG(WARNING) << "Server CPU is overloaded. Currently used: "
1473+
const bool cpuOverloaded = (currentUsedCpuPct > overloadedThresholdCpuPct);
1474+
if (cpuOverloaded && !cpuOverloaded_) {
1475+
LOG(WARNING) << "OVERLOAD: Server CPU is overloaded. Currently used: "
14671476
<< currentUsedCpuPct
14681477
<< "%, threshold: " << overloadedThresholdCpuPct << "%";
1469-
} else if (isCpuOverloaded_) {
1470-
LOG(INFO) << "Server CPU is no longer overloaded. Currently used: "
1471-
<< currentUsedCpuPct
1472-
<< "%, threshold: " << overloadedThresholdCpuPct << "%";
1478+
} else if (!cpuOverloaded && cpuOverloaded_) {
1479+
LOG(INFO)
1480+
<< "OVERLOAD: Server CPU is no longer overloaded. Currently used: "
1481+
<< currentUsedCpuPct << "%, threshold: " << overloadedThresholdCpuPct
1482+
<< "%";
14731483
}
1474-
RECORD_METRIC_VALUE(kCounterOverloadedCpu, isCpuOverloaded ? 100 : 0);
1475-
isCpuOverloaded_ = isCpuOverloaded;
1484+
RECORD_METRIC_VALUE(kCounterOverloadedCpu, cpuOverloaded ? 100 : 0);
1485+
cpuOverloaded_ = cpuOverloaded;
14761486
}
1487+
1488+
// Determine if the server is overloaded. We require both memory and CPU to be
1489+
// not overloaded for some time (continuous period) to consider the server as
1490+
// not overloaded.
1491+
const uint64_t currentTimeSecs = velox::getCurrentTimeSec();
1492+
if (memOverloaded_ || cpuOverloaded_) {
1493+
lastOverloadedTimeInSecs_ = currentTimeSecs;
1494+
}
1495+
VELOX_CHECK_GE(currentTimeSecs, lastOverloadedTimeInSecs_);
1496+
const bool serverOverloaded =
1497+
((cpuOverloaded_ || memOverloaded_) ||
1498+
((currentTimeSecs - lastOverloadedTimeInSecs_) <
1499+
systemConfig->workerOverloadedCooldownPeriodSec()));
1500+
1501+
if (serverOverloaded && !serverOverloaded_) {
1502+
LOG(WARNING) << "OVERLOAD: Server is overloaded.";
1503+
} else if (!serverOverloaded && serverOverloaded_) {
1504+
LOG(INFO) << "OVERLOAD: Server is no longer overloaded.";
1505+
}
1506+
RECORD_METRIC_VALUE(kCounterOverloaded, serverOverloaded ? 100 : 0);
1507+
serverOverloaded_ = serverOverloaded;
1508+
1509+
if (systemConfig->workerOverloadedTaskQueuingEnabled()) {
1510+
taskManager_->setServerOverloaded(serverOverloaded_);
1511+
}
1512+
taskManager_->maybeStartNextQueuedTask();
14771513
}
14781514

14791515
static protocol::Duration getUptime(

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,16 @@ class PrestoServer {
275275
std::unique_ptr<PeriodicTaskManager> periodicTaskManager_;
276276
std::unique_ptr<PrestoServerOperations> prestoServerOperations_;
277277
std::unique_ptr<PeriodicMemoryChecker> memoryChecker_;
278-
bool isMemOverloaded_{false};
279-
bool isCpuOverloaded_{false};
278+
279+
// Last known memory overloaded status.
280+
bool memOverloaded_{false};
281+
// Last known CPU overloaded status.
282+
bool cpuOverloaded_{false};
283+
// Current worker overloaded status. It can still be true when memory and CPU
284+
// overloaded flags are not due to cooldown period.
285+
bool serverOverloaded_{false};
286+
// Last time point (in seconds) when the worker was overloaded.
287+
uint64_t lastOverloadedTimeInSecs_{0};
280288

281289
// We update these members asynchronously and return in http requests w/o
282290
// delay.

presto-native-execution/presto_cpp/main/PrestoTask.cpp

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,25 @@ using namespace facebook::velox;
2424

2525
namespace facebook::presto {
2626

27+
std::string prestoTaskStateString(PrestoTaskState state) {
28+
switch (state) {
29+
case PrestoTaskState::kRunning:
30+
return "Running";
31+
case PrestoTaskState::kFinished:
32+
return "Finished";
33+
case PrestoTaskState::kCanceled:
34+
return "Canceled";
35+
case PrestoTaskState::kAborted:
36+
return "Aborted";
37+
case PrestoTaskState::kFailed:
38+
return "Failed";
39+
case PrestoTaskState::kPlanned:
40+
return "Planned";
41+
default:
42+
return fmt::format("UNKNOWN[{}]", static_cast<int>(state));
43+
}
44+
}
45+
2746
namespace {
2847

2948
#define TASK_STATS_SUM(taskStats, statsName, taskStatusSum) \
@@ -37,7 +56,25 @@ namespace {
3756
} \
3857
} while (0)
3958

40-
protocol::TaskState toPrestoTaskState(exec::TaskState state) {
59+
// Convert Velox task state to Presto task state.
60+
PrestoTaskState toPrestoTaskState(exec::TaskState state) {
61+
switch (state) {
62+
case exec::TaskState::kRunning:
63+
return PrestoTaskState::kRunning;
64+
case exec::TaskState::kFinished:
65+
return PrestoTaskState::kFinished;
66+
case exec::TaskState::kCanceled:
67+
return PrestoTaskState::kCanceled;
68+
case exec::TaskState::kAborted:
69+
return PrestoTaskState::kAborted;
70+
case exec::TaskState::kFailed:
71+
return PrestoTaskState::kFailed;
72+
}
73+
// Should not be here.
74+
return PrestoTaskState::kAborted;
75+
}
76+
77+
protocol::TaskState toProtocolTaskState(exec::TaskState state) {
4178
switch (state) {
4279
case exec::TaskState::kRunning:
4380
return protocol::TaskState::RUNNING;
@@ -466,6 +503,21 @@ PrestoTask::PrestoTask(
466503
createTimeMs = getCurrentTimeMs();
467504
}
468505

506+
PrestoTaskState PrestoTask::taskState() const {
507+
if (task != nullptr) {
508+
const auto prestoTaskState = toPrestoTaskState(task->state());
509+
// Velox Task is created with 'Running' state even though it is not running
510+
// until start() is called. Here we check for this and return 'Planned'
511+
// state if it is the case.
512+
if (prestoTaskState == PrestoTaskState::kRunning && !taskStarted) {
513+
return PrestoTaskState::kPlanned;
514+
}
515+
return prestoTaskState;
516+
}
517+
// Fallback to 'aborted' if there is no Velox task.
518+
return PrestoTaskState::kAborted;
519+
}
520+
469521
void PrestoTask::updateHeartbeatLocked() {
470522
lastHeartbeatMs = velox::getCurrentTimeMs();
471523
info.lastHeartbeatInMillis = lastHeartbeatMs;
@@ -527,7 +579,7 @@ protocol::TaskStatus PrestoTask::updateStatusLocked() {
527579

528580
const auto veloxTaskStats = task->taskStats();
529581

530-
info.taskStatus.state = toPrestoTaskState(task->state());
582+
info.taskStatus.state = toProtocolTaskState(task->state());
531583

532584
// Presto has a Driver per split. When splits represent partitions
533585
// of data, there is a queue of them per Task. We represent
@@ -856,14 +908,14 @@ void PrestoTask::updateExecutionInfoLocked(
856908
}
857909

858910
/*static*/ std::string PrestoTask::taskStatesToString(
859-
const std::array<size_t, 5>& taskStates) {
911+
const std::array<size_t, 6>& taskStates) {
860912
std::string str;
861913
for (size_t i = 0; i < taskStates.size(); ++i) {
862914
if (taskStates[i] != 0) {
863915
folly::toAppend(
864916
fmt::format(
865917
"{}={} ",
866-
velox::exec::taskStateString(velox::exec::TaskState(i)),
918+
prestoTaskStateString(PrestoTaskState(i)),
867919
taskStates[i]),
868920
&str);
869921
}

presto-native-execution/presto_cpp/main/PrestoTask.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@ struct RuntimeMetric;
2525

2626
namespace facebook::presto {
2727

28+
/// Velox Task does not have Planned state, so we add this enum to have this
29+
/// state.
30+
enum class PrestoTaskState : int {
31+
kRunning = 0,
32+
kFinished = 1,
33+
kCanceled = 2,
34+
kAborted = 3,
35+
kFailed = 4,
36+
kPlanned = 5
37+
};
38+
39+
std::string prestoTaskStateString(PrestoTaskState state);
40+
2841
template <typename T>
2942
struct PromiseHolder {
3043
explicit PromiseHolder(folly::Promise<T> p) : promise(std::move(p)) {}
@@ -139,6 +152,10 @@ struct PrestoTask {
139152
const std::string& nodeId,
140153
long startProcessCpuTime = 0);
141154

155+
/// Returns current task state, including 'planning'.
156+
/// If Velox task is null, it returns 'aborted'.
157+
PrestoTaskState taskState() const;
158+
142159
/// Updates when this task was touched last time.
143160
void updateHeartbeatLocked();
144161

@@ -169,7 +186,7 @@ struct PrestoTask {
169186

170187
/// Turns the task numbers (per state) into a string.
171188
static std::string taskStatesToString(
172-
const std::array<size_t, 5>& taskStates);
189+
const std::array<size_t, 6>& taskStates);
173190

174191
/// Invoked to update presto task status from the updated velox task stats.
175192
protocol::TaskStatus updateStatusLocked();

0 commit comments

Comments
 (0)