diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 1cde61351..04cd4cc60 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -785,6 +785,29 @@ class Worker return $count; } + /** + * Returns the number of active worker processes + * + * @return array List of worker process ids + * @throws \Exception + */ + private static function getWorkerPIDList() + { + $ids = []; + $stamp = (float)microtime(true); + + $queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process` + LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` + GROUP BY `process`.`pid`"); + while ($queue = DBA::fetch($queues)) { + $ids[$queue['pid']] = $queue['entries']; + } + DBA::close($queues); + + self::$db_duration += (microtime(true) - $stamp); + return $ids; + } + /** * Returns waiting jobs for the current process id * @@ -806,11 +829,11 @@ class Worker /** * Returns the next jobs that should be executed - * + * @param int $limit * @return array array with next jobs * @throws \Exception */ - private static function nextProcess() + private static function nextProcess(int $limit) { $priority = self::nextPriority(); if (empty($priority)) { @@ -818,8 +841,6 @@ class Worker return []; } - $limit = DI::config()->get('system', 'worker_fetch_limit', 1); - $ids = []; $stamp = (float)microtime(true); $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; @@ -918,14 +939,30 @@ class Worker */ private static function findWorkerProcesses() { - $mypid = getmypid(); + $fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1); - $ids = self::nextProcess(); + if (DI::config()->get('system', 'worker_multiple_fetch')) { + $pids = []; + $worker_pids = self::getWorkerPIDList(); + foreach ($worker_pids as $pid => $count) { + if ($count <= $fetch_limit) { + $pids[] = $pid; + } + } + if (empty($pids)) { + return; + } + $limit = $fetch_limit * count($pids); + } else { + $pids = [getmypid()]; + $limit = $fetch_limit; + } - // If there is no result we check without priority limit - if (empty($ids)) { - $limit = DI::config()->get('system', 'worker_fetch_limit', 1); + $ids = self::nextProcess($limit); + $limit -= count($ids); + // If there is not enough results we check without priority limit + if ($limit > 0) { $stamp = (float)microtime(true); $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]); @@ -943,9 +980,21 @@ class Worker } if (!empty($ids)) { + $worker = []; + foreach (array_unique($ids) as $id) { + $pid = next($pids); + if (!$pid) { + $pid = reset($pids); + } + $worker[$pid][] = $id; + } + $stamp = (float)microtime(true); - $condition = ['id' => $ids, 'done' => false, 'pid' => 0]; - DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition); + foreach ($worker as $worker_pid => $worker_ids) { + Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]); + DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid], + ['id' => $worker_ids, 'done' => false, 'pid' => 0]); + } self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } diff --git a/static/defaults.config.php b/static/defaults.config.php index 947ac1dc0..4d595010c 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -499,6 +499,11 @@ return [ // Setting 0 would allow maximum worker queues at all times, which is not recommended. 'worker_load_exponent' => 3, + // worker_multiple_fetch (Boolean) + // When activated, the worker fetches jobs for multiple workers (not only for itself). + // This is an experimental setting without knowing the performance impact. + 'worker_multiple_fetch' => false, + // worker_defer_limit (Integer) // Per default the systems tries delivering for 15 times before dropping it. 'worker_defer_limit' => 15,