diff --git a/mod/worker.php b/mod/worker.php index 1afbfe81c1..23dfd6e000 100644 --- a/mod/worker.php +++ b/mod/worker.php @@ -41,11 +41,7 @@ function worker_init() Worker::callWorker(); - $passing_slow = false; - $entries = 0; - $deferred = 0; - - if ($r = Worker::workerProcess($passing_slow, $entries, $deferred)) { + if ($r = Worker::workerProcess()) { // On most configurations this parameter wouldn't have any effect. // But since it doesn't destroy anything, we just try to get more execution time in any way. set_time_limit(0); diff --git a/src/Core/Worker.php b/src/Core/Worker.php index b1e4aa41e1..efb90ae6b6 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -92,15 +92,8 @@ class Worker $starttime = time(); - $entries = 0; - $deferred = 0; - // We fetch the next queue entry that is about to be executed - while ($r = self::workerProcess($passing_slow, $entries, $deferred)) { - // When we are processing jobs with a lower priority, we don't refetch new jobs - // Otherwise fast jobs could wait behind slow ones and could be blocked. - $refetched = $passing_slow; - + while ($r = self::workerProcess()) { foreach ($r as $entry) { // Assure that the priority is an integer value $entry['priority'] = (int)$entry['priority']; @@ -112,20 +105,16 @@ class Worker } // If possible we will fetch new jobs for this worker - if (!$refetched) { - $entries = self::totalEntries(); - $deferred = self::deferredEntries(); - if (Lock::acquire('worker_process', 0)) { - $refetched = self::findWorkerProcesses($passing_slow, $entries, $deferred); - Lock::release('worker_process'); - } + if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) { + self::findWorkerProcesses(); + Lock::release('worker_process'); } } // To avoid the quitting of multiple workers only one worker at a time will execute the check if (Lock::acquire('worker', 0)) { // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers($entries, $deferred)) { + if (self::tooMuchWorkers()) { Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); Lock::release('worker'); return; @@ -689,13 +678,10 @@ class Worker /** * @brief Checks if the number of active workers exceeds the given limits * - * @param integer $entries Total number of queue entries - * @param integer $deferred Number of deferred queue entries - * * @return bool Are there too much workers running? * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function tooMuchWorkers($entries = 0, $deferred = 0) + private static function tooMuchWorkers() { $queues = Config::get("system", "worker_queues", 4); @@ -732,7 +718,7 @@ class Worker $stamp = (float)microtime(true); $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_stat += (microtime(true) - $stamp); + //self::$db_duration_stat += (microtime(true) - $stamp); if ($job = DBA::fetch($jobs)) { $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); } @@ -746,12 +732,7 @@ class Worker $idle_workers = $active; - if (empty($deferred) && empty($entries)) { - $deferred = self::deferredEntries(); - $entries = max(self::totalEntries() - $deferred, 0); - } - - $waiting_processes = max(0, $entries - $deferred); + $deferred = self::deferredEntries(); if (Config::get('system', 'worker_debug')) { $waiting_processes = 0; @@ -773,10 +754,14 @@ class Worker DBA::close($processes); } DBA::close($jobs); + $entries = $deferred + $waiting_processes; } else { + $entries = self::totalEntries(); + $waiting_processes = max(0, $entries - $deferred); $stamp = (float)microtime(true); $jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority`"); self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_stat += (microtime(true) - $stamp); while ($entry = DBA::fetch($jobs)) { $idle_workers -= $entry["running"]; @@ -834,7 +819,20 @@ class Worker return $count; } - public static function nextProcess() + private static function getWaitingJobForPID() + { + $stamp = (float)microtime(true); + $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); + self::$db_duration += (microtime(true) - $stamp); + if (DBA::isResult($r)) { + return DBA::toArray($r); + } + DBA::close($r); + + return false; + } + + private static function nextProcess() { $priority = self::nextPriority(); if (empty($priority)) { @@ -862,7 +860,7 @@ class Worker return $ids; } - public static function nextPriority() + private static function nextPriority() { $waiting = []; $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE]; @@ -932,23 +930,17 @@ class Worker /** * @brief Find and claim the next worker process for us * - * @param boolean $passing_slow Returns if we had passed low priority processes - * @param integer $entries Total number of queue entries - * @param integer $deferred Number of deferred queue entries * @return boolean Have we found something? * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - private static function findWorkerProcesses(&$passing_slow, $entries, $deferred) + private static function findWorkerProcesses() { $mypid = getmypid(); - $passing_slow = false; - $ids = self::nextProcess(); - $found = (count($ids) > 0); - // If there is no result (or we shouldn't pass lower processes) we check without priority limit - if (!$found) { + // If there is no result we check without priority limit + if (empty($ids)) { $stamp = (float)microtime(true); $result = DBA::select( 'workerqueue', @@ -963,11 +955,9 @@ class Worker $ids[] = $id["id"]; } DBA::close($result); - - $found = (count($ids) > 0); } - if ($found) { + if (!empty($ids)) { $stamp = (float)microtime(true); $condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`"; array_unshift($ids, $condition); @@ -976,33 +966,22 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } - return $found; + return !empty($ids); } /** * @brief Returns the next worker process * - * @param boolean $passing_slow Returns if we had passed low priority processes - * @param integer $entries Returns total number of queue entries - * @param integer $deferred Returns number of deferred queue entries - * * @return string SQL statement * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function workerProcess(&$passing_slow, &$entries, &$deferred) + public static function workerProcess() { // There can already be jobs for us in the queue. - $stamp = (float)microtime(true); - $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); - self::$db_duration += (microtime(true) - $stamp); - if (DBA::isResult($r)) { - return DBA::toArray($r); + $waiting = self::getWaitingJobForPID(); + if (!empty($waiting)) { + return $waiting; } - DBA::close($r); - - // Counting the rows outside the lock reduces the lock time - $entries = self::totalEntries(); - $deferred = self::deferredEntries(); $stamp = (float)microtime(true); if (!Lock::acquire('worker_process')) { @@ -1010,7 +989,7 @@ class Worker } self::$lock_duration += (microtime(true) - $stamp); - $found = self::findWorkerProcesses($passing_slow, $entries, $deferred); + $found = self::findWorkerProcesses(); Lock::release('worker_process');