Merge pull request #6831 from annando/worker-fetch-once

Refetch new jobs only once per execution loop
This commit is contained in:
Tobias Diekershoff 2019-03-08 22:32:13 +01:00 committed by GitHub
commit 75d144aee7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -23,9 +23,9 @@ use Friendica\Util\Network;
class Worker class Worker
{ {
const STATE_STARTUP = 1; // Worker is in startup. This takes most time. const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
const STATE_SHORT_LOOP = 2; // Worker is processing preassigned jobs, thus saving much time. const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop. const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
const STATE_LONG_LOOP = 4; // Worker is processing the whole - long - loop. const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
private static $up_start; private static $up_start;
private static $db_duration = 0; private static $db_duration = 0;
@ -102,6 +102,7 @@ class Worker
// We fetch the next queue entry that is about to be executed // We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) { while ($r = self::workerProcess()) {
$refetched = false;
foreach ($r as $entry) { foreach ($r as $entry) {
// Assure that the priority is an integer value // Assure that the priority is an integer value
$entry['priority'] = (int)$entry['priority']; $entry['priority'] = (int)$entry['priority'];
@ -112,19 +113,21 @@ class Worker
return; return;
} }
// If possible we will fetch new jobs for this worker // Trying to fetch new processes - but only once when successful
if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) { if (!$refetched && Lock::acquire('worker_process', 0)) {
self::findWorkerProcesses(); self::findWorkerProcesses();
Lock::release('worker_process'); Lock::release('worker_process');
self::$state = self::STATE_REFETCH; self::$state = self::STATE_REFETCH;
$refetched = true;
} else {
self::$state = self::STATE_SHORT_LOOP;
} }
} }
if (self::$state != self::STATE_REFETCH) {
self::$state = self::STATE_LONG_LOOP;
}
// To avoid the quitting of multiple workers only one worker at a time will execute the check // To avoid the quitting of multiple workers only one worker at a time will execute the check
if (!self::getWaitingJobForPID()) {
self::$state = self::STATE_LONG_LOOP;
if (Lock::acquire('worker', 0)) { if (Lock::acquire('worker', 0)) {
// Count active workers and compare them with a maximum value that depends on the load // Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) { if (self::tooMuchWorkers()) {
@ -141,6 +144,7 @@ class Worker
} }
Lock::release('worker'); Lock::release('worker');
} }
}
// Quit the worker once every cron interval // Quit the worker once every cron interval
if (time() > ($starttime + (Config::get('system', 'cron_interval') * 60))) { if (time() > ($starttime + (Config::get('system', 'cron_interval') * 60))) {
@ -424,7 +428,6 @@ class Worker
self::$db_duration_stat = 0; self::$db_duration_stat = 0;
self::$db_duration_write = 0; self::$db_duration_write = 0;
self::$lock_duration = 0; self::$lock_duration = 0;
self::$state = self::STATE_SHORT_LOOP;
if ($duration > 3600) { if ($duration > 3600) {
$logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);