From 6c7dfd6958885d21d3a1809dc1a0a215ea3bd600 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 1 Aug 2022 04:48:49 +0000 Subject: [PATCH] New table to control the fetching process --- src/Protocol/ActivityPub/Fetch.php | 84 ++++++++++++++++++++++++++ src/Protocol/ActivityPub/Processor.php | 10 ++- static/dbstructure.config.php | 16 ++++- 3 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 src/Protocol/ActivityPub/Fetch.php diff --git a/src/Protocol/ActivityPub/Fetch.php b/src/Protocol/ActivityPub/Fetch.php new file mode 100644 index 000000000..198202b1c --- /dev/null +++ b/src/Protocol/ActivityPub/Fetch.php @@ -0,0 +1,84 @@ +. + * + */ + +namespace Friendica\Protocol\ActivityPub; + +use Friendica\Core\Logger; +use Friendica\Database\Database; +use Friendica\Database\DBA; +use Friendica\Util\DateTimeFormat; + +/** + * This class handles the fetching of posts + */ +class Fetch +{ + public static function add(string $url): int + { + DBA::insert('fetch-entry', ['url' => $url, 'created' => DateTimeFormat::utcNow()], Database::INSERT_IGNORE); + + $fetch = DBA::selectFirst('fetch-entry', ['id'], ['url' => $url]); + Logger::debug('Added fetch entry', ['url' => $url, 'fetch' => $fetch]); + return $fetch['id'] ?? 0; + } + + /** + * Set the worker id for the queue entry + * + * @param array $activity + * @param int $wid + * @return void + */ + public static function setWorkerId(string $url, int $wid) + { + if (empty($url) || empty($wid)) { + return; + } + + DBA::update('fetch-entry', ['wid' => $wid], ['url' => $url]); + Logger::debug('Worker id set', ['url' => $url, 'wid' => $wid]); + } + + /** + * Check if there is an assigned worker task + * + * @param array $activity + * @return bool + */ + public static function hasWorker(string $url): bool + { + $fetch = DBA::selectFirst('fetch-entry', ['id', 'wid'], ['url' => $url]); + if (empty($fetch['id'])) { + Logger::debug('No entry found for url', ['url' => $url]); + return false; + } + + // We don't have a workerqueue id yet. So most likely is isn't assigned yet. + // To avoid the ramping up of another fetch request we simply claim that there is a waiting worker. + if (!empty($fetch['id']) && empty($fetch['wid'])) { + Logger::debug('Entry without worker found for url', ['url' => $url]); + return true; + } + + return DBA::exists('workerqueue', ['id' => $fetch['wid'], 'done' => false]); + } + +} diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 96308ca2c..6d36786a2 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -333,8 +333,14 @@ class Processor if ($fetch_by_worker) { Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); $activity['recursion-depth'] = 0; - $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - Queue::setWorkerId($activity, $wid); + if (!Fetch::hasWorker($activity['reply-to-id'])) { + Fetch::add($activity['reply-to-id']); + $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + Fetch::setWorkerId($activity['reply-to-id'], $wid); + Queue::setWorkerId($activity, $wid); + } else { + Logger::debug('Activity is already in the fetching process', ['url' => $activity['reply-to-id']]); + } if (!empty($conversation)) { return []; } diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index 79da57a2b..f72ec0802 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1476); + define('DB_UPDATE_VERSION', 1477); } return [ @@ -692,6 +692,20 @@ return [ "uri-id" => ["UNIQUE", "uri-id"], ] ], + "fetch-entry" => [ + "comment" => "", + "fields" => [ + "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"], + "url" => ["type" => "varbinary(255)", "comment" => "url that awaiting to be fetched"], + "created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Creation date of the fetch request"], + "wid" => ["type" => "int unsigned", "foreign" => ["workerqueue" => "id"], "comment" => "Workerqueue id"], ], + "indexes" => [ + "PRIMARY" => ["id"], + "url" => ["UNIQUE", "url"], + "created" => ["created"], + "wid" => ["wid"], + ] + ], "fsuggest" => [ "comment" => "friend suggestion stuff", "fields" => [