diff --git a/bin/daemon.php b/bin/daemon.php index b51dd392ef..8ba86b3de0 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -6,8 +6,38 @@ * * This script was taken from http://php.net/manual/en/function.pcntl-fork.php */ -function shutdown() { - posix_kill(posix_getpid(), SIGHUP); + +use Friendica\App; +use Friendica\BaseObject; +use Friendica\Core\Config; +use Friendica\Core\Worker; + +// Ensure that daemon.php is executed from the base path of the installation +if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) { + $directory = dirname($_SERVER["argv"][0]); + + if (substr($directory, 0, 1) != "/") { + $directory = $_SERVER["PWD"]."/".$directory; + } + $directory = realpath($directory."/.."); + + chdir($directory); +} + +require_once "boot.php"; +require_once "include/dba.php"; + +$a = new App(dirname(__DIR__)); +BaseObject::setApp($a); + +require_once ".htconfig.php"; +dba::connect($db_host, $db_user, $db_pass, $db_data); + +Config::load(); + +if (!isset($pidfile)) { + die('Please specify a pid file in the variable $pidfile in the .htconfig.php. For example:'."\n". + '$pidfile = "/path/to/daemon.pid";'."\n"); } if (in_array("start", $_SERVER["argv"])) { @@ -30,27 +60,11 @@ if (empty($_SERVER["argv"][0])) { die("Unexpected script behaviour. This message should never occur.\n"); } -// Fetch the base directory -$directory = dirname($_SERVER["argv"][0]); +$pid = @file_get_contents($pidfile); -if (substr($directory, 0, 1) != "/") { - $directory = $_SERVER["PWD"]."/".$directory; -} -$directory = realpath($directory."/.."); - -include $directory."/.htconfig.php"; - -if (!isset($pidfile)) { - die('Please specify a pid file in the variable $pidfile in the .htconfig.php. For example:'."\n". - '$pidfile = "/path/to/daemon.pid";'."\n"); -} - -if (in_array($mode, array("stop", "status"))) { - $pid = @file_get_contents($pidfile); - - if (!$pid) { - die("Pidfile wasn't found. Is the daemon running?\n"); - } +if (empty($pid) && in_array($mode, ["stop", "status"])) { + Config::set('system', 'worker_daemon_mode', false); + die("Pidfile wasn't found. Is the daemon running?\n"); } if ($mode == "status") { @@ -60,6 +74,7 @@ if ($mode == "status") { unlink($pidfile); + Config::set('system', 'worker_daemon_mode', false); die("Daemon process $pid isn't running.\n"); } @@ -68,17 +83,19 @@ if ($mode == "stop") { unlink($pidfile); + logger("Worker daemon process $pid was killed.", LOGGER_DEBUG); + + Config::set('system', 'worker_daemon_mode', false); die("Worker daemon process $pid was killed.\n"); } -echo "Starting worker daemon.\n"; - -if (isset($a->config['php_path'])) { - $php = $a->config['php_path']; -} else { - $php = "php"; +if (!empty($pid) && posix_kill($pid, 0)) { + die("Daemon process $pid is already running.\n"); } +logger('Starting worker daemon.', LOGGER_DEBUG); +echo "Starting worker daemon.\n"; + // Switch over to daemon mode. if ($pid = pcntl_fork()) return; // Parent @@ -95,32 +112,32 @@ if (posix_setsid() < 0) if ($pid = pcntl_fork()) return; // Parent +// We lose the database connection upon forking +dba::connect($db_host, $db_user, $db_pass, $db_data); +unset($db_host, $db_user, $db_pass, $db_data); + +Config::set('system', 'worker_daemon_mode', true); + +// Just to be sure that this script really runs endlessly +set_time_limit(0); + $pid = getmypid(); file_put_contents($pidfile, $pid); +$wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60; + // Now running as a daemon. while (true) { - // Just to be sure that this script really runs endlessly - set_time_limit(0); + logger('Call the worker', LOGGER_DEBUG); + Worker::spawnWorker(); - // Call the worker - $cmdline = $php.' bin/worker.php'; - - $executed = false; - - if (function_exists('proc_open')) { - $resource = proc_open($cmdline . ' &', array(), $foo, $directory); - - if (is_resource($resource)) { - $executed = true; - proc_close($resource); - } - } - - if (!$executed) { - exec($cmdline.' spawn'); - } - - // Now sleep for 5 minutes - sleep(300); + logger("Sleep for $wait_interval seconds - or when a worker needs to be called", LOGGER_DEBUG); + $i = 0; + do { + sleep(1); + } while (($i++ < $wait_interval) && !Worker::IPCJobsExists()); +} + +function shutdown() { + posix_kill(posix_getpid(), SIGHUP); } diff --git a/include/dba.php b/include/dba.php index aa63a5b361..5245538ced 100644 --- a/include/dba.php +++ b/include/dba.php @@ -26,7 +26,7 @@ class dba { private static $relation = []; public static function connect($serveraddr, $user, $pass, $db) { - if (!is_null(self::$db)) { + if (!is_null(self::$db) && self::connected()) { return true; } diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 81cdf1bbd2..e2135f0daf 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -53,10 +53,15 @@ class Worker // We now start the process. This is done after the load check since this could increase the load. self::startProcess(); + // The daemon doesn't need to fork new workers anymore, since we started a process + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(false); + } + // Kill stale processes every 5 minutes - $last_cleanup = Config::get('system', 'poller_last_cleaned', 0); + $last_cleanup = Config::get('system', 'worker_last_cleaned', 0); if (time() > ($last_cleanup + 300)) { - Config::set('system', 'poller_last_cleaned', time()); + Config::set('system', 'worker_last_cleaned', time()); self::killStaleWorkers(); } @@ -108,16 +113,16 @@ class Worker } // If possible we will fetch new jobs for this worker - if (!$refetched && Lock::set('poller_worker_process', 0)) { + if (!$refetched && Lock::set('worker_process', 0)) { $stamp = (float)microtime(true); $refetched = self::findWorkerProcesses($passing_slow); self::$db_duration += (microtime(true) - $stamp); - Lock::remove('poller_worker_process'); + Lock::remove('worker_process'); } } // To avoid the quitting of multiple workers only one worker at a time will execute the check - if (Lock::set('poller_worker', 0)) { + if (Lock::set('worker', 0)) { $stamp = (float)microtime(true); // Count active workers and compare them with a maximum value that depends on the load if (self::tooMuchWorkers()) { @@ -130,7 +135,7 @@ class Worker logger('Memory limit reached, quitting.', LOGGER_DEBUG); return; } - Lock::remove('poller_worker'); + Lock::remove('worker'); self::$db_duration += (microtime(true) - $stamp); } @@ -140,6 +145,9 @@ class Worker return; } } + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(false); + } logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } @@ -244,7 +252,7 @@ class Worker $stamp = (float)microtime(true); if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) { - Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow()); + Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); @@ -285,7 +293,7 @@ class Worker $stamp = (float)microtime(true); if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) { - Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow()); + Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); } else { @@ -851,6 +859,11 @@ class Worker dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); } + // The daemon doesn't need to fork new workers anymore, since we are inside the worker + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(false); + } + return $found; } @@ -873,7 +886,7 @@ class Worker dba::close($r); $stamp = (float)microtime(true); - if (!Lock::set('poller_worker_process')) { + if (!Lock::set('worker_process')) { return false; } self::$lock_duration = (microtime(true) - $stamp); @@ -882,7 +895,7 @@ class Worker $found = self::findWorkerProcesses($passing_slow); self::$db_duration += (microtime(true) - $stamp); - Lock::remove('poller_worker_process'); + Lock::remove('worker_process'); if ($found) { $r = dba::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); @@ -1071,19 +1084,25 @@ class Worker dba::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]); } + // We tell the daemon that a new job entry exists + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(true); + return true; + } + // Should we quit and wait for the worker to be called as a cronjob? if ($dont_fork) { return true; } // If there is a lock then we don't have to check for too much worker - if (!Lock::set('poller_worker', 0)) { + if (!Lock::set('worker', 0)) { return true; } // If there are already enough workers running, don't fork another one $quit = self::tooMuchWorkers(); - Lock::remove('poller_worker'); + Lock::remove('worker'); if ($quit) { return true; @@ -1121,4 +1140,30 @@ class Worker { return Process::deleteByPid(); } + + private static function checkIPC() + { + dba::e("CREATE TABLE IF NOT EXISTS `worker-ipc` (`key` integer, `jobs` boolean) ENGINE = MEMORY;"); + } + + public static function IPCSetJobState($jobs) + { + self::checkIPC(); + + dba::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); + } + + public static function IPCJobsExists() + { + self::checkIPC(); + + $row = dba::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); + + // When we don't have a row, no job is running + if (!DBM::is_result($row)) { + return false; + } + + return (bool)$row['jobs']; + } }