Improved daemon test

This commit is contained in:
Michael 2021-01-02 08:43:55 +00:00
parent 78f67c1e0e
commit 7a03b72060

View file

@ -50,6 +50,7 @@ class Worker
private static $lock_duration = 0; private static $lock_duration = 0;
private static $last_update; private static $last_update;
private static $state; private static $state;
private static $daemon_mode = null;
/** /**
* Processes the tasks that are in the workerqueue table * Processes the tasks that are in the workerqueue table
@ -146,13 +147,17 @@ class Worker
if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
Logger::info('Process lifetime reached, respawning.'); Logger::info('Process lifetime reached, respawning.');
self::unclaimProcess(); self::unclaimProcess();
self::spawnWorker(); if (self::isDaemonMode()) {
self::IPCSetJobState(true);
} else {
self::spawnWorker();
}
return; return;
} }
} }
// Cleaning up. Possibly not needed, but it doesn't harm anything. // Cleaning up. Possibly not needed, but it doesn't harm anything.
if (DI::config()->get('system', 'worker_daemon_mode', false)) { if (self::isDaemonMode()) {
self::IPCSetJobState(false); self::IPCSetJobState(false);
} }
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
@ -190,7 +195,7 @@ class Worker
Logger::warning('Maximum processes reached, quitting.'); Logger::warning('Maximum processes reached, quitting.');
return false; return false;
} }
return true; return true;
} }
@ -771,7 +776,7 @@ class Worker
// Are there fewer workers running as possible? Then fork a new one. // Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]); Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
if (DI::config()->get('system', 'worker_daemon_mode', false)) { if (self::isDaemonMode()) {
self::IPCSetJobState(true); self::IPCSetJobState(true);
} else { } else {
self::spawnWorker(); self::spawnWorker();
@ -780,7 +785,7 @@ class Worker
} }
// if there are too much worker, we don't spawn a new one. // if there are too much worker, we don't spawn a new one.
if (DI::config()->get('system', 'worker_daemon_mode', false) && ($active > $queues)) { if (self::isDaemonMode() && ($active > $queues)) {
self::IPCSetJobState(false); self::IPCSetJobState(false);
} }
@ -1208,7 +1213,7 @@ class Worker
Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]); Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]);
return; return;
} }
// We now are in the new worker // We now are in the new worker
DBA::connect(); DBA::connect();
Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]); Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]);
@ -1233,6 +1238,7 @@ class Worker
*/ */
public static function spawnWorker($do_cron = false) public static function spawnWorker($do_cron = false)
{ {
Logger::notice("Spawn", ['do_cron' => $do_cron, 'callstack' => System::callstack(20)]);
// Worker and daemon are started from the command line. // Worker and daemon are started from the command line.
// This means that this is executed by a PHP interpreter without runtime limitations // This means that this is executed by a PHP interpreter without runtime limitations
if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) { if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) {
@ -1244,7 +1250,7 @@ class Worker
} }
// after spawning we have to remove the flag. // after spawning we have to remove the flag.
if (DI::config()->get('system', 'worker_daemon_mode', false)) { if (self::isDaemonMode()) {
self::IPCSetJobState(false); self::IPCSetJobState(false);
} }
} }
@ -1336,7 +1342,7 @@ class Worker
} }
// Set the IPC flag to ensure an immediate process execution via daemon // Set the IPC flag to ensure an immediate process execution via daemon
if (DI::config()->get('system', 'worker_daemon_mode', false)) { if (self::isDaemonMode()) {
self::IPCSetJobState(true); self::IPCSetJobState(true);
} }
@ -1361,7 +1367,7 @@ class Worker
} }
// Quit on daemon mode // Quit on daemon mode
if (DI::config()->get('system', 'worker_daemon_mode', false)) { if (self::isDaemonMode()) {
return $added; return $added;
} }
@ -1485,6 +1491,46 @@ class Worker
return (bool)$row['jobs']; return (bool)$row['jobs'];
} }
/**
* Checks if the worker is running in the daemon mode.
*
* @return boolean
*/
public static function isDaemonMode()
{
if (!is_null(self::$daemon_mode)) {
return self::$daemon_mode;
}
if (DI::mode()->getExecutor() == Mode::DAEMON) {
return true;
}
$daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
if ($daemon_mode) {
return $daemon_mode;
}
$pidfile = DI::config()->get('system', 'pidfile');
if (empty($pidfile)) {
// No pid file, no daemon
self::$daemon_mode = false;
return false;
}
if (!is_readable($pidfile)) {
// No pid file. We assume that the daemon had been intentionally stopped.
self::$daemon_mode = false;
return false;
}
$pid = intval(file_get_contents($pidfile));
$running = posix_kill($pid, 0);
self::$daemon_mode = $running;
return $running;
}
/** /**
* Test if the daemon is running. If not, it will be started * Test if the daemon is running. If not, it will be started
* *