From 13e4144ba6b65c8aaeb8f8356b3e24e00c5c526b Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 1 May 2022 09:29:31 +0000 Subject: [PATCH] Use a centralized function to check the priority --- src/Core/Worker.php | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 3443e6608e..5bd281638f 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -22,7 +22,6 @@ namespace Friendica\Core; use Friendica\App\Mode; -use Friendica\Core; use Friendica\Core\Worker\Entity\Process; use Friendica\Database\DBA; use Friendica\DI; @@ -105,12 +104,7 @@ class Worker // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; - if (!in_array($entry['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); - $entry['priority'] = PRIORITY_MEDIUM; - } + $entry = self::checkPriority($entry); // The work will be done if (!self::execute($entry)) { @@ -172,6 +166,24 @@ class Worker Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); } + /** + * Check and fix the priority of a worker task + * @param array $entry + * @return array + */ + private static function checkPriority(array $entry) + { + $entry['priority'] = (int)$entry['priority']; + + if (!in_array($entry['priority'], PRIORITIES)) { + Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); + DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]); + $entry['priority'] = PRIORITY_MEDIUM; + } + + return $entry; + } + /** * Checks if the system is ready. * @@ -484,11 +496,6 @@ class Worker // For this reason the variables have to be initialized. DI::profiler()->reset(); - if (!in_array($queue['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]); - $queue['priority'] = PRIORITY_MEDIUM; - } - $a->setQueue($queue); $up_duration = microtime(true) - self::$up_start; @@ -653,6 +660,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); while ($entry = DBA::fetch($entries)) { + $entry = self::checkPriority($entry); + if (!posix_kill($entry["pid"], 0)) { $stamp = (float)microtime(true); DBA::update( @@ -664,11 +673,6 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } else { // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($entry['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); - $entry['priority'] = PRIORITY_MEDIUM; - } // Define the maximum durations $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; @@ -1393,14 +1397,11 @@ class Worker return false; } + $queue = self::checkPriority($queue); + $id = $queue['id']; $priority = $queue['priority']; - if (!in_array($priority, PRIORITIES)) { - Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]); - $priority = PRIORITY_MEDIUM; - } - $max_level = DI::config()->get('system', 'worker_defer_limit'); $new_retrial = self::getNextRetrial($queue, $max_level);