workerqueue now has a "command" field

This commit is contained in:
Michael 2020-12-03 15:47:50 +00:00
parent aa4372447a
commit edbdfbae6b
7 changed files with 145 additions and 55 deletions

View file

@ -300,7 +300,11 @@ class Worker
return false; return false;
} }
$argv = json_decode($queue["parameter"], true); $argv = json_decode($queue['parameter'], true);
if (!empty($queue['command'])) {
array_unshift($argv, $queue['command']);
}
if (empty($argv)) { if (empty($argv)) {
Logger::warning('Parameter is empty', ['queue' => $queue]); Logger::warning('Parameter is empty', ['queue' => $queue]);
return false; return false;
@ -576,7 +580,7 @@ class Worker
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$entries = DBA::select( $entries = DBA::select(
'workerqueue', 'workerqueue',
['id', 'pid', 'executed', 'priority', 'parameter'], ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
['NOT `done` AND `pid` != 0'], ['NOT `done` AND `pid` != 0'],
['order' => ['priority', 'retrial', 'created']] ['order' => ['priority', 'retrial', 'created']]
); );
@ -603,17 +607,21 @@ class Worker
$max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
$max_duration = $max_duration_defaults[$entry["priority"]]; $max_duration = $max_duration_defaults[$entry["priority"]];
$argv = json_decode($entry["parameter"], true); $argv = json_decode($entry['parameter'], true);
if (empty($argv)) { if (!empty($entry['command'])) {
$command = $entry['command'];
} elseif (!empty($argv)) {
$command = array_shift($argv);
} else {
return; return;
} }
$argv[0] = basename($argv[0]); $command = basename($command);
// How long is the process already running? // How long is the process already running?
$duration = (time() - strtotime($entry["executed"])) / 60; $duration = (time() - strtotime($entry["executed"])) / 60;
if ($duration > $max_duration) { if ($duration > $max_duration) {
Logger::notice("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now."); Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
posix_kill($entry["pid"], SIGTERM); posix_kill($entry["pid"], SIGTERM);
// We killed the stale process. // We killed the stale process.
@ -636,7 +644,7 @@ class Worker
self::$db_duration += (microtime(true) - $stamp); self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp);
} else { } else {
Logger::info('Process runtime is okay', ['pid' => $entry["pid"], 'duration' => $duration, 'max' => $max_duration, 'command' => substr(json_encode($argv), 0, 50)]); Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
} }
} }
} }
@ -848,12 +856,17 @@ class Worker
$ids = []; $ids = [];
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]); $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
self::$db_duration += (microtime(true) - $stamp); self::$db_duration += (microtime(true) - $stamp);
while ($task = DBA::fetch($tasks)) { while ($task = DBA::fetch($tasks)) {
$ids[] = $task['id']; $ids[] = $task['id'];
// Only continue that loop while we are storing commands that can be processed quickly // Only continue that loop while we are storing commands that can be processed quickly
$command = json_decode($task['parameter'])[0]; if (!empty($task['command'])) {
$command = $task['command'];
} else {
$command = json_decode($task['parameter'])[0];
}
if (!in_array($command, self::FAST_COMMANDS)) { if (!in_array($command, self::FAST_COMMANDS)) {
break; break;
} }
@ -968,13 +981,17 @@ class Worker
if ($limit > 0) { if ($limit > 0) {
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]); $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
self::$db_duration += (microtime(true) - $stamp); self::$db_duration += (microtime(true) - $stamp);
while ($task = DBA::fetch($tasks)) { while ($task = DBA::fetch($tasks)) {
$ids[] = $task['id']; $ids[] = $task['id'];
// Only continue that loop while we are storing commands that can be processed quickly // Only continue that loop while we are storing commands that can be processed quickly
$command = json_decode($task['parameter'])[0]; if (!empty($task['command'])) {
$command = $task['command'];
} else {
$command = json_decode($task['parameter'])[0];
}
if (!in_array($command, self::FAST_COMMANDS)) { if (!in_array($command, self::FAST_COMMANDS)) {
break; break;
} }
@ -1242,8 +1259,9 @@ class Worker
} }
} }
$command = array_shift($args);
$parameters = json_encode($args); $parameters = json_encode($args);
$found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]); $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = false; $added = false;
// Quit if there was a database error - a precaution for the update process to 3.5.3 // Quit if there was a database error - a precaution for the update process to 3.5.3
@ -1252,13 +1270,13 @@ class Worker
} }
if (!$found) { if (!$found) {
$added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, $added = DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
'priority' => $priority, 'next_try' => $delayed]); 'priority' => $priority, 'next_try' => $delayed]);
if (!$added) { if (!$added) {
return false; return false;
} }
} elseif ($force_priority) { } elseif ($force_priority) {
DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]); DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
} }
// Set the IPC flag to ensure an immediate process execution via daemon // Set the IPC flag to ensure an immediate process execution via daemon
@ -1297,6 +1315,11 @@ class Worker
return $added; return $added;
} }
public static function countWorkersByCommand(string $command)
{
return DBA::count('workerqueue', ['done' => false, 'pid' => 0, 'command' => $command]);
}
/** /**
* Returns the next retrial level for worker jobs. * Returns the next retrial level for worker jobs.
* This function will skip levels when jobs are older. * This function will skip levels when jobs are older.

View file

@ -124,6 +124,55 @@ class GServer
return self::check($server, $network, $force); return self::check($server, $network, $force);
} }
public static function getNextUpdateDate(bool $success, string $created = '', string $last_contact = '')
{
// On successful contact process check again next week
if ($success) {
return DateTimeFormat::utc('now +7 day');
}
$now = strtotime(DateTimeFormat::utcNow());
if ($created > $last_contact) {
$contact_time = strtotime($created);
} else {
$contact_time = strtotime($last_contact);
}
// If the last contact was less than 6 hours before then try again in 6 hours
if (($now - $contact_time) < (60 * 60 * 6)) {
return DateTimeFormat::utc('now +6 hour');
}
// If the last contact was less than 12 hours before then try again in 12 hours
if (($now - $contact_time) < (60 * 60 * 12)) {
return DateTimeFormat::utc('now +12 hour');
}
// If the last contact was less than 24 hours before then try tomorrow again
if (($now - $contact_time) < (60 * 60 * 24)) {
return DateTimeFormat::utc('now +1 day');
}
// If the last contact was less than a week before then try again in a week
if (($now - $contact_time) < (60 * 60 * 24 * 7)) {
return DateTimeFormat::utc('now +1 week');
}
// If the last contact was less than two weeks before then try again in two week
if (($now - $contact_time) < (60 * 60 * 24 * 14)) {
return DateTimeFormat::utc('now +2 week');
}
// If the last contact was less than a month before then try again in a month
if (($now - $contact_time) < (60 * 60 * 24 * 30)) {
return DateTimeFormat::utc('now +1 month');
}
// The system hadn't been successul contacted for more than a month, so try again in three months
return DateTimeFormat::utc('now +3 month');
}
/** /**
* Decides if a server needs to be updated, based upon several date fields * Decides if a server needs to be updated, based upon several date fields
* *
@ -235,10 +284,13 @@ class GServer
* *
* @param string $url * @param string $url
*/ */
private static function setFailure(string $url) public static function setFailure(string $url)
{ {
if (DBA::exists('gserver', ['nurl' => Strings::normaliseLink($url)])) { $gserver = DBA::selectFirst('gserver', [], ['nurl' => Strings::normaliseLink($url)]);
DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow(), 'detection-method' => null], if (DBA::isResult($gserver)) {
$next_update = self::getNextUpdateDate(false, $gserver['created'], $gserver['last_contact']);
DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow(),
'next_contact' => $next_update, 'detection-method' => null],
['nurl' => Strings::normaliseLink($url)]); ['nurl' => Strings::normaliseLink($url)]);
Logger::info('Set failed status for existing server', ['url' => $url]); Logger::info('Set failed status for existing server', ['url' => $url]);
return; return;
@ -306,6 +358,7 @@ class GServer
// If the URL missmatches, then we mark the old entry as failure // If the URL missmatches, then we mark the old entry as failure
if ($url != $original_url) { if ($url != $original_url) {
/// @todo What to do with "next_contact" here?
DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow()], DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow()],
['nurl' => Strings::normaliseLink($original_url)]); ['nurl' => Strings::normaliseLink($original_url)]);
} }
@ -452,6 +505,8 @@ class GServer
$serverdata = self::detectNetworkViaContacts($url, $serverdata); $serverdata = self::detectNetworkViaContacts($url, $serverdata);
} }
$serverdata['next_contact'] = self::getNextUpdateDate(true);
$serverdata['last_contact'] = DateTimeFormat::utcNow(); $serverdata['last_contact'] = DateTimeFormat::utcNow();
$serverdata['failed'] = false; $serverdata['failed'] = false;
@ -1593,13 +1648,6 @@ class GServer
); );
while ($gserver = DBA::fetch($gservers)) { while ($gserver = DBA::fetch($gservers)) {
if (!GServer::check($gserver['url'], $gserver['network'])) {
// The server is not reachable? Okay, then we will try it later
$fields = ['last_poco_query' => DateTimeFormat::utcNow()];
DBA::update('gserver', $fields, ['nurl' => $gserver['nurl']]);
continue;
}
Logger::info('Update peer list', ['server' => $gserver['url'], 'id' => $gserver['id']]); Logger::info('Update peer list', ['server' => $gserver['url'], 'id' => $gserver['id']]);
Worker::add(PRIORITY_LOW, 'UpdateServerPeers', $gserver['url']); Worker::add(PRIORITY_LOW, 'UpdateServerPeers', $gserver['url']);

View file

@ -63,6 +63,9 @@ class Cron
// Update contact information // Update contact information
Worker::add(PRIORITY_LOW, 'UpdatePublicContacts'); Worker::add(PRIORITY_LOW, 'UpdatePublicContacts');
// Update server information
Worker::add(PRIORITY_LOW, 'UpdateGServers');
// run the process to update server directories in the background // run the process to update server directories in the background
Worker::add(PRIORITY_LOW, 'UpdateServerDirectories'); Worker::add(PRIORITY_LOW, 'UpdateServerDirectories');
@ -103,8 +106,6 @@ class Cron
// update nodeinfo data // update nodeinfo data
Worker::add(PRIORITY_LOW, 'NodeInfo'); Worker::add(PRIORITY_LOW, 'NodeInfo');
Worker::add(PRIORITY_LOW, 'UpdateGServers');
// Repair entries in the database // Repair entries in the database
Worker::add(PRIORITY_LOW, 'RepairDatabase'); Worker::add(PRIORITY_LOW, 'RepairDatabase');

View file

@ -32,18 +32,19 @@ class UpdateGServer
* @param string $server_url Server URL * @param string $server_url Server URL
* @param boolean $only_nodeinfo Only use nodeinfo for server detection * @param boolean $only_nodeinfo Only use nodeinfo for server detection
*/ */
public static function execute(string $server_url, bool $only_nodeinfo = false) public static function execute(string $server_url, bool $only_nodeinfo = false, bool $force = false)
{ {
if (empty($server_url)) { if (empty($server_url)) {
return; return;
} }
$server_url = filter_var($server_url, FILTER_SANITIZE_URL); $filtered = filter_var($server_url, FILTER_SANITIZE_URL);
if (substr(Strings::normaliseLink($server_url), 0, 7) != 'http://') { if (substr(Strings::normaliseLink($filtered), 0, 7) != 'http://') {
GServer::setFailure($filtered);
return; return;
} }
$ret = GServer::check($server_url, '', false, $only_nodeinfo); $ret = GServer::check($filtered, '', $force, $only_nodeinfo);
Logger::info('Updated gserver', ['url' => $server_url, 'result' => $ret]); Logger::info('Updated gserver', ['url' => $filtered, 'result' => $ret]);
} }
} }

View file

@ -24,34 +24,36 @@ namespace Friendica\Worker;
use Friendica\Core\Logger; use Friendica\Core\Logger;
use Friendica\Core\Worker; use Friendica\Core\Worker;
use Friendica\Database\DBA; use Friendica\Database\DBA;
use Friendica\Model\GServer;
class UpdateGServers class UpdateGServers
{ {
/** /**
* Updates the first 250 servers * Updates up to 100 servers
*/ */
public static function execute() public static function execute()
{ {
$gservers = DBA::p("SELECT `url`, `created`, `last_failure`, `last_contact` FROM `gserver` ORDER BY rand()"); $updating = Worker::countWorkersByCommand('UpdateGServer');
$limit = 100 - $updating;
if ($limit <= 0) {
Logger::info('The number of currently running jobs exceed the limit');
return;
}
$outdated = DBA::count('gserver', ["`next_contact` < UTC_TIMESTAMP()"]);
$total = DBA::count('gserver');
Logger::info('Server status', ['total' => $total, 'outdated' => $outdated, 'updating' => $limit]);
$gservers = DBA::select('gserver', ['url'], ["`next_contact` < UTC_TIMESTAMP()"], ['limit' => $limit]);
if (!DBA::isResult($gservers)) { if (!DBA::isResult($gservers)) {
return; return;
} }
$updated = 0; $count = 0;
while ($gserver = DBA::fetch($gservers)) { while ($gserver = DBA::fetch($gservers)) {
if (!GServer::updateNeeded($gserver['created'], '', $gserver['last_failure'], $gserver['last_contact'])) { Worker::add(PRIORITY_LOW, 'UpdateGServer', $gserver['url'], false, true);
continue; $count++;
}
Logger::info('Update server status', ['server' => $gserver['url']]);
Worker::add(PRIORITY_LOW, 'UpdateGServer', $gserver['url']);
if (++$updated > 250) {
return;
}
} }
DBA::close($gservers); DBA::close($gservers);
Logger::info('Updated servers', ['count' => $count]);
} }
} }

View file

@ -39,16 +39,27 @@ class UpdatePublicContacts
$ids = []; $ids = [];
$base_condition = ['network' => Protocol::FEDERATED, 'uid' => 0, 'self' => false]; $base_condition = ['network' => Protocol::FEDERATED, 'uid' => 0, 'self' => false];
$existing = Worker::countWorkersByCommand('UpdateContact');
Logger::info('Already existing jobs', ['existing' => $existing]);
if ($existing > 100) {
return;
}
$limit = 100 - $existing;
if (!DI::config()->get('system', 'update_active_contacts')) { if (!DI::config()->get('system', 'update_active_contacts')) {
$part = 3;
// Add every contact (mostly failed ones) that hadn't been updated for six months // Add every contact (mostly failed ones) that hadn't been updated for six months
$condition = DBA::mergeConditions($base_condition, $condition = DBA::mergeConditions($base_condition,
["`last-update` < ?", DateTimeFormat::utc('now - 6 month')]); ["`last-update` < ?", DateTimeFormat::utc('now - 6 month')]);
$ids = self::getContactsToUpdate($condition, $ids); $ids = self::getContactsToUpdate($condition, $ids, round($limit / $part));
// Add every non failed contact that hadn't been updated for a month // Add every non failed contact that hadn't been updated for a month
$condition = DBA::mergeConditions($base_condition, $condition = DBA::mergeConditions($base_condition,
["NOT `failed` AND `last-update` < ?", DateTimeFormat::utc('now - 1 month')]); ["NOT `failed` AND `last-update` < ?", DateTimeFormat::utc('now - 1 month')]);
$ids = self::getContactsToUpdate($condition, $ids); $ids = self::getContactsToUpdate($condition, $ids, round($limit / $part));
} else {
$part = 1;
} }
// Add every contact our system interacted with and hadn't been updated for a week // Add every contact our system interacted with and hadn't been updated for a week
@ -56,7 +67,7 @@ class UpdatePublicContacts
`id` IN (SELECT `owner-id` FROM `item`) OR `id` IN (SELECT `causer-id` FROM `item`) OR `id` IN (SELECT `owner-id` FROM `item`) OR `id` IN (SELECT `causer-id` FROM `item`) OR
`id` IN (SELECT `cid` FROM `post-tag`) OR `id` IN (SELECT `cid` FROM `user-contact`)) AND `id` IN (SELECT `cid` FROM `post-tag`) OR `id` IN (SELECT `cid` FROM `user-contact`)) AND
`last-update` < ?", DateTimeFormat::utc('now - 1 week')]); `last-update` < ?", DateTimeFormat::utc('now - 1 week')]);
$ids = self::getContactsToUpdate($condition, $ids); $ids = self::getContactsToUpdate($condition, $ids, round($limit / $part));
foreach ($ids as $id) { foreach ($ids as $id) {
Worker::add(PRIORITY_LOW, "UpdateContact", $id); Worker::add(PRIORITY_LOW, "UpdateContact", $id);
@ -73,9 +84,9 @@ class UpdatePublicContacts
* @param array $ids * @param array $ids
* @return array contact ids * @return array contact ids
*/ */
private static function getContactsToUpdate(array $condition, array $ids = []) private static function getContactsToUpdate(array $condition, array $ids = [], int $limit)
{ {
$contacts = DBA::select('contact', ['id'], $condition, ['limit' => 100, 'order' => ['last-update']]); $contacts = DBA::select('contact', ['id'], $condition, ['limit' => $limit, 'order' => ['last-update']]);
while ($contact = DBA::fetch($contacts)) { while ($contact = DBA::fetch($contacts)) {
$ids[] = $contact['id']; $ids[] = $contact['id'];
} }

View file

@ -81,13 +81,15 @@ return [
"detection-method" => ["type" => "tinyint unsigned", "comment" => "Method that had been used to detect that server"], "detection-method" => ["type" => "tinyint unsigned", "comment" => "Method that had been used to detect that server"],
"created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => ""], "created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => ""],
"last_poco_query" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""], "last_poco_query" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""],
"last_contact" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""], "last_contact" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => "Last successful connection request"],
"last_failure" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""], "last_failure" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => "Last failed connection request"],
"failed" => ["type" => "boolean", "comment" => "Connection failed"], "failed" => ["type" => "boolean", "comment" => "Connection failed"],
"next_contact" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => "Next connection request"],
], ],
"indexes" => [ "indexes" => [
"PRIMARY" => ["id"], "PRIMARY" => ["id"],
"nurl" => ["UNIQUE", "nurl(190)"], "nurl" => ["UNIQUE", "nurl(190)"],
"next_contact" => ["next_contact"],
] ]
], ],
"user" => [ "user" => [
@ -1496,7 +1498,8 @@ return [
"comment" => "Background tasks queue entries", "comment" => "Background tasks queue entries",
"fields" => [ "fields" => [
"id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "Auto incremented worker task id"], "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "Auto incremented worker task id"],
"parameter" => ["type" => "mediumtext", "comment" => "Task command"], "command" => ["type" => "varchar(100)", "comment" => "Task command"],
"parameter" => ["type" => "mediumtext", "comment" => "Task parameter"],
"priority" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => "Task priority"], "priority" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => "Task priority"],
"created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Creation date"], "created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Creation date"],
"pid" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "comment" => "Process id of the worker"], "pid" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "comment" => "Process id of the worker"],
@ -1507,7 +1510,8 @@ return [
], ],
"indexes" => [ "indexes" => [
"PRIMARY" => ["id"], "PRIMARY" => ["id"],
"done_parameter" => ["done", "parameter(64)"], "command" => ["command"],
"done_command_parameter" => ["done", "command", "parameter(64)"],
"done_executed" => ["done", "executed"], "done_executed" => ["done", "executed"],
"done_priority_retrial_created" => ["done", "priority", "retrial", "created"], "done_priority_retrial_created" => ["done", "priority", "retrial", "created"],
"done_priority_next_try" => ["done", "priority", "next_try"], "done_priority_next_try" => ["done", "priority", "next_try"],