Degrade priority step by step
This commit is contained in:
parent
7726353601
commit
0cd9db9cb7
1 changed files with 25 additions and 22 deletions
|
@ -237,7 +237,9 @@ function poller_execute($queue) {
|
||||||
$poller_last_update = time();
|
$poller_last_update = time();
|
||||||
|
|
||||||
if ($age > 1) {
|
if ($age > 1) {
|
||||||
|
$stamp = (float)microtime(true);
|
||||||
dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false));
|
dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false));
|
||||||
|
$poller_db_duration += (microtime(true) - $stamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
poller_exec_function($queue, $funcname, $argv);
|
poller_exec_function($queue, $funcname, $argv);
|
||||||
|
@ -468,46 +470,47 @@ function poller_max_connections_reached() {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
function poller_kill_stale_workers() {
|
function poller_kill_stale_workers() {
|
||||||
$r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE));
|
$entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0", NULL_DATE);
|
||||||
|
|
||||||
if (!dbm::is_result($r)) {
|
while ($entry = dba::fetch($entries)) {
|
||||||
// No processing here needed
|
if (!posix_kill($entry["pid"], 0)) {
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($r AS $pid) {
|
|
||||||
if (!posix_kill($pid["pid"], 0)) {
|
|
||||||
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
|
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
|
||||||
array('pid' => $pid["pid"], 'done' => false));
|
array('pid' => $entry["pid"], 'done' => false));
|
||||||
} else {
|
} else {
|
||||||
// Kill long running processes
|
// Kill long running processes
|
||||||
|
|
||||||
// Check if the priority is in a valid range
|
// Check if the priority is in a valid range
|
||||||
if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
|
if (!in_array($entry["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
|
||||||
$pid["priority"] = PRIORITY_MEDIUM;
|
$entry["priority"] = PRIORITY_MEDIUM;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Define the maximum durations
|
// Define the maximum durations
|
||||||
$max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
|
$max_duration_defaults = array(PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720);
|
||||||
$max_duration = $max_duration_defaults[$pid["priority"]];
|
$max_duration = $max_duration_defaults[$entry["priority"]];
|
||||||
|
|
||||||
$argv = json_decode($pid["parameter"]);
|
$argv = json_decode($entry["parameter"]);
|
||||||
$argv[0] = basename($argv[0]);
|
$argv[0] = basename($argv[0]);
|
||||||
|
|
||||||
// How long is the process already running?
|
// How long is the process already running?
|
||||||
$duration = (time() - strtotime($pid["executed"])) / 60;
|
$duration = (time() - strtotime($entry["executed"])) / 60;
|
||||||
if ($duration > $max_duration) {
|
if ($duration > $max_duration) {
|
||||||
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
|
logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
|
||||||
posix_kill($pid["pid"], SIGTERM);
|
posix_kill($entry["pid"], SIGTERM);
|
||||||
|
|
||||||
// We killed the stale process.
|
// We killed the stale process.
|
||||||
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
|
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
|
||||||
// Additionally we are lowering the priority.
|
// Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
|
||||||
|
if ($entry["priority"] == PRIORITY_HIGH) {
|
||||||
|
$new_priority = PRIORITY_MEDIUM;
|
||||||
|
} elseif ($entry["priority"] == PRIORITY_MEDIUM) {
|
||||||
|
$new_priority = PRIORITY_LOW;
|
||||||
|
} elseif ($entry["priority"] != PRIORITY_CRITICAL) {
|
||||||
|
$new_priority = PRIORITY_NEGLIGIBLE;
|
||||||
|
}
|
||||||
dba::update('workerqueue',
|
dba::update('workerqueue',
|
||||||
array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
|
array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => $new_priority, 'pid' => 0),
|
||||||
array('pid' => $pid["pid"], 'done' => false));
|
array('pid' => $entry["pid"], 'done' => false));
|
||||||
} else {
|
} else {
|
||||||
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
|
logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue