Furtner improval of the worker speed
This commit is contained in:
		
					parent
					
						
							
								3aa77685fc
							
						
					
				
			
			
				commit
				
					
						171cfe8f44
					
				
			
		
					 4 changed files with 159 additions and 47 deletions
				
			
		|  | @ -397,6 +397,10 @@ return [ | ||||||
| 		// If enabled, it prints out the jobs per minute.
 | 		// If enabled, it prints out the jobs per minute.
 | ||||||
| 		'worker_jpm' => false, | 		'worker_jpm' => false, | ||||||
| 
 | 
 | ||||||
|  | 		// worker_jpm_limit (String)
 | ||||||
|  | 		// List of minutes for the JPM calculation
 | ||||||
|  | 		'worker_jpm_limit' => '1, 10, 60', | ||||||
|  | 
 | ||||||
| 		// worker_load_exponent (Integer)
 | 		// worker_load_exponent (Integer)
 | ||||||
| 		// Default 3, which allows only 25% of the maximum worker queues when server load reaches around 37% of maximum load.
 | 		// Default 3, which allows only 25% of the maximum worker queues when server load reaches around 37% of maximum load.
 | ||||||
| 		// For a linear response where 25% of worker queues are allowed at 75% of maximum load, set this to 1.
 | 		// For a linear response where 25% of worker queues are allowed at 75% of maximum load, set this to 1.
 | ||||||
|  |  | ||||||
|  | @ -39,8 +39,10 @@ function worker_init() | ||||||
| 	Worker::callWorker(); | 	Worker::callWorker(); | ||||||
| 
 | 
 | ||||||
| 	$passing_slow = false; | 	$passing_slow = false; | ||||||
|  | 	$entries = 0; | ||||||
|  | 	$deferred = 0; | ||||||
| 
 | 
 | ||||||
| 	if ($r = Worker::workerProcess($passing_slow)) { | 	if ($r = Worker::workerProcess($passing_slow, $entries, $deferred)) { | ||||||
| 		// On most configurations this parameter wouldn't have any effect.
 | 		// On most configurations this parameter wouldn't have any effect.
 | ||||||
| 		// But since it doesn't destroy anything, we just try to get more execution time in any way.
 | 		// But since it doesn't destroy anything, we just try to get more execution time in any way.
 | ||||||
| 		set_time_limit(0); | 		set_time_limit(0); | ||||||
|  |  | ||||||
|  | @ -22,9 +22,12 @@ use Friendica\Util\Network; | ||||||
| class Worker | class Worker | ||||||
| { | { | ||||||
| 	private static $up_start; | 	private static $up_start; | ||||||
| 	private static $db_duration; | 	private static $db_duration = 0; | ||||||
|  | 	private static $db_duration_count = 0; | ||||||
|  | 	private static $db_duration_write = 0; | ||||||
|  | 	private static $db_duration_stat = 0; | ||||||
|  | 	private static $lock_duration = 0; | ||||||
| 	private static $last_update; | 	private static $last_update; | ||||||
| 	private static $lock_duration; |  | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
| 	 * @brief Processes the tasks that are in the workerqueue table | 	 * @brief Processes the tasks that are in the workerqueue table | ||||||
|  | @ -86,8 +89,11 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		$starttime = time(); | 		$starttime = time(); | ||||||
| 
 | 
 | ||||||
|  | 		$entries = 0; | ||||||
|  | 		$deferred = 0; | ||||||
|  | 
 | ||||||
| 		// We fetch the next queue entry that is about to be executed
 | 		// We fetch the next queue entry that is about to be executed
 | ||||||
| 		while ($r = self::workerProcess($passing_slow)) { | 		while ($r = self::workerProcess($passing_slow, $entries, $deferred)) { | ||||||
| 			// When we are processing jobs with a lower priority, we don't refetch new jobs
 | 			// When we are processing jobs with a lower priority, we don't refetch new jobs
 | ||||||
| 			// Otherwise fast jobs could wait behind slow ones and could be blocked.
 | 			// Otherwise fast jobs could wait behind slow ones and could be blocked.
 | ||||||
| 			$refetched = $passing_slow; | 			$refetched = $passing_slow; | ||||||
|  | @ -107,9 +113,7 @@ class Worker | ||||||
| 					$entries = self::totalEntries(); | 					$entries = self::totalEntries(); | ||||||
| 					$deferred = self::deferredEntries(); | 					$deferred = self::deferredEntries(); | ||||||
| 					if (Lock::acquire('worker_process', 0)) { | 					if (Lock::acquire('worker_process', 0)) { | ||||||
| 						$stamp = (float)microtime(true); |  | ||||||
| 						$refetched = self::findWorkerProcesses($passing_slow, $entries, $deferred); | 						$refetched = self::findWorkerProcesses($passing_slow, $entries, $deferred); | ||||||
| 						self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 						Lock::release('worker_process'); | 						Lock::release('worker_process'); | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
|  | @ -117,9 +121,8 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 			// To avoid the quitting of multiple workers only one worker at a time will execute the check
 | 			// To avoid the quitting of multiple workers only one worker at a time will execute the check
 | ||||||
| 			if (Lock::acquire('worker', 0)) { | 			if (Lock::acquire('worker', 0)) { | ||||||
| 				$stamp = (float)microtime(true); |  | ||||||
| 				// Count active workers and compare them with a maximum value that depends on the load
 | 				// Count active workers and compare them with a maximum value that depends on the load
 | ||||||
| 				if (self::tooMuchWorkers()) { | 				if (self::tooMuchWorkers($entries, $deferred)) { | ||||||
| 					Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); | 					Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); | ||||||
| 					Lock::release('worker'); | 					Lock::release('worker'); | ||||||
| 					return; | 					return; | ||||||
|  | @ -132,7 +135,6 @@ class Worker | ||||||
| 					return; | 					return; | ||||||
| 				} | 				} | ||||||
| 				Lock::release('worker'); | 				Lock::release('worker'); | ||||||
| 				self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			// Quit the worker once every 5 minutes
 | 			// Quit the worker once every 5 minutes
 | ||||||
|  | @ -149,6 +151,20 @@ class Worker | ||||||
| 		Logger::log("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", Logger::DEBUG); | 		Logger::log("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", Logger::DEBUG); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * @brief Check if non executed tasks do exist in the worker queue | ||||||
|  | 	 * | ||||||
|  | 	 * @return boolean Returns "true" if tasks are existing | ||||||
|  | 	 * @throws \Exception | ||||||
|  | 	 */ | ||||||
|  | 	private static function entriesExists() | ||||||
|  | 	{ | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
|  | 		$exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		return $exists; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	/** | 	/** | ||||||
| 	 * @brief Returns the number of deferred entries in the worker queue | 	 * @brief Returns the number of deferred entries in the worker queue | ||||||
| 	 * | 	 * | ||||||
|  | @ -157,7 +173,11 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	private static function deferredEntries() | 	private static function deferredEntries() | ||||||
| 	{ | 	{ | ||||||
| 		return DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` > ?", DateTimeFormat::utcNow()]); | 		$stamp = (float)microtime(true); | ||||||
|  | 		$count = DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` > ?", DateTimeFormat::utcNow()]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		self::$db_duration_count += (microtime(true) - $stamp); | ||||||
|  | 		return $count; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -168,7 +188,11 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	private static function totalEntries() | 	private static function totalEntries() | ||||||
| 	{ | 	{ | ||||||
| 		return DBA::count('workerqueue', ['done' => false, 'pid' => 0]); | 		$stamp = (float)microtime(true); | ||||||
|  | 		$count = DBA::count('workerqueue', ['done' => false, 'pid' => 0]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		self::$db_duration_count += (microtime(true) - $stamp); | ||||||
|  | 		return $count; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -179,8 +203,10 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	private static function highestPriority() | 	private static function highestPriority() | ||||||
| 	{ | 	{ | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; | 		$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; | ||||||
| 		$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); | 		$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 		if (DBA::isResult($workerqueue)) { | 		if (DBA::isResult($workerqueue)) { | ||||||
| 			return $workerqueue["priority"]; | 			return $workerqueue["priority"]; | ||||||
| 		} else { | 		} else { | ||||||
|  | @ -252,6 +278,7 @@ class Worker | ||||||
| 				$stamp = (float)microtime(true); | 				$stamp = (float)microtime(true); | ||||||
| 				DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]); | 				DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]); | ||||||
| 				self::$db_duration += (microtime(true) - $stamp); | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 				self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			array_shift($argv); | 			array_shift($argv); | ||||||
|  | @ -259,12 +286,12 @@ class Worker | ||||||
| 			self::execFunction($queue, $include, $argv, true); | 			self::execFunction($queue, $include, $argv, true); | ||||||
| 
 | 
 | ||||||
| 			$stamp = (float)microtime(true); | 			$stamp = (float)microtime(true); | ||||||
| 
 |  | ||||||
| 			$condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()]; | 			$condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()]; | ||||||
| 			if (DBA::update('workerqueue', ['done' => true], $condition)) { | 			if (DBA::update('workerqueue', ['done' => true], $condition)) { | ||||||
| 				Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); | 				Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); | ||||||
| 			} | 			} | ||||||
| 			self::$db_duration = (microtime(true) - $stamp); | 			self::$db_duration = (microtime(true) - $stamp); | ||||||
|  | 			self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 			return true; | 			return true; | ||||||
| 		} | 		} | ||||||
|  | @ -276,7 +303,10 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		if (!validate_include($include)) { | 		if (!validate_include($include)) { | ||||||
| 			Logger::log("Include file ".$argv[0]." is not valid!"); | 			Logger::log("Include file ".$argv[0]." is not valid!"); | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			DBA::delete('workerqueue', ['id' => $queue["id"]]); | 			DBA::delete('workerqueue', ['id' => $queue["id"]]); | ||||||
|  | 			self::$db_duration = (microtime(true) - $stamp); | ||||||
|  | 			self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 			return true; | 			return true; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -297,6 +327,7 @@ class Worker | ||||||
| 				$stamp = (float)microtime(true); | 				$stamp = (float)microtime(true); | ||||||
| 				DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]); | 				DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]); | ||||||
| 				self::$db_duration += (microtime(true) - $stamp); | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 				self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			self::execFunction($queue, $funcname, $argv, false); | 			self::execFunction($queue, $funcname, $argv, false); | ||||||
|  | @ -306,9 +337,13 @@ class Worker | ||||||
| 				Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); | 				Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); | ||||||
| 			} | 			} | ||||||
| 			self::$db_duration = (microtime(true) - $stamp); | 			self::$db_duration = (microtime(true) - $stamp); | ||||||
|  | 			self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 		} else { | 		} else { | ||||||
| 			Logger::log("Function ".$funcname." does not exist"); | 			Logger::log("Function ".$funcname." does not exist"); | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			DBA::delete('workerqueue', ['id' => $queue["id"]]); | 			DBA::delete('workerqueue', ['id' => $queue["id"]]); | ||||||
|  | 			self::$db_duration = (microtime(true) - $stamp); | ||||||
|  | 			self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		return true; | 		return true; | ||||||
|  | @ -361,7 +396,7 @@ class Worker | ||||||
| 		$a->process_id = $new_process_id; | 		$a->process_id = $new_process_id; | ||||||
| 		$a->queue = $queue; | 		$a->queue = $queue; | ||||||
| 
 | 
 | ||||||
| 		$up_duration = number_format(microtime(true) - self::$up_start, 3); | 		$up_duration = microtime(true) - self::$up_start; | ||||||
| 
 | 
 | ||||||
| 		// Reset global data to avoid interferences
 | 		// Reset global data to avoid interferences
 | ||||||
| 		unset($_SESSION); | 		unset($_SESSION); | ||||||
|  | @ -377,21 +412,27 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		$duration = (microtime(true) - $stamp); | 		$duration = (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		self::$up_start = microtime(true); |  | ||||||
| 
 |  | ||||||
| 		/* With these values we can analyze how effective the worker is. | 		/* With these values we can analyze how effective the worker is. | ||||||
| 		 * The database and rest time should be low since this is the unproductive time. | 		 * The database and rest time should be low since this is the unproductive time. | ||||||
| 		 * The execution time is the productive time. | 		 * The execution time is the productive time. | ||||||
| 		 * By changing parameters like the maximum number of workers we can check the effectivness. | 		 * By changing parameters like the maximum number of workers we can check the effectivness. | ||||||
| 		*/ | 		*/ | ||||||
| 		Logger::log( | 		Logger::log( | ||||||
| 			'DB: '.number_format(self::$db_duration, 2). | 			'DB: '.number_format(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 4). | ||||||
| 			' - Lock: '.number_format(self::$lock_duration, 2). | 			' - DB-Count: '.number_format(self::$db_duration_count, 4). | ||||||
| 			' - Rest: '.number_format($up_duration - self::$db_duration - self::$lock_duration, 2). | 			' - DB-Stat: '.number_format(self::$db_duration_stat, 4). | ||||||
| 			' - Execution: '.number_format($duration, 2), | 			' - DB-Write: '.number_format(self::$db_duration_write, 4). | ||||||
|  | 			' - Lock: '.number_format(self::$lock_duration, 4). | ||||||
|  | 			' - Rest: '.number_format(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 4). | ||||||
|  | 			' - Execution: '.number_format($duration, 4), | ||||||
| 			Logger::DEBUG | 			Logger::DEBUG | ||||||
| 		); | 		); | ||||||
| 
 | 
 | ||||||
|  | 		self::$up_start = microtime(true); | ||||||
|  | 		self::$db_duration = 0; | ||||||
|  | 		self::$db_duration_count = 0; | ||||||
|  | 		self::$db_duration_stat = 0; | ||||||
|  | 		self::$db_duration_write = 0; | ||||||
| 		self::$lock_duration = 0; | 		self::$lock_duration = 0; | ||||||
| 
 | 
 | ||||||
| 		if ($duration > 3600) { | 		if ($duration > 3600) { | ||||||
|  | @ -506,7 +547,9 @@ class Worker | ||||||
| 				$max = $r["Value"]; | 				$max = $r["Value"]; | ||||||
| 			} | 			} | ||||||
| 			// Or it can be granted. This overrides the system variable
 | 			// Or it can be granted. This overrides the system variable
 | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			$r = DBA::p('SHOW GRANTS'); | 			$r = DBA::p('SHOW GRANTS'); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
| 			while ($grants = DBA::fetch($r)) { | 			while ($grants = DBA::fetch($r)) { | ||||||
| 				$grant = array_pop($grants); | 				$grant = array_pop($grants); | ||||||
| 				if (stristr($grant, "GRANT USAGE ON")) { | 				if (stristr($grant, "GRANT USAGE ON")) { | ||||||
|  | @ -521,7 +564,9 @@ class Worker | ||||||
| 		// If $max is set we will use the processlist to determine the current number of connections
 | 		// If $max is set we will use the processlist to determine the current number of connections
 | ||||||
| 		// The processlist only shows entries of the current user
 | 		// The processlist only shows entries of the current user
 | ||||||
| 		if ($max != 0) { | 		if ($max != 0) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			$r = DBA::p('SHOW PROCESSLIST'); | 			$r = DBA::p('SHOW PROCESSLIST'); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
| 			$used = DBA::numRows($r); | 			$used = DBA::numRows($r); | ||||||
| 			DBA::close($r); | 			DBA::close($r); | ||||||
| 
 | 
 | ||||||
|  | @ -571,20 +616,25 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	private static function killStaleWorkers() | 	private static function killStaleWorkers() | ||||||
| 	{ | 	{ | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$entries = DBA::select( | 		$entries = DBA::select( | ||||||
| 			'workerqueue', | 			'workerqueue', | ||||||
| 			['id', 'pid', 'executed', 'priority', 'parameter'], | 			['id', 'pid', 'executed', 'priority', 'parameter'], | ||||||
| 			['NOT `done` AND `pid` != 0'], | 			['NOT `done` AND `pid` != 0'], | ||||||
| 			['order' => ['priority', 'created']] | 			['order' => ['priority', 'created']] | ||||||
| 		); | 		); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		while ($entry = DBA::fetch($entries)) { | 		while ($entry = DBA::fetch($entries)) { | ||||||
| 			if (!posix_kill($entry["pid"], 0)) { | 			if (!posix_kill($entry["pid"], 0)) { | ||||||
|  | 				$stamp = (float)microtime(true); | ||||||
| 				DBA::update( | 				DBA::update( | ||||||
| 					'workerqueue', | 					'workerqueue', | ||||||
| 					['executed' => DBA::NULL_DATETIME, 'pid' => 0], | 					['executed' => DBA::NULL_DATETIME, 'pid' => 0], | ||||||
| 					['id' => $entry["id"]] | 					['id' => $entry["id"]] | ||||||
| 				); | 				); | ||||||
|  | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 				self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 			} else { | 			} else { | ||||||
| 				// Kill long running processes
 | 				// Kill long running processes
 | ||||||
| 				// Check if the priority is in a valid range
 | 				// Check if the priority is in a valid range
 | ||||||
|  | @ -616,11 +666,14 @@ class Worker | ||||||
| 					} elseif ($entry["priority"] != PRIORITY_CRITICAL) { | 					} elseif ($entry["priority"] != PRIORITY_CRITICAL) { | ||||||
| 						$new_priority = PRIORITY_NEGLIGIBLE; | 						$new_priority = PRIORITY_NEGLIGIBLE; | ||||||
| 					} | 					} | ||||||
|  | 					$stamp = (float)microtime(true); | ||||||
| 					DBA::update( | 					DBA::update( | ||||||
| 						'workerqueue', | 						'workerqueue', | ||||||
| 						['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0], | 						['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0], | ||||||
| 						['id' => $entry["id"]] | 						['id' => $entry["id"]] | ||||||
| 					); | 					); | ||||||
|  | 					self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 					self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 				} else { | 				} else { | ||||||
| 					Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", Logger::DEBUG); | 					Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", Logger::DEBUG); | ||||||
| 				} | 				} | ||||||
|  | @ -631,10 +684,13 @@ class Worker | ||||||
| 	/** | 	/** | ||||||
| 	 * @brief Checks if the number of active workers exceeds the given limits | 	 * @brief Checks if the number of active workers exceeds the given limits | ||||||
| 	 * | 	 * | ||||||
|  | 	 * @param integer $entries Total number of queue entries | ||||||
|  | 	 * @param integer $deferred Number of deferred queue entries | ||||||
|  | 	 * | ||||||
| 	 * @return bool Are there too much workers running? | 	 * @return bool Are there too much workers running? | ||||||
| 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function tooMuchWorkers() | 	public static function tooMuchWorkers($entries = 0, $deferred = 0) | ||||||
| 	{ | 	{ | ||||||
| 		$queues = Config::get("system", "worker_queues", 4); | 		$queues = Config::get("system", "worker_queues", 4); | ||||||
| 
 | 
 | ||||||
|  | @ -659,10 +715,13 @@ class Worker | ||||||
| 			$processlist = ''; | 			$processlist = ''; | ||||||
| 
 | 
 | ||||||
| 			if (Config::get('system', 'worker_jpm')) { | 			if (Config::get('system', 'worker_jpm')) { | ||||||
| 				$intervals = [1, 10, 60]; | 				$intervals = explode(',', Config::get('system', 'worker_jpm_range')); | ||||||
| 				$jobs_per_minute = []; | 				$jobs_per_minute = []; | ||||||
| 				foreach ($intervals as $interval) { | 				foreach ($intervals as $interval) { | ||||||
|  | 					$stamp = (float)microtime(true); | ||||||
| 					$jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); | 					$jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); | ||||||
|  | 					self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 					self::$db_duration_stat += (microtime(true) - $stamp); | ||||||
| 					if ($job = DBA::fetch($jobs)) { | 					if ($job = DBA::fetch($jobs)) { | ||||||
| 						$jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); | 						$jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); | ||||||
| 					} | 					} | ||||||
|  | @ -671,33 +730,51 @@ class Worker | ||||||
| 				$processlist = ' - jpm: '.implode('/', $jobs_per_minute); | 				$processlist = ' - jpm: '.implode('/', $jobs_per_minute); | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
|  | 			// Create a list of queue entries grouped by their priority
 | ||||||
|  | 			$listitem = [0 => '']; | ||||||
|  | 
 | ||||||
|  | 			$idle_workers = $active; | ||||||
|  | 
 | ||||||
| 			if (Config::get('system', 'worker_debug')) { | 			if (Config::get('system', 'worker_debug')) { | ||||||
| 				// Create a list of queue entries grouped by their priority
 |  | ||||||
| 				$listitem = [0 => '']; |  | ||||||
| 
 |  | ||||||
| 				$idle_workers = $active; |  | ||||||
| 
 |  | ||||||
| 				// Now adding all processes with workerqueue entries
 | 				// Now adding all processes with workerqueue entries
 | ||||||
| 				$entries = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`"); | 				$stamp = (float)microtime(true); | ||||||
| 				while ($entry = DBA::fetch($entries)) { | 				$jobs = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` AND `next_try` < ? GROUP BY `priority`", DateTimeFormat::utcNow()); | ||||||
|  | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 				self::$db_duration_stat += (microtime(true) - $stamp); | ||||||
|  | 				while ($entry = DBA::fetch($jobs)) { | ||||||
|  | 					$stamp = (float)microtime(true); | ||||||
| 					$processes = DBA::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `priority` = ?", $entry["priority"]); | 					$processes = DBA::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `priority` = ?", $entry["priority"]); | ||||||
|  | 					self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 					self::$db_duration_stat += (microtime(true) - $stamp); | ||||||
| 					if ($process = DBA::fetch($processes)) { | 					if ($process = DBA::fetch($processes)) { | ||||||
| 						$idle_workers -= $process["running"]; | 						$idle_workers -= $process["running"]; | ||||||
| 						$listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; | 						$listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; | ||||||
| 					} | 					} | ||||||
| 					DBA::close($processes); | 					DBA::close($processes); | ||||||
| 				} | 				} | ||||||
| 				DBA::close($entries); | 				DBA::close($jobs); | ||||||
|  | 			} else { | ||||||
|  | 				$stamp = (float)microtime(true); | ||||||
|  | 				$jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority`"); | ||||||
|  | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 				$listitem[0] = "0:" . max(0, $idle_workers); | 				while ($entry = DBA::fetch($jobs)) { | ||||||
| 
 | 					$idle_workers -= $entry["running"]; | ||||||
| 				$processlist .= ' ('.implode(', ', $listitem).')'; | 					$listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"]; | ||||||
|  | 				} | ||||||
|  | 				DBA::close($jobs); | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			$deferred = self::deferredEntries(); | 			$listitem[0] = "0:" . max(0, $idle_workers); | ||||||
| 			$entries = max(self::totalEntries() - $deferred, 0); |  | ||||||
| 
 | 
 | ||||||
| 			if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) { | 			$processlist .= ' ('.implode(', ', $listitem).')'; | ||||||
|  | 
 | ||||||
|  | 			if (empty($deferred) && empty($entries)) { | ||||||
|  | 				$deferred = self::deferredEntries(); | ||||||
|  | 				$entries = max(self::totalEntries() - $deferred, 0); | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && self::entriesExists() && ($active >= $queues)) { | ||||||
| 				$top_priority = self::highestPriority(); | 				$top_priority = self::highestPriority(); | ||||||
| 				$high_running = self::processWithPriorityActive($top_priority); | 				$high_running = self::processWithPriorityActive($top_priority); | ||||||
| 
 | 
 | ||||||
|  | @ -707,7 +784,7 @@ class Worker | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $entries . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG); | 			Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . ($entries - $deferred) . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG); | ||||||
| 
 | 
 | ||||||
| 			// Are there fewer workers running as possible? Then fork a new one.
 | 			// Are there fewer workers running as possible? Then fork a new one.
 | ||||||
| 			if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { | 			if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { | ||||||
|  | @ -736,7 +813,10 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	private static function activeWorkers() | 	private static function activeWorkers() | ||||||
| 	{ | 	{ | ||||||
| 		return DBA::count('process', ['command' => 'Worker.php']); | 		$stamp = (float)microtime(true); | ||||||
|  | 		$count = DBA::count('process', ['command' => 'Worker.php']); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		return $count; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -753,11 +833,13 @@ class Worker | ||||||
| 	{ | 	{ | ||||||
| 		$highest_priority = 0; | 		$highest_priority = 0; | ||||||
| 
 | 
 | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$r = DBA::p( | 		$r = DBA::p( | ||||||
| 			"SELECT `priority`
 | 			"SELECT `priority`
 | ||||||
| 				FROM `process` | 				FROM `process` | ||||||
| 				INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`" | 				INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`" | ||||||
| 		); | 		); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		// No active processes at all? Fine
 | 		// No active processes at all? Fine
 | ||||||
| 		if (!DBA::isResult($r)) { | 		if (!DBA::isResult($r)) { | ||||||
|  | @ -829,6 +911,7 @@ class Worker | ||||||
| 		$ids = []; | 		$ids = []; | ||||||
| 		if (self::passingSlow($highest_priority)) { | 		if (self::passingSlow($highest_priority)) { | ||||||
| 			// Are there waiting processes with a higher priority than the currently highest?
 | 			// Are there waiting processes with a higher priority than the currently highest?
 | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			$result = DBA::select( | 			$result = DBA::select( | ||||||
| 				'workerqueue', | 				'workerqueue', | ||||||
| 				['id'], | 				['id'], | ||||||
|  | @ -836,6 +919,7 @@ class Worker | ||||||
| 				$highest_priority, DateTimeFormat::utcNow()], | 				$highest_priority, DateTimeFormat::utcNow()], | ||||||
| 				['limit' => $limit, 'order' => ['priority', 'created']] | 				['limit' => $limit, 'order' => ['priority', 'created']] | ||||||
| 			); | 			); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 			while ($id = DBA::fetch($result)) { | 			while ($id = DBA::fetch($result)) { | ||||||
| 				$ids[] = $id["id"]; | 				$ids[] = $id["id"]; | ||||||
|  | @ -846,6 +930,7 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 			if (!$found) { | 			if (!$found) { | ||||||
| 				// Give slower processes some processing time
 | 				// Give slower processes some processing time
 | ||||||
|  | 				$stamp = (float)microtime(true); | ||||||
| 				$result = DBA::select( | 				$result = DBA::select( | ||||||
| 					'workerqueue', | 					'workerqueue', | ||||||
| 					['id'], | 					['id'], | ||||||
|  | @ -853,6 +938,7 @@ class Worker | ||||||
| 					$highest_priority, DateTimeFormat::utcNow()], | 					$highest_priority, DateTimeFormat::utcNow()], | ||||||
| 					['limit' => $limit, 'order' => ['priority', 'created']] | 					['limit' => $limit, 'order' => ['priority', 'created']] | ||||||
| 				); | 				); | ||||||
|  | 				self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 				while ($id = DBA::fetch($result)) { | 				while ($id = DBA::fetch($result)) { | ||||||
| 					$ids[] = $id["id"]; | 					$ids[] = $id["id"]; | ||||||
|  | @ -866,6 +952,7 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 | 		// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 | ||||||
| 		if (!$found) { | 		if (!$found) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			$result = DBA::select( | 			$result = DBA::select( | ||||||
| 				'workerqueue', | 				'workerqueue', | ||||||
| 				['id'], | 				['id'], | ||||||
|  | @ -873,6 +960,7 @@ class Worker | ||||||
| 				DateTimeFormat::utcNow()], | 				DateTimeFormat::utcNow()], | ||||||
| 				['limit' => $limit, 'order' => ['priority', 'created']] | 				['limit' => $limit, 'order' => ['priority', 'created']] | ||||||
| 			); | 			); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 			while ($id = DBA::fetch($result)) { | 			while ($id = DBA::fetch($result)) { | ||||||
| 				$ids[] = $id["id"]; | 				$ids[] = $id["id"]; | ||||||
|  | @ -883,9 +971,12 @@ class Worker | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if ($found) { | 		if ($found) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			$condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`"; | 			$condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`"; | ||||||
| 			array_unshift($ids, $condition); | 			array_unshift($ids, $condition); | ||||||
| 			DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); | 			DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 			self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		return $found; | 		return $found; | ||||||
|  | @ -895,40 +986,41 @@ class Worker | ||||||
| 	 * @brief Returns the next worker process | 	 * @brief Returns the next worker process | ||||||
| 	 * | 	 * | ||||||
| 	 * @param boolean $passing_slow Returns if we had passed low priority processes | 	 * @param boolean $passing_slow Returns if we had passed low priority processes | ||||||
|  | 	 * @param integer $entries Returns total number of queue entries | ||||||
|  | 	 * @param integer $deferred Returns number of deferred queue entries | ||||||
|  | 	 * | ||||||
| 	 * @return string SQL statement | 	 * @return string SQL statement | ||||||
| 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | 	 * @throws \Friendica\Network\HTTPException\InternalServerErrorException | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function workerProcess(&$passing_slow) | 	public static function workerProcess(&$passing_slow, &$entries, &$deferred) | ||||||
| 	{ | 	{ | ||||||
| 		$stamp = (float)microtime(true); |  | ||||||
| 
 |  | ||||||
| 		// There can already be jobs for us in the queue.
 | 		// There can already be jobs for us in the queue.
 | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); | 		$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 		if (DBA::isResult($r)) { | 		if (DBA::isResult($r)) { | ||||||
| 			self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 			return DBA::toArray($r); | 			return DBA::toArray($r); | ||||||
| 		} | 		} | ||||||
| 		DBA::close($r); | 		DBA::close($r); | ||||||
| 
 | 
 | ||||||
| 		$stamp = (float)microtime(true); |  | ||||||
| 
 |  | ||||||
| 		// Counting the rows outside the lock reduces the lock time
 | 		// Counting the rows outside the lock reduces the lock time
 | ||||||
| 		$entries = self::totalEntries(); | 		$entries = self::totalEntries(); | ||||||
| 		$deferred = self::deferredEntries(); | 		$deferred = self::deferredEntries(); | ||||||
| 
 | 
 | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		if (!Lock::acquire('worker_process')) { | 		if (!Lock::acquire('worker_process')) { | ||||||
| 			return false; | 			return false; | ||||||
| 		} | 		} | ||||||
| 		self::$lock_duration = (microtime(true) - $stamp); | 		self::$lock_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		$stamp = (float)microtime(true); |  | ||||||
| 		$found = self::findWorkerProcesses($passing_slow, $entries, $deferred); | 		$found = self::findWorkerProcesses($passing_slow, $entries, $deferred); | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); |  | ||||||
| 
 | 
 | ||||||
| 		Lock::release('worker_process'); | 		Lock::release('worker_process'); | ||||||
| 
 | 
 | ||||||
| 		if ($found) { | 		if ($found) { | ||||||
|  | 			$stamp = (float)microtime(true); | ||||||
| 			$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); | 			$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); | ||||||
|  | 			self::$db_duration += (microtime(true) - $stamp); | ||||||
| 			return DBA::toArray($r); | 			return DBA::toArray($r); | ||||||
| 		} | 		} | ||||||
| 		return false; | 		return false; | ||||||
|  | @ -943,7 +1035,10 @@ class Worker | ||||||
| 	{ | 	{ | ||||||
| 		$mypid = getmypid(); | 		$mypid = getmypid(); | ||||||
| 
 | 
 | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); | 		DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -1022,9 +1117,12 @@ class Worker | ||||||
| 		$timeout = Config::get("system", "frontend_worker_timeout", 10); | 		$timeout = Config::get("system", "frontend_worker_timeout", 10); | ||||||
| 
 | 
 | ||||||
| 		/// @todo We should clean up the corresponding workerqueue entries as well
 | 		/// @todo We should clean up the corresponding workerqueue entries as well
 | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$condition = ["`created` < ? AND `command` = 'worker.php'", | 		$condition = ["`created` < ? AND `command` = 'worker.php'", | ||||||
| 				DateTimeFormat::utc("now - ".$timeout." minutes")]; | 				DateTimeFormat::utc("now - ".$timeout." minutes")]; | ||||||
| 		DBA::delete('process', $condition); | 		DBA::delete('process', $condition); | ||||||
|  | 		self::$db_duration = (microtime(true) - $stamp); | ||||||
|  | 		self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -1200,8 +1298,11 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue['priority'] . '/' . $priority, Logger::DEBUG); | 		Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue['priority'] . '/' . $priority, Logger::DEBUG); | ||||||
| 
 | 
 | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority]; | 		$fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority]; | ||||||
| 		DBA::update('workerqueue', $fields, ['id' => $id]); | 		DBA::update('workerqueue', $fields, ['id' => $id]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -1241,7 +1342,10 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function IPCSetJobState($jobs) | 	public static function IPCSetJobState($jobs) | ||||||
| 	{ | 	{ | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); | 		DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
|  | 		self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -1253,7 +1357,9 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function IPCJobsExists() | 	public static function IPCJobsExists() | ||||||
| 	{ | 	{ | ||||||
|  | 		$stamp = (float)microtime(true); | ||||||
| 		$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); | 		$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); | ||||||
|  | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		// When we don't have a row, no job is running
 | 		// When we don't have a row, no job is running
 | ||||||
| 		if (!DBA::isResult($row)) { | 		if (!DBA::isResult($row)) { | ||||||
|  |  | ||||||
|  | @ -285,7 +285,7 @@ class Cron | ||||||
| 
 | 
 | ||||||
| 			Logger::log("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact['priority'] . " " . $contact["nick"] . " " . $contact["name"]); | 			Logger::log("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact['priority'] . " " . $contact["nick"] . " " . $contact["name"]); | ||||||
| 
 | 
 | ||||||
| 			Worker::add(['priority' => $priority, 'dont_fork' => true], 'OnePoll', (int)$contact['id']); | 			Worker::add(['priority' => $priority, 'dont_fork' => true, 'force_priority' => true], 'OnePoll', (int)$contact['id']); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue