Merge pull request #6014 from annando/deferred-tasks
Show deferred worker tasks in the admin interface and workerqueue logs
This commit is contained in:
commit
7538dd2084
4 changed files with 68 additions and 39 deletions
|
@ -181,32 +181,33 @@ function admin_content(App $a)
|
||||||
// array(url, name, extra css classes)
|
// array(url, name, extra css classes)
|
||||||
// not part of $aside to make the template more adjustable
|
// not part of $aside to make the template more adjustable
|
||||||
$aside_sub = [
|
$aside_sub = [
|
||||||
'information' => [ L10n::t('Information'), [
|
'information' => [L10n::t('Information'), [
|
||||||
"overview" => ["admin/", L10n::t("Overview"), "overview" ],
|
'overview' => ['admin/', L10n::t('Overview'), 'overview'],
|
||||||
'federation' => ["admin/federation/" , L10n::t('Federation Statistics'), "federation"] ]],
|
'federation' => ['admin/federation/' , L10n::t('Federation Statistics'), 'federation']]],
|
||||||
'configuration' => [ L10n::t('Configuration'), [
|
'configuration' => [L10n::t('Configuration'), [
|
||||||
'site' => ["admin/site/" , L10n::t("Site") , "site"],
|
'site' => ['admin/site/' , L10n::t('Site') , 'site'],
|
||||||
'users' => ["admin/users/" , L10n::t("Users") , "users"],
|
'users' => ['admin/users/' , L10n::t('Users') , 'users'],
|
||||||
'addons' => ["admin/addons/" , L10n::t("Addons") , "addons"],
|
'addons' => ['admin/addons/' , L10n::t('Addons') , 'addons'],
|
||||||
'themes' => ["admin/themes/" , L10n::t("Themes") , "themes"],
|
'themes' => ['admin/themes/' , L10n::t('Themes') , 'themes'],
|
||||||
'features' => ["admin/features/" , L10n::t("Additional features") , "features"],
|
'features' => ['admin/features/' , L10n::t('Additional features') , 'features'],
|
||||||
'tos' => ["admin/tos/" , L10n::t("Terms of Service") , "tos"] ]],
|
'tos' => ['admin/tos/' , L10n::t('Terms of Service') , 'tos']]],
|
||||||
'database' => [ L10n::t('Database'), [
|
'database' => [L10n::t('Database'), [
|
||||||
'dbsync' => ["admin/dbsync/" , L10n::t('DB updates') , "dbsync"],
|
'dbsync' => ['admin/dbsync/' , L10n::t('DB updates') , 'dbsync'],
|
||||||
'queue' => ["admin/queue/" , L10n::t('Inspect Queue') , "queue"],
|
'queue' => ['admin/queue/' , L10n::t('Inspect Queue') , 'queue'],
|
||||||
'workerqueue' => ["admin/workerqueue/" , L10n::t('Inspect worker Queue') , "workerqueue"] ]],
|
'deferred' => ['admin/deferred/' , L10n::t('Inspect Deferred Workers'), 'deferred'],
|
||||||
'tools' => [ L10n::t('Tools'), [
|
'workerqueue' => ['admin/workerqueue/' , L10n::t('Inspect worker Queue') , 'workerqueue']]],
|
||||||
'contactblock' => ["admin/contactblock/", L10n::t('Contact Blocklist') , "contactblock"],
|
'tools' => [L10n::t('Tools'), [
|
||||||
'blocklist' => ["admin/blocklist/" , L10n::t('Server Blocklist') , "blocklist"],
|
'contactblock' => ['admin/contactblock/', L10n::t('Contact Blocklist') , 'contactblock'],
|
||||||
'deleteitem' => ["admin/deleteitem/" , L10n::t('Delete Item') , 'deleteitem'], ]],
|
'blocklist' => ['admin/blocklist/' , L10n::t('Server Blocklist') , 'blocklist'],
|
||||||
"logs" => [ L10n::t("Logs"), [
|
'deleteitem' => ['admin/deleteitem/' , L10n::t('Delete Item') , 'deleteitem'],]],
|
||||||
"logsconfig" => ["admin/logs/", L10n::t("Logs"), "logs"],
|
'logs' => [L10n::t('Logs'), [
|
||||||
"logsview" => ["admin/viewlogs/", L10n::t("View Logs"), 'viewlogs']
|
'logsconfig' => ['admin/logs/', L10n::t('Logs'), 'logs'],
|
||||||
|
'logsview' => ['admin/viewlogs/', L10n::t('View Logs'), 'viewlogs']
|
||||||
]],
|
]],
|
||||||
"diagnostics" => [ L10n::t("Diagnostics"), [
|
'diagnostics' => [L10n::t('Diagnostics'), [
|
||||||
"phpinfo" => ['phpinfo/', L10n::t('PHP Info'), 'phpinfo'],
|
'phpinfo' => ['phpinfo/', L10n::t('PHP Info'), 'phpinfo'],
|
||||||
"probe" => ['probe/', L10n::t('probe address'), 'probe'],
|
'probe' => ['probe/', L10n::t('probe address'), 'probe'],
|
||||||
"webfinger" =>['webfinger/', L10n::t('check webfinger'), 'webfinger']
|
'webfinger' =>['webfinger/', L10n::t('check webfinger'), 'webfinger']
|
||||||
]]
|
]]
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -264,8 +265,11 @@ function admin_content(App $a)
|
||||||
case 'queue':
|
case 'queue':
|
||||||
$o = admin_page_queue($a);
|
$o = admin_page_queue($a);
|
||||||
break;
|
break;
|
||||||
|
case 'deferred':
|
||||||
|
$o = admin_page_workerqueue($a, true);
|
||||||
|
break;
|
||||||
case 'workerqueue':
|
case 'workerqueue':
|
||||||
$o = admin_page_workerqueue($a);
|
$o = admin_page_workerqueue($a, false);
|
||||||
break;
|
break;
|
||||||
case 'federation':
|
case 'federation':
|
||||||
$o = admin_page_federation($a);
|
$o = admin_page_federation($a);
|
||||||
|
@ -790,10 +794,20 @@ function admin_page_queue(App $a)
|
||||||
* @param App $a
|
* @param App $a
|
||||||
* @return string
|
* @return string
|
||||||
*/
|
*/
|
||||||
function admin_page_workerqueue(App $a)
|
function admin_page_workerqueue(App $a, $deferred)
|
||||||
{
|
{
|
||||||
// get jobs from the workerqueue table
|
// get jobs from the workerqueue table
|
||||||
$entries = DBA::select('workerqueue', ['id', 'parameter', 'created', 'priority'], ['done' => 0], ['order'=> ['priority']]);
|
if ($deferred) {
|
||||||
|
$condition = ["NOT `done` AND `next_try` > ?", DateTimeFormat::utcNow()];
|
||||||
|
$sub_title = L10n::t('Inspect Deferred Worker Queue');
|
||||||
|
$info = L10n::t("This page lists the deferred worker jobs. This are jobs that couldn't be executed at the first time.");
|
||||||
|
} else {
|
||||||
|
$condition = ["NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
|
||||||
|
$sub_title = L10n::t('Inspect Worker Queue');
|
||||||
|
$info = L10n::t('This page lists the currently queued worker jobs. These jobs are handled by the worker cronjob you\'ve set up during install.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$entries = DBA::select('workerqueue', ['id', 'parameter', 'created', 'priority'], $condition, ['order'=> ['priority']]);
|
||||||
|
|
||||||
$r = [];
|
$r = [];
|
||||||
while ($entry = DBA::fetch($entries)) {
|
while ($entry = DBA::fetch($entries)) {
|
||||||
|
@ -807,13 +821,13 @@ function admin_page_workerqueue(App $a)
|
||||||
$t = get_markup_template('admin/workerqueue.tpl');
|
$t = get_markup_template('admin/workerqueue.tpl');
|
||||||
return replace_macros($t, [
|
return replace_macros($t, [
|
||||||
'$title' => L10n::t('Administration'),
|
'$title' => L10n::t('Administration'),
|
||||||
'$page' => L10n::t('Inspect Worker Queue'),
|
'$page' => $sub_title,
|
||||||
'$count' => count($r),
|
'$count' => count($r),
|
||||||
'$id_header' => L10n::t('ID'),
|
'$id_header' => L10n::t('ID'),
|
||||||
'$param_header' => L10n::t('Job Parameters'),
|
'$param_header' => L10n::t('Job Parameters'),
|
||||||
'$created_header' => L10n::t('Created'),
|
'$created_header' => L10n::t('Created'),
|
||||||
'$prio_header' => L10n::t('Priority'),
|
'$prio_header' => L10n::t('Priority'),
|
||||||
'$info' => L10n::t('This page lists the currently queued worker jobs. These jobs are handled by the worker cronjob you\'ve set up during install.'),
|
'$info' => $info,
|
||||||
'$entries' => $r,
|
'$entries' => $r,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
@ -900,15 +914,17 @@ function admin_page_summary(App $a)
|
||||||
|
|
||||||
$pending = Register::getPendingCount();
|
$pending = Register::getPendingCount();
|
||||||
|
|
||||||
$r = q("SELECT COUNT(*) AS `total` FROM `queue` WHERE 1");
|
$queue = DBA::count('queue', []);
|
||||||
$queue = (($r) ? $r[0]['total'] : 0);
|
|
||||||
|
|
||||||
$r = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE NOT `done`");
|
$deferred = DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` > ?",
|
||||||
$workerqueue = (($r) ? $r[0]['total'] : 0);
|
DBA::NULL_DATETIME, DateTimeFormat::utcNow()]);
|
||||||
|
|
||||||
|
$workerqueue = DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
|
||||||
|
DBA::NULL_DATETIME, DateTimeFormat::utcNow()]);
|
||||||
|
|
||||||
// We can do better, but this is a quick queue status
|
// We can do better, but this is a quick queue status
|
||||||
|
|
||||||
$queues = ['label' => L10n::t('Message queues'), 'queue' => $queue, 'workerq' => $workerqueue];
|
$queues = ['label' => L10n::t('Message queues'), 'queue' => $queue, 'deferred' => $deferred, 'workerq' => $workerqueue];
|
||||||
|
|
||||||
|
|
||||||
$r = q("SHOW variables LIKE 'max_allowed_packet'");
|
$r = q("SHOW variables LIKE 'max_allowed_packet'");
|
||||||
|
|
|
@ -146,6 +146,17 @@ class Worker
|
||||||
logger("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", LOGGER_DEBUG);
|
logger("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", LOGGER_DEBUG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Returns the number of deferred entries in the worker queue
|
||||||
|
*
|
||||||
|
* @return integer Number of deferred entries in the worker queue
|
||||||
|
*/
|
||||||
|
private static function deferredEntries()
|
||||||
|
{
|
||||||
|
return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` > ?",
|
||||||
|
DBA::NULL_DATETIME, DateTimeFormat::utcNow()]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Returns the number of non executed entries in the worker queue
|
* @brief Returns the number of non executed entries in the worker queue
|
||||||
*
|
*
|
||||||
|
@ -679,6 +690,7 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
$entries = self::totalEntries();
|
$entries = self::totalEntries();
|
||||||
|
$deferred = self::deferredEntries();
|
||||||
|
|
||||||
if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
|
if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
|
||||||
$top_priority = self::highestPriority();
|
$top_priority = self::highestPriority();
|
||||||
|
@ -690,7 +702,7 @@ class Worker
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
|
logger("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $entries . $processlist . " - maximum: " . $queues . "/" . $maxqueues, LOGGER_DEBUG);
|
||||||
|
|
||||||
// Are there fewer workers running as possible? Then fork a new one.
|
// Are there fewer workers running as possible? Then fork a new one.
|
||||||
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
|
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
|
||||||
|
@ -797,13 +809,14 @@ class Worker
|
||||||
$queue_length = Config::get('system', 'worker_fetch_limit', 1);
|
$queue_length = Config::get('system', 'worker_fetch_limit', 1);
|
||||||
$lower_job_limit = $worker_queues * $queue_length * 2;
|
$lower_job_limit = $worker_queues * $queue_length * 2;
|
||||||
$jobs = self::totalEntries();
|
$jobs = self::totalEntries();
|
||||||
|
$deferred = self::deferredEntries();
|
||||||
|
|
||||||
// Now do some magic
|
// Now do some magic
|
||||||
$exponent = 2;
|
$exponent = 2;
|
||||||
$slope = $queue_length / pow($lower_job_limit, $exponent);
|
$slope = $queue_length / pow($lower_job_limit, $exponent);
|
||||||
$limit = min($queue_length, ceil($slope * pow($jobs, $exponent)));
|
$limit = min($queue_length, ceil($slope * pow($jobs, $exponent)));
|
||||||
|
|
||||||
logger('Total: '.$jobs.' - Maximum: '.$queue_length.' - jobs per queue: '.$limit, LOGGER_DEBUG);
|
logger('Deferred: ' . $deferred . ' - Total: ' . $jobs . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, LOGGER_DEBUG);
|
||||||
$ids = [];
|
$ids = [];
|
||||||
if (self::passingSlow($highest_priority)) {
|
if (self::passingSlow($highest_priority)) {
|
||||||
// Are there waiting processes with a higher priority than the currently highest?
|
// Are there waiting processes with a higher priority than the currently highest?
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
|
|
||||||
<dl>
|
<dl>
|
||||||
<dt>{{$queues.label}}</dt>
|
<dt>{{$queues.label}}</dt>
|
||||||
<dd><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></dd>
|
<dd><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/deferred">{{$queues.deferred}} - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></dd>
|
||||||
</dl>
|
</dl>
|
||||||
<dl>
|
<dl>
|
||||||
<dt>{{$pending.0}}</dt>
|
<dt>{{$pending.0}}</dt>
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
{{* The work queues short statistic. *}}
|
{{* The work queues short statistic. *}}
|
||||||
<div id="admin-summary-queues" class="col-lg-12 col-md-12 col-sm-12 col-xs-12 admin-summary">
|
<div id="admin-summary-queues" class="col-lg-12 col-md-12 col-sm-12 col-xs-12 admin-summary">
|
||||||
<div class="col-lg-4 col-md-4 col-sm-4 col-xs-12 admin-summary-label-name text-muted">{{$queues.label}}</div>
|
<div class="col-lg-4 col-md-4 col-sm-4 col-xs-12 admin-summary-label-name text-muted">{{$queues.label}}</div>
|
||||||
<div class="col-lg-8 col-md-8 col-sm-8 col-xs-12 admin-summary-entry"><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></div>
|
<div class="col-lg-8 col-md-8 col-sm-8 col-xs-12 admin-summary-entry"><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/deferred">{{$queues.deferred}} - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{{* Number of pending registrations. *}}
|
{{* Number of pending registrations. *}}
|
||||||
|
|
Loading…
Reference in a new issue