From e97a3c25e7eeecc05102e98d7b7e4c23d5788e02 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 15 Jun 2018 18:18:20 +0000 Subject: [PATCH] Multiple daemon improvements (foreground, faster spawning) --- bin/daemon.php | 73 +++++++++++++++++++++++++++------------------ src/Core/Worker.php | 40 ++++++++++++++----------- 2 files changed, 67 insertions(+), 46 deletions(-) diff --git a/bin/daemon.php b/bin/daemon.php index acb26d8199..f7dcce1642 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -52,6 +52,8 @@ if (in_array("status", $_SERVER["argv"])) { $mode = "status"; } +$foreground = in_array("--foreground", $_SERVER["argv"]); + if (!isset($mode)) { die("Please use either 'start', 'stop' or 'status'.\n"); } @@ -94,31 +96,38 @@ if (!empty($pid) && posix_kill($pid, 0)) { } logger('Starting worker daemon.', LOGGER_DEBUG); -echo "Starting worker daemon.\n"; -// Switch over to daemon mode. -if ($pid = pcntl_fork()) { - return; // Parent +if (!$foreground) { + echo "Starting worker daemon.\n"; + + // Switch over to daemon mode. + if ($pid = pcntl_fork()) { + return; // Parent + } + + fclose(STDIN); // Close all of the standard + fclose(STDOUT); // file descriptors as we + fclose(STDERR); // are running as a daemon. + + dba::disconnect(); + + register_shutdown_function('shutdown'); + + if (posix_setsid() < 0) { + return; + } + + if ($pid = pcntl_fork()) { + return; // Parent + } + + $pid = getmypid(); + file_put_contents($pidfile, $pid); + + // We lose the database connection upon forking + dba::connect($db_host, $db_user, $db_pass, $db_data); } -fclose(STDIN); // Close all of the standard -fclose(STDOUT); // file descriptors as we -fclose(STDERR); // are running as a daemon. - -dba::disconnect(); - -register_shutdown_function('shutdown'); - -if (posix_setsid() < 0) { - return; -} - -if ($pid = pcntl_fork()) { - return; // Parent -} - -// We lose the database connection upon forking -dba::connect($db_host, $db_user, $db_pass, $db_data); unset($db_host, $db_user, $db_pass, $db_data); Config::set('system', 'worker_daemon_mode', true); @@ -126,9 +135,6 @@ Config::set('system', 'worker_daemon_mode', true); // Just to be sure that this script really runs endlessly set_time_limit(0); -$pid = getmypid(); -file_put_contents($pidfile, $pid); - $wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60; $do_cron = true; @@ -152,12 +158,21 @@ while (true) { } logger("Sleeping", LOGGER_DEBUG); - $i = 0; + $start = time(); do { - sleep(1); - } while (($i++ < $wait_interval) && !Worker::IPCJobsExists()); + $seconds = (time() - $start); - if ($i >= $wait_interval) { + // logarithmic wait time calculation. + // Background: After jobs had been started, they often fork many workers. + // To not waste too much time, the sleep period increases. + $arg = (($seconds + 1) / ($wait_interval / 9)) + 1; + $sleep = round(log10($arg) * 1000000, 0); + usleep($sleep); + + $timeout = ($seconds >= $wait_interval); + } while (!$timeout && !Worker::IPCJobsExists()); + + if ($timeout) { $do_cron = true; logger("Woke up after $wait_interval seconds.", LOGGER_DEBUG); } else { diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 4539dcc843..4659d22832 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -53,11 +53,6 @@ class Worker // We now start the process. This is done after the load check since this could increase the load. self::startProcess(); - // The daemon doesn't need to fork new workers anymore, since we started a process - if (Config::get('system', 'worker_daemon_mode', false)) { - self::IPCSetJobState(false); - } - // Kill stale processes every 5 minutes $last_cleanup = Config::get('system', 'worker_last_cleaned', 0); if (time() > ($last_cleanup + 300)) { @@ -145,6 +140,8 @@ class Worker return; } } + + // Cleaning up. Possibly not needed, but it doesn't harm anything. if (Config::get('system', 'worker_daemon_mode', false)) { self::IPCSetJobState(false); } @@ -698,10 +695,19 @@ class Worker // Are there fewer workers running as possible? Then fork a new one. if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG); - self::spawnWorker(); + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); + } } } + // if there are too much worker, we down't spawn a new one. + if (Config::get('system', 'worker_daemon_mode', false) && ($active >= $queues)) { + self::IPCSetJobState(false); + } + return $active >= $queues; } @@ -859,11 +865,6 @@ class Worker dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); } - // The daemon doesn't need to fork new workers anymore, since we are inside the worker - if (Config::get('system', 'worker_daemon_mode', false)) { - self::IPCSetJobState(false); - } - return $found; } @@ -1024,6 +1025,11 @@ class Worker } get_app()->proc_run($args); + + // after spawning we have to remove the flag. + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(false); + } } /** @@ -1094,12 +1100,6 @@ class Worker return true; } - // We tell the daemon that a new job entry exists - if (Config::get('system', 'worker_daemon_mode', false)) { - self::IPCSetJobState(true); - return true; - } - // If there is a lock then we don't have to check for too much worker if (!Lock::set('worker', 0)) { return true; @@ -1113,6 +1113,12 @@ class Worker return true; } + // We tell the daemon that a new job entry exists + if (Config::get('system', 'worker_daemon_mode', false)) { + // We don't have to set the IPC flag - this is done in "tooMuchWorkers" + return true; + } + // Now call the worker to execute the jobs that we just added to the queue self::spawnWorker();