Wait for child being ready
This commit is contained in:
		
					parent
					
						
							
								69c7e9af20
							
						
					
				
			
			
				commit
				
					
						7e89bf5af8
					
				
			
		
					 2 changed files with 82 additions and 104 deletions
				
			
		|  | @ -94,101 +94,70 @@ class Worker | ||||||
| 
 | 
 | ||||||
| 		$last_check = $starttime = time(); | 		$last_check = $starttime = time(); | ||||||
| 		self::$state = self::STATE_STARTUP; | 		self::$state = self::STATE_STARTUP; | ||||||
| 		$wait_interval = self::isDaemonMode() ? 360 : 10; |  | ||||||
| 		$start = time(); |  | ||||||
| 
 | 
 | ||||||
| 		do { | 		// We fetch the next queue entry that is about to be executed
 | ||||||
| 			// We fetch the next queue entry that is about to be executed
 | 		while ($r = self::workerProcess()) { | ||||||
| 			while ($r = self::workerProcess()) { | 			if (self::IPCJobsExists(getmypid())) { | ||||||
| 				// Don't refetch when a worker fetches tasks for multiple workers
 | 				self::IPCSetJobState(false, getmypid()); | ||||||
| 				$refetched = DI::config()->get('system', 'worker_multiple_fetch'); | 			} | ||||||
| 				foreach ($r as $entry) { | 			// Don't refetch when a worker fetches tasks for multiple workers
 | ||||||
| 					// Assure that the priority is an integer value
 | 			$refetched = DI::config()->get('system', 'worker_multiple_fetch'); | ||||||
| 					$entry['priority'] = (int)$entry['priority']; | 			foreach ($r as $entry) { | ||||||
|  | 				// Assure that the priority is an integer value
 | ||||||
|  | 				$entry['priority'] = (int)$entry['priority']; | ||||||
| 
 | 
 | ||||||
| 					// The work will be done
 | 				// The work will be done
 | ||||||
| 					if (!self::execute($entry)) { | 				if (!self::execute($entry)) { | ||||||
| 						Logger::notice('Process execution failed, quitting.'); | 					Logger::notice('Process execution failed, quitting.'); | ||||||
|  | 					return; | ||||||
|  | 				} | ||||||
|  | 
 | ||||||
|  | 				// Trying to fetch new processes - but only once when successful
 | ||||||
|  | 				if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { | ||||||
|  | 					self::findWorkerProcesses(); | ||||||
|  | 					DI::lock()->release(self::LOCK_PROCESS); | ||||||
|  | 					self::$state = self::STATE_REFETCH; | ||||||
|  | 					$refetched = true; | ||||||
|  | 				} else { | ||||||
|  | 					self::$state = self::STATE_SHORT_LOOP; | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			// To avoid the quitting of multiple workers only one worker at a time will execute the check
 | ||||||
|  | 			if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { | ||||||
|  | 				self::$state = self::STATE_LONG_LOOP; | ||||||
|  | 
 | ||||||
|  | 				if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { | ||||||
|  | 				// Count active workers and compare them with a maximum value that depends on the load
 | ||||||
|  | 					if (self::tooMuchWorkers()) { | ||||||
|  | 						Logger::notice('Active worker limit reached, quitting.'); | ||||||
|  | 						DI::lock()->release(self::LOCK_WORKER); | ||||||
| 						return; | 						return; | ||||||
| 					} | 					} | ||||||
| 
 | 
 | ||||||
| 					// Trying to fetch new processes - but only once when successful
 | 					// Check free memory
 | ||||||
| 					if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { | 					if (DI::process()->isMinMemoryReached()) { | ||||||
| 						self::findWorkerProcesses(); | 						Logger::warning('Memory limit reached, quitting.'); | ||||||
| 						DI::lock()->release(self::LOCK_PROCESS); |  | ||||||
| 						self::$state = self::STATE_REFETCH; |  | ||||||
| 						$refetched = true; |  | ||||||
| 					} else { |  | ||||||
| 						self::$state = self::STATE_SHORT_LOOP; |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 
 |  | ||||||
| 				// To avoid the quitting of multiple workers only one worker at a time will execute the check
 |  | ||||||
| 				if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { |  | ||||||
| 					self::$state = self::STATE_LONG_LOOP; |  | ||||||
| 
 |  | ||||||
| 					if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { |  | ||||||
| 					// Count active workers and compare them with a maximum value that depends on the load
 |  | ||||||
| 						if (self::tooMuchWorkers()) { |  | ||||||
| 							Logger::notice('Active worker limit reached, quitting.'); |  | ||||||
| 							DI::lock()->release(self::LOCK_WORKER); |  | ||||||
| 							return; |  | ||||||
| 						} |  | ||||||
| 
 |  | ||||||
| 						// Check free memory
 |  | ||||||
| 						if (DI::process()->isMinMemoryReached()) { |  | ||||||
| 							Logger::warning('Memory limit reached, quitting.'); |  | ||||||
| 							DI::lock()->release(self::LOCK_WORKER); |  | ||||||
| 							return; |  | ||||||
| 						} |  | ||||||
| 						DI::lock()->release(self::LOCK_WORKER); | 						DI::lock()->release(self::LOCK_WORKER); | ||||||
|  | 						return; | ||||||
| 					} | 					} | ||||||
| 					$last_check = time(); | 					DI::lock()->release(self::LOCK_WORKER); | ||||||
| 				} | 				} | ||||||
| 
 | 				$last_check = time(); | ||||||
| 				// Quit the worker once every cron interval
 |  | ||||||
| 				if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { |  | ||||||
| 					Logger::info('Process lifetime reached, respawning.'); |  | ||||||
| 					self::unclaimProcess(); |  | ||||||
| 					if (self::isDaemonMode()) { |  | ||||||
| 						self::IPCSetJobState(true); |  | ||||||
| 					} else { |  | ||||||
| 						self::spawnWorker(); |  | ||||||
| 					} |  | ||||||
| 					return; |  | ||||||
| 				} |  | ||||||
| 				$start = time(); |  | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			$seconds = (time() - $start); | 			// Quit the worker once every cron interval
 | ||||||
| 
 | 			if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { | ||||||
| 			// logarithmic wait time calculation.
 | 				Logger::info('Process lifetime reached, respawning.'); | ||||||
| 			$arg = (($seconds + 1) / ($wait_interval / 9)) + 1; | 				self::unclaimProcess(); | ||||||
| 			$sleep = min(1000000, round(log10($arg) * 1000000, 0)); | 				if (self::isDaemonMode()) { | ||||||
| 			usleep($sleep); | 					self::IPCSetJobState(true); | ||||||
| 
 | 				} else { | ||||||
| 			$timeout = ($seconds >= $wait_interval); | 					self::spawnWorker(); | ||||||
| 			Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]); |  | ||||||
| 
 |  | ||||||
| 			if (!$timeout) { |  | ||||||
| 				if (DI::process()->isMaxLoadReached()) { |  | ||||||
| 					Logger::notice('maximum load reached, quitting.'); |  | ||||||
| 					return; |  | ||||||
| 				} |  | ||||||
| 
 |  | ||||||
| 				// Kill stale processes every 5 minutes
 |  | ||||||
| 				$last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); |  | ||||||
| 				if (time() > ($last_cleanup + 300)) { |  | ||||||
| 					DI::config()->set('system', 'worker_last_cleaned', time()); |  | ||||||
| 					self::killStaleWorkers(); |  | ||||||
| 				} |  | ||||||
| 
 |  | ||||||
| 				// Check if the system is ready
 |  | ||||||
| 				if (!self::isReady()) { |  | ||||||
| 					return; |  | ||||||
| 				} | 				} | ||||||
|  | 				return; | ||||||
| 			} | 			} | ||||||
| 		} while (!$timeout); | 		} | ||||||
| 
 | 
 | ||||||
| 		// Cleaning up. Possibly not needed, but it doesn't harm anything.
 | 		// Cleaning up. Possibly not needed, but it doesn't harm anything.
 | ||||||
| 		if (self::isDaemonMode()) { | 		if (self::isDaemonMode()) { | ||||||
|  | @ -1248,33 +1217,36 @@ class Worker | ||||||
| 		} elseif ($pid) { | 		} elseif ($pid) { | ||||||
| 			// The parent process continues here
 | 			// The parent process continues here
 | ||||||
| 			DBA::connect(); | 			DBA::connect(); | ||||||
| 			Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]); | 			Logger::info('Spawned new worker', ['pid' => $pid]); | ||||||
|  | 			self::IPCSetJobState(true, $pid); | ||||||
|  | 
 | ||||||
|  | 			$cycles = 0; | ||||||
|  | 			while (self::IPCJobsExists($pid) && (++$cycles < 100)) { | ||||||
|  | 				usleep(10000); | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// We now are in the new worker
 | 		// We now are in the new worker
 | ||||||
| 		DBA::connect(); | 		DBA::connect(); | ||||||
| 		Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]); | 		Logger::info('Worker spawned', ['pid' => getmypid()]); | ||||||
| 
 | 
 | ||||||
| 		DI::process()->start(); | 		$cycles = 0; | ||||||
|  | 		while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { | ||||||
|  | 			usleep(10000); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]); | ||||||
| 
 | 
 | ||||||
| 		self::processQueue($do_cron); | 		self::processQueue($do_cron); | ||||||
| 
 | 
 | ||||||
| 		self::unclaimProcess(); | 		self::unclaimProcess(); | ||||||
| 
 | 
 | ||||||
|  | 		self::IPCSetJobState(false, getmypid()); | ||||||
| 		DI::process()->end(); | 		DI::process()->end(); | ||||||
| 		Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]); | 		Logger::info('Worker ended', ['pid' => getmypid()]); | ||||||
| 
 |  | ||||||
| 		DBA::disconnect(); |  | ||||||
| /* |  | ||||||
| 		$php = '/usr/bin/php'; |  | ||||||
| 		$param = ['bin/worker.php']; |  | ||||||
| 		if ($do_cron) { |  | ||||||
| 			$param[] = 'no_cron'; |  | ||||||
| 		} |  | ||||||
| 		pcntl_exec($php, $param); |  | ||||||
| 		Logger::warning('Error calling worker', ['cron' => $do_cron, 'pid' => getmypid()]); |  | ||||||
| */ |  | ||||||
| 		exit(); | 		exit(); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -1287,14 +1259,16 @@ class Worker | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function spawnWorker($do_cron = false) | 	public static function spawnWorker($do_cron = false) | ||||||
| 	{ | 	{ | ||||||
| 		if (self::isDaemonMode()) { | 		if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { | ||||||
| 			self::forkProcess($do_cron); | 			self::forkProcess($do_cron); | ||||||
| 			self::IPCSetJobState(false); |  | ||||||
| 		} else { | 		} else { | ||||||
| 			$process = new Core\Process(DI::logger(), DI::mode(), DI::config(), | 			$process = new Core\Process(DI::logger(), DI::mode(), DI::config(), | ||||||
| 				DI::modelProcess(), DI::app()->getBasePath(), getmypid()); | 				DI::modelProcess(), DI::app()->getBasePath(), getmypid()); | ||||||
| 			$process->run('bin/worker.php', ['no_cron' => !$do_cron]); | 			$process->run('bin/worker.php', ['no_cron' => !$do_cron]); | ||||||
| 		} | 		} | ||||||
|  | 		if (self::isDaemonMode()) { | ||||||
|  | 			self::IPCSetJobState(false); | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
|  | @ -1505,10 +1479,10 @@ class Worker | ||||||
| 	 * @param boolean $jobs Is there a waiting job? | 	 * @param boolean $jobs Is there a waiting job? | ||||||
| 	 * @throws \Exception | 	 * @throws \Exception | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function IPCSetJobState($jobs) | 	public static function IPCSetJobState(bool $jobs, int $key = 0) | ||||||
| 	{ | 	{ | ||||||
| 		$stamp = (float)microtime(true); | 		$stamp = (float)microtime(true); | ||||||
| 		DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); | 		DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]); | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 		self::$db_duration_write += (microtime(true) - $stamp); | 		self::$db_duration_write += (microtime(true) - $stamp); | ||||||
| 	} | 	} | ||||||
|  | @ -1519,10 +1493,10 @@ class Worker | ||||||
| 	 * @return bool | 	 * @return bool | ||||||
| 	 * @throws \Exception | 	 * @throws \Exception | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function IPCJobsExists() | 	public static function IPCJobsExists(int $key = 0) | ||||||
| 	{ | 	{ | ||||||
| 		$stamp = (float)microtime(true); | 		$stamp = (float)microtime(true); | ||||||
| 		$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); | 		$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]); | ||||||
| 		self::$db_duration += (microtime(true) - $stamp); | 		self::$db_duration += (microtime(true) - $stamp); | ||||||
| 
 | 
 | ||||||
| 		// When we don't have a row, no job is running
 | 		// When we don't have a row, no job is running
 | ||||||
|  |  | ||||||
|  | @ -538,6 +538,10 @@ return [ | ||||||
| 		// Number of worker tasks that are fetched in a single query.
 | 		// Number of worker tasks that are fetched in a single query.
 | ||||||
| 		'worker_fetch_limit' => 1, | 		'worker_fetch_limit' => 1, | ||||||
| 
 | 
 | ||||||
|  | 		// worker_fork (Boolean)
 | ||||||
|  | 		// Experimental setting. use pcntl_fork to spawn a new worker process
 | ||||||
|  | 		'worker_fork' => false, | ||||||
|  | 
 | ||||||
| 		// worker_jpm (Boolean)
 | 		// worker_jpm (Boolean)
 | ||||||
| 		// If enabled, it prints out the jobs per minute.
 | 		// If enabled, it prints out the jobs per minute.
 | ||||||
| 		'worker_jpm' => false, | 		'worker_jpm' => false, | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue