From 228993596a532ad1be54e9ac4fa04cca25d6e1ca Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 12 Jun 2017 21:39:20 +0000 Subject: [PATCH] Really fast, sadly with deadlocks --- include/poller.php | 201 +++++++++++++++++++++------------------------ 1 file changed, 93 insertions(+), 108 deletions(-) diff --git a/include/poller.php b/include/poller.php index 42bf589137..9da8dddf75 100644 --- a/include/poller.php +++ b/include/poller.php @@ -54,6 +54,12 @@ function poller_run($argv, $argc){ poller_kill_stale_workers(); } + // Count active workers and compare them with a maximum value that depends on the load + if (poller_too_much_workers()) { + logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } + // Do we have too few memory? if ($a->min_memory_reached()) { logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG); @@ -81,43 +87,31 @@ function poller_run($argv, $argc){ // We fetch the next queue entry that is about to be executed while ($r = poller_worker_process()) { + foreach ($r AS $entry) { + // Assure that the priority is an integer value + $entry['priority'] = (int)$entry['priority']; - // Assure that the priority is an integer value - $r[0]['priority'] = (int)$r[0]['priority']; - - // If we got that queue entry we claim it for us - if (!poller_claim_process($r[0])) { - Lock::remove('poller_fetch_worker'); - usleep(rand(0, 200000)); - continue; - } else { - // Fetch all workerqueue data while the table is still locked - // This is redundant, but this speeds up the processing - $entries = poller_total_entries(); - $top_priority = poller_highest_priority(); - $high_running = poller_process_with_priority_active($top_priority); - } - - // To avoid the quitting of multiple pollers only one poller at a time will execute the check - if (Lock::set('poller_worker', 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers($entries, $top_priority, $high_running)) { - logger('Active worker limit reached, quitting.', LOGGER_DEBUG); + // The work will be done + if (!poller_execute($entry)) { + logger('Process execution failed, quitting.', LOGGER_DEBUG); return; } - // Check free memory - if ($a->min_memory_reached()) { - logger('Memory limit reached, quitting.', LOGGER_DEBUG); - return; - } - Lock::remove('poller_worker'); - } + // To avoid the quitting of multiple pollers only one poller at a time will execute the check + if (Lock::set('poller_worker', 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (poller_too_much_workers()) { + logger('Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } - // finally the work will be done - if (!poller_execute($r[0])) { - logger('Process execution failed, quitting.', LOGGER_DEBUG); - return; + // Check free memory + if ($a->min_memory_reached()) { + logger('Memory limit reached, quitting.', LOGGER_DEBUG); + return; + } + Lock::remove('poller_worker'); + } } // Quit the poller once every hour @@ -200,7 +194,10 @@ function poller_execute($queue) { if (!validate_include($include)) { logger("Include file ".$argv[0]." is not valid!"); - dba::delete('workerqueue', array('id' => $queue["id"])); + $timeout = 10; + while (!dba::delete('workerqueue', array('id' => $queue["id"])) && (--$timeout > 0)) { + sleep(1); + } return true; } @@ -210,7 +207,11 @@ function poller_execute($queue) { if (function_exists($funcname)) { poller_exec_function($queue, $funcname, $argv); - dba::delete('workerqueue', array('id' => $queue["id"])); + $timeout = 10; + while (!dba::delete('workerqueue', array('id' => $queue["id"])) && (--$timeout > 0)) { + logger('Delete ID '.$queue["id"], LOGGER_DEBUG); + sleep(1); + } } else { logger("Function ".$funcname." does not exist"); } @@ -471,7 +472,7 @@ function poller_kill_stale_workers() { * * @return bool Are there too much workers running? */ -function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) { +function poller_too_much_workers() { $queues = Config::get("system", "worker_queues", 4); $maxqueues = $queues; @@ -514,16 +515,12 @@ function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_ru $processlist = ' ('.implode(', ', $listitem).')'; } - if (is_null($entries)) { - $entries = poller_total_entries(); - } + $entries = poller_total_entries(); + if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) { - if (is_null($top_priority)) { - $top_priority = poller_highest_priority(); - } - if (is_null($high_running)) { - $high_running = poller_process_with_priority_active($top_priority); - } + $top_priority = poller_highest_priority(); + $high_running = poller_process_with_priority_active($top_priority); + if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) { logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG); $queues = $active + 1; @@ -606,6 +603,52 @@ function poller_passing_slow(&$highest_priority) { return $passing_slow; } +/** + * @brief Find and claim the next worker process for us + * + * @return boolean Have we found something? + */ +function find_worker_processes() { + // Check if we should pass some low priority process + $highest_priority = 0; + $found = false; + + if (poller_passing_slow($highest_priority)) { + // Are there waiting processes with a higher priority than the currently highest? + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + WHERE `executed` <= ? AND `priority` < ? + ORDER BY `priority`, `created` LIMIT 1", + datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + if (dbm::is_result($result)) { + $found = (dba::num_rows($result) > 0); + } + dba::close($result); + + if (!$found) { + // Give slower processes some processing time + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + WHERE `executed` <= ? AND `priority` > ? + ORDER BY `priority`, `created` LIMIT 1", + datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + if (dbm::is_result($result)) { + $found = (dba::num_rows($result) > 0); + } + dba::close($result); + } + } + + // If there is no result (or we shouldn't pass lower processes) we check without priority limit + if (!$found) { + $result = dba::p("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 1", + datetime_convert(), getmypid(), NULL_DATE); + if (dbm::is_result($result)) { + $found = (dba::num_rows($result) > 0); + } + dba::close($result); + } + return $found; +} + /** * @brief Returns the next worker process * @@ -613,74 +656,17 @@ function poller_passing_slow(&$highest_priority) { */ function poller_worker_process() { - // Check if we should pass some low priority process - $highest_priority = 0; + $timeout = 10; + do { + $found = find_worker_processes(); + } while (!$found && (poller_total_entries() > 0) && (--$timeout > 0)); - if (poller_passing_slow($highest_priority)) { - // Are there waiting processes with a higher priority than the currently highest? - $r = q("SELECT * FROM `workerqueue` - WHERE `executed` <= '%s' AND `priority` < %d - ORDER BY `priority`, `created` LIMIT 1", - dbesc(NULL_DATE), - intval($highest_priority)); - if (dbm::is_result($r)) { - return $r; - } - // Give slower processes some processing time - $r = q("SELECT * FROM `workerqueue` - WHERE `executed` <= '%s' AND `priority` > %d - ORDER BY `priority`, `created` LIMIT 1", - dbesc(NULL_DATE), - intval($highest_priority)); - - if (dbm::is_result($r)) { - return $r; - } + if ($found) { + $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d", intval(getmypid())); } - - // If there is no result (or we shouldn't pass lower processes) we check without priority limit - if (!dbm::is_result($r)) { - $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE)); - } - return $r; } -/** - * @brief Assigns a workerqueue entry to the current process - * - * When we are sure that the table locks are working correctly, we can remove the checks from here - * - * @param array $queue Workerqueue entry - * - * @return boolean "true" if the claiming was successful - */ -function poller_claim_process($queue) { - $mypid = getmypid(); - - $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), - array('id' => $queue["id"], 'pid' => 0)); - - if (!$success) { - logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); - return false; - } - - // Assure that there are no tasks executed twice - $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); - if (!$id) { - logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG); - return false; - } elseif ((strtotime($id[0]["executed"]) <= 0) || ($id[0]["pid"] == 0)) { - logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); - return false; - } elseif ($id[0]["pid"] != $mypid) { - logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); - return false; - } - return true; -} - /** * @brief Removes a workerqueue entry from the current process */ @@ -791,7 +777,6 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); Lock::remove('poller_worker'); - Lock::remove('poller_fetch_worker'); killme(); }