further improvements to the workerqueue
This commit is contained in:
parent
bee6ad5916
commit
8f336bffc2
1 changed files with 75 additions and 29 deletions
|
@ -86,13 +86,21 @@ function poller_run($argv, $argc){
|
||||||
|
|
||||||
// If we got that queue entry we claim it for us
|
// If we got that queue entry we claim it for us
|
||||||
if (!poller_claim_process($r[0])) {
|
if (!poller_claim_process($r[0])) {
|
||||||
|
dba::unlock();
|
||||||
continue;
|
continue;
|
||||||
|
} else {
|
||||||
|
// Fetch all workerqueue data while the table is still locked
|
||||||
|
// This is redundant, but this speeds up the processing
|
||||||
|
$entries = poller_total_entries();
|
||||||
|
$top_priority = poller_highest_priority();
|
||||||
|
$high_running = poller_process_with_priority_active($top_priority);
|
||||||
|
dba::unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
|
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
|
||||||
if (Lock::set('poller_worker', 0)) {
|
if (Lock::set('poller_worker', 0)) {
|
||||||
// Count active workers and compare them with a maximum value that depends on the load
|
// Count active workers and compare them with a maximum value that depends on the load
|
||||||
if (poller_too_much_workers()) {
|
if (poller_too_much_workers($entries, $top_priority, $high_running)) {
|
||||||
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
|
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -120,6 +128,39 @@ function poller_run($argv, $argc){
|
||||||
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
|
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Returns the number of non executed entries in the worker queue
|
||||||
|
*
|
||||||
|
* @return integer Number of non executed entries in the worker queue
|
||||||
|
*/
|
||||||
|
function poller_total_entries() {
|
||||||
|
$s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
|
||||||
|
return $s[0]["total"];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Returns the highest priority in the worker queue that isn't executed
|
||||||
|
*
|
||||||
|
* @return integer Number of active poller processes
|
||||||
|
*/
|
||||||
|
function poller_highest_priority() {
|
||||||
|
$s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
|
||||||
|
return $s[0]["priority"];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Returns if a process with the given priority is running
|
||||||
|
*
|
||||||
|
* @param integer $priority The priority that should be checked
|
||||||
|
*
|
||||||
|
* @return integer Is there a process running with that priority?
|
||||||
|
*/
|
||||||
|
function poller_process_with_priority_active($priority) {
|
||||||
|
$s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
|
||||||
|
intval($priority), dbesc(NULL_DATE));
|
||||||
|
return dbm::is_result($s);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Execute a worker entry
|
* @brief Execute a worker entry
|
||||||
*
|
*
|
||||||
|
@ -421,9 +462,13 @@ function poller_kill_stale_workers() {
|
||||||
/**
|
/**
|
||||||
* @brief Checks if the number of active workers exceeds the given limits
|
* @brief Checks if the number of active workers exceeds the given limits
|
||||||
*
|
*
|
||||||
|
* @param integer $entries The number of not executed entries in the worker queue
|
||||||
|
* @param integer $top_priority The highest not executed priority in the worker queue
|
||||||
|
* @param boolean $high_running Is a process with priority "$top_priority" running?
|
||||||
|
*
|
||||||
* @return bool Are there too much workers running?
|
* @return bool Are there too much workers running?
|
||||||
*/
|
*/
|
||||||
function poller_too_much_workers() {
|
function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) {
|
||||||
$queues = Config::get("system", "worker_queues", 4);
|
$queues = Config::get("system", "worker_queues", 4);
|
||||||
|
|
||||||
$maxqueues = $queues;
|
$maxqueues = $queues;
|
||||||
|
@ -442,39 +487,40 @@ function poller_too_much_workers() {
|
||||||
$slope = $maxworkers / pow($maxsysload, $exponent);
|
$slope = $maxworkers / pow($maxsysload, $exponent);
|
||||||
$queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
|
$queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
|
||||||
|
|
||||||
// Create a list of queue entries grouped by their priority
|
if (Config::get('system', 'worker_debug')) {
|
||||||
$listitem = array();
|
// Create a list of queue entries grouped by their priority
|
||||||
|
$listitem = array();
|
||||||
|
|
||||||
// Adding all processes with no workerqueue entry
|
// Adding all processes with no workerqueue entry
|
||||||
$processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
|
$processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
|
||||||
if ($process = dba::fetch($processes)) {
|
|
||||||
$listitem[0] = "0:".$process["running"];
|
|
||||||
}
|
|
||||||
dba::close($processes);
|
|
||||||
|
|
||||||
// Now adding all processes with workerqueue entries
|
|
||||||
$entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
|
|
||||||
while ($entry = dba::fetch($entries)) {
|
|
||||||
$processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
|
|
||||||
if ($process = dba::fetch($processes)) {
|
if ($process = dba::fetch($processes)) {
|
||||||
$listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
|
$listitem[0] = "0:".$process["running"];
|
||||||
}
|
}
|
||||||
dba::close($processes);
|
dba::close($processes);
|
||||||
|
|
||||||
|
// Now adding all processes with workerqueue entries
|
||||||
|
$entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
|
||||||
|
while ($entry = dba::fetch($entries)) {
|
||||||
|
$processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
|
||||||
|
if ($process = dba::fetch($processes)) {
|
||||||
|
$listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
|
||||||
|
}
|
||||||
|
dba::close($processes);
|
||||||
|
}
|
||||||
|
dba::close($entries);
|
||||||
|
$processlist = ' ('.implode(', ', $listitem).')';
|
||||||
}
|
}
|
||||||
dba::close($entries);
|
|
||||||
$processlist = ' ('.implode(', ', $listitem).')';
|
|
||||||
|
|
||||||
$s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
|
|
||||||
$entries = $s[0]["total"];
|
|
||||||
|
|
||||||
|
if (is_null($entries)) {
|
||||||
|
$entries = poller_total_entries();
|
||||||
|
}
|
||||||
if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) {
|
if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) {
|
||||||
$s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
|
if (is_null($top_priority)) {
|
||||||
$top_priority = $s[0]["priority"];
|
$top_priority = poller_highest_priority();
|
||||||
|
}
|
||||||
$s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
|
if (is_null($high_running)) {
|
||||||
intval($top_priority), dbesc(NULL_DATE));
|
$high_running = poller_process_with_priority_active($top_priority);
|
||||||
$high_running = dbm::is_result($s);
|
}
|
||||||
|
|
||||||
if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) {
|
if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) {
|
||||||
logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
|
logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
|
||||||
$queues = $active + 1;
|
$queues = $active + 1;
|
||||||
|
@ -620,7 +666,7 @@ function poller_claim_process($queue) {
|
||||||
|
|
||||||
$success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
|
$success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
|
||||||
array('id' => $queue["id"], 'pid' => 0));
|
array('id' => $queue["id"], 'pid' => 0));
|
||||||
dba::unlock();
|
//dba::unlock();
|
||||||
|
|
||||||
if (!$success) {
|
if (!$success) {
|
||||||
logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
|
logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
|
||||||
|
|
Loading…
Reference in a new issue