diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 1cde61351..7758e06a9 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -91,12 +91,13 @@ class Worker self::runCron(); } - $starttime = time(); + $last_check = $starttime = time(); self::$state = self::STATE_STARTUP; // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { - $refetched = false; + // Don't refetch when a worker fetches tasks for multiple workers + $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { // Assure that the priority is an integer value $entry['priority'] = (int)$entry['priority']; @@ -119,7 +120,7 @@ class Worker } // To avoid the quitting of multiple workers only one worker at a time will execute the check - if (!self::getWaitingJobForPID()) { + if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { self::$state = self::STATE_LONG_LOOP; if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { @@ -138,11 +139,13 @@ class Worker } DI::lock()->release(self::LOCK_WORKER); } + $last_check = time(); } // Quit the worker once every cron interval if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); + self::unclaimProcess(); self::spawnWorker(); return; } @@ -782,9 +785,34 @@ class Worker $stamp = (float)microtime(true); $count = DBA::count('process', ['command' => 'Worker.php']); self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_count += (microtime(true) - $stamp); return $count; } + /** + * Returns the number of active worker processes + * + * @return array List of worker process ids + * @throws \Exception + */ + private static function getWorkerPIDList() + { + $ids = []; + $stamp = (float)microtime(true); + + $queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process` + LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` + GROUP BY `process`.`pid`"); + while ($queue = DBA::fetch($queues)) { + $ids[$queue['pid']] = $queue['entries']; + } + DBA::close($queues); + + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_count += (microtime(true) - $stamp); + return $ids; + } + /** * Returns waiting jobs for the current process id * @@ -806,11 +834,11 @@ class Worker /** * Returns the next jobs that should be executed - * + * @param int $limit * @return array array with next jobs * @throws \Exception */ - private static function nextProcess() + private static function nextProcess(int $limit) { $priority = self::nextPriority(); if (empty($priority)) { @@ -818,8 +846,6 @@ class Worker return []; } - $limit = DI::config()->get('system', 'worker_fetch_limit', 1); - $ids = []; $stamp = (float)microtime(true); $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; @@ -918,14 +944,29 @@ class Worker */ private static function findWorkerProcesses() { - $mypid = getmypid(); + $fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1); - $ids = self::nextProcess(); + if (DI::config()->get('system', 'worker_multiple_fetch')) { + $pids = []; + foreach (self::getWorkerPIDList() as $pid => $count) { + if ($count <= $fetch_limit) { + $pids[] = $pid; + } + } + if (empty($pids)) { + return; + } + $limit = $fetch_limit * count($pids); + } else { + $pids = [getmypid()]; + $limit = $fetch_limit; + } - // If there is no result we check without priority limit - if (empty($ids)) { - $limit = DI::config()->get('system', 'worker_fetch_limit', 1); + $ids = self::nextProcess($limit); + $limit -= count($ids); + // If there is not enough results we check without priority limit + if ($limit > 0) { $stamp = (float)microtime(true); $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]); @@ -942,15 +983,28 @@ class Worker DBA::close($tasks); } - if (!empty($ids)) { - $stamp = (float)microtime(true); - $condition = ['id' => $ids, 'done' => false, 'pid' => 0]; - DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); + if (empty($ids)) { + return; } - return !empty($ids); + // Assign the task ids to the workers + $worker = []; + foreach (array_unique($ids) as $id) { + $pid = next($pids); + if (!$pid) { + $pid = reset($pids); + } + $worker[$pid][] = $id; + } + + $stamp = (float)microtime(true); + foreach ($worker as $worker_pid => $worker_ids) { + Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]); + DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid], + ['id' => $worker_ids, 'done' => false, 'pid' => 0]); + } + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } /** @@ -973,17 +1027,11 @@ class Worker } self::$lock_duration += (microtime(true) - $stamp); - $found = self::findWorkerProcesses(); + self::findWorkerProcesses(); DI::lock()->release(self::LOCK_PROCESS); - if ($found) { - $stamp = (float)microtime(true); - $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); - self::$db_duration += (microtime(true) - $stamp); - return DBA::toArray($r); - } - return false; + return self::getWaitingJobForPID(); } /** diff --git a/static/defaults.config.php b/static/defaults.config.php index 947ac1dc0..4d595010c 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -499,6 +499,11 @@ return [ // Setting 0 would allow maximum worker queues at all times, which is not recommended. 'worker_load_exponent' => 3, + // worker_multiple_fetch (Boolean) + // When activated, the worker fetches jobs for multiple workers (not only for itself). + // This is an experimental setting without knowing the performance impact. + 'worker_multiple_fetch' => false, + // worker_defer_limit (Integer) // Per default the systems tries delivering for 15 times before dropping it. 'worker_defer_limit' => 15,