From 5e81c105e0070f5cf409f9dac2ec1210fede6ee8 Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Sun, 24 Jul 2016 20:25:11 +0200 Subject: [PATCH] We now work with the "priority" field - that's better --- boot.php | 21 +++++++++++++--- include/poller.php | 62 +++++++++++----------------------------------- 2 files changed, 32 insertions(+), 51 deletions(-) diff --git a/boot.php b/boot.php index 298010dd0d..056e47b7d2 100644 --- a/boot.php +++ b/boot.php @@ -1786,12 +1786,27 @@ function proc_run($cmd){ $found = q("SELECT `id` FROM `workerqueue` WHERE `parameter` = '%s'", dbesc($parameters)); + $funcname = str_replace(".php", "", basename($argv[0]))."_run"; + + // Define the processes that have priority over any other process + /// @todo Better check for priority processes + $high_priority = array("delivery_run", "notifier_run", "pubsubpublish_run"); + $low_priority = array("queue_run", "gprobe_run", "discover_poco_run"); + + if (in_array($funcname, $high_priority)) + $priority = 1; + elseif (in_array($funcname, $low_priority)) + $priority = 3; + else + $priority = 2; + if (!$found) - q("INSERT INTO `workerqueue` (`parameter`, `created`, `priority`) - VALUES ('%s', '%s', %d)", + q("INSERT INTO `workerqueue` (`function`, `parameter`, `created`, `priority`) + VALUES ('%s', '%s', '%s', %d)", + dbesc($funcname), dbesc($parameters), dbesc(datetime_convert()), - intval(0)); + intval($priority)); // Should we quit and wait for the poller to be called as a cronjob? if (get_config("system", "worker_dont_fork")) diff --git a/include/poller.php b/include/poller.php index f71872c579..ec6653b488 100644 --- a/include/poller.php +++ b/include/poller.php @@ -39,7 +39,7 @@ function poller_run(&$argv, &$argc){ return; // Checking the number of workers - if (poller_too_much_workers($entries, $queues, $maxqueues, $active)) { + if (poller_too_much_workers()) { poller_kill_stale_workers(); return; } @@ -58,16 +58,14 @@ function poller_run(&$argv, &$argc){ sleep(4); // Checking number of workers - if (poller_too_much_workers($entries, $queues, $maxqueues, $active)) + if (poller_too_much_workers()) return; $cooldown = Config::get("system", "worker_cooldown", 0); $starttime = time(); - $delayed = 0; - - while ($r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `created` LIMIT 1")) { + while ($r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority`, `created` LIMIT 1")) { // Constantly check the number of parallel database processes if ($a->max_processes_reached()) @@ -78,46 +76,9 @@ function poller_run(&$argv, &$argc){ return; // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers($entries, $queues, $maxqueues, $active)) + if (poller_too_much_workers()) return; - $argv = json_decode($r[0]["parameter"]); - - $argc = count($argv); - - $funcname = str_replace(".php", "", basename($argv[0]))."_run"; - - // Define the processes that have priority over any other process - /// @todo Better check for priority processes - $priority = array("delivery_run", "notifier_run", "pubsubpublish_run"); - - // Reserve a speed lane - if (($active == ($queues - 1)) AND ($maxqueues >= 2) AND !in_array($funcname, $priority)) { - logger("Delay call to '".$funcname."' to reserve a speed lane for high priority processes", LOGGER_DEBUG); - $delay = true; - } - - // Delay other processes if the system has a high load and there are many pending processes. - // The "delayed" value is a safety mechanism for the case when there are no high priority processes in the queue. - if (!$delay AND (($queues / $maxqueues) <= 0.5) AND (($entries > 100) AND ($delayed < 100)) AND !in_array($funcname, $priority)) { - // Count the number of delayed processes - ++$delayed; - logger("Delay call to '".$funcname."' for performance reasons (Delay ".$delayed.")", LOGGER_DEBUG); - $delay = true; - } - - if ($delay) { - q("UPDATE `workerqueue` SET `created` = '%s' WHERE `id` = %d", - dbesc(datetime_convert()), - intval($r[0]["id"])); - continue; - } - - // If we delivered a process that has high priority we reset the delayed counter. - // When we reached the limit we will process any entry until we reach a high priority process. - if (in_array($funcname, $priority)) - $delayed = 0; - q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'", dbesc(datetime_convert()), intval(getmypid()), @@ -132,6 +93,10 @@ function poller_run(&$argv, &$argc){ continue; } + $argv = json_decode($r[0]["parameter"]); + + $argc = count($argv); + // Check for existance and validity of the include file $include = $argv[0]; @@ -143,16 +108,18 @@ function poller_run(&$argv, &$argc){ require_once($include); + $funcname = str_replace(".php", "", basename($argv[0]))."_run"; + if (function_exists($funcname)) { - logger("Process ".getmypid()." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]); + logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]); $funcname($argv, $argc); if ($cooldown > 0) { - logger("Process ".getmypid()." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds"); + logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds"); sleep($cooldown); } - logger("Process ".getmypid()." - ID ".$r[0]["id"].": ".$funcname." - done"); + logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done"); q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"])); } else @@ -277,9 +244,8 @@ function poller_kill_stale_workers() { } } -function poller_too_much_workers(&$entries, &$queues, &$maxqueues, &$active) { +function poller_too_much_workers() { - $entries = 0; $queues = get_config("system", "worker_queues");