From 069786cd7ff859bb3cc842cf6947160f1b7faba2 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 29 Aug 2020 10:44:38 +0000 Subject: [PATCH] Simplified the code --- src/Core/Worker.php | 52 ++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 04cd4cc60..f4df32249 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -943,8 +943,7 @@ class Worker if (DI::config()->get('system', 'worker_multiple_fetch')) { $pids = []; - $worker_pids = self::getWorkerPIDList(); - foreach ($worker_pids as $pid => $count) { + foreach (self::getWorkerPIDList() as $pid => $count) { if ($count <= $fetch_limit) { $pids[] = $pid; } @@ -979,27 +978,28 @@ class Worker DBA::close($tasks); } - if (!empty($ids)) { - $worker = []; - foreach (array_unique($ids) as $id) { - $pid = next($pids); - if (!$pid) { - $pid = reset($pids); - } - $worker[$pid][] = $id; - } - - $stamp = (float)microtime(true); - foreach ($worker as $worker_pid => $worker_ids) { - Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]); - DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid], - ['id' => $worker_ids, 'done' => false, 'pid' => 0]); - } - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); + if (empty($ids)) { + return; } - return !empty($ids); + // Assign the task ids to the workers + $worker = []; + foreach (array_unique($ids) as $id) { + $pid = next($pids); + if (!$pid) { + $pid = reset($pids); + } + $worker[$pid][] = $id; + } + + $stamp = (float)microtime(true); + foreach ($worker as $worker_pid => $worker_ids) { + Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]); + DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid], + ['id' => $worker_ids, 'done' => false, 'pid' => 0]); + } + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } /** @@ -1022,17 +1022,11 @@ class Worker } self::$lock_duration += (microtime(true) - $stamp); - $found = self::findWorkerProcesses(); + self::findWorkerProcesses(); DI::lock()->release(self::LOCK_PROCESS); - if ($found) { - $stamp = (float)microtime(true); - $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); - self::$db_duration += (microtime(true) - $stamp); - return DBA::toArray($r); - } - return false; + return self::getWaitingJobForPID(); } /**