From 9a6141dcbeb8c0334356d7e92060748906fd6e46 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 1 Jan 2021 19:35:29 +0000 Subject: [PATCH 01/11] Use "pcntl_fork" to fork processes --- bin/daemon.php | 42 ++++++++++++++++++---------------- bin/worker.php | 4 +++- index.php | 2 ++ src/App/Mode.php | 36 +++++++++++++++++++++++++++++ src/Core/Worker.php | 55 +++++++++++++++++++++++++++++++++++++++------ 5 files changed, 112 insertions(+), 27 deletions(-) diff --git a/bin/daemon.php b/bin/daemon.php index d08aa37d15..ec507305cc 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -29,6 +29,7 @@ if (php_sapi_name() !== 'cli') { } use Dice\Dice; +use Friendica\App\Mode; use Friendica\Core\Logger; use Friendica\Core\Worker; use Friendica\Database\DBA; @@ -65,6 +66,8 @@ if (DI::mode()->isInstall()) { die("Friendica isn't properly installed yet.\n"); } +DI::mode()->setExecutor(Mode::DAEMON); + DI::config()->load(); if (empty(DI::config()->get('system', 'pidfile'))) { @@ -144,34 +147,35 @@ Logger::notice('Starting worker daemon.', ["pid" => $pid]); if (!$foreground) { echo "Starting worker daemon.\n"; - // Switch over to daemon mode. - if ($pid = pcntl_fork()) { - return; // Parent - } - - fclose(STDIN); // Close all of the standard - - // Enabling this seem to block a running php process with 100% CPU usage when there is an outpout - // fclose(STDOUT); // file descriptors as we - // fclose(STDERR); // are running as a daemon. - DBA::disconnect(); + // Fork a daemon process + $pid = pcntl_fork(); + if ($pid == -1) { + echo "Daemon couldn't be forked.\n"; + Logger::warning('Could not fork daemon'); + exit(1); + } elseif ($pid) { + // The parent process continues here + echo 'Child process started with pid ' . $pid . ".\n"; + Logger::notice('Child process started', ['pid' => $pid]); + file_put_contents($pidfile, $pid); + exit(0); + } + + // We now are in the child process register_shutdown_function('shutdown'); + // Make the child the main process, detach it from the terminal if (posix_setsid() < 0) { return; } - if ($pid = pcntl_fork()) { - return; // Parent - } + // Closing all existing connections with the outside + fclose(STDIN); - $pid = getmypid(); - file_put_contents($pidfile, $pid); - - // We lose the database connection upon forking - DBA::reconnect(); + // And now connect the database again + DBA::connect(); } DI::config()->set('system', 'worker_daemon_mode', true); diff --git a/bin/worker.php b/bin/worker.php index 5698cf16dd..52400a045a 100755 --- a/bin/worker.php +++ b/bin/worker.php @@ -28,7 +28,7 @@ if (php_sapi_name() !== 'cli') { use Dice\Dice; use Friendica\App; -use Friendica\Core\Process; +use Friendica\App\Mode; use Friendica\Core\Update; use Friendica\Core\Worker; use Friendica\DI; @@ -59,6 +59,8 @@ $dice = $dice->addRule(LoggerInterface::class,['constructParams' => ['worker']]) DI::init($dice); $a = DI::app(); +DI::mode()->setExecutor(Mode::WORKER); + // Check the database structure and possibly fixes it Update::check($a->getBasePath(), true, DI::mode()); diff --git a/index.php b/index.php index fdb15fdd88..baa6818b09 100644 --- a/index.php +++ b/index.php @@ -36,6 +36,8 @@ $dice = $dice->addRule(Friendica\App\Mode::class, ['call' => [['determineRunMode $a = \Friendica\DI::app(); +\Friendica\DI::mode()->setExecutor(\Friendica\App\Mode::INDEX); + $a->runFrontend( $dice->create(\Friendica\App\Module::class), $dice->create(\Friendica\App\Router::class), diff --git a/src/App/Mode.php b/src/App/Mode.php index e19de8f07f..8aa812c93d 100644 --- a/src/App/Mode.php +++ b/src/App/Mode.php @@ -38,6 +38,11 @@ class Mode const DBCONFIGAVAILABLE = 4; const MAINTENANCEDISABLED = 8; + const UNDEFINED = 0; + const INDEX = 1; + const DAEMON = 2; + const WORKER = 3; + const BACKEND_CONTENT_TYPES = ['application/jrd+json', 'text/xml', 'application/rss+xml', 'application/atom+xml', 'application/activity+json']; @@ -47,6 +52,12 @@ class Mode */ private $mode; + /*** + * @var int Who executes this Application + * + */ + private $executor = self::UNDEFINED; + /** * @var bool True, if the call is a backend call */ @@ -163,6 +174,31 @@ class Mode return ($this->mode & $mode) > 0; } + /** + * Set the execution mode + * + * @param integer $executor Execution Mode + * @return void + */ + public function setExecutor(int $executor) + { + $this->executor = $executor; + + // Daemon and worker are always backend + if (in_array($executor, [self::DAEMON, self::WORKER])) { + $this->isBackend = true; + } + } + + /*isBackend = true;* + * get the execution mode + * + * @return int Execution Mode + */ + public function getExecutor() + { + return $this->executor; + } /** * Install mode is when the local config file is missing or the DB schema hasn't been installed yet. diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 2f39a82fe5..e90747e41a 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -21,6 +21,7 @@ namespace Friendica\Core; +use Friendica\App\Mode; use Friendica\Core; use Friendica\Database\DBA; use Friendica\DI; @@ -1180,6 +1181,44 @@ class Worker self::killStaleWorkers(); } + /** + * Fork a child process + * + * @param boolean $do_cron + * @return void + */ + private static function forkProcess(bool $do_cron) + { + // Children inherit their parent's database connection. + // To avoid problems we disconnect and connect both parent and child + DBA::disconnect(); + $pid = pcntl_fork(); + if ($pid == -1) { + DBA::connect(); + Logger::warning('Could not spawn worker'); + return; + } elseif ($pid) { + // The parent process continues here + DBA::connect(); + Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]); + return; + } + + // We now are in the new worker + DBA::connect(); + Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]); + + DI::process()->start(); + + self::processQueue($do_cron); + + self::unclaimProcess(); + + DI::process()->end(); + Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]); + exit(); + } + /** * Spawns a new worker * @@ -1189,13 +1228,15 @@ class Worker */ public static function spawnWorker($do_cron = false) { - $command = 'bin/worker.php'; - - $args = ['no_cron' => !$do_cron]; - - $a = DI::app(); - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid()); - $process->run($command, $args); + // Worker and daemon are started from the command line. + // This means that this is executed by a PHP interpreter without runtime limitations + if (in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) { + self::forkProcess($do_cron); + } else { + $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), + DI::modelProcess(), DI::app()->getBasePath(), getmypid()); + $process->run('bin/worker.php', ['no_cron' => !$do_cron]); + } // after spawning we have to remove the flag. if (DI::config()->get('system', 'worker_daemon_mode', false)) { From 78f67c1e0ef699d47966b2560f6e43eb5e63ddc0 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 1 Jan 2021 23:10:38 +0000 Subject: [PATCH 02/11] Check for reaching the memory limit --- src/Core/Worker.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index e90747e41a..009912ee7f 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1189,6 +1189,11 @@ class Worker */ private static function forkProcess(bool $do_cron) { + if (DI::process()->isMinMemoryReached()) { + Logger::warning('Memory limit reached - quitting'); + return; + } + // Children inherit their parent's database connection. // To avoid problems we disconnect and connect both parent and child DBA::disconnect(); @@ -1230,7 +1235,7 @@ class Worker { // Worker and daemon are started from the command line. // This means that this is executed by a PHP interpreter without runtime limitations - if (in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) { + if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) { self::forkProcess($do_cron); } else { $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), From 7a03b72060c915271f0037e9f2a72e2beccc9607 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 2 Jan 2021 08:43:55 +0000 Subject: [PATCH 03/11] Improved daemon test --- src/Core/Worker.php | 64 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 009912ee7f..eb769ad1c0 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -50,6 +50,7 @@ class Worker private static $lock_duration = 0; private static $last_update; private static $state; + private static $daemon_mode = null; /** * Processes the tasks that are in the workerqueue table @@ -146,13 +147,17 @@ class Worker if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); self::unclaimProcess(); - self::spawnWorker(); + if (self::isDaemonMode()) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); + } return; } } // Cleaning up. Possibly not needed, but it doesn't harm anything. - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(false); } Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); @@ -190,7 +195,7 @@ class Worker Logger::warning('Maximum processes reached, quitting.'); return false; } - + return true; } @@ -771,7 +776,7 @@ class Worker // Are there fewer workers running as possible? Then fork a new one. if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]); - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(true); } else { self::spawnWorker(); @@ -780,7 +785,7 @@ class Worker } // if there are too much worker, we don't spawn a new one. - if (DI::config()->get('system', 'worker_daemon_mode', false) && ($active > $queues)) { + if (self::isDaemonMode() && ($active > $queues)) { self::IPCSetJobState(false); } @@ -1208,7 +1213,7 @@ class Worker Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]); return; } - + // We now are in the new worker DBA::connect(); Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]); @@ -1233,6 +1238,7 @@ class Worker */ public static function spawnWorker($do_cron = false) { + Logger::notice("Spawn", ['do_cron' => $do_cron, 'callstack' => System::callstack(20)]); // Worker and daemon are started from the command line. // This means that this is executed by a PHP interpreter without runtime limitations if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) { @@ -1244,7 +1250,7 @@ class Worker } // after spawning we have to remove the flag. - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(false); } } @@ -1336,7 +1342,7 @@ class Worker } // Set the IPC flag to ensure an immediate process execution via daemon - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(true); } @@ -1361,7 +1367,7 @@ class Worker } // Quit on daemon mode - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { return $added; } @@ -1485,6 +1491,46 @@ class Worker return (bool)$row['jobs']; } + /** + * Checks if the worker is running in the daemon mode. + * + * @return boolean + */ + public static function isDaemonMode() + { + if (!is_null(self::$daemon_mode)) { + return self::$daemon_mode; + } + + if (DI::mode()->getExecutor() == Mode::DAEMON) { + return true; + } + + $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true); + if ($daemon_mode) { + return $daemon_mode; + } + + $pidfile = DI::config()->get('system', 'pidfile'); + if (empty($pidfile)) { + // No pid file, no daemon + self::$daemon_mode = false; + return false; + } + + if (!is_readable($pidfile)) { + // No pid file. We assume that the daemon had been intentionally stopped. + self::$daemon_mode = false; + return false; + } + + $pid = intval(file_get_contents($pidfile)); + $running = posix_kill($pid, 0); + + self::$daemon_mode = $running; + return $running; + } + /** * Test if the daemon is running. If not, it will be started * From 74d7d7e164364c09b7fdfbf64e15168ccd56508a Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 3 Jan 2021 22:57:25 +0000 Subject: [PATCH 04/11] Check for childf status --- bin/daemon.php | 3 +++ src/Core/Worker.php | 27 ++++++++++++++++++--------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/bin/daemon.php b/bin/daemon.php index ec507305cc..cc86a9f201 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -223,6 +223,9 @@ while (true) { $sleep = min(1000000, round(log10($arg) * 1000000, 0)); usleep($sleep); + $pid = pcntl_waitpid(-1, $status, WNOHANG); + Logger::info('Checked children status via pcntl_waitpid', ['pid' => $pid, 'status' => $status]); + $timeout = ($seconds >= $wait_interval); } while (!$timeout && !Worker::IPCJobsExists()); diff --git a/src/Core/Worker.php b/src/Core/Worker.php index ec317a676d..559ae68370 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1230,6 +1230,17 @@ class Worker DI::process()->end(); Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]); + + DBA::disconnect(); +/* + $php = '/usr/bin/php'; + $param = ['bin/worker.php']; + if ($do_cron) { + $param[] = 'no_cron'; + } + pcntl_exec($php, $param); + Logger::warning('Error calling worker', ['cron' => $do_cron, 'pid' => getmypid()]); +*/ exit(); } @@ -1242,21 +1253,14 @@ class Worker */ public static function spawnWorker($do_cron = false) { - Logger::notice("Spawn", ['do_cron' => $do_cron, 'callstack' => System::callstack(20)]); - // Worker and daemon are started from the command line. - // This means that this is executed by a PHP interpreter without runtime limitations - if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) { + if (self::isDaemonMode()) { self::forkProcess($do_cron); + self::IPCSetJobState(false); } else { $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), DI::app()->getBasePath(), getmypid()); $process->run('bin/worker.php', ['no_cron' => !$do_cron]); } - - // after spawning we have to remove the flag. - if (self::isDaemonMode()) { - self::IPCSetJobState(false); - } } /** @@ -1515,6 +1519,11 @@ class Worker return $daemon_mode; } + if (!function_exists('pcntl_fork')) { + self::$daemon_mode = false; + return false; + } + $pidfile = DI::config()->get('system', 'pidfile'); if (empty($pidfile)) { // No pid file, no daemon From 69c7e9af2043404052cd488fd5edf18917dcdaa5 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 4 Jan 2021 09:20:44 +0000 Subject: [PATCH 05/11] Let the worker run for an hour in daemon mode --- bin/daemon.php | 6 +- src/Core/Worker.php | 142 +++++++++++++++++++++++++++----------------- 2 files changed, 92 insertions(+), 56 deletions(-) diff --git a/bin/daemon.php b/bin/daemon.php index cc86a9f201..fcdd735668 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -224,8 +224,10 @@ while (true) { usleep($sleep); $pid = pcntl_waitpid(-1, $status, WNOHANG); - Logger::info('Checked children status via pcntl_waitpid', ['pid' => $pid, 'status' => $status]); - + if ($pid > 0) { + Logger::info('Children quit via pcntl_waitpid', ['pid' => $pid, 'status' => $status]); + } + $timeout = ($seconds >= $wait_interval); } while (!$timeout && !Worker::IPCJobsExists()); diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 559ae68370..bb17e430c1 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -94,67 +94,101 @@ class Worker $last_check = $starttime = time(); self::$state = self::STATE_STARTUP; + $wait_interval = self::isDaemonMode() ? 360 : 10; + $start = time(); - // We fetch the next queue entry that is about to be executed - while ($r = self::workerProcess()) { - // Don't refetch when a worker fetches tasks for multiple workers - $refetched = DI::config()->get('system', 'worker_multiple_fetch'); - foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; + do { + // We fetch the next queue entry that is about to be executed + while ($r = self::workerProcess()) { + // Don't refetch when a worker fetches tasks for multiple workers + $refetched = DI::config()->get('system', 'worker_multiple_fetch'); + foreach ($r as $entry) { + // Assure that the priority is an integer value + $entry['priority'] = (int)$entry['priority']; - // The work will be done - if (!self::execute($entry)) { - Logger::notice('Process execution failed, quitting.'); + // The work will be done + if (!self::execute($entry)) { + Logger::notice('Process execution failed, quitting.'); + return; + } + + // Trying to fetch new processes - but only once when successful + if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { + self::findWorkerProcesses(); + DI::lock()->release(self::LOCK_PROCESS); + self::$state = self::STATE_REFETCH; + $refetched = true; + } else { + self::$state = self::STATE_SHORT_LOOP; + } + } + + // To avoid the quitting of multiple workers only one worker at a time will execute the check + if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { + self::$state = self::STATE_LONG_LOOP; + + if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (self::tooMuchWorkers()) { + Logger::notice('Active worker limit reached, quitting.'); + DI::lock()->release(self::LOCK_WORKER); + return; + } + + // Check free memory + if (DI::process()->isMinMemoryReached()) { + Logger::warning('Memory limit reached, quitting.'); + DI::lock()->release(self::LOCK_WORKER); + return; + } + DI::lock()->release(self::LOCK_WORKER); + } + $last_check = time(); + } + + // Quit the worker once every cron interval + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + Logger::info('Process lifetime reached, respawning.'); + self::unclaimProcess(); + if (self::isDaemonMode()) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); + } + return; + } + $start = time(); + } + + $seconds = (time() - $start); + + // logarithmic wait time calculation. + $arg = (($seconds + 1) / ($wait_interval / 9)) + 1; + $sleep = min(1000000, round(log10($arg) * 1000000, 0)); + usleep($sleep); + + $timeout = ($seconds >= $wait_interval); + Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]); + + if (!$timeout) { + if (DI::process()->isMaxLoadReached()) { + Logger::notice('maximum load reached, quitting.'); return; } - // Trying to fetch new processes - but only once when successful - if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { - self::findWorkerProcesses(); - DI::lock()->release(self::LOCK_PROCESS); - self::$state = self::STATE_REFETCH; - $refetched = true; - } else { - self::$state = self::STATE_SHORT_LOOP; + // Kill stale processes every 5 minutes + $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); + if (time() > ($last_cleanup + 300)) { + DI::config()->set('system', 'worker_last_cleaned', time()); + self::killStaleWorkers(); } + + // Check if the system is ready + if (!self::isReady()) { + return; + } } - - // To avoid the quitting of multiple workers only one worker at a time will execute the check - if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { - self::$state = self::STATE_LONG_LOOP; - - if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers()) { - Logger::notice('Active worker limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } - - // Check free memory - if (DI::process()->isMinMemoryReached()) { - Logger::warning('Memory limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } - DI::lock()->release(self::LOCK_WORKER); - } - $last_check = time(); - } - - // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { - Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); - if (self::isDaemonMode()) { - self::IPCSetJobState(true); - } else { - self::spawnWorker(); - } - return; - } - } + } while (!$timeout); // Cleaning up. Possibly not needed, but it doesn't harm anything. if (self::isDaemonMode()) { From 7e89bf5af89c18c1934b5af0d162897eda3ad7e7 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 10:18:25 +0000 Subject: [PATCH 06/11] Wait for child being ready --- src/Core/Worker.php | 182 ++++++++++++++++--------------------- static/defaults.config.php | 4 + 2 files changed, 82 insertions(+), 104 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index bb17e430c1..f84dd29475 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -94,101 +94,70 @@ class Worker $last_check = $starttime = time(); self::$state = self::STATE_STARTUP; - $wait_interval = self::isDaemonMode() ? 360 : 10; - $start = time(); - do { - // We fetch the next queue entry that is about to be executed - while ($r = self::workerProcess()) { - // Don't refetch when a worker fetches tasks for multiple workers - $refetched = DI::config()->get('system', 'worker_multiple_fetch'); - foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; + // We fetch the next queue entry that is about to be executed + while ($r = self::workerProcess()) { + if (self::IPCJobsExists(getmypid())) { + self::IPCSetJobState(false, getmypid()); + } + // Don't refetch when a worker fetches tasks for multiple workers + $refetched = DI::config()->get('system', 'worker_multiple_fetch'); + foreach ($r as $entry) { + // Assure that the priority is an integer value + $entry['priority'] = (int)$entry['priority']; - // The work will be done - if (!self::execute($entry)) { - Logger::notice('Process execution failed, quitting.'); + // The work will be done + if (!self::execute($entry)) { + Logger::notice('Process execution failed, quitting.'); + return; + } + + // Trying to fetch new processes - but only once when successful + if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { + self::findWorkerProcesses(); + DI::lock()->release(self::LOCK_PROCESS); + self::$state = self::STATE_REFETCH; + $refetched = true; + } else { + self::$state = self::STATE_SHORT_LOOP; + } + } + + // To avoid the quitting of multiple workers only one worker at a time will execute the check + if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { + self::$state = self::STATE_LONG_LOOP; + + if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (self::tooMuchWorkers()) { + Logger::notice('Active worker limit reached, quitting.'); + DI::lock()->release(self::LOCK_WORKER); return; } - // Trying to fetch new processes - but only once when successful - if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { - self::findWorkerProcesses(); - DI::lock()->release(self::LOCK_PROCESS); - self::$state = self::STATE_REFETCH; - $refetched = true; - } else { - self::$state = self::STATE_SHORT_LOOP; - } - } - - // To avoid the quitting of multiple workers only one worker at a time will execute the check - if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { - self::$state = self::STATE_LONG_LOOP; - - if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers()) { - Logger::notice('Active worker limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } - - // Check free memory - if (DI::process()->isMinMemoryReached()) { - Logger::warning('Memory limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } + // Check free memory + if (DI::process()->isMinMemoryReached()) { + Logger::warning('Memory limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); + return; } - $last_check = time(); + DI::lock()->release(self::LOCK_WORKER); } - - // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { - Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); - if (self::isDaemonMode()) { - self::IPCSetJobState(true); - } else { - self::spawnWorker(); - } - return; - } - $start = time(); + $last_check = time(); } - $seconds = (time() - $start); - - // logarithmic wait time calculation. - $arg = (($seconds + 1) / ($wait_interval / 9)) + 1; - $sleep = min(1000000, round(log10($arg) * 1000000, 0)); - usleep($sleep); - - $timeout = ($seconds >= $wait_interval); - Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]); - - if (!$timeout) { - if (DI::process()->isMaxLoadReached()) { - Logger::notice('maximum load reached, quitting.'); - return; + // Quit the worker once every cron interval + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + Logger::info('Process lifetime reached, respawning.'); + self::unclaimProcess(); + if (self::isDaemonMode()) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); } - - // Kill stale processes every 5 minutes - $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); - if (time() > ($last_cleanup + 300)) { - DI::config()->set('system', 'worker_last_cleaned', time()); - self::killStaleWorkers(); - } - - // Check if the system is ready - if (!self::isReady()) { - return; - } + return; } - } while (!$timeout); + } // Cleaning up. Possibly not needed, but it doesn't harm anything. if (self::isDaemonMode()) { @@ -1248,33 +1217,36 @@ class Worker } elseif ($pid) { // The parent process continues here DBA::connect(); - Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]); + Logger::info('Spawned new worker', ['pid' => $pid]); + self::IPCSetJobState(true, $pid); + + $cycles = 0; + while (self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]); return; } // We now are in the new worker DBA::connect(); - Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]); + Logger::info('Worker spawned', ['pid' => getmypid()]); - DI::process()->start(); + $cycles = 0; + while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]); self::processQueue($do_cron); self::unclaimProcess(); + self::IPCSetJobState(false, getmypid()); DI::process()->end(); - Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]); - - DBA::disconnect(); -/* - $php = '/usr/bin/php'; - $param = ['bin/worker.php']; - if ($do_cron) { - $param[] = 'no_cron'; - } - pcntl_exec($php, $param); - Logger::warning('Error calling worker', ['cron' => $do_cron, 'pid' => getmypid()]); -*/ + Logger::info('Worker ended', ['pid' => getmypid()]); exit(); } @@ -1287,14 +1259,16 @@ class Worker */ public static function spawnWorker($do_cron = false) { - if (self::isDaemonMode()) { + if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { self::forkProcess($do_cron); - self::IPCSetJobState(false); } else { $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), DI::app()->getBasePath(), getmypid()); $process->run('bin/worker.php', ['no_cron' => !$do_cron]); } + if (self::isDaemonMode()) { + self::IPCSetJobState(false); + } } /** @@ -1505,10 +1479,10 @@ class Worker * @param boolean $jobs Is there a waiting job? * @throws \Exception */ - public static function IPCSetJobState($jobs) + public static function IPCSetJobState(bool $jobs, int $key = 0) { $stamp = (float)microtime(true); - DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); + DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -1519,10 +1493,10 @@ class Worker * @return bool * @throws \Exception */ - public static function IPCJobsExists() + public static function IPCJobsExists(int $key = 0) { $stamp = (float)microtime(true); - $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); + $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]); self::$db_duration += (microtime(true) - $stamp); // When we don't have a row, no job is running diff --git a/static/defaults.config.php b/static/defaults.config.php index 310d1ea08e..ed0f8f871a 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -538,6 +538,10 @@ return [ // Number of worker tasks that are fetched in a single query. 'worker_fetch_limit' => 1, + // worker_fork (Boolean) + // Experimental setting. use pcntl_fork to spawn a new worker process + 'worker_fork' => false, + // worker_jpm (Boolean) // If enabled, it prints out the jobs per minute. 'worker_jpm' => false, From 17fbe5c299d5a85abe1bb862233fcc515b597d26 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 16:01:05 +0000 Subject: [PATCH 07/11] Delete IPC entries --- src/Core/Worker.php | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index f84dd29475..619cb5bfa3 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -98,8 +98,9 @@ class Worker // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { if (self::IPCJobsExists(getmypid())) { - self::IPCSetJobState(false, getmypid()); + self::IPCDeleteJobState(getmypid()); } + // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { @@ -1217,8 +1218,9 @@ class Worker } elseif ($pid) { // The parent process continues here DBA::connect(); - Logger::info('Spawned new worker', ['pid' => $pid]); + self::IPCSetJobState(true, $pid); + Logger::info('Spawned new worker', ['pid' => $pid]); $cycles = 0; while (self::IPCJobsExists($pid) && (++$cycles < 100)) { @@ -1231,15 +1233,11 @@ class Worker // We now are in the new worker DBA::connect(); + /// @todo Reinitialize the logger to set a new process_id and uid + + self::IPCSetJobState(true, getmypid()); Logger::info('Worker spawned', ['pid' => getmypid()]); - $cycles = 0; - while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { - usleep(10000); - } - - Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]); - self::processQueue($do_cron); self::unclaimProcess(); @@ -1477,6 +1475,7 @@ class Worker * Set the flag if some job is waiting * * @param boolean $jobs Is there a waiting job? + * @param int $key Key number * @throws \Exception */ public static function IPCSetJobState(bool $jobs, int $key = 0) @@ -1487,9 +1486,24 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } + /** + * Delete a key entry + * + * @param int $key Key number + * @throws \Exception + */ + public static function IPCDeleteJobState(int $key) + { + $stamp = (float)microtime(true); + DBA::delete('worker-ipc', ['key' => $key]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); + } + /** * Checks if some worker job waits to be executed * + * @param int $key Key number * @return bool * @throws \Exception */ From 2f8e873cc7b7ff56bd77480893e03bb9ff3b5315 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 16:47:55 +0000 Subject: [PATCH 08/11] Wait for parent --- src/Core/Worker.php | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 619cb5bfa3..d2468a2a80 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1234,9 +1234,13 @@ class Worker // We now are in the new worker DBA::connect(); /// @todo Reinitialize the logger to set a new process_id and uid - - self::IPCSetJobState(true, getmypid()); - Logger::info('Worker spawned', ['pid' => getmypid()]); + + $cycles = 0; + while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Worker spawned', ['pid' => getmypid(), 'wait_cycles' => $cycles]); self::processQueue($do_cron); From 6ef0f9646f4853d3ac957cb6a74d054a2e305806 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 21:39:53 +0000 Subject: [PATCH 09/11] Set the new pid --- src/Core/Process.php | 11 +++++++++++ src/Core/Worker.php | 1 + 2 files changed, 12 insertions(+) diff --git a/src/Core/Process.php b/src/Core/Process.php index 919d37dea1..447d312d44 100644 --- a/src/Core/Process.php +++ b/src/Core/Process.php @@ -77,6 +77,17 @@ class Process $this->pid = $pid; } + /** + * Set the process id + * + * @param integer $pid + * @return void + */ + public function setPid(int $pid) + { + $this->pid = $pid; + } + /** * Log active processes into the "process" table */ diff --git a/src/Core/Worker.php b/src/Core/Worker.php index d2468a2a80..54bfdedf3e 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1234,6 +1234,7 @@ class Worker // We now are in the new worker DBA::connect(); /// @todo Reinitialize the logger to set a new process_id and uid + DI::process()->setPid($pid); $cycles = 0; while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { From 594b1a75b567337e4fac13947c846723298fa87d Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 21:48:29 +0000 Subject: [PATCH 10/11] Use the correct pid --- src/Core/Worker.php | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 54bfdedf3e..504a7b9f54 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1232,6 +1232,8 @@ class Worker } // We now are in the new worker + $pid = getmypid(); + DBA::connect(); /// @todo Reinitialize the logger to set a new process_id and uid DI::process()->setPid($pid); @@ -1241,15 +1243,15 @@ class Worker usleep(10000); } - Logger::info('Worker spawned', ['pid' => getmypid(), 'wait_cycles' => $cycles]); + Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]); self::processQueue($do_cron); self::unclaimProcess(); - self::IPCSetJobState(false, getmypid()); + self::IPCSetJobState(false, $pid); DI::process()->end(); - Logger::info('Worker ended', ['pid' => getmypid()]); + Logger::info('Worker ended', ['pid' => $pid]); exit(); } From e944e5d7a2106de57fccb1a9061a17946026eb36 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 22:23:46 +0000 Subject: [PATCH 11/11] Added description --- static/defaults.config.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/static/defaults.config.php b/static/defaults.config.php index ed0f8f871a..2bc2bea5b1 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -539,7 +539,8 @@ return [ 'worker_fetch_limit' => 1, // worker_fork (Boolean) - // Experimental setting. use pcntl_fork to spawn a new worker process + // Experimental setting. Use pcntl_fork to spawn a new worker process. + // Does not work when "worker_multiple_fetch" is enabled (Needs more testing) 'worker_fork' => false, // worker_jpm (Boolean) @@ -559,6 +560,7 @@ return [ // worker_multiple_fetch (Boolean) // When activated, the worker fetches jobs for multiple workers (not only for itself). // This is an experimental setting without knowing the performance impact. + // Does not work when "worker_fork" is enabled (Needs more testing) 'worker_multiple_fetch' => false, // worker_defer_limit (Integer)