Merge pull request #9092 from annando/multiple-fetch
Worker: Fetch jobs for multiple workers
This commit is contained in:
commit
bc67808f09
2 changed files with 80 additions and 27 deletions
|
@ -91,12 +91,13 @@ class Worker
|
||||||
self::runCron();
|
self::runCron();
|
||||||
}
|
}
|
||||||
|
|
||||||
$starttime = time();
|
$last_check = $starttime = time();
|
||||||
self::$state = self::STATE_STARTUP;
|
self::$state = self::STATE_STARTUP;
|
||||||
|
|
||||||
// We fetch the next queue entry that is about to be executed
|
// We fetch the next queue entry that is about to be executed
|
||||||
while ($r = self::workerProcess()) {
|
while ($r = self::workerProcess()) {
|
||||||
$refetched = false;
|
// Don't refetch when a worker fetches tasks for multiple workers
|
||||||
|
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
|
||||||
foreach ($r as $entry) {
|
foreach ($r as $entry) {
|
||||||
// Assure that the priority is an integer value
|
// Assure that the priority is an integer value
|
||||||
$entry['priority'] = (int)$entry['priority'];
|
$entry['priority'] = (int)$entry['priority'];
|
||||||
|
@ -119,7 +120,7 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
// To avoid the quitting of multiple workers only one worker at a time will execute the check
|
// To avoid the quitting of multiple workers only one worker at a time will execute the check
|
||||||
if (!self::getWaitingJobForPID()) {
|
if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
|
||||||
self::$state = self::STATE_LONG_LOOP;
|
self::$state = self::STATE_LONG_LOOP;
|
||||||
|
|
||||||
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
|
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
|
||||||
|
@ -138,11 +139,13 @@ class Worker
|
||||||
}
|
}
|
||||||
DI::lock()->release(self::LOCK_WORKER);
|
DI::lock()->release(self::LOCK_WORKER);
|
||||||
}
|
}
|
||||||
|
$last_check = time();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Quit the worker once every cron interval
|
// Quit the worker once every cron interval
|
||||||
if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
|
if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
|
||||||
Logger::info('Process lifetime reached, respawning.');
|
Logger::info('Process lifetime reached, respawning.');
|
||||||
|
self::unclaimProcess();
|
||||||
self::spawnWorker();
|
self::spawnWorker();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -782,9 +785,34 @@ class Worker
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$count = DBA::count('process', ['command' => 'Worker.php']);
|
$count = DBA::count('process', ['command' => 'Worker.php']);
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
|
self::$db_duration_count += (microtime(true) - $stamp);
|
||||||
return $count;
|
return $count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of active worker processes
|
||||||
|
*
|
||||||
|
* @return array List of worker process ids
|
||||||
|
* @throws \Exception
|
||||||
|
*/
|
||||||
|
private static function getWorkerPIDList()
|
||||||
|
{
|
||||||
|
$ids = [];
|
||||||
|
$stamp = (float)microtime(true);
|
||||||
|
|
||||||
|
$queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
|
||||||
|
LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done`
|
||||||
|
GROUP BY `process`.`pid`");
|
||||||
|
while ($queue = DBA::fetch($queues)) {
|
||||||
|
$ids[$queue['pid']] = $queue['entries'];
|
||||||
|
}
|
||||||
|
DBA::close($queues);
|
||||||
|
|
||||||
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
|
self::$db_duration_count += (microtime(true) - $stamp);
|
||||||
|
return $ids;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns waiting jobs for the current process id
|
* Returns waiting jobs for the current process id
|
||||||
*
|
*
|
||||||
|
@ -806,11 +834,11 @@ class Worker
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the next jobs that should be executed
|
* Returns the next jobs that should be executed
|
||||||
*
|
* @param int $limit
|
||||||
* @return array array with next jobs
|
* @return array array with next jobs
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
private static function nextProcess()
|
private static function nextProcess(int $limit)
|
||||||
{
|
{
|
||||||
$priority = self::nextPriority();
|
$priority = self::nextPriority();
|
||||||
if (empty($priority)) {
|
if (empty($priority)) {
|
||||||
|
@ -818,8 +846,6 @@ class Worker
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
$limit = DI::config()->get('system', 'worker_fetch_limit', 1);
|
|
||||||
|
|
||||||
$ids = [];
|
$ids = [];
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
|
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
|
||||||
|
@ -918,14 +944,29 @@ class Worker
|
||||||
*/
|
*/
|
||||||
private static function findWorkerProcesses()
|
private static function findWorkerProcesses()
|
||||||
{
|
{
|
||||||
$mypid = getmypid();
|
$fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1);
|
||||||
|
|
||||||
$ids = self::nextProcess();
|
if (DI::config()->get('system', 'worker_multiple_fetch')) {
|
||||||
|
$pids = [];
|
||||||
|
foreach (self::getWorkerPIDList() as $pid => $count) {
|
||||||
|
if ($count <= $fetch_limit) {
|
||||||
|
$pids[] = $pid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (empty($pids)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$limit = $fetch_limit * count($pids);
|
||||||
|
} else {
|
||||||
|
$pids = [getmypid()];
|
||||||
|
$limit = $fetch_limit;
|
||||||
|
}
|
||||||
|
|
||||||
// If there is no result we check without priority limit
|
$ids = self::nextProcess($limit);
|
||||||
if (empty($ids)) {
|
$limit -= count($ids);
|
||||||
$limit = DI::config()->get('system', 'worker_fetch_limit', 1);
|
|
||||||
|
|
||||||
|
// If there is not enough results we check without priority limit
|
||||||
|
if ($limit > 0) {
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
|
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
|
||||||
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
|
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
|
||||||
|
@ -942,15 +983,28 @@ class Worker
|
||||||
DBA::close($tasks);
|
DBA::close($tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($ids)) {
|
if (empty($ids)) {
|
||||||
$stamp = (float)microtime(true);
|
return;
|
||||||
$condition = ['id' => $ids, 'done' => false, 'pid' => 0];
|
|
||||||
DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition);
|
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
|
||||||
self::$db_duration_write += (microtime(true) - $stamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return !empty($ids);
|
// Assign the task ids to the workers
|
||||||
|
$worker = [];
|
||||||
|
foreach (array_unique($ids) as $id) {
|
||||||
|
$pid = next($pids);
|
||||||
|
if (!$pid) {
|
||||||
|
$pid = reset($pids);
|
||||||
|
}
|
||||||
|
$worker[$pid][] = $id;
|
||||||
|
}
|
||||||
|
|
||||||
|
$stamp = (float)microtime(true);
|
||||||
|
foreach ($worker as $worker_pid => $worker_ids) {
|
||||||
|
Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
|
||||||
|
DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
|
||||||
|
['id' => $worker_ids, 'done' => false, 'pid' => 0]);
|
||||||
|
}
|
||||||
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
|
self::$db_duration_write += (microtime(true) - $stamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -973,17 +1027,11 @@ class Worker
|
||||||
}
|
}
|
||||||
self::$lock_duration += (microtime(true) - $stamp);
|
self::$lock_duration += (microtime(true) - $stamp);
|
||||||
|
|
||||||
$found = self::findWorkerProcesses();
|
self::findWorkerProcesses();
|
||||||
|
|
||||||
DI::lock()->release(self::LOCK_PROCESS);
|
DI::lock()->release(self::LOCK_PROCESS);
|
||||||
|
|
||||||
if ($found) {
|
return self::getWaitingJobForPID();
|
||||||
$stamp = (float)microtime(true);
|
|
||||||
$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
|
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
|
||||||
return DBA::toArray($r);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -499,6 +499,11 @@ return [
|
||||||
// Setting 0 would allow maximum worker queues at all times, which is not recommended.
|
// Setting 0 would allow maximum worker queues at all times, which is not recommended.
|
||||||
'worker_load_exponent' => 3,
|
'worker_load_exponent' => 3,
|
||||||
|
|
||||||
|
// worker_multiple_fetch (Boolean)
|
||||||
|
// When activated, the worker fetches jobs for multiple workers (not only for itself).
|
||||||
|
// This is an experimental setting without knowing the performance impact.
|
||||||
|
'worker_multiple_fetch' => false,
|
||||||
|
|
||||||
// worker_defer_limit (Integer)
|
// worker_defer_limit (Integer)
|
||||||
// Per default the systems tries delivering for 15 times before dropping it.
|
// Per default the systems tries delivering for 15 times before dropping it.
|
||||||
'worker_defer_limit' => 15,
|
'worker_defer_limit' => 15,
|
||||||
|
|
Loading…
Reference in a new issue