diff --git a/config/dbstructure.config.php b/config/dbstructure.config.php index fdc54b0bde..6ce9d69474 100644 --- a/config/dbstructure.config.php +++ b/config/dbstructure.config.php @@ -34,7 +34,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1301); + define('DB_UPDATE_VERSION', 1302); } return [ @@ -1377,12 +1377,12 @@ return [ "indexes" => [ "PRIMARY" => ["id"], "done_parameter" => ["done", "parameter(64)"], + "done_executed" => ["done", "executed"], + "done_priority" => ["done", "priority"], + "done_priority_created" => ["done", "priority", "created"], "done_pid" => ["done", "pid"], - "done_priority_created_next_try" => ["done", "priority", "created", "next_try"], - "done_priority_executed_next_try" => ["done", "priority", "executed", "next_try"], - "done_priority_next_try" => ["done", "priority", "next_try"], - "done_executed_next_try" => ["done", "executed", "next_try"], - "done_next_try" => ["done", "next_try"] + "done_pid_next_try" => ["done", "pid", "next_try"], + "done_pid_priority_created" => ["done", "pid", "priority", "created"] ] ], "storage" => [ diff --git a/src/Core/Worker.php b/src/Core/Worker.php index cd16272443..affcf8442d 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -103,11 +103,15 @@ class Worker } // If possible we will fetch new jobs for this worker - if (!$refetched && Lock::acquire('worker_process', 0)) { - $stamp = (float)microtime(true); - $refetched = self::findWorkerProcesses($passing_slow); - self::$db_duration += (microtime(true) - $stamp); - Lock::release('worker_process'); + if (!$refetched) { + $entries = self::totalEntries(); + $deferred = self::deferredEntries(); + if (Lock::acquire('worker_process', 0)) { + $stamp = (float)microtime(true); + $refetched = self::findWorkerProcesses($passing_slow, $entries, $deferred); + self::$db_duration += (microtime(true) - $stamp); + Lock::release('worker_process'); + } } } @@ -153,8 +157,7 @@ class Worker */ private static function deferredEntries() { - return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` > ?", - DBA::NULL_DATETIME, DateTimeFormat::utcNow()]); + return DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` > ?", DateTimeFormat::utcNow()]); } /** @@ -165,8 +168,7 @@ class Worker */ private static function totalEntries() { - return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` < ?", - DBA::NULL_DATETIME, DateTimeFormat::utcNow()]); + return DBA::count('workerqueue', ['done' => false, 'pid' => 0]); } /** @@ -177,7 +179,7 @@ class Worker */ private static function highestPriority() { - $condition = ["`executed` <= ? AND NOT `done` AND `next_try` < ?", DBA::NULL_DATETIME, DateTimeFormat::utcNow()]; + $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); if (DBA::isResult($workerqueue)) { return $workerqueue["priority"]; @@ -196,8 +198,7 @@ class Worker */ private static function processWithPriorityActive($priority) { - $condition = ["`priority` <= ? AND `executed` > ? AND NOT `done` AND `next_try` < ?", - $priority, DBA::NULL_DATETIME, DateTimeFormat::utcNow()]; + $condition = ["`priority` <= ? AND `pid` != 0 AND NOT `done`", $priority]; return DBA::exists('workerqueue', $condition); } @@ -573,7 +574,7 @@ class Worker $entries = DBA::select( 'workerqueue', ['id', 'pid', 'executed', 'priority', 'parameter'], - ['`executed` > ? AND NOT `done` AND `pid` != 0', DBA::NULL_DATETIME], + ['NOT `done` AND `pid` != 0'], ['order' => ['priority', 'created']] ); @@ -661,7 +662,7 @@ class Worker $intervals = [1, 10, 60]; $jobs_per_minute = []; foreach ($intervals as $interval) { - $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ".intval($interval)." MINUTE"); + $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); if ($job = DBA::fetch($jobs)) { $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); } @@ -672,38 +673,29 @@ class Worker if (Config::get('system', 'worker_debug')) { // Create a list of queue entries grouped by their priority - $listitem = []; + $listitem = [0 => '']; - // Adding all processes with no workerqueue entry - $processes = DBA::p( - "SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS - (SELECT id FROM `workerqueue` - WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done` AND `pid` != ?)", - getmypid() - ); - - if ($process = DBA::fetch($processes)) { - $listitem[0] = "0:".$process["running"]; - } - DBA::close($processes); + $idle_workers = $active; // Now adding all processes with workerqueue entries - $entries = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` AND `next_try` < ? GROUP BY `priority`", DateTimeFormat::utcNow()); + $entries = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`"); while ($entry = DBA::fetch($entries)) { - $processes = DBA::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `next_try` < ? AND `priority` = ?", - DateTimeFormat::utcNow(), $entry["priority"]); + $processes = DBA::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `priority` = ?", $entry["priority"]); if ($process = DBA::fetch($processes)) { + $idle_workers -= $process["running"]; $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; } DBA::close($processes); } DBA::close($entries); + $listitem[0] = "0:" . max(0, $idle_workers); + $processlist .= ' ('.implode(', ', $listitem).')'; } - $entries = self::totalEntries(); $deferred = self::deferredEntries(); + $entries = max(self::totalEntries() - $deferred, 0); if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) { $top_priority = self::highestPriority(); @@ -807,10 +799,12 @@ class Worker * @brief Find and claim the next worker process for us * * @param boolean $passing_slow Returns if we had passed low priority processes + * @param integer $entries Total number of queue entries + * @param integer $deferred Number of deferred queue entries * @return boolean Have we found something? * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - private static function findWorkerProcesses(&$passing_slow) + private static function findWorkerProcesses(&$passing_slow, $entries, $deferred) { $mypid = getmypid(); @@ -824,23 +818,22 @@ class Worker $worker_queues = Config::get("system", "worker_queues", 4); $queue_length = Config::get('system', 'worker_fetch_limit', 1); $lower_job_limit = $worker_queues * $queue_length * 2; - $jobs = self::totalEntries(); - $deferred = self::deferredEntries(); + $entries = max($entries - $deferred, 0); // Now do some magic $exponent = 2; $slope = $queue_length / pow($lower_job_limit, $exponent); - $limit = min($queue_length, ceil($slope * pow($jobs, $exponent))); + $limit = min($queue_length, ceil($slope * pow($entries, $exponent))); - Logger::log('Deferred: ' . $deferred . ' - Total: ' . $jobs . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); + Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG); $ids = []; if (self::passingSlow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? $result = DBA::select( 'workerqueue', ['id'], - ["`executed` <= ? AND `priority` < ? AND NOT `done` AND `next_try` < ?", - DBA::NULL_DATETIME, $highest_priority, DateTimeFormat::utcNow()], + ["`pid` = 0 AND `priority` < ? AND NOT `done` AND `next_try` < ?", + $highest_priority, DateTimeFormat::utcNow()], ['limit' => $limit, 'order' => ['priority', 'created']] ); @@ -856,8 +849,8 @@ class Worker $result = DBA::select( 'workerqueue', ['id'], - ["`executed` <= ? AND `priority` > ? AND NOT `done` AND `next_try` < ?", - DBA::NULL_DATETIME, $highest_priority, DateTimeFormat::utcNow()], + ["`pid` = 0 AND `priority` > ? AND NOT `done` AND `next_try` < ?", + $highest_priority, DateTimeFormat::utcNow()], ['limit' => $limit, 'order' => ['priority', 'created']] ); @@ -876,8 +869,8 @@ class Worker $result = DBA::select( 'workerqueue', ['id'], - ["`executed` <= ? AND NOT `done` AND `next_try` < ?", - DBA::NULL_DATETIME, DateTimeFormat::utcNow()], + ["`pid` = 0 AND NOT `done` AND `next_try` < ?", + DateTimeFormat::utcNow()], ['limit' => $limit, 'order' => ['priority', 'created']] ); @@ -918,13 +911,18 @@ class Worker DBA::close($r); $stamp = (float)microtime(true); + + // Counting the rows outside the lock reduces the lock time + $entries = self::totalEntries(); + $deferred = self::deferredEntries(); + if (!Lock::acquire('worker_process')) { return false; } self::$lock_duration = (microtime(true) - $stamp); $stamp = (float)microtime(true); - $found = self::findWorkerProcesses($passing_slow); + $found = self::findWorkerProcesses($passing_slow, $entries, $deferred); self::$db_duration += (microtime(true) - $stamp); Lock::release('worker_process'); @@ -1039,10 +1037,10 @@ class Worker Logger::log('Add cron entries', Logger::DEBUG); // Check for spooled items - self::add(PRIORITY_HIGH, "SpoolPost"); + self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost'); // Run the cron job that calls all other jobs - self::add(PRIORITY_MEDIUM, "Cron"); + self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron'); // Cleaning dead processes self::killStaleWorkers(); @@ -1103,6 +1101,7 @@ class Worker $priority = PRIORITY_MEDIUM; $dont_fork = Config::get("system", "worker_dont_fork", false); $created = DateTimeFormat::utcNow(); + $force_priority = false; $run_parameter = array_shift($args); @@ -1118,6 +1117,9 @@ class Worker if (isset($run_parameter['dont_fork'])) { $dont_fork = $run_parameter['dont_fork']; } + if (isset($run_parameter['force_priority'])) { + $force_priority = $run_parameter['force_priority']; + } } $parameters = json_encode($args); @@ -1130,6 +1132,8 @@ class Worker if (!$found) { DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]); + } elseif ($force_priority) { + DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]); } // Should we quit and wait for the worker to be called as a cronjob? @@ -1175,6 +1179,7 @@ class Worker $retrial = $queue['retrial']; $id = $queue['id']; + $priority = $queue['priority']; if ($retrial > 14) { Logger::log('Id ' . $id . ' had been tried 14 times. We stop now.', Logger::DEBUG); @@ -1185,9 +1190,17 @@ class Worker $delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1)); $next = DateTimeFormat::utc('now + ' . $delay . ' seconds'); - Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next, Logger::DEBUG); + if (($priority < PRIORITY_MEDIUM) && ($retrial > 2)) { + $priority = PRIORITY_MEDIUM; + } elseif (($priority < PRIORITY_LOW) && ($retrial > 5)) { + $priority = PRIORITY_LOW; + } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($retrial > 7)) { + $priority = PRIORITY_NEGLIGIBLE; + } - $fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0]; + Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue['priority'] . '/' . $priority, Logger::DEBUG); + + $fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority]; DBA::update('workerqueue', $fields, ['id' => $id]); } diff --git a/src/Worker/OnePoll.php b/src/Worker/OnePoll.php index 2d36ee8f47..8523ce2e0b 100644 --- a/src/Worker/OnePoll.php +++ b/src/Worker/OnePoll.php @@ -138,18 +138,27 @@ class OnePoll // We don't poll our followers if ($contact["rel"] == Contact::FOLLOWER) { Logger::log("Don't poll follower"); + + // set the last-update so we don't keep polling + DBA::update('contact', ['last-update' => DateTimeFormat::utcNow()], ['id' => $contact['id']]); return; } // Don't poll if polling is deactivated (But we poll feeds and mails anyway) if (!in_array($contact['network'], [Protocol::FEED, Protocol::MAIL]) && Config::get('system', 'disable_polling')) { Logger::log('Polling is disabled'); + + // set the last-update so we don't keep polling + DBA::update('contact', ['last-update' => DateTimeFormat::utcNow()], ['id' => $contact['id']]); return; } // We don't poll AP contacts by now if ($contact['network'] === Protocol::ACTIVITYPUB) { Logger::log("Don't poll AP contact"); + + // set the last-update so we don't keep polling + DBA::update('contact', ['last-update' => DateTimeFormat::utcNow()], ['id' => $contact['id']]); return; }