From 8f336bffc24a636462f770ab91661436cf85ef53 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 8 Jun 2017 20:43:30 +0000 Subject: [PATCH] further improvements to the workerqueue --- include/poller.php | 104 ++++++++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 29 deletions(-) diff --git a/include/poller.php b/include/poller.php index fcabe5d8ee..1588cecf20 100644 --- a/include/poller.php +++ b/include/poller.php @@ -86,13 +86,21 @@ function poller_run($argv, $argc){ // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { + dba::unlock(); continue; + } else { + // Fetch all workerqueue data while the table is still locked + // This is redundant, but this speeds up the processing + $entries = poller_total_entries(); + $top_priority = poller_highest_priority(); + $high_running = poller_process_with_priority_active($top_priority); + dba::unlock(); } // To avoid the quitting of multiple pollers only one poller at a time will execute the check if (Lock::set('poller_worker', 0)) { // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers()) { + if (poller_too_much_workers($entries, $top_priority, $high_running)) { logger('Active worker limit reached, quitting.', LOGGER_DEBUG); return; } @@ -120,6 +128,39 @@ function poller_run($argv, $argc){ logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } +/** + * @brief Returns the number of non executed entries in the worker queue + * + * @return integer Number of non executed entries in the worker queue + */ +function poller_total_entries() { + $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); + return $s[0]["total"]; +} + +/** + * @brief Returns the highest priority in the worker queue that isn't executed + * + * @return integer Number of active poller processes + */ +function poller_highest_priority() { + $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE)); + return $s[0]["priority"]; +} + +/** + * @brief Returns if a process with the given priority is running + * + * @param integer $priority The priority that should be checked + * + * @return integer Is there a process running with that priority? + */ +function poller_process_with_priority_active($priority) { + $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1", + intval($priority), dbesc(NULL_DATE)); + return dbm::is_result($s); +} + /** * @brief Execute a worker entry * @@ -421,9 +462,13 @@ function poller_kill_stale_workers() { /** * @brief Checks if the number of active workers exceeds the given limits * + * @param integer $entries The number of not executed entries in the worker queue + * @param integer $top_priority The highest not executed priority in the worker queue + * @param boolean $high_running Is a process with priority "$top_priority" running? + * * @return bool Are there too much workers running? */ -function poller_too_much_workers() { +function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) { $queues = Config::get("system", "worker_queues", 4); $maxqueues = $queues; @@ -442,39 +487,40 @@ function poller_too_much_workers() { $slope = $maxworkers / pow($maxsysload, $exponent); $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent)); - // Create a list of queue entries grouped by their priority - $listitem = array(); + if (Config::get('system', 'worker_debug')) { + // Create a list of queue entries grouped by their priority + $listitem = array(); - // Adding all processes with no workerqueue entry - $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)"); - if ($process = dba::fetch($processes)) { - $listitem[0] = "0:".$process["running"]; - } - dba::close($processes); - - // Now adding all processes with workerqueue entries - $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`"); - while ($entry = dba::fetch($entries)) { - $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]); + // Adding all processes with no workerqueue entry + $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)"); if ($process = dba::fetch($processes)) { - $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; + $listitem[0] = "0:".$process["running"]; } dba::close($processes); + + // Now adding all processes with workerqueue entries + $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`"); + while ($entry = dba::fetch($entries)) { + $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]); + if ($process = dba::fetch($processes)) { + $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; + } + dba::close($processes); + } + dba::close($entries); + $processlist = ' ('.implode(', ', $listitem).')'; } - dba::close($entries); - $processlist = ' ('.implode(', ', $listitem).')'; - - $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); - $entries = $s[0]["total"]; + if (is_null($entries)) { + $entries = poller_total_entries(); + } if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) { - $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE)); - $top_priority = $s[0]["priority"]; - - $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1", - intval($top_priority), dbesc(NULL_DATE)); - $high_running = dbm::is_result($s); - + if (is_null($top_priority)) { + $top_priority = poller_highest_priority(); + } + if (is_null($high_running)) { + $high_running = poller_process_with_priority_active($top_priority); + } if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) { logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG); $queues = $active + 1; @@ -620,7 +666,7 @@ function poller_claim_process($queue) { $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), array('id' => $queue["id"], 'pid' => 0)); - dba::unlock(); + //dba::unlock(); if (!$success) { logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);