From 8f336bffc24a636462f770ab91661436cf85ef53 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 8 Jun 2017 20:43:30 +0000 Subject: [PATCH 01/22] further improvements to the workerqueue --- include/poller.php | 104 ++++++++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 29 deletions(-) diff --git a/include/poller.php b/include/poller.php index fcabe5d8e..1588cecf2 100644 --- a/include/poller.php +++ b/include/poller.php @@ -86,13 +86,21 @@ function poller_run($argv, $argc){ // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { + dba::unlock(); continue; + } else { + // Fetch all workerqueue data while the table is still locked + // This is redundant, but this speeds up the processing + $entries = poller_total_entries(); + $top_priority = poller_highest_priority(); + $high_running = poller_process_with_priority_active($top_priority); + dba::unlock(); } // To avoid the quitting of multiple pollers only one poller at a time will execute the check if (Lock::set('poller_worker', 0)) { // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers()) { + if (poller_too_much_workers($entries, $top_priority, $high_running)) { logger('Active worker limit reached, quitting.', LOGGER_DEBUG); return; } @@ -120,6 +128,39 @@ function poller_run($argv, $argc){ logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } +/** + * @brief Returns the number of non executed entries in the worker queue + * + * @return integer Number of non executed entries in the worker queue + */ +function poller_total_entries() { + $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); + return $s[0]["total"]; +} + +/** + * @brief Returns the highest priority in the worker queue that isn't executed + * + * @return integer Number of active poller processes + */ +function poller_highest_priority() { + $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE)); + return $s[0]["priority"]; +} + +/** + * @brief Returns if a process with the given priority is running + * + * @param integer $priority The priority that should be checked + * + * @return integer Is there a process running with that priority? + */ +function poller_process_with_priority_active($priority) { + $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1", + intval($priority), dbesc(NULL_DATE)); + return dbm::is_result($s); +} + /** * @brief Execute a worker entry * @@ -421,9 +462,13 @@ function poller_kill_stale_workers() { /** * @brief Checks if the number of active workers exceeds the given limits * + * @param integer $entries The number of not executed entries in the worker queue + * @param integer $top_priority The highest not executed priority in the worker queue + * @param boolean $high_running Is a process with priority "$top_priority" running? + * * @return bool Are there too much workers running? */ -function poller_too_much_workers() { +function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) { $queues = Config::get("system", "worker_queues", 4); $maxqueues = $queues; @@ -442,39 +487,40 @@ function poller_too_much_workers() { $slope = $maxworkers / pow($maxsysload, $exponent); $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent)); - // Create a list of queue entries grouped by their priority - $listitem = array(); + if (Config::get('system', 'worker_debug')) { + // Create a list of queue entries grouped by their priority + $listitem = array(); - // Adding all processes with no workerqueue entry - $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)"); - if ($process = dba::fetch($processes)) { - $listitem[0] = "0:".$process["running"]; - } - dba::close($processes); - - // Now adding all processes with workerqueue entries - $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`"); - while ($entry = dba::fetch($entries)) { - $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]); + // Adding all processes with no workerqueue entry + $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)"); if ($process = dba::fetch($processes)) { - $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; + $listitem[0] = "0:".$process["running"]; } dba::close($processes); + + // Now adding all processes with workerqueue entries + $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`"); + while ($entry = dba::fetch($entries)) { + $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]); + if ($process = dba::fetch($processes)) { + $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; + } + dba::close($processes); + } + dba::close($entries); + $processlist = ' ('.implode(', ', $listitem).')'; } - dba::close($entries); - $processlist = ' ('.implode(', ', $listitem).')'; - - $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); - $entries = $s[0]["total"]; + if (is_null($entries)) { + $entries = poller_total_entries(); + } if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) { - $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE)); - $top_priority = $s[0]["priority"]; - - $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1", - intval($top_priority), dbesc(NULL_DATE)); - $high_running = dbm::is_result($s); - + if (is_null($top_priority)) { + $top_priority = poller_highest_priority(); + } + if (is_null($high_running)) { + $high_running = poller_process_with_priority_active($top_priority); + } if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) { logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG); $queues = $active + 1; @@ -620,7 +666,7 @@ function poller_claim_process($queue) { $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), array('id' => $queue["id"], 'pid' => 0)); - dba::unlock(); + //dba::unlock(); if (!$success) { logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); From d419962dbf54e5c526b386d9ad37c90b0b8ec31e Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 8 Jun 2017 20:50:25 +0000 Subject: [PATCH 02/22] Added documentation --- doc/htconfig.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/htconfig.md b/doc/htconfig.md index 7887184d8..5fbd6e0cb 100644 --- a/doc/htconfig.md +++ b/doc/htconfig.md @@ -68,6 +68,7 @@ Example: To set the directory value please add this line to your .htconfig.php: * **ostatus_poll_timeframe** - Defines how old an item can be to try to complete the conversation with it. * **paranoia** (Boolean) - Log out users if their IP address changed. * **permit_crawling** (Boolean) - Restricts the search for not logged in users to one search per minute. +* **worker_debug** (Boolean) - If activated, it prints out the number of running processes split by priority. * **profiler** (Boolean) - Enable internal timings to help optimize code. Needed for "rendertime" addon. Default is false. * **free_crawls** - Number of "free" searches when "permit_crawling" is activated (Default value is 10) * **crawl_permit_period** - Period in seconds between allowed searches when the number of free searches is reached and "permit_crawling" is activated (Default value is 60) From f1119b41981e63f9c0469738b1008519c86c4fea Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 8 Jun 2017 20:53:21 +0000 Subject: [PATCH 03/22] Removed commented out line --- include/poller.php | 1 - 1 file changed, 1 deletion(-) diff --git a/include/poller.php b/include/poller.php index 1588cecf2..a1266d62c 100644 --- a/include/poller.php +++ b/include/poller.php @@ -666,7 +666,6 @@ function poller_claim_process($queue) { $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), array('id' => $queue["id"], 'pid' => 0)); - //dba::unlock(); if (!$success) { logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); From 4f70682f7ad272543c08d0d46b8554590caa1a30 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 11 Jun 2017 07:41:38 +0000 Subject: [PATCH 04/22] Inherit the creation date --- boot.php | 6 +++++- include/notifier.php | 13 +++++++++---- include/pubsubpublish.php | 13 ++++++++++++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/boot.php b/boot.php index 15341c0fe..2ad057d4c 100644 --- a/boot.php +++ b/boot.php @@ -1069,6 +1069,7 @@ function proc_run($cmd) { $priority = PRIORITY_MEDIUM; $dont_fork = get_config("system", "worker_dont_fork"); + $created = datetime_convert(); if (is_int($run_parameter)) { $priority = $run_parameter; @@ -1076,6 +1077,9 @@ function proc_run($cmd) { if (isset($run_parameter['priority'])) { $priority = $run_parameter['priority']; } + if (isset($run_parameter['created'])) { + $created = $run_parameter['created']; + } if (isset($run_parameter['dont_fork'])) { $dont_fork = $run_parameter['dont_fork']; } @@ -1088,7 +1092,7 @@ function proc_run($cmd) { $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1)); if (!dbm::is_result($found)) { - dba::insert('workerqueue', array('parameter' => $parameters, 'created' => datetime_convert(), 'priority' => $priority)); + dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority)); } // Should we quit and wait for the poller to be called as a cronjob? diff --git a/include/notifier.php b/include/notifier.php index a08057f67..a5f378a55 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -56,13 +56,15 @@ function notifier_run(&$argv, &$argc){ } // Inherit the priority - $queue = dba::select('workerqueue', array('priority'), array('pid' => getmypid()), array('limit' => 1)); + $queue = dba::select('workerqueue', array('priority', 'created'), array('pid' => getmypid()), array('limit' => 1)); if (dbm::is_result($queue)) { $priority = (int)$queue['priority']; + $process_created = $queue['created']; logger('inherited priority: '.$priority); } else { // Normally this shouldn't happen. $priority = PRIORITY_HIGH; + $process_created = datetime_convert(); logger('no inherited priority! Something is wrong.'); } @@ -498,7 +500,8 @@ function notifier_run(&$argv, &$argc){ } logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG); - proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']); + proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true), + 'include/delivery.php', $cmd, $item_id, $contact['id']); } } @@ -563,7 +566,8 @@ function notifier_run(&$argv, &$argc){ if ((! $mail) && (! $fsuggest) && (! $followup)) { logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]); - proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']); + proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true), + 'include/delivery.php', $cmd, $item_id, $rr['id']); } } } @@ -603,7 +607,8 @@ function notifier_run(&$argv, &$argc){ } // Handling the pubsubhubbub requests - proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php'); + proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true), + 'include/pubsubpublish.php'); } logger('notifier: calling hooks', LOGGER_DEBUG); diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 1112969f2..7c70059f9 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -11,13 +11,24 @@ function pubsubpublish_run(&$argv, &$argc){ if ($argc > 1) { $pubsubpublish_id = intval($argv[1]); } else { + // Inherit the creation time + $queue = dba::select('workerqueue', array('created'), array('pid' => getmypid()), array('limit' => 1)); + if (dbm::is_result($queue)) { + $process_created = $queue['created']; + } else { + // Normally this shouldn't happen. + $process_created = datetime_convert(); + logger('no inherited priority! Something is wrong.'); + } + // We'll push to each subscriber that has push > 0, // i.e. there has been an update (set in notifier.php). $r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0"); foreach ($r as $rr) { logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); - proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]); + proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true), + 'include/pubsubpublish.php', $rr["id"]); } } From 7d0a7f6be9e6c281ddafdfed799bf8b46785424a Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 11 Jun 2017 19:51:18 +0000 Subject: [PATCH 05/22] We got rid of two workerqueue queries, yeah! --- include/notifier.php | 21 ++++----------------- include/poller.php | 5 +++++ include/pubsubpublish.php | 13 ++----------- src/App.php | 1 + 4 files changed, 12 insertions(+), 28 deletions(-) diff --git a/include/notifier.php b/include/notifier.php index d7e8a9cce..28d437f4c 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -55,19 +55,6 @@ function notifier_run(&$argv, &$argc){ return; } - // Inherit the priority - $queue = dba::select('workerqueue', array('priority', 'created'), array('pid' => getmypid()), array('limit' => 1)); - if (dbm::is_result($queue)) { - $priority = (int)$queue['priority']; - $process_created = $queue['created']; - logger('inherited priority: '.$priority); - } else { - // Normally this shouldn't happen. - $priority = PRIORITY_HIGH; - $process_created = datetime_convert(); - logger('no inherited priority! Something is wrong.'); - } - logger('notifier: invoked: ' . print_r($argv,true), LOGGER_DEBUG); $cmd = $argv[1]; @@ -361,7 +348,7 @@ function notifier_run(&$argv, &$argc){ // a delivery fork. private groups (forum_mode == 2) do not uplink if ((intval($parent['forum_mode']) == 1) && (! $top_level) && ($cmd !== 'uplink')) { - proc_run($priority, 'include/notifier.php', 'uplink', $item_id); + proc_run($a->queue['priority'], 'include/notifier.php', 'uplink', $item_id); } $conversants = array(); @@ -500,7 +487,7 @@ function notifier_run(&$argv, &$argc){ } logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG); - proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true), + proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']); } } @@ -566,7 +553,7 @@ function notifier_run(&$argv, &$argc){ if ((! $mail) && (! $fsuggest) && (! $followup)) { logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]); - proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true), + proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']); } } @@ -607,7 +594,7 @@ function notifier_run(&$argv, &$argc){ } // Handling the pubsubhubbub requests - proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true), + proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), 'include/pubsubpublish.php'); } diff --git a/include/poller.php b/include/poller.php index 8784931d3..89b0a24db 100644 --- a/include/poller.php +++ b/include/poller.php @@ -84,6 +84,9 @@ function poller_run($argv, $argc){ // We fetch the next queue entry that is about to be executed while ($r = poller_worker_process()) { + // Assure that the priority is an integer value + $r[0]['priority'] = (int)$r[0]['priority']; + // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { dba::unlock(); @@ -255,10 +258,12 @@ function poller_exec_function($queue, $funcname, $argv) { // But preserve the old one for the worker $old_process_id = $a->process_id; $a->process_id = uniqid("wrk", true); + $a->queue = $queue; $funcname($argv, $argc); $a->process_id = $old_process_id; + unset($a->queue); $duration = number_format(microtime(true) - $stamp, 3); diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 7c70059f9..3265fd1e1 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -7,27 +7,18 @@ require_once('include/items.php'); require_once('include/ostatus.php'); function pubsubpublish_run(&$argv, &$argc){ + global $a; if ($argc > 1) { $pubsubpublish_id = intval($argv[1]); } else { - // Inherit the creation time - $queue = dba::select('workerqueue', array('created'), array('pid' => getmypid()), array('limit' => 1)); - if (dbm::is_result($queue)) { - $process_created = $queue['created']; - } else { - // Normally this shouldn't happen. - $process_created = datetime_convert(); - logger('no inherited priority! Something is wrong.'); - } - // We'll push to each subscriber that has push > 0, // i.e. there has been an update (set in notifier.php). $r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0"); foreach ($r as $rr) { logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); - proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true), + proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]); } } diff --git a/src/App.php b/src/App.php index f6b568ae8..94ca00751 100644 --- a/src/App.php +++ b/src/App.php @@ -100,6 +100,7 @@ class App { */ public $template_engine_instance = array(); public $process_id; + public $queue; private $ldelim = array( 'internal' => '', 'smarty3' => '{{' From 4fdaca861b5df3c72e6c5f294c2b2f3da872d996 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 11 Jun 2017 20:25:51 +0000 Subject: [PATCH 06/22] Contact ID is integer --- include/notifier.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/notifier.php b/include/notifier.php index 28d437f4c..c110984dd 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -488,7 +488,7 @@ function notifier_run(&$argv, &$argc){ logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG); proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true), - 'include/delivery.php', $cmd, $item_id, $contact['id']); + 'include/delivery.php', $cmd, $item_id, (int)$contact['id']); } } @@ -554,7 +554,7 @@ function notifier_run(&$argv, &$argc){ if ((! $mail) && (! $fsuggest) && (! $followup)) { logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]); proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true), - 'include/delivery.php', $cmd, $item_id, $rr['id']); + 'include/delivery.php', $cmd, $item_id, (int)$rr['id']); } } } From d4fc7c3f6862fa9326d482077aaf1793e0f7f4d3 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 11 Jun 2017 20:41:01 +0000 Subject: [PATCH 07/22] And some more integer --- include/pubsubpublish.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 3265fd1e1..580e3ffce 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -19,7 +19,7 @@ function pubsubpublish_run(&$argv, &$argc){ foreach ($r as $rr) { logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), - 'include/pubsubpublish.php', $rr["id"]); + 'include/pubsubpublish.php', (int)$rr["id"]); } } From 8be52424f5d2727fd53fc564a9e41ac29ba622ac Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 09:44:46 +0000 Subject: [PATCH 08/22] Only check for stale processes every 5 minutes --- boot.php | 11 ++++++++++- include/poller.php | 11 +++++------ include/pubsubpublish.php | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/boot.php b/boot.php index fe1ee63e0..b93810d77 100644 --- a/boot.php +++ b/boot.php @@ -22,6 +22,7 @@ require_once(__DIR__ . DIRECTORY_SEPARATOR . 'vendor' . DIRECTORY_SEPARATOR . 'a use Friendica\App; use Friendica\Core\Config; +use Friendica\Util\Lock; require_once 'include/config.php'; require_once 'include/network.php'; @@ -1100,8 +1101,16 @@ function proc_run($cmd) { return; } + // If there is a lock then we don't have to check for too much worker + if (!Lock::set('poller_worker', 0)) { + return; + } + // If there are already enough workers running, don't fork another one - if (poller_too_much_workers()) { + $quit = poller_too_much_workers(); + Lock::remove('poller_worker'); + + if ($quit) { return; } diff --git a/include/poller.php b/include/poller.php index 89b0a24db..04dcfa431 100644 --- a/include/poller.php +++ b/include/poller.php @@ -47,13 +47,12 @@ function poller_run($argv, $argc){ // We now start the process. This is done after the load check since this could increase the load. $a->start_process(); - // At first we check the number of workers and quit if there are too much of them - // This is done at the top to avoid that too much code is executed without a need to do so, - // since the poller mostly quits here. - if (poller_too_much_workers()) { + // Kill stale processes every 5 minutes + $last_cleanup = Config::get('system', 'poller_last_cleaned', 0); + if (time() > ($last_cleanup + 300)) { + logger('CLEAN: '.time().' > '.($last_cleanup + 300).' - '.$last_cleanup); + Config::set('system', 'poller_last_cleaned', time()); poller_kill_stale_workers(); - logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG); - return; } // Do we have too few memory? diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 3265fd1e1..580e3ffce 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -19,7 +19,7 @@ function pubsubpublish_run(&$argv, &$argc){ foreach ($r as $rr) { logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), - 'include/pubsubpublish.php', $rr["id"]); + 'include/pubsubpublish.php', (int)$rr["id"]); } } From dc00d89b9aafdd2136e97154588e5dd36816b785 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 09:45:13 +0000 Subject: [PATCH 09/22] Remove test message --- include/poller.php | 1 - 1 file changed, 1 deletion(-) diff --git a/include/poller.php b/include/poller.php index 04dcfa431..4b5d6287e 100644 --- a/include/poller.php +++ b/include/poller.php @@ -50,7 +50,6 @@ function poller_run($argv, $argc){ // Kill stale processes every 5 minutes $last_cleanup = Config::get('system', 'poller_last_cleaned', 0); if (time() > ($last_cleanup + 300)) { - logger('CLEAN: '.time().' > '.($last_cleanup + 300).' - '.$last_cleanup); Config::set('system', 'poller_last_cleaned', time()); poller_kill_stale_workers(); } From 2372ef7fd52eb6d213bb8bccd2cf192ab622fc48 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 12:28:48 +0000 Subject: [PATCH 10/22] Locking seems to be better here --- boot.php | 4 ++++ include/pubsubpublish.php | 1 + 2 files changed, 5 insertions(+) diff --git a/boot.php b/boot.php index b93810d77..d30d978ca 100644 --- a/boot.php +++ b/boot.php @@ -1089,6 +1089,8 @@ function proc_run($cmd) { $argv = $args; array_shift($argv); + dba::lock('workerqueue'); + $parameters = json_encode($argv); $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1)); @@ -1096,6 +1098,8 @@ function proc_run($cmd) { dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority)); } + dba::unlock(); + // Should we quit and wait for the poller to be called as a cronjob? if ($dont_fork) { return; diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 580e3ffce..0cefa5531 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -20,6 +20,7 @@ function pubsubpublish_run(&$argv, &$argc){ logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), 'include/pubsubpublish.php', (int)$rr["id"]); + logger("Publish feed to ".$rr["callback_url"].' - done', LOGGER_DEBUG); } } From 1932a6d634ad1bc79d42d80d98997e4f0b2c03cd Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 14:19:47 +0000 Subject: [PATCH 11/22] Replaced "table lock" - it is not so good, it seems --- boot.php | 4 ---- include/poller.php | 15 ++++++++------- include/pubsubpublish.php | 1 - 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/boot.php b/boot.php index d30d978ca..b93810d77 100644 --- a/boot.php +++ b/boot.php @@ -1089,8 +1089,6 @@ function proc_run($cmd) { $argv = $args; array_shift($argv); - dba::lock('workerqueue'); - $parameters = json_encode($argv); $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1)); @@ -1098,8 +1096,6 @@ function proc_run($cmd) { dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority)); } - dba::unlock(); - // Should we quit and wait for the poller to be called as a cronjob? if ($dont_fork) { return; diff --git a/include/poller.php b/include/poller.php index 4b5d6287e..53f855785 100644 --- a/include/poller.php +++ b/include/poller.php @@ -87,7 +87,7 @@ function poller_run($argv, $argc){ // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { - dba::unlock(); + Lock::remove('poller_fetch_worker'); continue; } else { // Fetch all workerqueue data while the table is still locked @@ -95,7 +95,7 @@ function poller_run($argv, $argc){ $entries = poller_total_entries(); $top_priority = poller_highest_priority(); $high_running = poller_process_with_priority_active($top_priority); - dba::unlock(); + Lock::remove('poller_fetch_worker'); } // To avoid the quitting of multiple pollers only one poller at a time will execute the check @@ -616,9 +616,11 @@ function poller_worker_process() { // Check if we should pass some low priority process $highest_priority = 0; - if (poller_passing_slow($highest_priority)) { - dba::lock('workerqueue'); + if (!Lock::set('poller_fetch_worker')) { + return false; + } + if (poller_passing_slow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' AND `priority` < %d @@ -638,8 +640,6 @@ function poller_worker_process() { if (dbm::is_result($r)) { return $r; } - } else { - dba::lock('workerqueue'); } // If there is no result (or we shouldn't pass lower processes) we check without priority limit @@ -649,7 +649,7 @@ function poller_worker_process() { // We only unlock the tables here, when we got no data if (!dbm::is_result($r)) { - dba::unlock(); + Lock::remove('poller_fetch_worker'); } return $r; @@ -800,6 +800,7 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); Lock::remove('poller_worker'); + Lock::remove('poller_fetch_worker'); killme(); } diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 0cefa5531..580e3ffce 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -20,7 +20,6 @@ function pubsubpublish_run(&$argv, &$argc){ logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), 'include/pubsubpublish.php', (int)$rr["id"]); - logger("Publish feed to ".$rr["callback_url"].' - done', LOGGER_DEBUG); } } From 16df7715b96b436ac3133a92830b37a2673633de Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 15:42:54 +0000 Subject: [PATCH 12/22] Avoid locking problems in lock class --- src/Util/Lock.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Util/Lock.php b/src/Util/Lock.php index b2c5afc66..6d7952ffa 100644 --- a/src/Util/Lock.php +++ b/src/Util/Lock.php @@ -61,6 +61,8 @@ class Lock { $cachekey = get_app()->get_hostname().";lock:".$fn_name; do { + // We only lock to be sure that nothing happens at exactly the same time + dba::lock('locks'); $lock = $memcache->get($cachekey); if (!is_bool($lock)) { @@ -76,6 +78,9 @@ class Lock { $memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300); $got_lock = true; } + + dba::unlock(); + if (!$got_lock && ($timeout > 0)) { usleep($wait_sec * 1000000); } From 1301a2950536afafd9993c4ff2c542699771e351 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 19:20:50 +0000 Subject: [PATCH 13/22] It's faster without locks, gnarl ... --- include/poller.php | 11 +---------- src/Util/Lock.php | 7 ++----- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/include/poller.php b/include/poller.php index 53f855785..42bf58913 100644 --- a/include/poller.php +++ b/include/poller.php @@ -88,6 +88,7 @@ function poller_run($argv, $argc){ // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { Lock::remove('poller_fetch_worker'); + usleep(rand(0, 200000)); continue; } else { // Fetch all workerqueue data while the table is still locked @@ -95,7 +96,6 @@ function poller_run($argv, $argc){ $entries = poller_total_entries(); $top_priority = poller_highest_priority(); $high_running = poller_process_with_priority_active($top_priority); - Lock::remove('poller_fetch_worker'); } // To avoid the quitting of multiple pollers only one poller at a time will execute the check @@ -616,10 +616,6 @@ function poller_worker_process() { // Check if we should pass some low priority process $highest_priority = 0; - if (!Lock::set('poller_fetch_worker')) { - return false; - } - if (poller_passing_slow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? $r = q("SELECT * FROM `workerqueue` @@ -647,11 +643,6 @@ function poller_worker_process() { $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE)); } - // We only unlock the tables here, when we got no data - if (!dbm::is_result($r)) { - Lock::remove('poller_fetch_worker'); - } - return $r; } diff --git a/src/Util/Lock.php b/src/Util/Lock.php index 6d7952ffa..36f408cf3 100644 --- a/src/Util/Lock.php +++ b/src/Util/Lock.php @@ -57,7 +57,6 @@ class Lock { $memcache = self::connectMemcache(); if (is_object($memcache)) { - $wait_sec = 0.2; $cachekey = get_app()->get_hostname().";lock:".$fn_name; do { @@ -82,15 +81,13 @@ class Lock { dba::unlock(); if (!$got_lock && ($timeout > 0)) { - usleep($wait_sec * 1000000); + usleep(rand(10000, 200000)); } } while (!$got_lock && ((time() - $start) < $timeout)); return $got_lock; } - $wait_sec = 2; - do { dba::lock('locks'); $lock = dba::select('locks', array('locked', 'pid'), array('name' => $fn_name), array('limit' => 1)); @@ -118,7 +115,7 @@ class Lock { dba::unlock(); if (!$got_lock && ($timeout > 0)) { - sleep($wait_sec); + usleep(rand(100000, 2000000)); } } while (!$got_lock && ((time() - $start) < $timeout)); From 228993596a532ad1be54e9ac4fa04cca25d6e1ca Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 21:39:20 +0000 Subject: [PATCH 14/22] Really fast, sadly with deadlocks --- include/poller.php | 201 +++++++++++++++++++++------------------------ 1 file changed, 93 insertions(+), 108 deletions(-) diff --git a/include/poller.php b/include/poller.php index 42bf58913..9da8dddf7 100644 --- a/include/poller.php +++ b/include/poller.php @@ -54,6 +54,12 @@ function poller_run($argv, $argc){ poller_kill_stale_workers(); } + // Count active workers and compare them with a maximum value that depends on the load + if (poller_too_much_workers()) { + logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } + // Do we have too few memory? if ($a->min_memory_reached()) { logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG); @@ -81,43 +87,31 @@ function poller_run($argv, $argc){ // We fetch the next queue entry that is about to be executed while ($r = poller_worker_process()) { + foreach ($r AS $entry) { + // Assure that the priority is an integer value + $entry['priority'] = (int)$entry['priority']; - // Assure that the priority is an integer value - $r[0]['priority'] = (int)$r[0]['priority']; - - // If we got that queue entry we claim it for us - if (!poller_claim_process($r[0])) { - Lock::remove('poller_fetch_worker'); - usleep(rand(0, 200000)); - continue; - } else { - // Fetch all workerqueue data while the table is still locked - // This is redundant, but this speeds up the processing - $entries = poller_total_entries(); - $top_priority = poller_highest_priority(); - $high_running = poller_process_with_priority_active($top_priority); - } - - // To avoid the quitting of multiple pollers only one poller at a time will execute the check - if (Lock::set('poller_worker', 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers($entries, $top_priority, $high_running)) { - logger('Active worker limit reached, quitting.', LOGGER_DEBUG); + // The work will be done + if (!poller_execute($entry)) { + logger('Process execution failed, quitting.', LOGGER_DEBUG); return; } - // Check free memory - if ($a->min_memory_reached()) { - logger('Memory limit reached, quitting.', LOGGER_DEBUG); - return; - } - Lock::remove('poller_worker'); - } + // To avoid the quitting of multiple pollers only one poller at a time will execute the check + if (Lock::set('poller_worker', 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (poller_too_much_workers()) { + logger('Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } - // finally the work will be done - if (!poller_execute($r[0])) { - logger('Process execution failed, quitting.', LOGGER_DEBUG); - return; + // Check free memory + if ($a->min_memory_reached()) { + logger('Memory limit reached, quitting.', LOGGER_DEBUG); + return; + } + Lock::remove('poller_worker'); + } } // Quit the poller once every hour @@ -200,7 +194,10 @@ function poller_execute($queue) { if (!validate_include($include)) { logger("Include file ".$argv[0]." is not valid!"); - dba::delete('workerqueue', array('id' => $queue["id"])); + $timeout = 10; + while (!dba::delete('workerqueue', array('id' => $queue["id"])) && (--$timeout > 0)) { + sleep(1); + } return true; } @@ -210,7 +207,11 @@ function poller_execute($queue) { if (function_exists($funcname)) { poller_exec_function($queue, $funcname, $argv); - dba::delete('workerqueue', array('id' => $queue["id"])); + $timeout = 10; + while (!dba::delete('workerqueue', array('id' => $queue["id"])) && (--$timeout > 0)) { + logger('Delete ID '.$queue["id"], LOGGER_DEBUG); + sleep(1); + } } else { logger("Function ".$funcname." does not exist"); } @@ -471,7 +472,7 @@ function poller_kill_stale_workers() { * * @return bool Are there too much workers running? */ -function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) { +function poller_too_much_workers() { $queues = Config::get("system", "worker_queues", 4); $maxqueues = $queues; @@ -514,16 +515,12 @@ function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_ru $processlist = ' ('.implode(', ', $listitem).')'; } - if (is_null($entries)) { - $entries = poller_total_entries(); - } + $entries = poller_total_entries(); + if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) { - if (is_null($top_priority)) { - $top_priority = poller_highest_priority(); - } - if (is_null($high_running)) { - $high_running = poller_process_with_priority_active($top_priority); - } + $top_priority = poller_highest_priority(); + $high_running = poller_process_with_priority_active($top_priority); + if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) { logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG); $queues = $active + 1; @@ -606,6 +603,52 @@ function poller_passing_slow(&$highest_priority) { return $passing_slow; } +/** + * @brief Find and claim the next worker process for us + * + * @return boolean Have we found something? + */ +function find_worker_processes() { + // Check if we should pass some low priority process + $highest_priority = 0; + $found = false; + + if (poller_passing_slow($highest_priority)) { + // Are there waiting processes with a higher priority than the currently highest? + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + WHERE `executed` <= ? AND `priority` < ? + ORDER BY `priority`, `created` LIMIT 1", + datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + if (dbm::is_result($result)) { + $found = (dba::num_rows($result) > 0); + } + dba::close($result); + + if (!$found) { + // Give slower processes some processing time + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + WHERE `executed` <= ? AND `priority` > ? + ORDER BY `priority`, `created` LIMIT 1", + datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + if (dbm::is_result($result)) { + $found = (dba::num_rows($result) > 0); + } + dba::close($result); + } + } + + // If there is no result (or we shouldn't pass lower processes) we check without priority limit + if (!$found) { + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 1", + datetime_convert(), getmypid(), NULL_DATE); + if (dbm::is_result($result)) { + $found = (dba::num_rows($result) > 0); + } + dba::close($result); + } + return $found; +} + /** * @brief Returns the next worker process * @@ -613,74 +656,17 @@ function poller_passing_slow(&$highest_priority) { */ function poller_worker_process() { - // Check if we should pass some low priority process - $highest_priority = 0; + $timeout = 10; + do { + $found = find_worker_processes(); + } while (!$found && (poller_total_entries() > 0) && (--$timeout > 0)); - if (poller_passing_slow($highest_priority)) { - // Are there waiting processes with a higher priority than the currently highest? - $r = q("SELECT * FROM `workerqueue` - WHERE `executed` <= '%s' AND `priority` < %d - ORDER BY `priority`, `created` LIMIT 1", - dbesc(NULL_DATE), - intval($highest_priority)); - if (dbm::is_result($r)) { - return $r; - } - // Give slower processes some processing time - $r = q("SELECT * FROM `workerqueue` - WHERE `executed` <= '%s' AND `priority` > %d - ORDER BY `priority`, `created` LIMIT 1", - dbesc(NULL_DATE), - intval($highest_priority)); - - if (dbm::is_result($r)) { - return $r; - } + if ($found) { + $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d", intval(getmypid())); } - - // If there is no result (or we shouldn't pass lower processes) we check without priority limit - if (!dbm::is_result($r)) { - $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE)); - } - return $r; } -/** - * @brief Assigns a workerqueue entry to the current process - * - * When we are sure that the table locks are working correctly, we can remove the checks from here - * - * @param array $queue Workerqueue entry - * - * @return boolean "true" if the claiming was successful - */ -function poller_claim_process($queue) { - $mypid = getmypid(); - - $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), - array('id' => $queue["id"], 'pid' => 0)); - - if (!$success) { - logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); - return false; - } - - // Assure that there are no tasks executed twice - $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); - if (!$id) { - logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG); - return false; - } elseif ((strtotime($id[0]["executed"]) <= 0) || ($id[0]["pid"] == 0)) { - logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); - return false; - } elseif ($id[0]["pid"] != $mypid) { - logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); - return false; - } - return true; -} - /** * @brief Removes a workerqueue entry from the current process */ @@ -791,7 +777,6 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); Lock::remove('poller_worker'); - Lock::remove('poller_fetch_worker'); killme(); } From 97be344a4a103d4f4fbd5945640afc9912fd3c81 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 05:52:59 +0000 Subject: [PATCH 15/22] Handle deadlocks centrally --- include/dba.php | 53 ++++++++++++++++++++++++++++++++++++---------- include/poller.php | 11 ++-------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/include/dba.php b/include/dba.php index e4846899d..e1330d442 100644 --- a/include/dba.php +++ b/include/dba.php @@ -625,8 +625,21 @@ class dba { } if (self::$dbo->errorno != 0) { - logger('DB Error '.self::$dbo->errorno.': '.self::$dbo->error."\n". - $a->callstack(8))."\n".self::replace_parameters($sql, $args); + $trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1); + $called_from = array_shift($trace); + + // We are having an own error logging in the function "p" + if ($called_from['function'] != 'p') { + // We have to preserve the error code, somewhere in the logging it get lost + $error = self::$dbo->error; + $errorno = self::$dbo->errorno; + + logger('DB Error '.self::$dbo->errorno.': '.self::$dbo->error."\n". + $a->callstack(8))."\n".self::replace_parameters($sql, $args); + + self::$dbo->error = $error; + self::$dbo->errorno = $errorno; + } } $a->save_timestamp($stamp1, 'database'); @@ -662,18 +675,36 @@ class dba { $args = func_get_args(); - $stmt = call_user_func_array('self::p', $args); + // In a case of a deadlock we are repeating the query 10 times + $timeout = 10; - if (is_bool($stmt)) { - $retval = $stmt; - } elseif (is_object($stmt)) { - $retval = true; - } else { - $retval = false; + do { + $stmt = call_user_func_array('self::p', $args); + + if (is_bool($stmt)) { + $retval = $stmt; + } elseif (is_object($stmt)) { + $retval = true; + } else { + $retval = false; + } + + self::close($stmt); + + } while ((self::$dbo->errorno = 1213) && (--$timeout > 0)); + + if (self::$dbo->errorno != 0) { + // We have to preserve the error code, somewhere in the logging it get lost + $error = self::$dbo->error; + $errorno = self::$dbo->errorno; + + logger('DB Error '.self::$dbo->errorno.': '.self::$dbo->error."\n". + $a->callstack(8))."\n".self::replace_parameters($sql, $args); + + self::$dbo->error = $error; + self::$dbo->errorno = $errorno; } - self::close($stmt); - $a->save_timestamp($stamp, "database_write"); return $retval; diff --git a/include/poller.php b/include/poller.php index 9da8dddf7..e0b83dd73 100644 --- a/include/poller.php +++ b/include/poller.php @@ -194,10 +194,7 @@ function poller_execute($queue) { if (!validate_include($include)) { logger("Include file ".$argv[0]." is not valid!"); - $timeout = 10; - while (!dba::delete('workerqueue', array('id' => $queue["id"])) && (--$timeout > 0)) { - sleep(1); - } + dba::delete('workerqueue', array('id' => $queue["id"])); return true; } @@ -207,11 +204,7 @@ function poller_execute($queue) { if (function_exists($funcname)) { poller_exec_function($queue, $funcname, $argv); - $timeout = 10; - while (!dba::delete('workerqueue', array('id' => $queue["id"])) && (--$timeout > 0)) { - logger('Delete ID '.$queue["id"], LOGGER_DEBUG); - sleep(1); - } + dba::delete('workerqueue', array('id' => $queue["id"])); } else { logger("Function ".$funcname." does not exist"); } From cd129665ef3baab31489dae51282341d5d8763e3 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 05:56:02 +0000 Subject: [PATCH 16/22] =?UTF-8?q?=C3=B6rgs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/dba.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/dba.php b/include/dba.php index e1330d442..3e62da892 100644 --- a/include/dba.php +++ b/include/dba.php @@ -691,7 +691,7 @@ class dba { self::close($stmt); - } while ((self::$dbo->errorno = 1213) && (--$timeout > 0)); + } while ((self::$dbo->errorno == 1213) && (--$timeout > 0)); if (self::$dbo->errorno != 0) { // We have to preserve the error code, somewhere in the logging it get lost From e232c683b612800c4c3a8b15dd43c9a3fed6191b Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 09:03:19 +0000 Subject: [PATCH 17/22] Changed timeout, logging added --- include/dba.php | 4 ++-- include/poller.php | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/dba.php b/include/dba.php index 3e62da892..d310ad04c 100644 --- a/include/dba.php +++ b/include/dba.php @@ -675,8 +675,8 @@ class dba { $args = func_get_args(); - // In a case of a deadlock we are repeating the query 10 times - $timeout = 10; + // In a case of a deadlock we are repeating the query 20 times + $timeout = 20; do { $stmt = call_user_func_array('self::p', $args); diff --git a/include/poller.php b/include/poller.php index e0b83dd73..0c92d0087 100644 --- a/include/poller.php +++ b/include/poller.php @@ -649,11 +649,15 @@ function find_worker_processes() { */ function poller_worker_process() { + $stamp = (float)microtime(true); + $timeout = 10; do { $found = find_worker_processes(); } while (!$found && (poller_total_entries() > 0) && (--$timeout > 0)); + logger('Duration: '.number_format(microtime(true) - $stamp, 3), LOGGER_DEBUG); + if ($found) { $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d", intval(getmypid())); } From 21e84e4d2558cb9be02ba89adc7cefe6971e898c Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 13:51:25 +0000 Subject: [PATCH 18/22] Fetching new queue tasks in a bulk to increase speed --- include/poller.php | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/include/poller.php b/include/poller.php index 0c92d0087..4801ea8fd 100644 --- a/include/poller.php +++ b/include/poller.php @@ -96,26 +96,26 @@ function poller_run($argv, $argc){ logger('Process execution failed, quitting.', LOGGER_DEBUG); return; } - - // To avoid the quitting of multiple pollers only one poller at a time will execute the check - if (Lock::set('poller_worker', 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers()) { - logger('Active worker limit reached, quitting.', LOGGER_DEBUG); - return; - } - - // Check free memory - if ($a->min_memory_reached()) { - logger('Memory limit reached, quitting.', LOGGER_DEBUG); - return; - } - Lock::remove('poller_worker'); - } } - // Quit the poller once every hour - if (time() > ($starttime + 3600)) { + // To avoid the quitting of multiple pollers only one poller at a time will execute the check + if (Lock::set('poller_worker', 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (poller_too_much_workers()) { + logger('Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } + + // Check free memory + if ($a->min_memory_reached()) { + logger('Memory limit reached, quitting.', LOGGER_DEBUG); + return; + } + Lock::remove('poller_worker'); + } + + // Quit the poller once every 5 minutes + if (time() > ($starttime + 300)) { logger('Process lifetime reached, quitting.', LOGGER_DEBUG); return; } @@ -610,7 +610,7 @@ function find_worker_processes() { // Are there waiting processes with a higher priority than the currently highest? $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` < ? - ORDER BY `priority`, `created` LIMIT 1", + ORDER BY `priority`, `created` LIMIT 5", datetime_convert(), getmypid(), NULL_DATE, $highest_priority); if (dbm::is_result($result)) { $found = (dba::num_rows($result) > 0); @@ -632,7 +632,7 @@ function find_worker_processes() { // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { - $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 1", + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 5", datetime_convert(), getmypid(), NULL_DATE); if (dbm::is_result($result)) { $found = (dba::num_rows($result) > 0); From 5183de8075c4d460038f0f99f2e71429de3bfa44 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 15:14:53 +0000 Subject: [PATCH 19/22] Don't fork these calls --- include/items.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/items.php b/include/items.php index eb3076390..8649a1bd1 100644 --- a/include/items.php +++ b/include/items.php @@ -1139,7 +1139,7 @@ function item_store($arr, $force_parent = false, $notify = false, $dontcache = f check_item_notification($current_post, $uid); if ($notify) { - proc_run(PRIORITY_HIGH, "include/notifier.php", $notify_type, $current_post); + proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "include/notifier.php", $notify_type, $current_post); } return $current_post; @@ -1430,7 +1430,7 @@ function tag_deliver($uid, $item_id) { ); update_thread($item_id); - proc_run(PRIORITY_HIGH,'include/notifier.php', 'tgroup', $item_id); + proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/notifier.php', 'tgroup', $item_id); } @@ -2076,7 +2076,7 @@ function item_expire($uid, $days, $network = "", $force = false) { drop_item($item['id'], false); } - proc_run(PRIORITY_LOW, "include/notifier.php", "expire", $uid); + proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/notifier.php", "expire", $uid); } @@ -2099,7 +2099,7 @@ function drop_items($items) { // multiple threads may have been deleted, send an expire notification if ($uid) { - proc_run(PRIORITY_LOW, "include/notifier.php", "expire", $uid); + proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/notifier.php", "expire", $uid); } } @@ -2295,7 +2295,7 @@ function drop_item($id, $interactive = true) { $drop_id = intval($item['id']); $priority = ($interactive ? PRIORITY_HIGH : PRIORITY_LOW); - proc_run($priority, "include/notifier.php", "drop", $drop_id); + proc_run(array('priority' => $priority, 'dont_fork' => true), "include/notifier.php", "drop", $drop_id); if (! $interactive) { return $owner; From a056afd566e796c9d45204a908416549f6c58d63 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 20:51:24 +0000 Subject: [PATCH 20/22] Small corrections --- doc/htconfig.md | 2 +- include/poller.php | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/doc/htconfig.md b/doc/htconfig.md index ec961c200..ed2c816de 100644 --- a/doc/htconfig.md +++ b/doc/htconfig.md @@ -68,7 +68,7 @@ Example: To set the directory value please add this line to your .htconfig.php: * **ostatus_poll_timeframe** - Defines how old an item can be to try to complete the conversation with it. * **paranoia** (Boolean) - Log out users if their IP address changed. * **permit_crawling** (Boolean) - Restricts the search for not logged in users to one search per minute. -* **worker_debug** (Boolean) - If activated, it prints out the number of running processes split by priority. +* **worker_debug** (Boolean) - If enabled, it prints out the number of running processes split by priority. * **profiler** (Boolean) - Enable internal timings to help optimize code. Needed for "rendertime" addon. Default is false. * **free_crawls** - Number of "free" searches when "permit_crawling" is activated (Default value is 10) * **crawl_permit_period** - Period in seconds between allowed searches when the number of free searches is reached and "permit_crawling" is activated (Default value is 60) diff --git a/include/poller.php b/include/poller.php index 4801ea8fd..6c2f9a0d6 100644 --- a/include/poller.php +++ b/include/poller.php @@ -130,7 +130,11 @@ function poller_run($argv, $argc){ */ function poller_total_entries() { $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); - return $s[0]["total"]; + if (dbm::is_result($s)) { + return $s[0]["total"]; + } else { + return 0; + } } /** @@ -140,7 +144,11 @@ function poller_total_entries() { */ function poller_highest_priority() { $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE)); - return $s[0]["priority"]; + if (dbm::is_result($s)) { + return $s[0]["priority"]; + } else { + return 0; + } } /** @@ -459,10 +467,6 @@ function poller_kill_stale_workers() { /** * @brief Checks if the number of active workers exceeds the given limits * - * @param integer $entries The number of not executed entries in the worker queue - * @param integer $top_priority The highest not executed priority in the worker queue - * @param boolean $high_running Is a process with priority "$top_priority" running? - * * @return bool Are there too much workers running? */ function poller_too_much_workers() { From a7526f12917a628204a241c639687b10f513d580 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 21:56:50 +0000 Subject: [PATCH 21/22] New function for affected rows --- include/dba.php | 16 ++++++++++++++++ include/poller.php | 21 +++++++++------------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/include/dba.php b/include/dba.php index d310ad04c..1c63fa505 100644 --- a/include/dba.php +++ b/include/dba.php @@ -21,6 +21,8 @@ class dba { private $driver; public $connected = false; public $error = false; + public $errorno = 0; + public $affected_rows = 0; private $_server_info = ''; private static $in_transaction = false; private static $dbo; @@ -551,6 +553,7 @@ class dba { self::$dbo->error = ''; self::$dbo->errorno = 0; + self::$dbo->affected_rows = 0; switch (self::$dbo->driver) { case 'pdo': @@ -573,6 +576,7 @@ class dba { $retval = false; } else { $retval = $stmt; + self::$dbo->affected_rows = $retval->rowCount(); } break; case 'mysqli': @@ -612,6 +616,7 @@ class dba { } else { $stmt->store_result(); $retval = $stmt; + self::$dbo->affected_rows = $retval->affected_rows; } break; case 'mysql': @@ -620,6 +625,8 @@ class dba { if (mysql_errno(self::$dbo->db)) { self::$dbo->error = mysql_error(self::$dbo->db); self::$dbo->errorno = mysql_errno(self::$dbo->db); + } else { + self::$dbo->affected_rows = mysql_affected_rows($retval); } break; } @@ -754,6 +761,15 @@ class dba { return $retval; } + /** + * @brief Returns the number of affected rows of the last statement + * + * @return int Number of rows + */ + static public function affected_rows() { + return self::$dbo->affected_rows; + } + /** * @brief Returns the number of rows of a statement * diff --git a/include/poller.php b/include/poller.php index 6c2f9a0d6..dbfc51100 100644 --- a/include/poller.php +++ b/include/poller.php @@ -612,36 +612,33 @@ function find_worker_processes() { if (poller_passing_slow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? - $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` < ? ORDER BY `priority`, `created` LIMIT 5", datetime_convert(), getmypid(), NULL_DATE, $highest_priority); - if (dbm::is_result($result)) { - $found = (dba::num_rows($result) > 0); + if ($result) { + $found = (dba::affected_rows() > 0); } - dba::close($result); if (!$found) { // Give slower processes some processing time - $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` > ? ORDER BY `priority`, `created` LIMIT 1", datetime_convert(), getmypid(), NULL_DATE, $highest_priority); - if (dbm::is_result($result)) { - $found = (dba::num_rows($result) > 0); + if ($result) { + $found = (dba::affected_rows() > 0); } - dba::close($result); } } // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { - $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 5", + $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 5", datetime_convert(), getmypid(), NULL_DATE); - if (dbm::is_result($result)) { - $found = (dba::num_rows($result) > 0); + if ($result) { + $found = (dba::affected_rows() > 0); } - dba::close($result); } return $found; } From bafa26c060bd198eb55cdb7a205eb1a8c98133f3 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Jun 2017 22:25:24 +0000 Subject: [PATCH 22/22] We don't need the timeout there anymore --- include/poller.php | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/include/poller.php b/include/poller.php index dbfc51100..ef6fbd69d 100644 --- a/include/poller.php +++ b/include/poller.php @@ -652,10 +652,7 @@ function poller_worker_process() { $stamp = (float)microtime(true); - $timeout = 10; - do { - $found = find_worker_processes(); - } while (!$found && (poller_total_entries() > 0) && (--$timeout > 0)); + $found = find_worker_processes(); logger('Duration: '.number_format(microtime(true) - $stamp, 3), LOGGER_DEBUG);