diff --git a/bin/daemon.php b/bin/daemon.php index d08aa37d15..fcdd735668 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -29,6 +29,7 @@ if (php_sapi_name() !== 'cli') { } use Dice\Dice; +use Friendica\App\Mode; use Friendica\Core\Logger; use Friendica\Core\Worker; use Friendica\Database\DBA; @@ -65,6 +66,8 @@ if (DI::mode()->isInstall()) { die("Friendica isn't properly installed yet.\n"); } +DI::mode()->setExecutor(Mode::DAEMON); + DI::config()->load(); if (empty(DI::config()->get('system', 'pidfile'))) { @@ -144,34 +147,35 @@ Logger::notice('Starting worker daemon.', ["pid" => $pid]); if (!$foreground) { echo "Starting worker daemon.\n"; - // Switch over to daemon mode. - if ($pid = pcntl_fork()) { - return; // Parent - } - - fclose(STDIN); // Close all of the standard - - // Enabling this seem to block a running php process with 100% CPU usage when there is an outpout - // fclose(STDOUT); // file descriptors as we - // fclose(STDERR); // are running as a daemon. - DBA::disconnect(); + // Fork a daemon process + $pid = pcntl_fork(); + if ($pid == -1) { + echo "Daemon couldn't be forked.\n"; + Logger::warning('Could not fork daemon'); + exit(1); + } elseif ($pid) { + // The parent process continues here + echo 'Child process started with pid ' . $pid . ".\n"; + Logger::notice('Child process started', ['pid' => $pid]); + file_put_contents($pidfile, $pid); + exit(0); + } + + // We now are in the child process register_shutdown_function('shutdown'); + // Make the child the main process, detach it from the terminal if (posix_setsid() < 0) { return; } - if ($pid = pcntl_fork()) { - return; // Parent - } + // Closing all existing connections with the outside + fclose(STDIN); - $pid = getmypid(); - file_put_contents($pidfile, $pid); - - // We lose the database connection upon forking - DBA::reconnect(); + // And now connect the database again + DBA::connect(); } DI::config()->set('system', 'worker_daemon_mode', true); @@ -219,6 +223,11 @@ while (true) { $sleep = min(1000000, round(log10($arg) * 1000000, 0)); usleep($sleep); + $pid = pcntl_waitpid(-1, $status, WNOHANG); + if ($pid > 0) { + Logger::info('Children quit via pcntl_waitpid', ['pid' => $pid, 'status' => $status]); + } + $timeout = ($seconds >= $wait_interval); } while (!$timeout && !Worker::IPCJobsExists()); diff --git a/bin/worker.php b/bin/worker.php index 5698cf16dd..52400a045a 100755 --- a/bin/worker.php +++ b/bin/worker.php @@ -28,7 +28,7 @@ if (php_sapi_name() !== 'cli') { use Dice\Dice; use Friendica\App; -use Friendica\Core\Process; +use Friendica\App\Mode; use Friendica\Core\Update; use Friendica\Core\Worker; use Friendica\DI; @@ -59,6 +59,8 @@ $dice = $dice->addRule(LoggerInterface::class,['constructParams' => ['worker']]) DI::init($dice); $a = DI::app(); +DI::mode()->setExecutor(Mode::WORKER); + // Check the database structure and possibly fixes it Update::check($a->getBasePath(), true, DI::mode()); diff --git a/index.php b/index.php index fdb15fdd88..baa6818b09 100644 --- a/index.php +++ b/index.php @@ -36,6 +36,8 @@ $dice = $dice->addRule(Friendica\App\Mode::class, ['call' => [['determineRunMode $a = \Friendica\DI::app(); +\Friendica\DI::mode()->setExecutor(\Friendica\App\Mode::INDEX); + $a->runFrontend( $dice->create(\Friendica\App\Module::class), $dice->create(\Friendica\App\Router::class), diff --git a/src/App/Mode.php b/src/App/Mode.php index e19de8f07f..8aa812c93d 100644 --- a/src/App/Mode.php +++ b/src/App/Mode.php @@ -38,6 +38,11 @@ class Mode const DBCONFIGAVAILABLE = 4; const MAINTENANCEDISABLED = 8; + const UNDEFINED = 0; + const INDEX = 1; + const DAEMON = 2; + const WORKER = 3; + const BACKEND_CONTENT_TYPES = ['application/jrd+json', 'text/xml', 'application/rss+xml', 'application/atom+xml', 'application/activity+json']; @@ -47,6 +52,12 @@ class Mode */ private $mode; + /*** + * @var int Who executes this Application + * + */ + private $executor = self::UNDEFINED; + /** * @var bool True, if the call is a backend call */ @@ -163,6 +174,31 @@ class Mode return ($this->mode & $mode) > 0; } + /** + * Set the execution mode + * + * @param integer $executor Execution Mode + * @return void + */ + public function setExecutor(int $executor) + { + $this->executor = $executor; + + // Daemon and worker are always backend + if (in_array($executor, [self::DAEMON, self::WORKER])) { + $this->isBackend = true; + } + } + + /*isBackend = true;* + * get the execution mode + * + * @return int Execution Mode + */ + public function getExecutor() + { + return $this->executor; + } /** * Install mode is when the local config file is missing or the DB schema hasn't been installed yet. diff --git a/src/Core/Process.php b/src/Core/Process.php index 919d37dea1..447d312d44 100644 --- a/src/Core/Process.php +++ b/src/Core/Process.php @@ -77,6 +77,17 @@ class Process $this->pid = $pid; } + /** + * Set the process id + * + * @param integer $pid + * @return void + */ + public function setPid(int $pid) + { + $this->pid = $pid; + } + /** * Log active processes into the "process" table */ diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 1759fae19e..504a7b9f54 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -21,6 +21,7 @@ namespace Friendica\Core; +use Friendica\App\Mode; use Friendica\Core; use Friendica\Database\DBA; use Friendica\DI; @@ -49,6 +50,7 @@ class Worker private static $lock_duration = 0; private static $last_update; private static $state; + private static $daemon_mode = null; /** * Processes the tasks that are in the workerqueue table @@ -95,6 +97,10 @@ class Worker // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { + if (self::IPCJobsExists(getmypid())) { + self::IPCDeleteJobState(getmypid()); + } + // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { @@ -145,13 +151,17 @@ class Worker if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); self::unclaimProcess(); - self::spawnWorker(); + if (self::isDaemonMode()) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); + } return; } } // Cleaning up. Possibly not needed, but it doesn't harm anything. - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(false); } Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); @@ -189,7 +199,7 @@ class Worker Logger::warning('Maximum processes reached, quitting.'); return false; } - + return true; } @@ -774,7 +784,7 @@ class Worker // Are there fewer workers running as possible? Then fork a new one. if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]); - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(true); } else { self::spawnWorker(); @@ -783,7 +793,7 @@ class Worker } // if there are too much worker, we don't spawn a new one. - if (DI::config()->get('system', 'worker_daemon_mode', false) && ($active > $queues)) { + if (self::isDaemonMode() && ($active > $queues)) { self::IPCSetJobState(false); } @@ -1184,6 +1194,67 @@ class Worker self::killStaleWorkers(); } + /** + * Fork a child process + * + * @param boolean $do_cron + * @return void + */ + private static function forkProcess(bool $do_cron) + { + if (DI::process()->isMinMemoryReached()) { + Logger::warning('Memory limit reached - quitting'); + return; + } + + // Children inherit their parent's database connection. + // To avoid problems we disconnect and connect both parent and child + DBA::disconnect(); + $pid = pcntl_fork(); + if ($pid == -1) { + DBA::connect(); + Logger::warning('Could not spawn worker'); + return; + } elseif ($pid) { + // The parent process continues here + DBA::connect(); + + self::IPCSetJobState(true, $pid); + Logger::info('Spawned new worker', ['pid' => $pid]); + + $cycles = 0; + while (self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]); + return; + } + + // We now are in the new worker + $pid = getmypid(); + + DBA::connect(); + /// @todo Reinitialize the logger to set a new process_id and uid + DI::process()->setPid($pid); + + $cycles = 0; + while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]); + + self::processQueue($do_cron); + + self::unclaimProcess(); + + self::IPCSetJobState(false, $pid); + DI::process()->end(); + Logger::info('Worker ended', ['pid' => $pid]); + exit(); + } + /** * Spawns a new worker * @@ -1193,16 +1264,14 @@ class Worker */ public static function spawnWorker($do_cron = false) { - $command = 'bin/worker.php'; - - $args = ['no_cron' => !$do_cron]; - - $a = DI::app(); - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid()); - $process->run($command, $args); - - // after spawning we have to remove the flag. - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { + self::forkProcess($do_cron); + } else { + $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), + DI::modelProcess(), DI::app()->getBasePath(), getmypid()); + $process->run('bin/worker.php', ['no_cron' => !$do_cron]); + } + if (self::isDaemonMode()) { self::IPCSetJobState(false); } } @@ -1294,7 +1363,7 @@ class Worker } // Set the IPC flag to ensure an immediate process execution via daemon - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { self::IPCSetJobState(true); } @@ -1319,7 +1388,7 @@ class Worker } // Quit on daemon mode - if (DI::config()->get('system', 'worker_daemon_mode', false)) { + if (self::isDaemonMode()) { return $added; } @@ -1413,12 +1482,27 @@ class Worker * Set the flag if some job is waiting * * @param boolean $jobs Is there a waiting job? + * @param int $key Key number * @throws \Exception */ - public static function IPCSetJobState($jobs) + public static function IPCSetJobState(bool $jobs, int $key = 0) { $stamp = (float)microtime(true); - DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); + DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); + } + + /** + * Delete a key entry + * + * @param int $key Key number + * @throws \Exception + */ + public static function IPCDeleteJobState(int $key) + { + $stamp = (float)microtime(true); + DBA::delete('worker-ipc', ['key' => $key]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -1426,13 +1510,14 @@ class Worker /** * Checks if some worker job waits to be executed * + * @param int $key Key number * @return bool * @throws \Exception */ - public static function IPCJobsExists() + public static function IPCJobsExists(int $key = 0) { $stamp = (float)microtime(true); - $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); + $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]); self::$db_duration += (microtime(true) - $stamp); // When we don't have a row, no job is running @@ -1443,6 +1528,51 @@ class Worker return (bool)$row['jobs']; } + /** + * Checks if the worker is running in the daemon mode. + * + * @return boolean + */ + public static function isDaemonMode() + { + if (!is_null(self::$daemon_mode)) { + return self::$daemon_mode; + } + + if (DI::mode()->getExecutor() == Mode::DAEMON) { + return true; + } + + $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true); + if ($daemon_mode) { + return $daemon_mode; + } + + if (!function_exists('pcntl_fork')) { + self::$daemon_mode = false; + return false; + } + + $pidfile = DI::config()->get('system', 'pidfile'); + if (empty($pidfile)) { + // No pid file, no daemon + self::$daemon_mode = false; + return false; + } + + if (!is_readable($pidfile)) { + // No pid file. We assume that the daemon had been intentionally stopped. + self::$daemon_mode = false; + return false; + } + + $pid = intval(file_get_contents($pidfile)); + $running = posix_kill($pid, 0); + + self::$daemon_mode = $running; + return $running; + } + /** * Test if the daemon is running. If not, it will be started * diff --git a/static/defaults.config.php b/static/defaults.config.php index 310d1ea08e..2bc2bea5b1 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -538,6 +538,11 @@ return [ // Number of worker tasks that are fetched in a single query. 'worker_fetch_limit' => 1, + // worker_fork (Boolean) + // Experimental setting. Use pcntl_fork to spawn a new worker process. + // Does not work when "worker_multiple_fetch" is enabled (Needs more testing) + 'worker_fork' => false, + // worker_jpm (Boolean) // If enabled, it prints out the jobs per minute. 'worker_jpm' => false, @@ -555,6 +560,7 @@ return [ // worker_multiple_fetch (Boolean) // When activated, the worker fetches jobs for multiple workers (not only for itself). // This is an experimental setting without knowing the performance impact. + // Does not work when "worker_fork" is enabled (Needs more testing) 'worker_multiple_fetch' => false, // worker_defer_limit (Integer)