|
|
@ -53,11 +53,6 @@ class Worker |
|
|
|
// We now start the process. This is done after the load check since this could increase the load.
|
|
|
|
self::startProcess(); |
|
|
|
|
|
|
|
// The daemon doesn't need to fork new workers anymore, since we started a process
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
self::IPCSetJobState(false); |
|
|
|
} |
|
|
|
|
|
|
|
// Kill stale processes every 5 minutes
|
|
|
|
$last_cleanup = Config::get('system', 'worker_last_cleaned', 0); |
|
|
|
if (time() > ($last_cleanup + 300)) { |
|
|
@ -145,6 +140,8 @@ class Worker |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Cleaning up. Possibly not needed, but it doesn't harm anything.
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
self::IPCSetJobState(false); |
|
|
|
} |
|
|
@ -698,10 +695,19 @@ class Worker |
|
|
|
// Are there fewer workers running as possible? Then fork a new one.
|
|
|
|
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { |
|
|
|
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG); |
|
|
|
self::spawnWorker(); |
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
self::IPCSetJobState(true); |
|
|
|
} else { |
|
|
|
self::spawnWorker(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// if there are too much worker, we down't spawn a new one.
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false) && ($active >= $queues)) { |
|
|
|
self::IPCSetJobState(false); |
|
|
|
} |
|
|
|
|
|
|
|
return $active >= $queues; |
|
|
|
} |
|
|
|
|
|
|
@ -859,11 +865,6 @@ class Worker |
|
|
|
dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); |
|
|
|
} |
|
|
|
|
|
|
|
// The daemon doesn't need to fork new workers anymore, since we are inside the worker
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
self::IPCSetJobState(false); |
|
|
|
} |
|
|
|
|
|
|
|
return $found; |
|
|
|
} |
|
|
|
|
|
|
@ -1024,6 +1025,11 @@ class Worker |
|
|
|
} |
|
|
|
|
|
|
|
get_app()->proc_run($args); |
|
|
|
|
|
|
|
// after spawning we have to remove the flag.
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
self::IPCSetJobState(false); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
@ -1094,12 +1100,6 @@ class Worker |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
// We tell the daemon that a new job entry exists
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
self::IPCSetJobState(true); |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
// If there is a lock then we don't have to check for too much worker
|
|
|
|
if (!Lock::set('worker', 0)) { |
|
|
|
return true; |
|
|
@ -1113,6 +1113,12 @@ class Worker |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
// We tell the daemon that a new job entry exists
|
|
|
|
if (Config::get('system', 'worker_daemon_mode', false)) { |
|
|
|
// We don't have to set the IPC flag - this is done in "tooMuchWorkers"
|
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
// Now call the worker to execute the jobs that we just added to the queue
|
|
|
|
self::spawnWorker(); |
|
|
|
|
|
|
|