Merge pull request #6942 from annando/worker-fast-commands
Command based pre fetching of worker tasks
This commit is contained in:
commit
ef0545645a
1 changed files with 22 additions and 11 deletions
|
@ -27,6 +27,9 @@ class Worker
|
||||||
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
|
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
|
||||||
const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
|
const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
|
||||||
|
|
||||||
|
const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
|
||||||
|
|
||||||
|
|
||||||
private static $up_start;
|
private static $up_start;
|
||||||
private static $db_duration = 0;
|
private static $db_duration = 0;
|
||||||
private static $db_duration_count = 0;
|
private static $db_duration_count = 0;
|
||||||
|
@ -783,23 +786,24 @@ class Worker
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($priority <= PRIORITY_MEDIUM) {
|
|
||||||
$limit = Config::get('system', 'worker_fetch_limit', 1);
|
$limit = Config::get('system', 'worker_fetch_limit', 1);
|
||||||
} else {
|
|
||||||
$limit = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
$ids = [];
|
$ids = [];
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
|
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
|
||||||
$tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]);
|
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]);
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
while ($task = DBA::fetch($tasks)) {
|
while ($task = DBA::fetch($tasks)) {
|
||||||
$ids[] = $task['id'];
|
$ids[] = $task['id'];
|
||||||
|
// Only continue that loop while we are storing commands that can be processed quickly
|
||||||
|
$command = json_decode($task['parameter'])[0];
|
||||||
|
if (!in_array($command, self::FAST_COMMANDS)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
DBA::close($tasks);
|
DBA::close($tasks);
|
||||||
|
|
||||||
Logger::info('Found:', ['id' => $ids, 'priority' => $priority]);
|
Logger::info('Found:', ['priority' => $priority, 'id' => $ids]);
|
||||||
return $ids;
|
return $ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -890,15 +894,22 @@ class Worker
|
||||||
|
|
||||||
// If there is no result we check without priority limit
|
// If there is no result we check without priority limit
|
||||||
if (empty($ids)) {
|
if (empty($ids)) {
|
||||||
|
$limit = Config::get('system', 'worker_fetch_limit', 1);
|
||||||
|
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
|
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
|
||||||
$result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]);
|
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
|
|
||||||
while ($id = DBA::fetch($result)) {
|
while ($task = DBA::fetch($tasks)) {
|
||||||
$ids[] = $id["id"];
|
$ids[] = $task['id'];
|
||||||
|
// Only continue that loop while we are storing commands that can be processed quickly
|
||||||
|
$command = json_decode($task['parameter'])[0];
|
||||||
|
if (!in_array($command, self::FAST_COMMANDS)) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
DBA::close($result);
|
}
|
||||||
|
DBA::close($tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($ids)) {
|
if (!empty($ids)) {
|
||||||
|
|
Loading…
Reference in a new issue