Merge pull request #5223 from annando/fast-daemon
Multiple daemon improvements (foreground, faster spawning)
This commit is contained in:
commit
41d1f0173a
2 changed files with 67 additions and 46 deletions
|
@ -52,6 +52,8 @@ if (in_array("status", $_SERVER["argv"])) {
|
||||||
$mode = "status";
|
$mode = "status";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$foreground = in_array("--foreground", $_SERVER["argv"]);
|
||||||
|
|
||||||
if (!isset($mode)) {
|
if (!isset($mode)) {
|
||||||
die("Please use either 'start', 'stop' or 'status'.\n");
|
die("Please use either 'start', 'stop' or 'status'.\n");
|
||||||
}
|
}
|
||||||
|
@ -94,31 +96,38 @@ if (!empty($pid) && posix_kill($pid, 0)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logger('Starting worker daemon.', LOGGER_DEBUG);
|
logger('Starting worker daemon.', LOGGER_DEBUG);
|
||||||
echo "Starting worker daemon.\n";
|
|
||||||
|
|
||||||
// Switch over to daemon mode.
|
if (!$foreground) {
|
||||||
if ($pid = pcntl_fork()) {
|
echo "Starting worker daemon.\n";
|
||||||
|
|
||||||
|
// Switch over to daemon mode.
|
||||||
|
if ($pid = pcntl_fork()) {
|
||||||
return; // Parent
|
return; // Parent
|
||||||
}
|
}
|
||||||
|
|
||||||
fclose(STDIN); // Close all of the standard
|
fclose(STDIN); // Close all of the standard
|
||||||
fclose(STDOUT); // file descriptors as we
|
fclose(STDOUT); // file descriptors as we
|
||||||
fclose(STDERR); // are running as a daemon.
|
fclose(STDERR); // are running as a daemon.
|
||||||
|
|
||||||
dba::disconnect();
|
dba::disconnect();
|
||||||
|
|
||||||
register_shutdown_function('shutdown');
|
register_shutdown_function('shutdown');
|
||||||
|
|
||||||
if (posix_setsid() < 0) {
|
if (posix_setsid() < 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($pid = pcntl_fork()) {
|
if ($pid = pcntl_fork()) {
|
||||||
return; // Parent
|
return; // Parent
|
||||||
|
}
|
||||||
|
|
||||||
|
$pid = getmypid();
|
||||||
|
file_put_contents($pidfile, $pid);
|
||||||
|
|
||||||
|
// We lose the database connection upon forking
|
||||||
|
dba::connect($db_host, $db_user, $db_pass, $db_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We lose the database connection upon forking
|
|
||||||
dba::connect($db_host, $db_user, $db_pass, $db_data);
|
|
||||||
unset($db_host, $db_user, $db_pass, $db_data);
|
unset($db_host, $db_user, $db_pass, $db_data);
|
||||||
|
|
||||||
Config::set('system', 'worker_daemon_mode', true);
|
Config::set('system', 'worker_daemon_mode', true);
|
||||||
|
@ -126,9 +135,6 @@ Config::set('system', 'worker_daemon_mode', true);
|
||||||
// Just to be sure that this script really runs endlessly
|
// Just to be sure that this script really runs endlessly
|
||||||
set_time_limit(0);
|
set_time_limit(0);
|
||||||
|
|
||||||
$pid = getmypid();
|
|
||||||
file_put_contents($pidfile, $pid);
|
|
||||||
|
|
||||||
$wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60;
|
$wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60;
|
||||||
|
|
||||||
$do_cron = true;
|
$do_cron = true;
|
||||||
|
@ -152,12 +158,21 @@ while (true) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logger("Sleeping", LOGGER_DEBUG);
|
logger("Sleeping", LOGGER_DEBUG);
|
||||||
$i = 0;
|
$start = time();
|
||||||
do {
|
do {
|
||||||
sleep(1);
|
$seconds = (time() - $start);
|
||||||
} while (($i++ < $wait_interval) && !Worker::IPCJobsExists());
|
|
||||||
|
|
||||||
if ($i >= $wait_interval) {
|
// logarithmic wait time calculation.
|
||||||
|
// Background: After jobs had been started, they often fork many workers.
|
||||||
|
// To not waste too much time, the sleep period increases.
|
||||||
|
$arg = (($seconds + 1) / ($wait_interval / 9)) + 1;
|
||||||
|
$sleep = round(log10($arg) * 1000000, 0);
|
||||||
|
usleep($sleep);
|
||||||
|
|
||||||
|
$timeout = ($seconds >= $wait_interval);
|
||||||
|
} while (!$timeout && !Worker::IPCJobsExists());
|
||||||
|
|
||||||
|
if ($timeout) {
|
||||||
$do_cron = true;
|
$do_cron = true;
|
||||||
logger("Woke up after $wait_interval seconds.", LOGGER_DEBUG);
|
logger("Woke up after $wait_interval seconds.", LOGGER_DEBUG);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -53,11 +53,6 @@ class Worker
|
||||||
// We now start the process. This is done after the load check since this could increase the load.
|
// We now start the process. This is done after the load check since this could increase the load.
|
||||||
self::startProcess();
|
self::startProcess();
|
||||||
|
|
||||||
// The daemon doesn't need to fork new workers anymore, since we started a process
|
|
||||||
if (Config::get('system', 'worker_daemon_mode', false)) {
|
|
||||||
self::IPCSetJobState(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Kill stale processes every 5 minutes
|
// Kill stale processes every 5 minutes
|
||||||
$last_cleanup = Config::get('system', 'worker_last_cleaned', 0);
|
$last_cleanup = Config::get('system', 'worker_last_cleaned', 0);
|
||||||
if (time() > ($last_cleanup + 300)) {
|
if (time() > ($last_cleanup + 300)) {
|
||||||
|
@ -145,6 +140,8 @@ class Worker
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleaning up. Possibly not needed, but it doesn't harm anything.
|
||||||
if (Config::get('system', 'worker_daemon_mode', false)) {
|
if (Config::get('system', 'worker_daemon_mode', false)) {
|
||||||
self::IPCSetJobState(false);
|
self::IPCSetJobState(false);
|
||||||
}
|
}
|
||||||
|
@ -698,9 +695,18 @@ class Worker
|
||||||
// Are there fewer workers running as possible? Then fork a new one.
|
// Are there fewer workers running as possible? Then fork a new one.
|
||||||
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
|
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
|
||||||
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
|
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
|
||||||
|
if (Config::get('system', 'worker_daemon_mode', false)) {
|
||||||
|
self::IPCSetJobState(true);
|
||||||
|
} else {
|
||||||
self::spawnWorker();
|
self::spawnWorker();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there are too much worker, we down't spawn a new one.
|
||||||
|
if (Config::get('system', 'worker_daemon_mode', false) && ($active >= $queues)) {
|
||||||
|
self::IPCSetJobState(false);
|
||||||
|
}
|
||||||
|
|
||||||
return $active >= $queues;
|
return $active >= $queues;
|
||||||
}
|
}
|
||||||
|
@ -859,11 +865,6 @@ class Worker
|
||||||
dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids);
|
dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
// The daemon doesn't need to fork new workers anymore, since we are inside the worker
|
|
||||||
if (Config::get('system', 'worker_daemon_mode', false)) {
|
|
||||||
self::IPCSetJobState(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $found;
|
return $found;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1024,6 +1025,11 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
get_app()->proc_run($args);
|
get_app()->proc_run($args);
|
||||||
|
|
||||||
|
// after spawning we have to remove the flag.
|
||||||
|
if (Config::get('system', 'worker_daemon_mode', false)) {
|
||||||
|
self::IPCSetJobState(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1094,12 +1100,6 @@ class Worker
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// We tell the daemon that a new job entry exists
|
|
||||||
if (Config::get('system', 'worker_daemon_mode', false)) {
|
|
||||||
self::IPCSetJobState(true);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there is a lock then we don't have to check for too much worker
|
// If there is a lock then we don't have to check for too much worker
|
||||||
if (!Lock::set('worker', 0)) {
|
if (!Lock::set('worker', 0)) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -1113,6 +1113,12 @@ class Worker
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We tell the daemon that a new job entry exists
|
||||||
|
if (Config::get('system', 'worker_daemon_mode', false)) {
|
||||||
|
// We don't have to set the IPC flag - this is done in "tooMuchWorkers"
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// Now call the worker to execute the jobs that we just added to the queue
|
// Now call the worker to execute the jobs that we just added to the queue
|
||||||
self::spawnWorker();
|
self::spawnWorker();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue