From 0845089a0fa790840c20a9f94708cb12a214f8d0 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 16 Feb 2019 15:03:37 +0000 Subject: [PATCH] New scheduler mechanism --- src/Core/Worker.php | 86 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 75d4e48740..a435d86b64 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -892,6 +892,85 @@ class Worker return $passing_slow; } + public static function nextProcess() + { + $priority = self::nextPriority(); + if (empty($priority)) { + Logger::log('No tasks found', Logger::DEBUG); + return []; + } + + if ($priority <= PRIORITY_MEDIUM) { + $limit = Config::get('system', 'worker_fetch_limit', 1); + } else { + $limit = 1; + } + + $ids = []; + $stamp = (float)microtime(true); + $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; + $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]); + self::$db_duration += (microtime(true) - $stamp); + while ($task = DBA::fetch($tasks)) { + $ids[] = $task['id']; + } + DBA::close($tasks); + + Logger::log('Found task(s) ' . implode(', ', $ids) . ' with priority ' .$priority, Logger::DEBUG); + return $ids; + } + + public static function nextPriority() + { + $waiting = []; + $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE]; + foreach ($priorities as $priority) { + $stamp = (float)microtime(true); + if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) { + $waiting[$priority] = true; + } + self::$db_duration += (microtime(true) - $stamp); + } + + if (!empty($waiting[PRIORITY_CRITICAL])) { + return PRIORITY_CRITICAL; + } + + $running = []; + $stamp = (float)microtime(true); + $processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process` + INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` + WHERE NOT `done` GROUP BY `priority`"); + self::$db_duration += (microtime(true) - $stamp); + while ($process = DBA::fetch($processes)) { + $running[$process['priority']] = $process['running']; + } + DBA::close($processes); + + $active = self::activeWorkers(); + + foreach ($priorities as $priority) { + if (!empty($waiting[$priority]) && empty($running[$priority])) { + return $priority; + } + } + + // Temp + if (!empty($running[PRIORITY_LOW]) && ($running[PRIORITY_LOW] < 3)) { + return PRIORITY_LOW; + } + + if (!empty($running[PRIORITY_NEGLIGIBLE]) && ($running[PRIORITY_NEGLIGIBLE] < 2)) { + return PRIORITY_NEGLIGIBLE; + } + + if (!empty($waiting)) { + return array_shift(array_keys($waiting)); + } + + return false; + } + /** * @brief Find and claim the next worker process for us * @@ -923,8 +1002,11 @@ class Worker $limit = min($queue_length, ceil($slope * pow($entries, $exponent))); Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); - $ids = []; - if (self::passingSlow($highest_priority)) { + + $ids = self::nextProcess(); + $found = (count($ids) > 0); + + if (!$found && self::passingSlow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? $stamp = (float)microtime(true); $result = DBA::select(