Merge pull request #6719 from annando/worker-scheduler
New scheduler mechanism - now to the correct branch
This commit is contained in:
		
				commit
				
					
						4375edd63e
					
				
			
		
					 4 changed files with 187 additions and 240 deletions
				
			
		|  | @ -34,7 +34,7 @@ | ||||||
| use Friendica\Database\DBA; | use Friendica\Database\DBA; | ||||||
| 
 | 
 | ||||||
| if (!defined('DB_UPDATE_VERSION')) { | if (!defined('DB_UPDATE_VERSION')) { | ||||||
| 	define('DB_UPDATE_VERSION', 1302); | 	define('DB_UPDATE_VERSION', 1303); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| return [ | return [ | ||||||
|  | @ -1378,9 +1378,8 @@ return [ | ||||||
| 			"PRIMARY" => ["id"], | 			"PRIMARY" => ["id"], | ||||||
| 			"done_parameter" => ["done", "parameter(64)"], | 			"done_parameter" => ["done", "parameter(64)"], | ||||||
| 			"done_executed" => ["done", "executed"], | 			"done_executed" => ["done", "executed"], | ||||||
| 			"done_priority" => ["done", "priority"], |  | ||||||
| 			"done_priority_created" => ["done", "priority", "created"], | 			"done_priority_created" => ["done", "priority", "created"], | ||||||
| 			"done_pid" => ["done", "pid"], | 			"done_priority_next_try" => ["done", "priority", "next_try"], | ||||||
| 			"done_pid_next_try" => ["done", "pid", "next_try"], | 			"done_pid_next_try" => ["done", "pid", "next_try"], | ||||||
| 			"done_pid_priority_created" => ["done", "pid", "priority", "created"] | 			"done_pid_priority_created" => ["done", "pid", "priority", "created"] | ||||||
| 		] | 		] | ||||||
|  |  | ||||||
|  | @ -41,11 +41,7 @@ function worker_init() | ||||||
| 
 | 
 | ||||||
| 	Worker::callWorker(); | 	Worker::callWorker(); | ||||||
| 
 | 
 | ||||||
| 	$passing_slow = false; | 	if ($r = Worker::workerProcess()) { | ||||||
| 	$entries = 0; |  | ||||||
| 	$deferred = 0; |  | ||||||
| 
 |  | ||||||
| 	if ($r = Worker::workerProcess($passing_slow, $entries, $deferred)) { |  | ||||||
| 		// On most configurations this parameter wouldn't have any effect.
 | 		// On most configurations this parameter wouldn't have any effect.
 | ||||||
| 		// But since it doesn't destroy anything, we just try to get more execution time in any way.
 | 		// But since it doesn't destroy anything, we just try to get more execution time in any way.
 | ||||||
| 		set_time_limit(0); | 		set_time_limit(0); | ||||||
|  |  | ||||||
|  | @ -92,15 +92,8 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		$starttime = time(); | 		$starttime = time(); | ||||||
| 
 | 
 | ||||||
| 		$entries = 0; |  | ||||||
| 		$deferred = 0; |  | ||||||
| 
 |  | ||||||
| 		// We fetch the next queue entry that is about to be executed
 | 		// We fetch the next queue entry that is about to be executed
 | ||||||
| 		while ($r = self::workerProcess($passing_slow, $entries, $deferred)) { | 		while ($r = self::workerProcess()) { | ||||||
| 			// When we are processing jobs with a lower priority, we don't refetch new jobs
 |  | ||||||
| 			// Otherwise fast jobs could wait behind slow ones and could be blocked.
 |  | ||||||
| 			$refetched = $passing_slow; |  | ||||||
| 
 |  | ||||||
| 			foreach ($r as $entry) { | 			foreach ($r as $entry) { | ||||||
| 				// Assure that the priority is an integer value
 | 				// Assure that the priority is an integer value
 | ||||||
| 				$entry['priority'] = (int)$entry['priority']; | 				$entry['priority'] = (int)$entry['priority']; | ||||||
|  | @ -112,20 +105,16 @@ class Worker | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				// If possible we will fetch new jobs for this worker
 | 				// If possible we will fetch new jobs for this worker
 | ||||||
| 				if (!$refetched) { | 				if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) { | ||||||
| 					$entries = self::totalEntries(); | 					self::findWorkerProcesses(); | ||||||
| 					$deferred = self::deferredEntries(); |  | ||||||
| 					if (Lock::acquire('worker_process', 0)) { |  | ||||||
| 						$refetched = self::findWorkerProcesses($passing_slow, $entries, $deferred); |  | ||||||
| 					Lock::release('worker_process'); | 					Lock::release('worker_process'); | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 			} |  | ||||||
| 
 | 
 | ||||||
| 			// To avoid the quitting of multiple workers only one worker at a time will execute the check
 | 			// To avoid the quitting of multiple workers only one worker at a time will execute the check
 | ||||||
| 			if (Lock::acquire('worker', 0)) { | 			if (Lock::acquire('worker', 0)) { | ||||||
| 				// Count active workers and compare them with a maximum value that depends on the load
 | 				// Count active workers and compare them with a maximum value that depends on the load
 | ||||||
| 				if (self::tooMuchWorkers($entries, $deferred)) { | 				if (self::tooMuchWorkers()) { | ||||||
| 					Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); | 					Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); | ||||||
| 					Lock::release('worker'); | 					Lock::release('worker'); | ||||||
| 					return; | 					return; | ||||||
|  | @ -409,16 +398,15 @@ class Worker | ||||||
| 		 * The execution time is the productive time. | 		 * The execution time is the productive time. | ||||||
| 		 * By changing parameters like the maximum number of workers we can check the effectivness. | 		 * By changing parameters like the maximum number of workers we can check the effectivness. | ||||||
| 		*/ | 		*/ | ||||||
| 		Logger::log( | 		$dbtotal = number_format(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 4); | ||||||
| 			'DB: '.number_format(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 4). | 		$dbcount = number_format(self::$db_duration_count, 4); | ||||||
| 			' - DB-Count: '.number_format(self::$db_duration_count, 4). | 		$dbstat  = number_format(self::$db_duration_stat, 4); | ||||||
| 			' - DB-Stat: '.number_format(self::$db_duration_stat, 4). | 		$dbwrite = number_format(self::$db_duration_write, 4); | ||||||
| 			' - DB-Write: '.number_format(self::$db_duration_write, 4). | 		$dblock  = number_format(self::$lock_duration, 4); | ||||||
| 			' - Lock: '.number_format(self::$lock_duration, 4). | 		$rest    = number_format(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 4); | ||||||
| 			' - Rest: '.number_format(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 4). | 		$exec    = number_format($duration, 4); | ||||||
| 			' - Execution: '.number_format($duration, 4), | 
 | ||||||
| 			Logger::DEBUG | 		Logger::info('Performance:', ['total' => $dbtotal, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'block' => $dblock, 'rest' => $rest, 'exec' => $exec]); | ||||||
| 		); |  | ||||||
| 
 | 
 | ||||||
| 		self::$up_start = microtime(true); | 		self::$up_start = microtime(true); | ||||||
| 		self::$db_duration = 0; | 		self::$db_duration = 0; | ||||||
|  | @ -607,13 +595,10 @@ class Worker | ||||||
| 	/** | 	/** | ||||||
| 	 * @brief Checks if the number of active workers exceeds the given limits | 	 * @brief Checks if the number of active workers exceeds the given limits | ||||||
| 	 * | 	 * | ||||||
| 	 * @param integer $entries Total number of queue entries |  | ||||||
| 	 * @param integer $deferred Number of deferred queue entries |  | ||||||
| 	 * |  | ||||||
| 	 * @return bool Are there too much workers running? | 	 * @return bool Are there too much workers running? | ||||||
| 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function tooMuchWorkers($entries = 0, $deferred = 0) | 	private static function tooMuchWorkers() | ||||||
| 	{ | 	{ | ||||||
| 		$queues = Config::get("system", "worker_queues", 4); | 		$queues = Config::get("system", "worker_queues", 4); | ||||||
| 
 | 
 | ||||||
|  | @ -664,12 +649,7 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 			$idle_workers = $active; | 			$idle_workers = $active; | ||||||
| 
 | 
 | ||||||
| 			if (empty($deferred) && empty($entries)) { |  | ||||||
| 			$deferred = self::deferredEntries(); | 			$deferred = self::deferredEntries(); | ||||||
| 				$entries = max(self::totalEntries() - $deferred, 0); |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			$waiting_processes = max(0, $entries - $deferred); |  | ||||||
| 
 | 
 | ||||||
| 			if (Config::get('system', 'worker_debug')) { | 			if (Config::get('system', 'worker_debug')) { | ||||||
| 				$waiting_processes = 0; | 				$waiting_processes = 0; | ||||||
|  | @ -691,10 +671,14 @@ class Worker | ||||||
| 					DBA::close($processes); | 					DBA::close($processes); | ||||||
| 				} | 				} | ||||||
| 				DBA::close($jobs); | 				DBA::close($jobs); | ||||||
|  | 				$entries = $deferred + $waiting_processes; | ||||||
| 			} else { | 			} else { | ||||||
|  | 				$entries = self::totalEntries(); | ||||||
|  | 				$waiting_processes = max(0, $entries - $deferred); | ||||||
| 				$stamp = (float)microtime(true); | 				$stamp = (float)microtime(true); | ||||||
| 				$jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority`"); | 				$jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority`"); | ||||||
| 				self::$db_duration += (microtime(true) - $stamp); | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 				self::$db_duration_stat += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 				while ($entry = DBA::fetch($jobs)) { | 				while ($entry = DBA::fetch($jobs)) { | ||||||
| 					$idle_workers -= $entry["running"]; | 					$idle_workers -= $entry["running"]; | ||||||
|  | @ -753,201 +737,13 @@ class Worker | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
| 	 * @brief Check if we should pass some slow processes | 	 * @brief Returns waiting jobs for the current process id | ||||||
| 	 * | 	 * | ||||||
| 	 * When the active processes of the highest priority are using more than 2/3 | 	 * @return array waiting workerqueue jobs | ||||||
| 	 * of all processes, we let pass slower processes. |  | ||||||
| 	 * |  | ||||||
| 	 * @param string $highest_priority Returns the currently highest priority |  | ||||||
| 	 * @return bool We let pass a slower process than $highest_priority |  | ||||||
| 	 * @throws \Exception | 	 * @throws \Exception | ||||||
| 	 */ | 	 */ | ||||||
| 	private static function passingSlow(&$highest_priority) | 	private static function getWaitingJobForPID() | ||||||
| 	{ | 	{ | ||||||
| 		$highest_priority = 0; |  | ||||||
| 
 |  | ||||||
| 		$stamp = (float)microtime(true); |  | ||||||
| 		$r = DBA::p( |  | ||||||
| 			"SELECT `priority`
 |  | ||||||
| 				FROM `process` |  | ||||||
| 				INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`" |  | ||||||
| 		); |  | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 		// No active processes at all? Fine
 |  | ||||||
| 		if (!DBA::isResult($r)) { |  | ||||||
| 			return false; |  | ||||||
| 		} |  | ||||||
| 		$priorities = []; |  | ||||||
| 		while ($line = DBA::fetch($r)) { |  | ||||||
| 			$priorities[] = $line["priority"]; |  | ||||||
| 		} |  | ||||||
| 		DBA::close($r); |  | ||||||
| 
 |  | ||||||
| 		// Should not happen
 |  | ||||||
| 		if (count($priorities) == 0) { |  | ||||||
| 			return false; |  | ||||||
| 		} |  | ||||||
| 		$highest_priority = min($priorities); |  | ||||||
| 
 |  | ||||||
| 		// The highest process is already the slowest one?
 |  | ||||||
| 		// Then we quit
 |  | ||||||
| 		if ($highest_priority == PRIORITY_NEGLIGIBLE) { |  | ||||||
| 			return false; |  | ||||||
| 		} |  | ||||||
| 		$high = 0; |  | ||||||
| 		foreach ($priorities as $priority) { |  | ||||||
| 			if ($priority == $highest_priority) { |  | ||||||
| 				++$high; |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		Logger::log("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, Logger::DEBUG); |  | ||||||
| 		$passing_slow = (($high/count($priorities)) > (2/3)); |  | ||||||
| 
 |  | ||||||
| 		if ($passing_slow) { |  | ||||||
| 			Logger::log("Passing slower processes than priority ".$highest_priority, Logger::DEBUG); |  | ||||||
| 		} |  | ||||||
| 		return $passing_slow; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	/** |  | ||||||
| 	 * @brief Find and claim the next worker process for us |  | ||||||
| 	 * |  | ||||||
| 	 * @param boolean $passing_slow Returns if we had passed low priority processes |  | ||||||
| 	 * @param integer $entries Total number of queue entries |  | ||||||
| 	 * @param integer $deferred Number of deferred queue entries |  | ||||||
| 	 * @return boolean Have we found something? |  | ||||||
| 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException |  | ||||||
| 	 */ |  | ||||||
| 	private static function findWorkerProcesses(&$passing_slow, $entries, $deferred) |  | ||||||
| 	{ |  | ||||||
| 		$mypid = getmypid(); |  | ||||||
| 
 |  | ||||||
| 		// Check if we should pass some low priority process
 |  | ||||||
| 		$highest_priority = 0; |  | ||||||
| 		$found = false; |  | ||||||
| 		$passing_slow = false; |  | ||||||
| 
 |  | ||||||
| 		// The higher the number of parallel workers, the more we prefetch to prevent concurring access
 |  | ||||||
| 		// We decrease the limit with the number of entries left in the queue
 |  | ||||||
| 		$worker_queues = Config::get("system", "worker_queues", 4); |  | ||||||
| 		$queue_length = Config::get('system', 'worker_fetch_limit', 1); |  | ||||||
| 		$lower_job_limit = $worker_queues * $queue_length * 2; |  | ||||||
| 		$entries = max($entries - $deferred, 0); |  | ||||||
| 
 |  | ||||||
| 		// Now do some magic
 |  | ||||||
| 		$exponent = 2; |  | ||||||
| 		$slope = $queue_length / pow($lower_job_limit, $exponent); |  | ||||||
| 		$limit = min($queue_length, ceil($slope * pow($entries, $exponent))); |  | ||||||
| 
 |  | ||||||
| 		Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); |  | ||||||
| 		$ids = []; |  | ||||||
| 		if (self::passingSlow($highest_priority)) { |  | ||||||
| 			// Are there waiting processes with a higher priority than the currently highest?
 |  | ||||||
| 			$stamp = (float)microtime(true); |  | ||||||
| 			$result = DBA::select( |  | ||||||
| 				'workerqueue', |  | ||||||
| 				['id'], |  | ||||||
| 				["`pid` = 0 AND `priority` < ? AND NOT `done` AND `next_try` < ?", |  | ||||||
| 				$highest_priority, DateTimeFormat::utcNow()], |  | ||||||
| 				['limit' => 1, 'order' => ['priority', 'created']] |  | ||||||
| 			); |  | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 			while ($id = DBA::fetch($result)) { |  | ||||||
| 				$ids[] = $id["id"]; |  | ||||||
| 			} |  | ||||||
| 			DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 			$found = (count($ids) > 0); |  | ||||||
| 
 |  | ||||||
| 			if (!$found) { |  | ||||||
| 				// Give slower processes some processing time
 |  | ||||||
| 				$stamp = (float)microtime(true); |  | ||||||
| 				$result = DBA::select( |  | ||||||
| 					'workerqueue', |  | ||||||
| 					['id'], |  | ||||||
| 					["`pid` = 0 AND `priority` > ? AND NOT `done` AND `next_try` < ?", |  | ||||||
| 					$highest_priority, DateTimeFormat::utcNow()], |  | ||||||
| 					['limit' => 1, 'order' => ['priority', 'created']] |  | ||||||
| 				); |  | ||||||
| 				self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 				while ($id = DBA::fetch($result)) { |  | ||||||
| 					$ids[] = $id["id"]; |  | ||||||
| 				} |  | ||||||
| 				DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 				$found = (count($ids) > 0); |  | ||||||
| 				$passing_slow = $found; |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// At first try to fetch a bunch of high or medium tasks
 |  | ||||||
| 		if (!$found && ($limit > 1)) { |  | ||||||
| 			$stamp = (float)microtime(true); |  | ||||||
| 			$result = DBA::select( |  | ||||||
| 				'workerqueue', |  | ||||||
| 				['id'], |  | ||||||
| 				["`pid` = 0 AND NOT `done` AND `priority` <= ? AND `next_try` < ? AND `retrial` = 0", |  | ||||||
| 				PRIORITY_MEDIUM, DateTimeFormat::utcNow()], |  | ||||||
| 				['limit' => $limit, 'order' => ['created']] |  | ||||||
| 			); |  | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 			while ($id = DBA::fetch($result)) { |  | ||||||
| 				$ids[] = $id["id"]; |  | ||||||
| 			} |  | ||||||
| 			DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 			$found = (count($ids) > 0); |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 |  | ||||||
| 		if (!$found) { |  | ||||||
| 			$stamp = (float)microtime(true); |  | ||||||
| 			$result = DBA::select( |  | ||||||
| 				'workerqueue', |  | ||||||
| 				['id'], |  | ||||||
| 				["`pid` = 0 AND NOT `done` AND `next_try` < ?", |  | ||||||
| 				DateTimeFormat::utcNow()], |  | ||||||
| 				['limit' => 1, 'order' => ['priority', 'created']] |  | ||||||
| 			); |  | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 |  | ||||||
| 			while ($id = DBA::fetch($result)) { |  | ||||||
| 				$ids[] = $id["id"]; |  | ||||||
| 			} |  | ||||||
| 			DBA::close($result); |  | ||||||
| 
 |  | ||||||
| 			$found = (count($ids) > 0); |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if ($found) { |  | ||||||
| 			$stamp = (float)microtime(true); |  | ||||||
| 			$condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`"; |  | ||||||
| 			array_unshift($ids, $condition); |  | ||||||
| 			DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); |  | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 			self::$db_duration_write += (microtime(true) - $stamp); |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return $found; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	/** |  | ||||||
| 	 * @brief Returns the next worker process |  | ||||||
| 	 * |  | ||||||
| 	 * @param boolean $passing_slow Returns if we had passed low priority processes |  | ||||||
| 	 * @param integer $entries Returns total number of queue entries |  | ||||||
| 	 * @param integer $deferred Returns number of deferred queue entries |  | ||||||
| 	 * |  | ||||||
| 	 * @return string SQL statement |  | ||||||
| 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException |  | ||||||
| 	 */ |  | ||||||
| 	public static function workerProcess(&$passing_slow, &$entries, &$deferred) |  | ||||||
| 	{ |  | ||||||
| 		// There can already be jobs for us in the queue.
 |  | ||||||
| 		$stamp = (float)microtime(true); | 		$stamp = (float)microtime(true); | ||||||
| 		$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); | 		$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | @ -956,9 +752,165 @@ class Worker | ||||||
| 		} | 		} | ||||||
| 		DBA::close($r); | 		DBA::close($r); | ||||||
| 
 | 
 | ||||||
| 		// Counting the rows outside the lock reduces the lock time
 | 		return false; | ||||||
| 		$entries = self::totalEntries(); | 	} | ||||||
| 		$deferred = self::deferredEntries(); | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * @brief Returns the next jobs that should be executed | ||||||
|  | 	 * | ||||||
|  | 	 * @return array array with next jobs | ||||||
|  | 	 * @throws \Exception | ||||||
|  | 	 */ | ||||||
|  | 	private static function nextProcess() | ||||||
|  | 	{ | ||||||
|  | 		$priority = self::nextPriority(); | ||||||
|  | 		if (empty($priority)) { | ||||||
|  | 			Logger::info('No tasks found'); | ||||||
|  | 			return []; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if ($priority <= PRIORITY_MEDIUM) { | ||||||
|  | 			$limit = Config::get('system', 'worker_fetch_limit', 1); | ||||||
|  | 		} else { | ||||||
|  | 			$limit = 1; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$ids = []; | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
|  | 		$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; | ||||||
|  | 		$tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		while ($task = DBA::fetch($tasks)) { | ||||||
|  | 			$ids[] = $task['id']; | ||||||
|  | 		} | ||||||
|  | 		DBA::close($tasks); | ||||||
|  | 
 | ||||||
|  | 		Logger::info('Found:', ['id' => $ids, 'priority' => $priority]); | ||||||
|  | 		return $ids; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * @brief Returns the priority of the next workerqueue job | ||||||
|  | 	 * | ||||||
|  | 	 * @return string priority | ||||||
|  | 	 * @throws \Exception | ||||||
|  | 	 */ | ||||||
|  | 	private static function nextPriority() | ||||||
|  | 	{ | ||||||
|  | 		$waiting = []; | ||||||
|  | 		$priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE]; | ||||||
|  | 		foreach ($priorities as $priority) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
|  | 			if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) { | ||||||
|  | 				$waiting[$priority] = true; | ||||||
|  | 			} | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if (!empty($waiting[PRIORITY_CRITICAL])) { | ||||||
|  | 			return PRIORITY_CRITICAL; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$running = []; | ||||||
|  | 		$running_total = 0; | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
|  | 		$processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
 | ||||||
|  | 			INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` | ||||||
|  | 			WHERE NOT `done` GROUP BY `priority`");
 | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		while ($process = DBA::fetch($processes)) { | ||||||
|  | 			$running[$process['priority']] = $process['running']; | ||||||
|  | 			$running_total += $process['running']; | ||||||
|  | 		} | ||||||
|  | 		DBA::close($processes); | ||||||
|  | 
 | ||||||
|  | 		foreach ($priorities as $priority) { | ||||||
|  | 			if (!empty($waiting[$priority]) && empty($running[$priority])) { | ||||||
|  | 				Logger::info('No running worker found with priority {priority} - assigning it.', ['priority' => $priority]); | ||||||
|  | 				return $priority; | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$active = max(self::activeWorkers(), $running_total); | ||||||
|  | 		$priorities = max(count($waiting), count($running)); | ||||||
|  | 		$exponent = 2; | ||||||
|  | 
 | ||||||
|  | 		$total = 0; | ||||||
|  | 		for ($i = 1; $i <= $priorities; ++$i) { | ||||||
|  | 			$total += pow($i, $exponent); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$limit = []; | ||||||
|  | 		for ($i = 1; $i <= $priorities; ++$i) { | ||||||
|  | 			$limit[$priorities - $i] = max(1, round($active * (pow($i, $exponent) / $total))); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$i = 0; | ||||||
|  | 		foreach ($running as $priority => $workers) { | ||||||
|  | 			if ($workers < $limit[$i++]) { | ||||||
|  | 				Logger::info('Priority {priority} has got {workers} workers out of a limit of {limit}', ['priority' => $priority, 'workers' => $workers, 'limit' => $limit[$i - 1]]); | ||||||
|  | 				return $priority; | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if (!empty($waiting)) { | ||||||
|  | 			$priority =  array_shift(array_keys($waiting)); | ||||||
|  | 			Logger::info('No underassigned priority found, now taking the highest priority.', ['priority' => $priority]); | ||||||
|  | 			return $priority; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return false; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * @brief Find and claim the next worker process for us | ||||||
|  | 	 * | ||||||
|  | 	 * @return boolean Have we found something? | ||||||
|  | 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | ||||||
|  | 	 */ | ||||||
|  | 	private static function findWorkerProcesses() | ||||||
|  | 	{ | ||||||
|  | 		$mypid = getmypid(); | ||||||
|  | 
 | ||||||
|  | 		$ids = self::nextProcess(); | ||||||
|  | 
 | ||||||
|  | 		// If there is no result we check without priority limit
 | ||||||
|  | 		if (empty($ids)) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
|  | 			$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; | ||||||
|  | 			$result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 
 | ||||||
|  | 			while ($id = DBA::fetch($result)) { | ||||||
|  | 				$ids[] = $id["id"]; | ||||||
|  | 			} | ||||||
|  | 			DBA::close($result); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if (!empty($ids)) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
|  | 			$condition = ['id' => $ids, 'done' => false, 'pid' => 0]; | ||||||
|  | 			DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 			self::$db_duration_write += (microtime(true) - $stamp); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return !empty($ids); | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * @brief Returns the next worker process | ||||||
|  | 	 * | ||||||
|  | 	 * @return string SQL statement | ||||||
|  | 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | ||||||
|  | 	 */ | ||||||
|  | 	public static function workerProcess() | ||||||
|  | 	{ | ||||||
|  | 		// There can already be jobs for us in the queue.
 | ||||||
|  | 		$waiting = self::getWaitingJobForPID(); | ||||||
|  | 		if (!empty($waiting)) { | ||||||
|  | 			return $waiting; | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 		$stamp = (float)microtime(true); | 		$stamp = (float)microtime(true); | ||||||
| 		if (!Lock::acquire('worker_process')) { | 		if (!Lock::acquire('worker_process')) { | ||||||
|  | @ -966,7 +918,7 @@ class Worker | ||||||
| 		} | 		} | ||||||
| 		self::$lock_duration += (microtime(true) - $stamp); | 		self::$lock_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		$found = self::findWorkerProcesses($passing_slow, $entries, $deferred); | 		$found = self::findWorkerProcesses(); | ||||||
| 
 | 
 | ||||||
| 		Lock::release('worker_process'); | 		Lock::release('worker_process'); | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -616,7 +616,7 @@ class Contact extends BaseObject | ||||||
| 		DBA::update('contact', ['archive' => true, 'network' => Protocol::PHANTOM, 'deleted' => true], ['id' => $id]); | 		DBA::update('contact', ['archive' => true, 'network' => Protocol::PHANTOM, 'deleted' => true], ['id' => $id]); | ||||||
| 
 | 
 | ||||||
| 		// Delete it in the background
 | 		// Delete it in the background
 | ||||||
| 		Worker::add(PRIORITY_LOW, 'RemoveContact', $id); | 		Worker::add(PRIORITY_MEDIUM, 'RemoveContact', $id); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue