diff --git a/bin/daemon.php b/bin/daemon.php index cc86a9f201..fcdd735668 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -224,8 +224,10 @@ while (true) { usleep($sleep); $pid = pcntl_waitpid(-1, $status, WNOHANG); - Logger::info('Checked children status via pcntl_waitpid', ['pid' => $pid, 'status' => $status]); - + if ($pid > 0) { + Logger::info('Children quit via pcntl_waitpid', ['pid' => $pid, 'status' => $status]); + } + $timeout = ($seconds >= $wait_interval); } while (!$timeout && !Worker::IPCJobsExists()); diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 559ae68370..bb17e430c1 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -94,67 +94,101 @@ class Worker $last_check = $starttime = time(); self::$state = self::STATE_STARTUP; + $wait_interval = self::isDaemonMode() ? 360 : 10; + $start = time(); - // We fetch the next queue entry that is about to be executed - while ($r = self::workerProcess()) { - // Don't refetch when a worker fetches tasks for multiple workers - $refetched = DI::config()->get('system', 'worker_multiple_fetch'); - foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; + do { + // We fetch the next queue entry that is about to be executed + while ($r = self::workerProcess()) { + // Don't refetch when a worker fetches tasks for multiple workers + $refetched = DI::config()->get('system', 'worker_multiple_fetch'); + foreach ($r as $entry) { + // Assure that the priority is an integer value + $entry['priority'] = (int)$entry['priority']; - // The work will be done - if (!self::execute($entry)) { - Logger::notice('Process execution failed, quitting.'); + // The work will be done + if (!self::execute($entry)) { + Logger::notice('Process execution failed, quitting.'); + return; + } + + // Trying to fetch new processes - but only once when successful + if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { + self::findWorkerProcesses(); + DI::lock()->release(self::LOCK_PROCESS); + self::$state = self::STATE_REFETCH; + $refetched = true; + } else { + self::$state = self::STATE_SHORT_LOOP; + } + } + + // To avoid the quitting of multiple workers only one worker at a time will execute the check + if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { + self::$state = self::STATE_LONG_LOOP; + + if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (self::tooMuchWorkers()) { + Logger::notice('Active worker limit reached, quitting.'); + DI::lock()->release(self::LOCK_WORKER); + return; + } + + // Check free memory + if (DI::process()->isMinMemoryReached()) { + Logger::warning('Memory limit reached, quitting.'); + DI::lock()->release(self::LOCK_WORKER); + return; + } + DI::lock()->release(self::LOCK_WORKER); + } + $last_check = time(); + } + + // Quit the worker once every cron interval + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + Logger::info('Process lifetime reached, respawning.'); + self::unclaimProcess(); + if (self::isDaemonMode()) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); + } + return; + } + $start = time(); + } + + $seconds = (time() - $start); + + // logarithmic wait time calculation. + $arg = (($seconds + 1) / ($wait_interval / 9)) + 1; + $sleep = min(1000000, round(log10($arg) * 1000000, 0)); + usleep($sleep); + + $timeout = ($seconds >= $wait_interval); + Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]); + + if (!$timeout) { + if (DI::process()->isMaxLoadReached()) { + Logger::notice('maximum load reached, quitting.'); return; } - // Trying to fetch new processes - but only once when successful - if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { - self::findWorkerProcesses(); - DI::lock()->release(self::LOCK_PROCESS); - self::$state = self::STATE_REFETCH; - $refetched = true; - } else { - self::$state = self::STATE_SHORT_LOOP; + // Kill stale processes every 5 minutes + $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); + if (time() > ($last_cleanup + 300)) { + DI::config()->set('system', 'worker_last_cleaned', time()); + self::killStaleWorkers(); } + + // Check if the system is ready + if (!self::isReady()) { + return; + } } - - // To avoid the quitting of multiple workers only one worker at a time will execute the check - if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { - self::$state = self::STATE_LONG_LOOP; - - if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers()) { - Logger::notice('Active worker limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } - - // Check free memory - if (DI::process()->isMinMemoryReached()) { - Logger::warning('Memory limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } - DI::lock()->release(self::LOCK_WORKER); - } - $last_check = time(); - } - - // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { - Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); - if (self::isDaemonMode()) { - self::IPCSetJobState(true); - } else { - self::spawnWorker(); - } - return; - } - } + } while (!$timeout); // Cleaning up. Possibly not needed, but it doesn't harm anything. if (self::isDaemonMode()) {