Worker: Fetch jobs for multiple workers

This commit is contained in:
Michael 2020-08-29 09:03:50 +00:00
parent d1bbe1dcbb
commit f9152ce140
2 changed files with 65 additions and 11 deletions

View file

@ -785,6 +785,29 @@ class Worker
return $count; return $count;
} }
/**
* Returns the number of active worker processes
*
* @return array List of worker process ids
* @throws \Exception
*/
private static function getWorkerPIDList()
{
$ids = [];
$stamp = (float)microtime(true);
$queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done`
GROUP BY `process`.`pid`");
while ($queue = DBA::fetch($queues)) {
$ids[$queue['pid']] = $queue['entries'];
}
DBA::close($queues);
self::$db_duration += (microtime(true) - $stamp);
return $ids;
}
/** /**
* Returns waiting jobs for the current process id * Returns waiting jobs for the current process id
* *
@ -806,11 +829,11 @@ class Worker
/** /**
* Returns the next jobs that should be executed * Returns the next jobs that should be executed
* * @param int $limit
* @return array array with next jobs * @return array array with next jobs
* @throws \Exception * @throws \Exception
*/ */
private static function nextProcess() private static function nextProcess(int $limit)
{ {
$priority = self::nextPriority(); $priority = self::nextPriority();
if (empty($priority)) { if (empty($priority)) {
@ -818,8 +841,6 @@ class Worker
return []; return [];
} }
$limit = DI::config()->get('system', 'worker_fetch_limit', 1);
$ids = []; $ids = [];
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
@ -918,14 +939,30 @@ class Worker
*/ */
private static function findWorkerProcesses() private static function findWorkerProcesses()
{ {
$mypid = getmypid(); $fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1);
$ids = self::nextProcess(); if (DI::config()->get('system', 'worker_multiple_fetch')) {
$pids = [];
$worker_pids = self::getWorkerPIDList();
foreach ($worker_pids as $pid => $count) {
if ($count <= $fetch_limit) {
$pids[] = $pid;
}
}
if (empty($pids)) {
return;
}
$limit = $fetch_limit * count($pids);
} else {
$pids = [getmypid()];
$limit = $fetch_limit;
}
// If there is no result we check without priority limit $ids = self::nextProcess($limit);
if (empty($ids)) { $limit -= count($ids);
$limit = DI::config()->get('system', 'worker_fetch_limit', 1);
// If there is not enough results we check without priority limit
if ($limit > 0) {
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]); $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
@ -943,9 +980,21 @@ class Worker
} }
if (!empty($ids)) { if (!empty($ids)) {
$worker = [];
foreach (array_unique($ids) as $id) {
$pid = next($pids);
if (!$pid) {
$pid = reset($pids);
}
$worker[$pid][] = $id;
}
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$condition = ['id' => $ids, 'done' => false, 'pid' => 0]; foreach ($worker as $worker_pid => $worker_ids) {
DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition); Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
['id' => $worker_ids, 'done' => false, 'pid' => 0]);
}
self::$db_duration += (microtime(true) - $stamp); self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp);
} }

View file

@ -499,6 +499,11 @@ return [
// Setting 0 would allow maximum worker queues at all times, which is not recommended. // Setting 0 would allow maximum worker queues at all times, which is not recommended.
'worker_load_exponent' => 3, 'worker_load_exponent' => 3,
// worker_multiple_fetch (Boolean)
// When activated, the worker fetches jobs for multiple workers (not only for itself).
// This is an experimental setting without knowing the performance impact.
'worker_multiple_fetch' => false,
// worker_defer_limit (Integer) // worker_defer_limit (Integer)
// Per default the systems tries delivering for 15 times before dropping it. // Per default the systems tries delivering for 15 times before dropping it.
'worker_defer_limit' => 15, 'worker_defer_limit' => 15,