The number of workers per priority is now calculated dynamically
This commit is contained in:
		
					parent
					
						
							
								0845089a0f
							
						
					
				
			
			
				commit
				
					
						01d6ba85ff
					
				
			
		
					 1 changed files with 24 additions and 144 deletions
				
			
		|  | @ -834,64 +834,6 @@ class Worker | ||||||
| 		return $count; | 		return $count; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** |  | ||||||
| 	 * @brief Check if we should pass some slow processes |  | ||||||
| 	 * |  | ||||||
| 	 * When the active processes of the highest priority are using more than 2/3 |  | ||||||
| 	 * of all processes, we let pass slower processes. |  | ||||||
| 	 * |  | ||||||
| 	 * @param string $highest_priority Returns the currently highest priority |  | ||||||
| 	 * @return bool We let pass a slower process than $highest_priority |  | ||||||
| 	 * @throws \Exception |  | ||||||
| 	 */ |  | ||||||
| 	private static function passingSlow(&$highest_priority) |  | ||||||
| 	{ |  | ||||||
| 		$highest_priority = 0; |  | ||||||
| 
 |  | ||||||
| 		$stamp = (float)microtime(true); |  | ||||||
| 		$r = DBA::p( |  | ||||||
| 			"SELECT `priority`
 |  | ||||||
| 				FROM `process` |  | ||||||
| 				INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`" |  | ||||||
| 		); |  | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 		// No active processes at all? Fine
 |  | ||||||
| 		if (!DBA::isResult($r)) { |  | ||||||
| 			return false; |  | ||||||
| 		} |  | ||||||
| 		$priorities = []; |  | ||||||
| 		while ($line = DBA::fetch($r)) { |  | ||||||
| 			$priorities[] = $line["priority"]; |  | ||||||
| 		} |  | ||||||
| 		DBA::close($r); |  | ||||||
| 
 |  | ||||||
| 		// Should not happen
 |  | ||||||
| 		if (count($priorities) == 0) { |  | ||||||
| 			return false; |  | ||||||
| 		} |  | ||||||
| 		$highest_priority = min($priorities); |  | ||||||
| 
 |  | ||||||
| 		// The highest process is already the slowest one?
 |  | ||||||
| 		// Then we quit
 |  | ||||||
| 		if ($highest_priority == PRIORITY_NEGLIGIBLE) { |  | ||||||
| 			return false; |  | ||||||
| 		} |  | ||||||
| 		$high = 0; |  | ||||||
| 		foreach ($priorities as $priority) { |  | ||||||
| 			if ($priority == $highest_priority) { |  | ||||||
| 				++$high; |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		Logger::log("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, Logger::DEBUG); |  | ||||||
| 		$passing_slow = (($high/count($priorities)) > (2/3)); |  | ||||||
| 
 |  | ||||||
| 		if ($passing_slow) { |  | ||||||
| 			Logger::log("Passing slower processes than priority ".$highest_priority, Logger::DEBUG); |  | ||||||
| 		} |  | ||||||
| 		return $passing_slow; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	public static function nextProcess() | 	public static function nextProcess() | ||||||
| 	{ | 	{ | ||||||
| 		$priority = self::nextPriority(); | 		$priority = self::nextPriority(); | ||||||
|  | @ -937,6 +879,7 @@ class Worker | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		$running = []; | 		$running = []; | ||||||
|  | 		$running_total = 0; | ||||||
| 		$stamp = (float)microtime(true); | 		$stamp = (float)microtime(true); | ||||||
| 		$processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
 | 		$processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
 | ||||||
| 			INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` | 			INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` | ||||||
|  | @ -944,28 +887,43 @@ class Worker | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 		while ($process = DBA::fetch($processes)) { | 		while ($process = DBA::fetch($processes)) { | ||||||
| 			$running[$process['priority']] = $process['running']; | 			$running[$process['priority']] = $process['running']; | ||||||
|  | 			$running_total += $process['running']; | ||||||
| 		} | 		} | ||||||
| 		DBA::close($processes); | 		DBA::close($processes); | ||||||
| 
 | 
 | ||||||
| 		$active = self::activeWorkers(); |  | ||||||
| 
 |  | ||||||
| 		foreach ($priorities as $priority) { | 		foreach ($priorities as $priority) { | ||||||
| 			if (!empty($waiting[$priority]) && empty($running[$priority])) { | 			if (!empty($waiting[$priority]) && empty($running[$priority])) { | ||||||
|  | 				Logger::log('No running worker found with priority ' . $priority . ' - assigning it.', Logger::DEBUG); | ||||||
| 				return $priority; | 				return $priority; | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// Temp
 | 		$active = max(self::activeWorkers(), $running_total); | ||||||
| 		if (!empty($running[PRIORITY_LOW]) && ($running[PRIORITY_LOW] < 3)) { | 		$priorities = max(count($waiting), count($running)); | ||||||
| 			return PRIORITY_LOW; | 		$exponent = 2; | ||||||
|  | 
 | ||||||
|  | 		$total = 0; | ||||||
|  | 		for ($i = 1; $i <= $priorities; ++$i) { | ||||||
|  | 			$total += pow($i, $exponent); | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if (!empty($running[PRIORITY_NEGLIGIBLE]) && ($running[PRIORITY_NEGLIGIBLE] < 2)) { | 		$limit = []; | ||||||
| 			return PRIORITY_NEGLIGIBLE; | 		for ($i = 1; $i <= $priorities; ++$i) { | ||||||
|  | 			$limit[$priorities - $i] = max(1, round($active * (pow($i, $exponent) / $total))); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$i = 0; | ||||||
|  | 		foreach ($running as $priority => $workers) { | ||||||
|  | 			if ($workers < $limit[$i++]) { | ||||||
|  | 				Logger::log('Priority ' . $priority . ' has got ' . $workers . ' workers out of a limit of ' . $limit[$i - 1], Logger::DEBUG); | ||||||
|  | 				return $priority; | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if (!empty($waiting)) { | 		if (!empty($waiting)) { | ||||||
| 			return array_shift(array_keys($waiting)); | 			$priority =  array_shift(array_keys($waiting)); | ||||||
|  | 			Logger::log('No underassigned priority found, now taking the highest priority (' . $priority . ').', Logger::DEBUG); | ||||||
|  | 			return $priority; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		return false; | 		return false; | ||||||
|  | @ -984,89 +942,11 @@ class Worker | ||||||
| 	{ | 	{ | ||||||
| 		$mypid = getmypid(); | 		$mypid = getmypid(); | ||||||
| 
 | 
 | ||||||
| 		// Check if we should pass some low priority process
 |  | ||||||
| 		$highest_priority = 0; |  | ||||||
| 		$found = false; |  | ||||||
| 		$passing_slow = false; | 		$passing_slow = false; | ||||||
| 
 | 
 | ||||||
| 		// The higher the number of parallel workers, the more we prefetch to prevent concurring access
 |  | ||||||
| 		// We decrease the limit with the number of entries left in the queue
 |  | ||||||
| 		$worker_queues = Config::get("system", "worker_queues", 4); |  | ||||||
| 		$queue_length = Config::get('system', 'worker_fetch_limit', 1); |  | ||||||
| 		$lower_job_limit = $worker_queues * $queue_length * 2; |  | ||||||
| 		$entries = max($entries - $deferred, 0); |  | ||||||
| 
 |  | ||||||
| 		// Now do some magic
 |  | ||||||
| 		$exponent = 2; |  | ||||||
| 		$slope = $queue_length / pow($lower_job_limit, $exponent); |  | ||||||
| 		$limit = min($queue_length, ceil($slope * pow($entries, $exponent))); |  | ||||||
| 
 |  | ||||||
| 		Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); |  | ||||||
| 
 |  | ||||||
| 		$ids = self::nextProcess(); | 		$ids = self::nextProcess(); | ||||||
| 		$found = (count($ids) > 0); | 		$found = (count($ids) > 0); | ||||||
| 
 | 
 | ||||||
| 		if (!$found && self::passingSlow($highest_priority)) { |  | ||||||
| 			// Are there waiting processes with a higher priority than the currently highest?
 |  | ||||||
| 			$stamp = (float)microtime(true); |  | ||||||
| 			$result = DBA::select( |  | ||||||
| 				'workerqueue', |  | ||||||
| 				['id'], |  | ||||||
| 				["`pid` = 0 AND `priority` < ? AND NOT `done` AND `next_try` < ?", |  | ||||||
| 				$highest_priority, DateTimeFormat::utcNow()], |  | ||||||
| 				['limit' => 1, 'order' => ['priority', 'created']] |  | ||||||
| 			); |  | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 			while ($id = DBA::fetch($result)) { |  | ||||||
| 				$ids[] = $id["id"]; |  | ||||||
| 			} |  | ||||||
| 			DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 			$found = (count($ids) > 0); |  | ||||||
| 
 |  | ||||||
| 			if (!$found) { |  | ||||||
| 				// Give slower processes some processing time
 |  | ||||||
| 				$stamp = (float)microtime(true); |  | ||||||
| 				$result = DBA::select( |  | ||||||
| 					'workerqueue', |  | ||||||
| 					['id'], |  | ||||||
| 					["`pid` = 0 AND `priority` > ? AND NOT `done` AND `next_try` < ?", |  | ||||||
| 					$highest_priority, DateTimeFormat::utcNow()], |  | ||||||
| 					['limit' => 1, 'order' => ['priority', 'created']] |  | ||||||
| 				); |  | ||||||
| 				self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 				while ($id = DBA::fetch($result)) { |  | ||||||
| 					$ids[] = $id["id"]; |  | ||||||
| 				} |  | ||||||
| 				DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 				$found = (count($ids) > 0); |  | ||||||
| 				$passing_slow = $found; |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// At first try to fetch a bunch of high or medium tasks
 |  | ||||||
| 		if (!$found && ($limit > 1)) { |  | ||||||
| 			$stamp = (float)microtime(true); |  | ||||||
| 			$result = DBA::select( |  | ||||||
| 				'workerqueue', |  | ||||||
| 				['id'], |  | ||||||
| 				["`pid` = 0 AND NOT `done` AND `priority` <= ? AND `next_try` < ? AND `retrial` = 0", |  | ||||||
| 				PRIORITY_MEDIUM, DateTimeFormat::utcNow()], |  | ||||||
| 				['limit' => $limit, 'order' => ['created']] |  | ||||||
| 			); |  | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 			while ($id = DBA::fetch($result)) { |  | ||||||
| 				$ids[] = $id["id"]; |  | ||||||
| 			} |  | ||||||
| 			DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 			$found = (count($ids) > 0); |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 | 		// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 | ||||||
| 		if (!$found) { | 		if (!$found) { | ||||||
| 			$stamp = (float)microtime(true); | 			$stamp = (float)microtime(true); | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue