New scheduler mechanism
This commit is contained in:
		
					parent
					
						
							
								1ca8a89087
							
						
					
				
			
			
				commit
				
					
						0845089a0f
					
				
			
		
					 1 changed files with 84 additions and 2 deletions
				
			
		|  | @ -892,6 +892,85 @@ class Worker | |||
| 		return $passing_slow; | ||||
| 	} | ||||
| 
 | ||||
| 	public static function nextProcess() | ||||
| 	{ | ||||
| 		$priority = self::nextPriority(); | ||||
| 		if (empty($priority)) { | ||||
| 			Logger::log('No tasks found', Logger::DEBUG); | ||||
| 			return []; | ||||
| 		} | ||||
| 
 | ||||
| 		if ($priority <= PRIORITY_MEDIUM) { | ||||
| 			$limit = Config::get('system', 'worker_fetch_limit', 1); | ||||
| 		} else { | ||||
| 			$limit = 1; | ||||
| 		} | ||||
| 
 | ||||
| 		$ids = []; | ||||
| 		$stamp = (float)microtime(true); | ||||
| 		$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; | ||||
| 		$tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]); | ||||
| 		self::$db_duration += (microtime(true) - $stamp); | ||||
| 		while ($task = DBA::fetch($tasks)) { | ||||
| 			$ids[] = $task['id']; | ||||
| 		} | ||||
| 		DBA::close($tasks); | ||||
| 
 | ||||
| 		Logger::log('Found task(s) ' . implode(', ', $ids) . ' with priority ' .$priority, Logger::DEBUG); | ||||
| 		return $ids; | ||||
| 	} | ||||
| 
 | ||||
| 	public static function nextPriority() | ||||
| 	{ | ||||
| 		$waiting = []; | ||||
| 		$priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE]; | ||||
| 		foreach ($priorities as $priority) { | ||||
| 			$stamp = (float)microtime(true); | ||||
| 			if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) { | ||||
| 				$waiting[$priority] = true; | ||||
| 			} | ||||
| 			self::$db_duration += (microtime(true) - $stamp); | ||||
| 		} | ||||
| 
 | ||||
| 		if (!empty($waiting[PRIORITY_CRITICAL])) { | ||||
| 			return PRIORITY_CRITICAL; | ||||
| 		} | ||||
| 
 | ||||
| 		$running = []; | ||||
| 		$stamp = (float)microtime(true); | ||||
| 		$processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
 | ||||
| 			INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` | ||||
| 			WHERE NOT `done` GROUP BY `priority`");
 | ||||
| 		self::$db_duration += (microtime(true) - $stamp); | ||||
| 		while ($process = DBA::fetch($processes)) { | ||||
| 			$running[$process['priority']] = $process['running']; | ||||
| 		} | ||||
| 		DBA::close($processes); | ||||
| 
 | ||||
| 		$active = self::activeWorkers(); | ||||
| 
 | ||||
| 		foreach ($priorities as $priority) { | ||||
| 			if (!empty($waiting[$priority]) && empty($running[$priority])) { | ||||
| 				return $priority; | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		// Temp
 | ||||
| 		if (!empty($running[PRIORITY_LOW]) && ($running[PRIORITY_LOW] < 3)) { | ||||
| 			return PRIORITY_LOW; | ||||
| 		} | ||||
| 
 | ||||
| 		if (!empty($running[PRIORITY_NEGLIGIBLE]) && ($running[PRIORITY_NEGLIGIBLE] < 2)) { | ||||
| 			return PRIORITY_NEGLIGIBLE; | ||||
| 		} | ||||
| 
 | ||||
| 		if (!empty($waiting)) { | ||||
| 			return array_shift(array_keys($waiting)); | ||||
| 		} | ||||
| 
 | ||||
| 		return false; | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * @brief Find and claim the next worker process for us | ||||
| 	 * | ||||
|  | @ -923,8 +1002,11 @@ class Worker | |||
| 		$limit = min($queue_length, ceil($slope * pow($entries, $exponent))); | ||||
| 
 | ||||
| 		Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); | ||||
| 		$ids = []; | ||||
| 		if (self::passingSlow($highest_priority)) { | ||||
| 
 | ||||
| 		$ids = self::nextProcess(); | ||||
| 		$found = (count($ids) > 0); | ||||
| 
 | ||||
| 		if (!$found && self::passingSlow($highest_priority)) { | ||||
| 			// Are there waiting processes with a higher priority than the currently highest?
 | ||||
| 			$stamp = (float)microtime(true); | ||||
| 			$result = DBA::select( | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue