From f2eec66240f674ac56ef57350606aab6cfda8aa8 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Aug 2022 03:38:03 +0000 Subject: [PATCH] Hopefully fixes loops during message processing --- src/Model/Contact.php | 2 +- src/Protocol/ActivityPub/Processor.php | 136 +++++++++++++++---------- src/Protocol/ActivityPub/Queue.php | 44 ++++++-- src/Protocol/ActivityPub/Receiver.php | 28 +++-- src/Worker/Cron.php | 4 +- static/defaults.config.php | 4 + 6 files changed, 142 insertions(+), 76 deletions(-) diff --git a/src/Model/Contact.php b/src/Model/Contact.php index 2c1ca2d4c..c873c3941 100644 --- a/src/Model/Contact.php +++ b/src/Model/Contact.php @@ -2449,7 +2449,7 @@ class Contact $new_pubkey = $ret['pubkey'] ?? ''; - if ($uid == 0) { + if ($uid == 0 && DI::config()->get('system', 'fetch_featured_posts')) { if ($ret['network'] == Protocol::ACTIVITYPUB) { $apcontact = APContact::getByURL($ret['url'], false); if (!empty($apcontact['featured'])) { diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 653649aa2..a409fec11 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -306,58 +306,13 @@ class Processor } if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { - if (self::hasJustBeenFetched($activity['reply-to-id'])) { - Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); - $fetch_by_worker = false; - if (empty($conversation)) { - return []; - } - } else { - $recursion_depth = $activity['recursion-depth'] ?? 0; - Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { - $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - $fetch_by_worker = empty($result); - if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { - if (!empty($activity['entry-id'])) { - Queue::deleteById($activity['entry-id']); - } - if (empty($conversation)) { - return []; - } - } - } else { - Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - $fetch_by_worker = true; - } - } - - if ($fetch_by_worker && Queue::hasWorker($activity)) { - Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]); - $fetch_by_worker = false; - if (empty($conversation)) { - return []; - } - } - - if ($fetch_by_worker && DI::config()->get('system', 'fetch_by_worker')) { - Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - $activity['recursion-depth'] = 0; - if (!Fetch::hasWorker($activity['reply-to-id'])) { - Fetch::add($activity['reply-to-id']); - $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - Fetch::setWorkerId($activity['reply-to-id'], $wid); - Queue::setWorkerId($activity, $wid); - } else { - Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]); - } - if (empty($conversation)) { - return []; - } - } elseif (!empty($result)) { + $result = self::fetchParent($activity); + if (!empty($result)) { if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { $item['thr-parent'] = $result; } + } elseif (empty($conversation)) { + return []; } } @@ -482,6 +437,77 @@ class Processor return $item; } + /** + * Fetch and process parent posts for the given activity + * + * @param array $activity + * + * @return string + */ + private static function fetchParent(array $activity): string + { + if (self::hasJustBeenFetched($activity['reply-to-id'])) { + Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); + return ''; + } + + $recursion_depth = $activity['recursion-depth'] ?? 0; + + if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { + Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { + Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + return ''; + } elseif (!empty($result)) { + $exists = Post::exists(['uri' => [$result, $activity['reply-to-id']]]); + if ($exists) { + Logger::notice('The activity has been fetched and created.', ['parent' => $result]); + return $result; + } elseif (DI::config()->get('system', 'fetch_by_worker') || DI::config()->get('system', 'decoupled_receiver')) { + Logger::notice('The activity has been fetched and will hopefully be created later.', ['parent' => $result]); + } else { + Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + } + return ''; + } + if (empty($result) && !DI::config()->get('system', 'fetch_by_worker')) { + return ''; + } + } elseif (self::isActivityGone($activity['reply-to-id'])) { + Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + return ''; + } else { + Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + } + + if (Queue::hasWorker($activity['worker-id'] ?? 0)) { + Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]); + return ''; + } + + if (!Fetch::hasWorker($activity['reply-to-id'])) { + Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + Fetch::add($activity['reply-to-id']); + $activity['recursion-depth'] = 0; + $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + Fetch::setWorkerId($activity['reply-to-id'], $wid); + } else { + Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]); + } + + return ''; + } + /** * Check if a given activity has recently been fetched * @@ -1022,7 +1048,7 @@ class Processor Queue::remove($activity); if ($success && Queue::hasChildren($item['uri'])) { - Worker::add(PRIORITY_HIGH, 'ProcessReplyByUri', $item['uri']); + Queue::processReplyByUri($item['uri']); } // Store send a follow request for every reshare - but only when the item had been stored @@ -1366,9 +1392,13 @@ class Processor return ''; } - ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer); - - Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]); + if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) { + Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]); + } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) { + Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]); + } else { + Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]); + } return $activity['id']; } diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 321bd7f43..87d491bd9 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -84,6 +84,18 @@ class Queue return $activity; } + /** + * Checks if an entryy for a given url and type already exists + * + * @param string $url + * @param string $type + * @return boolean + */ + public static function exists(string $url, string $type): bool + { + return DBA::exists('inbox-entry', ['type' => $type, 'object-id' => $url]); + } + /** * Remove activity from the queue * @@ -132,30 +144,31 @@ class Queue /** * Set the worker id for the queue entry * - * @param array $activity - * @param int $wid + * @param int $entryid + * @param int $wid * @return void */ - public static function setWorkerId(array $activity, int $wid) + public static function setWorkerId(int $entryid, int $wid) { - if (empty($activity['entry-id']) || empty($wid)) { + if (empty($entryid) || empty($wid)) { return; } - DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]); + DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entryid]); } /** * Check if there is an assigned worker task * - * @param array $activity + * @param int $wid + * * @return bool */ - public static function hasWorker(array $activity = []): bool + public static function hasWorker(int $wid): bool { - if (empty($activity['worker-id'])) { + if (empty($wid)) { return false; } - return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]); + return DBA::exists('workerqueue', ['id' => $wid, 'done' => false]); } /** @@ -172,6 +185,18 @@ class Queue return false; } + if (!empty($entry['wid'])) { + $worker = DI::app()->getQueue(); + $wid = $worker['id'] ?? 0; + if ($entry['wid'] != $wid) { + $workerqueue = DBA::selectFirst('workerqueue', ['pid'], ['id' => $entry['wid'], 'done' => false]); + if (!empty($workerqueue['pid']) && posix_kill($workerqueue['pid'], 0)) { + Logger::notice('Entry is already processed via another process.', ['current' => $wid, 'processor' => $entry['wid']]); + return false; + } + } + } + Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]); $activity = json_decode($entry['activity'], true); @@ -314,6 +339,5 @@ class Queue } } DBA::close($entries); - } } diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 1d337f28a..b3d67b2af 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -507,26 +507,29 @@ class Receiver * @param boolean $trust_source Do we trust the source? * @param boolean $push Message had been pushed to our system * @param array $signer The signer of the post + * + * @return bool + * * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '') + public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '', int $completion = Receiver::COMPLETION_AUTO): bool { $type = JsonLD::fetchElement($activity, '@type'); if (!$type) { Logger::info('Empty type', ['activity' => $activity]); - return; + return true; } if (!JsonLD::fetchElement($activity, 'as:object', '@id')) { Logger::info('Empty object', ['activity' => $activity]); - return; + return true; } $actor = JsonLD::fetchElement($activity, 'as:actor', '@id'); if (empty($actor)) { Logger::info('Empty actor', ['activity' => $activity]); - return; + return true; } if (is_array($activity['as:object'])) { @@ -548,7 +551,7 @@ class Receiver $object_data = self::prepareObjectData($activity, $uid, $push, $trust_source); if (empty($object_data)) { Logger::info('No object data found', ['activity' => $activity]); - return; + return true; } // Lemmy is announcing activities. @@ -583,21 +586,27 @@ class Receiver $object_data['object_activity'] = $activity; } + if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) { + Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); + return true; + } + if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) { $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source); } if (!$trust_source) { Logger::info('Activity trust could not be achieved.', ['id' => $object_data['object_id'], 'type' => $type, 'signer' => $signer, 'actor' => $actor, 'attributedTo' => $attributed_to]); - return; + return true; } - if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($activity['completion-mode'] == self::COMPLETION_RELAY))) { + if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) { // We delay by 5 seconds to allow to accumulate all receivers $delayed = date(DateTimeFormat::MYSQL, time() + 5); Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]); - Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); - return; + $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); + Queue::setWorkerId($object_data['entry-id'], $wid); + return false; } if (!empty($activity['recursion-depth'])) { @@ -612,6 +621,7 @@ class Receiver self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); Queue::remove($object_data); } + return true; } /** diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index 2c724366d..12ea6e60a 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -93,9 +93,7 @@ class Cron Queue::clear(); // Process all unprocessed entries - if (DI::config()->get('system', 'decoupled_receiver')) { - Queue::processAll(); - } + Queue::processAll(); // Search for new contacts in the directory if (DI::config()->get('system', 'synchronize_directory')) { diff --git a/static/defaults.config.php b/static/defaults.config.php index ee15ce826..44c80b7b0 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -286,6 +286,10 @@ return [ // Fetch missing posts via a background process 'fetch_by_worker' => false, + // fetch_featured_posts (Boolean) + // Fetch featured posts from all contacts + 'fetch_featured_posts' => false, + // free_crawls (Integer) // Number of "free" searches when system => permit_crawling is enabled. 'free_crawls' => 10,