diff --git a/src/Core/Worker.php b/src/Core/Worker.php index a435d86b6..b1e4aa41e 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -834,64 +834,6 @@ class Worker return $count; } - /** - * @brief Check if we should pass some slow processes - * - * When the active processes of the highest priority are using more than 2/3 - * of all processes, we let pass slower processes. - * - * @param string $highest_priority Returns the currently highest priority - * @return bool We let pass a slower process than $highest_priority - * @throws \Exception - */ - private static function passingSlow(&$highest_priority) - { - $highest_priority = 0; - - $stamp = (float)microtime(true); - $r = DBA::p( - "SELECT `priority` - FROM `process` - INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`" - ); - self::$db_duration += (microtime(true) - $stamp); - - // No active processes at all? Fine - if (!DBA::isResult($r)) { - return false; - } - $priorities = []; - while ($line = DBA::fetch($r)) { - $priorities[] = $line["priority"]; - } - DBA::close($r); - - // Should not happen - if (count($priorities) == 0) { - return false; - } - $highest_priority = min($priorities); - - // The highest process is already the slowest one? - // Then we quit - if ($highest_priority == PRIORITY_NEGLIGIBLE) { - return false; - } - $high = 0; - foreach ($priorities as $priority) { - if ($priority == $highest_priority) { - ++$high; - } - } - Logger::log("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, Logger::DEBUG); - $passing_slow = (($high/count($priorities)) > (2/3)); - - if ($passing_slow) { - Logger::log("Passing slower processes than priority ".$highest_priority, Logger::DEBUG); - } - return $passing_slow; - } - public static function nextProcess() { $priority = self::nextPriority(); @@ -937,6 +879,7 @@ class Worker } $running = []; + $running_total = 0; $stamp = (float)microtime(true); $processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` @@ -944,28 +887,43 @@ class Worker self::$db_duration += (microtime(true) - $stamp); while ($process = DBA::fetch($processes)) { $running[$process['priority']] = $process['running']; + $running_total += $process['running']; } DBA::close($processes); - $active = self::activeWorkers(); - foreach ($priorities as $priority) { if (!empty($waiting[$priority]) && empty($running[$priority])) { + Logger::log('No running worker found with priority ' . $priority . ' - assigning it.', Logger::DEBUG); return $priority; } } - // Temp - if (!empty($running[PRIORITY_LOW]) && ($running[PRIORITY_LOW] < 3)) { - return PRIORITY_LOW; + $active = max(self::activeWorkers(), $running_total); + $priorities = max(count($waiting), count($running)); + $exponent = 2; + + $total = 0; + for ($i = 1; $i <= $priorities; ++$i) { + $total += pow($i, $exponent); } - if (!empty($running[PRIORITY_NEGLIGIBLE]) && ($running[PRIORITY_NEGLIGIBLE] < 2)) { - return PRIORITY_NEGLIGIBLE; + $limit = []; + for ($i = 1; $i <= $priorities; ++$i) { + $limit[$priorities - $i] = max(1, round($active * (pow($i, $exponent) / $total))); + } + + $i = 0; + foreach ($running as $priority => $workers) { + if ($workers < $limit[$i++]) { + Logger::log('Priority ' . $priority . ' has got ' . $workers . ' workers out of a limit of ' . $limit[$i - 1], Logger::DEBUG); + return $priority; + } } if (!empty($waiting)) { - return array_shift(array_keys($waiting)); + $priority = array_shift(array_keys($waiting)); + Logger::log('No underassigned priority found, now taking the highest priority (' . $priority . ').', Logger::DEBUG); + return $priority; } return false; @@ -984,89 +942,11 @@ class Worker { $mypid = getmypid(); - // Check if we should pass some low priority process - $highest_priority = 0; - $found = false; $passing_slow = false; - // The higher the number of parallel workers, the more we prefetch to prevent concurring access - // We decrease the limit with the number of entries left in the queue - $worker_queues = Config::get("system", "worker_queues", 4); - $queue_length = Config::get('system', 'worker_fetch_limit', 1); - $lower_job_limit = $worker_queues * $queue_length * 2; - $entries = max($entries - $deferred, 0); - - // Now do some magic - $exponent = 2; - $slope = $queue_length / pow($lower_job_limit, $exponent); - $limit = min($queue_length, ceil($slope * pow($entries, $exponent))); - - Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); - $ids = self::nextProcess(); $found = (count($ids) > 0); - if (!$found && self::passingSlow($highest_priority)) { - // Are there waiting processes with a higher priority than the currently highest? - $stamp = (float)microtime(true); - $result = DBA::select( - 'workerqueue', - ['id'], - ["`pid` = 0 AND `priority` < ? AND NOT `done` AND `next_try` < ?", - $highest_priority, DateTimeFormat::utcNow()], - ['limit' => 1, 'order' => ['priority', 'created']] - ); - self::$db_duration += (microtime(true) - $stamp); - - while ($id = DBA::fetch($result)) { - $ids[] = $id["id"]; - } - DBA::close($result); - - $found = (count($ids) > 0); - - if (!$found) { - // Give slower processes some processing time - $stamp = (float)microtime(true); - $result = DBA::select( - 'workerqueue', - ['id'], - ["`pid` = 0 AND `priority` > ? AND NOT `done` AND `next_try` < ?", - $highest_priority, DateTimeFormat::utcNow()], - ['limit' => 1, 'order' => ['priority', 'created']] - ); - self::$db_duration += (microtime(true) - $stamp); - - while ($id = DBA::fetch($result)) { - $ids[] = $id["id"]; - } - DBA::close($result); - - $found = (count($ids) > 0); - $passing_slow = $found; - } - } - - // At first try to fetch a bunch of high or medium tasks - if (!$found && ($limit > 1)) { - $stamp = (float)microtime(true); - $result = DBA::select( - 'workerqueue', - ['id'], - ["`pid` = 0 AND NOT `done` AND `priority` <= ? AND `next_try` < ? AND `retrial` = 0", - PRIORITY_MEDIUM, DateTimeFormat::utcNow()], - ['limit' => $limit, 'order' => ['created']] - ); - self::$db_duration += (microtime(true) - $stamp); - - while ($id = DBA::fetch($result)) { - $ids[] = $id["id"]; - } - DBA::close($result); - - $found = (count($ids) > 0); - } - // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { $stamp = (float)microtime(true);