More checks for strange priorities
This commit is contained in:
parent
19c1b31ab2
commit
f7b6507438
1 changed files with 23 additions and 19 deletions
|
@ -107,6 +107,10 @@ class Worker
|
||||||
foreach ($r as $entry) {
|
foreach ($r as $entry) {
|
||||||
// Assure that the priority is an integer value
|
// Assure that the priority is an integer value
|
||||||
$entry['priority'] = (int)$entry['priority'];
|
$entry['priority'] = (int)$entry['priority'];
|
||||||
|
if (!in_array($entry['priority'], PRIORITIES)) {
|
||||||
|
Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
|
||||||
|
$entry['priority'] = PRIORITY_MEDIUM;
|
||||||
|
}
|
||||||
|
|
||||||
// The work will be done
|
// The work will be done
|
||||||
if (!self::execute($entry)) {
|
if (!self::execute($entry)) {
|
||||||
|
@ -261,7 +265,7 @@ class Worker
|
||||||
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
|
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
if (DBA::isResult($workerqueue)) {
|
if (DBA::isResult($workerqueue)) {
|
||||||
return $workerqueue["priority"];
|
return $workerqueue['priority'];
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -466,13 +470,13 @@ class Worker
|
||||||
|
|
||||||
$cooldown = DI::config()->get("system", "worker_cooldown", 0);
|
$cooldown = DI::config()->get("system", "worker_cooldown", 0);
|
||||||
if ($cooldown > 0) {
|
if ($cooldown > 0) {
|
||||||
Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
|
Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
|
||||||
sleep($cooldown);
|
sleep($cooldown);
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger::enableWorker($funcname);
|
Logger::enableWorker($funcname);
|
||||||
|
|
||||||
Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]);
|
Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]);
|
||||||
|
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
|
|
||||||
|
@ -529,21 +533,21 @@ class Worker
|
||||||
self::$lock_duration = 0;
|
self::$lock_duration = 0;
|
||||||
|
|
||||||
if ($duration > 3600) {
|
if ($duration > 3600) {
|
||||||
Logger::info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
} elseif ($duration > 600) {
|
} elseif ($duration > 600) {
|
||||||
Logger::info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
} elseif ($duration > 300) {
|
} elseif ($duration > 300) {
|
||||||
Logger::info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
} elseif ($duration > 120) {
|
} elseif ($duration > 120) {
|
||||||
Logger::info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger::info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
|
Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
|
||||||
|
|
||||||
DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
|
DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
|
||||||
|
|
||||||
if ($cooldown > 0) {
|
if ($cooldown > 0) {
|
||||||
Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
|
Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
|
||||||
sleep($cooldown);
|
sleep($cooldown);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -661,14 +665,14 @@ class Worker
|
||||||
} else {
|
} else {
|
||||||
// Kill long running processes
|
// Kill long running processes
|
||||||
// Check if the priority is in a valid range
|
// Check if the priority is in a valid range
|
||||||
if (!in_array($entry["priority"], PRIORITIES)) {
|
if (!in_array($entry['priority'], PRIORITIES)) {
|
||||||
Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
|
Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
|
||||||
$entry["priority"] = PRIORITY_MEDIUM;
|
$entry['priority'] = PRIORITY_MEDIUM;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Define the maximum durations
|
// Define the maximum durations
|
||||||
$max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
|
$max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
|
||||||
$max_duration = $max_duration_defaults[$entry["priority"]];
|
$max_duration = $max_duration_defaults[$entry['priority']];
|
||||||
|
|
||||||
$argv = json_decode($entry['parameter'], true);
|
$argv = json_decode($entry['parameter'], true);
|
||||||
if (!empty($entry['command'])) {
|
if (!empty($entry['command'])) {
|
||||||
|
@ -690,12 +694,12 @@ class Worker
|
||||||
// We killed the stale process.
|
// We killed the stale process.
|
||||||
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
|
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
|
||||||
// Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
|
// Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
|
||||||
$new_priority = $entry["priority"];
|
$new_priority = $entry['priority'];
|
||||||
if ($entry["priority"] == PRIORITY_HIGH) {
|
if ($entry['priority'] == PRIORITY_HIGH) {
|
||||||
$new_priority = PRIORITY_MEDIUM;
|
$new_priority = PRIORITY_MEDIUM;
|
||||||
} elseif ($entry["priority"] == PRIORITY_MEDIUM) {
|
} elseif ($entry['priority'] == PRIORITY_MEDIUM) {
|
||||||
$new_priority = PRIORITY_LOW;
|
$new_priority = PRIORITY_LOW;
|
||||||
} elseif ($entry["priority"] != PRIORITY_CRITICAL) {
|
} elseif ($entry['priority'] != PRIORITY_CRITICAL) {
|
||||||
$new_priority = PRIORITY_NEGLIGIBLE;
|
$new_priority = PRIORITY_NEGLIGIBLE;
|
||||||
}
|
}
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
|
@ -779,12 +783,12 @@ class Worker
|
||||||
self::$db_duration_stat += (microtime(true) - $stamp);
|
self::$db_duration_stat += (microtime(true) - $stamp);
|
||||||
while ($entry = DBA::fetch($jobs)) {
|
while ($entry = DBA::fetch($jobs)) {
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$running = DBA::count('workerqueue-view', ['priority' => $entry["priority"]]);
|
$running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]);
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
self::$db_duration_stat += (microtime(true) - $stamp);
|
self::$db_duration_stat += (microtime(true) - $stamp);
|
||||||
$idle_workers -= $running;
|
$idle_workers -= $running;
|
||||||
$waiting_processes += $entry["entries"];
|
$waiting_processes += $entry["entries"];
|
||||||
$listitem[$entry["priority"]] = $entry["priority"] . ":" . $running . "/" . $entry["entries"];
|
$listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"];
|
||||||
}
|
}
|
||||||
DBA::close($jobs);
|
DBA::close($jobs);
|
||||||
} else {
|
} else {
|
||||||
|
@ -796,7 +800,7 @@ class Worker
|
||||||
|
|
||||||
while ($entry = DBA::fetch($jobs)) {
|
while ($entry = DBA::fetch($jobs)) {
|
||||||
$idle_workers -= $entry["running"];
|
$idle_workers -= $entry["running"];
|
||||||
$listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"];
|
$listitem[$entry['priority']] = $entry['priority'].":".$entry["running"];
|
||||||
}
|
}
|
||||||
DBA::close($jobs);
|
DBA::close($jobs);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue