Refetch new jobs only once per execution loop
This commit is contained in:
		
					parent
					
						
							
								5df246e9af
							
						
					
				
			
			
				commit
				
					
						ea71a4e83a
					
				
			
		
					 1 changed files with 24 additions and 21 deletions
				
			
		| 
						 | 
				
			
			@ -23,9 +23,9 @@ use Friendica\Util\Network;
 | 
			
		|||
class Worker
 | 
			
		||||
{
 | 
			
		||||
	const STATE_STARTUP    = 1; // Worker is in startup. This takes most time.
 | 
			
		||||
	const STATE_SHORT_LOOP = 2; // Worker is processing preassigned jobs, thus saving much time.
 | 
			
		||||
	const STATE_LONG_LOOP  = 2; // Worker is processing the whole - long - loop.
 | 
			
		||||
	const STATE_REFETCH    = 3; // Worker had refetched jobs in the execution loop.
 | 
			
		||||
	const STATE_LONG_LOOP  = 4; // Worker is processing the whole - long - loop.
 | 
			
		||||
	const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
 | 
			
		||||
 | 
			
		||||
	private static $up_start;
 | 
			
		||||
	private static $db_duration = 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -102,6 +102,7 @@ class Worker
 | 
			
		|||
 | 
			
		||||
		// We fetch the next queue entry that is about to be executed
 | 
			
		||||
		while ($r = self::workerProcess()) {
 | 
			
		||||
			$refetched = false;
 | 
			
		||||
			foreach ($r as $entry) {
 | 
			
		||||
				// Assure that the priority is an integer value
 | 
			
		||||
				$entry['priority'] = (int)$entry['priority'];
 | 
			
		||||
| 
						 | 
				
			
			@ -112,34 +113,37 @@ class Worker
 | 
			
		|||
					return;
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// If possible we will fetch new jobs for this worker
 | 
			
		||||
				if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
 | 
			
		||||
				// Trying to fetch new processes - but only once when successful
 | 
			
		||||
				if (!$refetched && Lock::acquire('worker_process', 0)) {
 | 
			
		||||
					self::findWorkerProcesses();
 | 
			
		||||
					Lock::release('worker_process');
 | 
			
		||||
					self::$state = self::STATE_REFETCH;
 | 
			
		||||
					$refetched = true;
 | 
			
		||||
				} else {
 | 
			
		||||
					self::$state = self::STATE_SHORT_LOOP;
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if (self::$state != self::STATE_REFETCH) {
 | 
			
		||||
				self::$state = self::STATE_LONG_LOOP;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// To avoid the quitting of multiple workers only one worker at a time will execute the check
 | 
			
		||||
			if (Lock::acquire('worker', 0)) {
 | 
			
		||||
				// Count active workers and compare them with a maximum value that depends on the load
 | 
			
		||||
				if (self::tooMuchWorkers()) {
 | 
			
		||||
					Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
 | 
			
		||||
					Lock::release('worker');
 | 
			
		||||
					return;
 | 
			
		||||
				}
 | 
			
		||||
			if (!self::getWaitingJobForPID()) {
 | 
			
		||||
				self::$state = self::STATE_LONG_LOOP;
 | 
			
		||||
 | 
			
		||||
				// Check free memory
 | 
			
		||||
				if ($a->isMinMemoryReached()) {
 | 
			
		||||
					Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
 | 
			
		||||
				if (Lock::acquire('worker', 0)) {
 | 
			
		||||
				// Count active workers and compare them with a maximum value that depends on the load
 | 
			
		||||
					if (self::tooMuchWorkers()) {
 | 
			
		||||
						Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
 | 
			
		||||
						Lock::release('worker');
 | 
			
		||||
						return;
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					// Check free memory
 | 
			
		||||
					if ($a->isMinMemoryReached()) {
 | 
			
		||||
						Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
 | 
			
		||||
						Lock::release('worker');
 | 
			
		||||
						return;
 | 
			
		||||
					}
 | 
			
		||||
					Lock::release('worker');
 | 
			
		||||
					return;
 | 
			
		||||
				}
 | 
			
		||||
				Lock::release('worker');
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Quit the worker once every cron interval
 | 
			
		||||
| 
						 | 
				
			
			@ -424,7 +428,6 @@ class Worker
 | 
			
		|||
		self::$db_duration_stat = 0;
 | 
			
		||||
		self::$db_duration_write = 0;
 | 
			
		||||
		self::$lock_duration = 0;
 | 
			
		||||
		self::$state = self::STATE_SHORT_LOOP;
 | 
			
		||||
 | 
			
		||||
		if ($duration > 3600) {
 | 
			
		||||
			$logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue