Merge pull request #6764 from annando/worker2
Better worker lifetime check
This commit is contained in:
commit
1f27290494
1 changed files with 31 additions and 16 deletions
|
@ -22,6 +22,11 @@ use Friendica\Util\Network;
|
||||||
*/
|
*/
|
||||||
class Worker
|
class Worker
|
||||||
{
|
{
|
||||||
|
const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
|
||||||
|
const STATE_SHORT_LOOP = 2; // Worker is processing preassigned jobs, thus saving much time.
|
||||||
|
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
|
||||||
|
const STATE_LONG_LOOP = 4; // Worker is processing the whole - long - loop.
|
||||||
|
|
||||||
private static $up_start;
|
private static $up_start;
|
||||||
private static $db_duration = 0;
|
private static $db_duration = 0;
|
||||||
private static $db_duration_count = 0;
|
private static $db_duration_count = 0;
|
||||||
|
@ -29,6 +34,7 @@ class Worker
|
||||||
private static $db_duration_stat = 0;
|
private static $db_duration_stat = 0;
|
||||||
private static $lock_duration = 0;
|
private static $lock_duration = 0;
|
||||||
private static $last_update;
|
private static $last_update;
|
||||||
|
private static $state;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Processes the tasks that are in the workerqueue table
|
* @brief Processes the tasks that are in the workerqueue table
|
||||||
|
@ -92,6 +98,7 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
$starttime = time();
|
$starttime = time();
|
||||||
|
self::$state = self::STATE_STARTUP;
|
||||||
|
|
||||||
// We fetch the next queue entry that is about to be executed
|
// We fetch the next queue entry that is about to be executed
|
||||||
while ($r = self::workerProcess()) {
|
while ($r = self::workerProcess()) {
|
||||||
|
@ -109,9 +116,14 @@ class Worker
|
||||||
if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
|
if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
|
||||||
self::findWorkerProcesses();
|
self::findWorkerProcesses();
|
||||||
Lock::release('worker_process');
|
Lock::release('worker_process');
|
||||||
|
self::$state = self::STATE_REFETCH;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (self::$state != self::STATE_REFETCH) {
|
||||||
|
self::$state = self::STATE_LONG_LOOP;
|
||||||
|
}
|
||||||
|
|
||||||
// To avoid the quitting of multiple workers only one worker at a time will execute the check
|
// To avoid the quitting of multiple workers only one worker at a time will execute the check
|
||||||
if (Lock::acquire('worker', 0)) {
|
if (Lock::acquire('worker', 0)) {
|
||||||
// Count active workers and compare them with a maximum value that depends on the load
|
// Count active workers and compare them with a maximum value that depends on the load
|
||||||
|
@ -130,9 +142,10 @@ class Worker
|
||||||
Lock::release('worker');
|
Lock::release('worker');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Quit the worker once every 5 minutes
|
// Quit the worker once every cron interval
|
||||||
if (time() > ($starttime + 300)) {
|
if (time() > ($starttime + (Config::get('system', 'cron_interval') * 60))) {
|
||||||
Logger::log('Process lifetime reached, quitting.', Logger::DEBUG);
|
Logger::info('Process lifetime reached, respawning.');
|
||||||
|
self::spawnWorker();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -394,15 +407,16 @@ class Worker
|
||||||
* The execution time is the productive time.
|
* The execution time is the productive time.
|
||||||
* By changing parameters like the maximum number of workers we can check the effectivness.
|
* By changing parameters like the maximum number of workers we can check the effectivness.
|
||||||
*/
|
*/
|
||||||
$dbtotal = number_format(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 4);
|
$dbtotal = round(self::$db_duration, 2);
|
||||||
$dbcount = number_format(self::$db_duration_count, 4);
|
$dbread = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2);
|
||||||
$dbstat = number_format(self::$db_duration_stat, 4);
|
$dbcount = round(self::$db_duration_count, 2);
|
||||||
$dbwrite = number_format(self::$db_duration_write, 4);
|
$dbstat = round(self::$db_duration_stat, 2);
|
||||||
$dblock = number_format(self::$lock_duration, 4);
|
$dbwrite = round(self::$db_duration_write, 2);
|
||||||
$rest = number_format(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 4);
|
$dblock = round(self::$lock_duration, 2);
|
||||||
$exec = number_format($duration, 4);
|
$rest = round(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 2);
|
||||||
|
$exec = round($duration, 2);
|
||||||
|
|
||||||
$logger->info('Performance log.', ['total' => $dbtotal, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'block' => $dblock, 'rest' => $rest, 'exec' => $exec]);
|
$logger->info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
|
||||||
|
|
||||||
self::$up_start = microtime(true);
|
self::$up_start = microtime(true);
|
||||||
self::$db_duration = 0;
|
self::$db_duration = 0;
|
||||||
|
@ -410,18 +424,19 @@ class Worker
|
||||||
self::$db_duration_stat = 0;
|
self::$db_duration_stat = 0;
|
||||||
self::$db_duration_write = 0;
|
self::$db_duration_write = 0;
|
||||||
self::$lock_duration = 0;
|
self::$lock_duration = 0;
|
||||||
|
self::$state = self::STATE_SHORT_LOOP;
|
||||||
|
|
||||||
if ($duration > 3600) {
|
if ($duration > 3600) {
|
||||||
$logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
$logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
} elseif ($duration > 600) {
|
} elseif ($duration > 600) {
|
||||||
$logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
$logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
} elseif ($duration > 300) {
|
} elseif ($duration > 300) {
|
||||||
$logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
$logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
} elseif ($duration > 120) {
|
} elseif ($duration > 120) {
|
||||||
$logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
$logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
$workerLogger->info('Process done. ', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => number_format($duration, 4)]);
|
$workerLogger->info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
|
||||||
|
|
||||||
$a->getProfiler()->saveLog($a->getLogger(), "ID " . $queue["id"] . ": " . $funcname);
|
$a->getProfiler()->saveLog($a->getLogger(), "ID " . $queue["id"] . ": " . $funcname);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue