diff --git a/include/poller.php b/include/poller.php index bcdad586eb..934641c13c 100644 --- a/include/poller.php +++ b/include/poller.php @@ -237,7 +237,9 @@ function poller_execute($queue) { $poller_last_update = time(); if ($age > 1) { + $stamp = (float)microtime(true); dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false)); + $poller_db_duration += (microtime(true) - $stamp); } poller_exec_function($queue, $funcname, $argv); @@ -468,46 +470,47 @@ function poller_max_connections_reached() { * */ function poller_kill_stale_workers() { - $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE)); + $entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0", NULL_DATE); - if (!dbm::is_result($r)) { - // No processing here needed - return; - } - - foreach ($r AS $pid) { - if (!posix_kill($pid["pid"], 0)) { + while ($entry = dba::fetch($entries)) { + if (!posix_kill($entry["pid"], 0)) { dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), - array('pid' => $pid["pid"], 'done' => false)); + array('pid' => $entry["pid"], 'done' => false)); } else { // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) { - $pid["priority"] = PRIORITY_MEDIUM; + if (!in_array($entry["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) { + $entry["priority"] = PRIORITY_MEDIUM; } // Define the maximum durations - $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360); - $max_duration = $max_duration_defaults[$pid["priority"]]; + $max_duration_defaults = array(PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720); + $max_duration = $max_duration_defaults[$entry["priority"]]; - $argv = json_decode($pid["parameter"]); + $argv = json_decode($entry["parameter"]); $argv[0] = basename($argv[0]); // How long is the process already running? - $duration = (time() - strtotime($pid["executed"])) / 60; + $duration = (time() - strtotime($entry["executed"])) / 60; if ($duration > $max_duration) { - logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now."); - posix_kill($pid["pid"], SIGTERM); + logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now."); + posix_kill($entry["pid"], SIGTERM); // We killed the stale process. // To avoid a blocking situation we reschedule the process at the beginning of the queue. - // Additionally we are lowering the priority. + // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) + if ($entry["priority"] == PRIORITY_HIGH) { + $new_priority = PRIORITY_MEDIUM; + } elseif ($entry["priority"] == PRIORITY_MEDIUM) { + $new_priority = PRIORITY_LOW; + } elseif ($entry["priority"] != PRIORITY_CRITICAL) { + $new_priority = PRIORITY_NEGLIGIBLE; + } dba::update('workerqueue', - array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0), - array('pid' => $pid["pid"], 'done' => false)); + array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => $new_priority, 'pid' => 0), + array('pid' => $entry["pid"], 'done' => false)); } else { - logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); + logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); } } }