Show deferred worker tasks in the admin interface and workerqueue logs

This commit is contained in:
Michael 2018-10-23 20:38:28 +00:00
parent e6fd05d8e8
commit 95dc030926
4 changed files with 68 additions and 39 deletions

View File

@ -181,32 +181,33 @@ function admin_content(App $a)
// array(url, name, extra css classes) // array(url, name, extra css classes)
// not part of $aside to make the template more adjustable // not part of $aside to make the template more adjustable
$aside_sub = [ $aside_sub = [
'information' => [ L10n::t('Information'), [ 'information' => [L10n::t('Information'), [
"overview" => ["admin/", L10n::t("Overview"), "overview" ], 'overview' => ['admin/', L10n::t('Overview'), 'overview'],
'federation' => ["admin/federation/" , L10n::t('Federation Statistics'), "federation"] ]], 'federation' => ['admin/federation/' , L10n::t('Federation Statistics'), 'federation']]],
'configuration' => [ L10n::t('Configuration'), [ 'configuration' => [L10n::t('Configuration'), [
'site' => ["admin/site/" , L10n::t("Site") , "site"], 'site' => ['admin/site/' , L10n::t('Site') , 'site'],
'users' => ["admin/users/" , L10n::t("Users") , "users"], 'users' => ['admin/users/' , L10n::t('Users') , 'users'],
'addons' => ["admin/addons/" , L10n::t("Addons") , "addons"], 'addons' => ['admin/addons/' , L10n::t('Addons') , 'addons'],
'themes' => ["admin/themes/" , L10n::t("Themes") , "themes"], 'themes' => ['admin/themes/' , L10n::t('Themes') , 'themes'],
'features' => ["admin/features/" , L10n::t("Additional features") , "features"], 'features' => ['admin/features/' , L10n::t('Additional features') , 'features'],
'tos' => ["admin/tos/" , L10n::t("Terms of Service") , "tos"] ]], 'tos' => ['admin/tos/' , L10n::t('Terms of Service') , 'tos']]],
'database' => [ L10n::t('Database'), [ 'database' => [L10n::t('Database'), [
'dbsync' => ["admin/dbsync/" , L10n::t('DB updates') , "dbsync"], 'dbsync' => ['admin/dbsync/' , L10n::t('DB updates') , 'dbsync'],
'queue' => ["admin/queue/" , L10n::t('Inspect Queue') , "queue"], 'queue' => ['admin/queue/' , L10n::t('Inspect Queue') , 'queue'],
'workerqueue' => ["admin/workerqueue/" , L10n::t('Inspect worker Queue') , "workerqueue"] ]], 'deferred' => ['admin/deferred/' , L10n::t('Inspect Deferred Workers'), 'deferred'],
'tools' => [ L10n::t('Tools'), [ 'workerqueue' => ['admin/workerqueue/' , L10n::t('Inspect worker Queue') , 'workerqueue']]],
'contactblock' => ["admin/contactblock/", L10n::t('Contact Blocklist') , "contactblock"], 'tools' => [L10n::t('Tools'), [
'blocklist' => ["admin/blocklist/" , L10n::t('Server Blocklist') , "blocklist"], 'contactblock' => ['admin/contactblock/', L10n::t('Contact Blocklist') , 'contactblock'],
'deleteitem' => ["admin/deleteitem/" , L10n::t('Delete Item') , 'deleteitem'], ]], 'blocklist' => ['admin/blocklist/' , L10n::t('Server Blocklist') , 'blocklist'],
"logs" => [ L10n::t("Logs"), [ 'deleteitem' => ['admin/deleteitem/' , L10n::t('Delete Item') , 'deleteitem'],]],
"logsconfig" => ["admin/logs/", L10n::t("Logs"), "logs"], 'logs' => [L10n::t('Logs'), [
"logsview" => ["admin/viewlogs/", L10n::t("View Logs"), 'viewlogs'] 'logsconfig' => ['admin/logs/', L10n::t('Logs'), 'logs'],
'logsview' => ['admin/viewlogs/', L10n::t('View Logs'), 'viewlogs']
]], ]],
"diagnostics" => [ L10n::t("Diagnostics"), [ 'diagnostics' => [L10n::t('Diagnostics'), [
"phpinfo" => ['phpinfo/', L10n::t('PHP Info'), 'phpinfo'], 'phpinfo' => ['phpinfo/', L10n::t('PHP Info'), 'phpinfo'],
"probe" => ['probe/', L10n::t('probe address'), 'probe'], 'probe' => ['probe/', L10n::t('probe address'), 'probe'],
"webfinger" =>['webfinger/', L10n::t('check webfinger'), 'webfinger'] 'webfinger' =>['webfinger/', L10n::t('check webfinger'), 'webfinger']
]] ]]
]; ];
@ -264,8 +265,11 @@ function admin_content(App $a)
case 'queue': case 'queue':
$o = admin_page_queue($a); $o = admin_page_queue($a);
break; break;
case 'deferred':
$o = admin_page_workerqueue($a, true);
break;
case 'workerqueue': case 'workerqueue':
$o = admin_page_workerqueue($a); $o = admin_page_workerqueue($a, false);
break; break;
case 'federation': case 'federation':
$o = admin_page_federation($a); $o = admin_page_federation($a);
@ -790,10 +794,20 @@ function admin_page_queue(App $a)
* @param App $a * @param App $a
* @return string * @return string
*/ */
function admin_page_workerqueue(App $a) function admin_page_workerqueue(App $a, $deferred)
{ {
// get jobs from the workerqueue table // get jobs from the workerqueue table
$entries = DBA::select('workerqueue', ['id', 'parameter', 'created', 'priority'], ['done' => 0], ['order'=> ['priority']]); if ($deferred) {
$condition = ["NOT `done` AND `next_try` > ?", DateTimeFormat::utcNow()];
$sub_title = L10n::t('Inspect Deferred Worker Queue');
$info = L10n::t("This page lists the deferred worker jobs. This are jobs that couldn't be executed at the first time.");
} else {
$condition = ["NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
$sub_title = L10n::t('Inspect Worker Queue');
$info = L10n::t('This page lists the currently queued worker jobs. These jobs are handled by the worker cronjob you\'ve set up during install.');
}
$entries = DBA::select('workerqueue', ['id', 'parameter', 'created', 'priority'], $condition, ['order'=> ['priority']]);
$r = []; $r = [];
while ($entry = DBA::fetch($entries)) { while ($entry = DBA::fetch($entries)) {
@ -807,13 +821,13 @@ function admin_page_workerqueue(App $a)
$t = get_markup_template('admin/workerqueue.tpl'); $t = get_markup_template('admin/workerqueue.tpl');
return replace_macros($t, [ return replace_macros($t, [
'$title' => L10n::t('Administration'), '$title' => L10n::t('Administration'),
'$page' => L10n::t('Inspect Worker Queue'), '$page' => $sub_title,
'$count' => count($r), '$count' => count($r),
'$id_header' => L10n::t('ID'), '$id_header' => L10n::t('ID'),
'$param_header' => L10n::t('Job Parameters'), '$param_header' => L10n::t('Job Parameters'),
'$created_header' => L10n::t('Created'), '$created_header' => L10n::t('Created'),
'$prio_header' => L10n::t('Priority'), '$prio_header' => L10n::t('Priority'),
'$info' => L10n::t('This page lists the currently queued worker jobs. These jobs are handled by the worker cronjob you\'ve set up during install.'), '$info' => $info,
'$entries' => $r, '$entries' => $r,
]); ]);
} }
@ -900,15 +914,17 @@ function admin_page_summary(App $a)
$pending = Register::getPendingCount(); $pending = Register::getPendingCount();
$r = q("SELECT COUNT(*) AS `total` FROM `queue` WHERE 1"); $queue = DBA::count('queue', []);
$queue = (($r) ? $r[0]['total'] : 0);
$r = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE NOT `done`"); $deferred = DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` > ?",
$workerqueue = (($r) ? $r[0]['total'] : 0); DBA::NULL_DATETIME, DateTimeFormat::utcNow()]);
$workerqueue = DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
DBA::NULL_DATETIME, DateTimeFormat::utcNow()]);
// We can do better, but this is a quick queue status // We can do better, but this is a quick queue status
$queues = ['label' => L10n::t('Message queues'), 'queue' => $queue, 'workerq' => $workerqueue]; $queues = ['label' => L10n::t('Message queues'), 'queue' => $queue, 'deferred' => $deferred, 'workerq' => $workerqueue];
$r = q("SHOW variables LIKE 'max_allowed_packet'"); $r = q("SHOW variables LIKE 'max_allowed_packet'");

View File

@ -146,6 +146,17 @@ class Worker
logger("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", LOGGER_DEBUG); logger("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", LOGGER_DEBUG);
} }
/**
* @brief Returns the number of deferred entries in the worker queue
*
* @return integer Number of deferred entries in the worker queue
*/
private static function deferredEntries()
{
return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` > ?",
DBA::NULL_DATETIME, DateTimeFormat::utcNow()]);
}
/** /**
* @brief Returns the number of non executed entries in the worker queue * @brief Returns the number of non executed entries in the worker queue
* *
@ -679,6 +690,7 @@ class Worker
} }
$entries = self::totalEntries(); $entries = self::totalEntries();
$deferred = self::deferredEntries();
if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) { if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
$top_priority = self::highestPriority(); $top_priority = self::highestPriority();
@ -690,7 +702,7 @@ class Worker
} }
} }
logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG); logger("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $entries . $processlist . " - maximum: " . $queues . "/" . $maxqueues, LOGGER_DEBUG);
// Are there fewer workers running as possible? Then fork a new one. // Are there fewer workers running as possible? Then fork a new one.
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
@ -797,13 +809,14 @@ class Worker
$queue_length = Config::get('system', 'worker_fetch_limit', 1); $queue_length = Config::get('system', 'worker_fetch_limit', 1);
$lower_job_limit = $worker_queues * $queue_length * 2; $lower_job_limit = $worker_queues * $queue_length * 2;
$jobs = self::totalEntries(); $jobs = self::totalEntries();
$deferred = self::deferredEntries();
// Now do some magic // Now do some magic
$exponent = 2; $exponent = 2;
$slope = $queue_length / pow($lower_job_limit, $exponent); $slope = $queue_length / pow($lower_job_limit, $exponent);
$limit = min($queue_length, ceil($slope * pow($jobs, $exponent))); $limit = min($queue_length, ceil($slope * pow($jobs, $exponent)));
logger('Total: '.$jobs.' - Maximum: '.$queue_length.' - jobs per queue: '.$limit, LOGGER_DEBUG); logger('Deferred: ' . $deferred . ' - Total: ' . $jobs . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, LOGGER_DEBUG);
$ids = []; $ids = [];
if (self::passingSlow($highest_priority)) { if (self::passingSlow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest? // Are there waiting processes with a higher priority than the currently highest?

View File

@ -11,7 +11,7 @@
<dl> <dl>
<dt>{{$queues.label}}</dt> <dt>{{$queues.label}}</dt>
<dd><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></dd> <dd><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/deferred">{{$queues.deferred}} - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></dd>
</dl> </dl>
<dl> <dl>
<dt>{{$pending.0}}</dt> <dt>{{$pending.0}}</dt>

View File

@ -14,7 +14,7 @@
{{* The work queues short statistic. *}} {{* The work queues short statistic. *}}
<div id="admin-summary-queues" class="col-lg-12 col-md-12 col-sm-12 col-xs-12 admin-summary"> <div id="admin-summary-queues" class="col-lg-12 col-md-12 col-sm-12 col-xs-12 admin-summary">
<div class="col-lg-4 col-md-4 col-sm-4 col-xs-12 admin-summary-label-name text-muted">{{$queues.label}}</div> <div class="col-lg-4 col-md-4 col-sm-4 col-xs-12 admin-summary-label-name text-muted">{{$queues.label}}</div>
<div class="col-lg-8 col-md-8 col-sm-8 col-xs-12 admin-summary-entry"><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></div> <div class="col-lg-8 col-md-8 col-sm-8 col-xs-12 admin-summary-entry"><a href="{{$baseurl}}/admin/queue">{{$queues.queue}}</a> - <a href="{{$baseurl}}/admin/deferred">{{$queues.deferred}} - <a href="{{$baseurl}}/admin/workerqueue">{{$queues.workerq}}</a></div>
</div> </div>
{{* Number of pending registrations. *}} {{* Number of pending registrations. *}}