From da32fa8fa63b9158f74f919968fcc1e4b58c957b Mon Sep 17 00:00:00 2001 From: Hypolite Petovan Date: Fri, 24 Jun 2022 23:48:49 -0400 Subject: [PATCH] Create ActivityPub\FetchQueue and ActivityPub\FetchQueueItem classes - These classes are used to flatten the recursive missing activity fetch that can hit PHP's maximum function nesting limit - The original caller is responsible for processing the remaining queue once the original activity has been fetched --- src/Model/Item.php | 6 +- src/Module/Debug/ActivityPubConversion.php | 2 +- src/Protocol/ActivityPub.php | 7 ++- src/Protocol/ActivityPub/FetchQueue.php | 57 +++++++++++++++++++ src/Protocol/ActivityPub/FetchQueueItem.php | 62 +++++++++++++++++++++ src/Protocol/ActivityPub/Processor.php | 43 ++++++++------ src/Protocol/ActivityPub/Receiver.php | 54 ++++++++++-------- 7 files changed, 189 insertions(+), 42 deletions(-) create mode 100644 src/Protocol/ActivityPub/FetchQueue.php create mode 100644 src/Protocol/ActivityPub/FetchQueueItem.php diff --git a/src/Model/Item.php b/src/Model/Item.php index 4013dc2ce1..bf3ca49dbc 100644 --- a/src/Model/Item.php +++ b/src/Model/Item.php @@ -3411,7 +3411,11 @@ class Item return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0; } - if ($fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri)) { + $fetchQueue = new ActivityPub\FetchQueue(); + $fetched_uri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $uri); + $fetchQueue->process(); + + if ($fetched_uri) { $item_id = self::searchByLink($fetched_uri, $uid); } else { $item_id = Diaspora::fetchByURL($uri); diff --git a/src/Module/Debug/ActivityPubConversion.php b/src/Module/Debug/ActivityPubConversion.php index ec7fee3f46..5fa9a8b409 100644 --- a/src/Module/Debug/ActivityPubConversion.php +++ b/src/Module/Debug/ActivityPubConversion.php @@ -123,7 +123,7 @@ class ActivityPubConversion extends BaseModule 'content' => visible_whitespace(var_export($object_data, true)) ]; - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem(new ActivityPub\FetchQueue(), $object_data); $results[] = [ 'title' => DI::l10n()->t('Result Item'), diff --git a/src/Protocol/ActivityPub.php b/src/Protocol/ActivityPub.php index 93204e81d3..858f837e8f 100644 --- a/src/Protocol/ActivityPub.php +++ b/src/Protocol/ActivityPub.php @@ -25,6 +25,7 @@ use Friendica\Core\Logger; use Friendica\Core\Protocol; use Friendica\Model\APContact; use Friendica\Model\User; +use Friendica\Protocol\ActivityPub\FetchQueue; use Friendica\Util\HTTPSignature; use Friendica\Util\JsonLD; @@ -223,10 +224,14 @@ class ActivityPub $items = []; } + $fetchQueue = new FetchQueue(); + foreach ($items as $activity) { $ldactivity = JsonLD::compact($activity); - ActivityPub\Receiver::processActivity($ldactivity, '', $uid, true); + ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, '', $uid, true); } + + $fetchQueue->process(); } /** diff --git a/src/Protocol/ActivityPub/FetchQueue.php b/src/Protocol/ActivityPub/FetchQueue.php new file mode 100644 index 0000000000..dfaa338361 --- /dev/null +++ b/src/Protocol/ActivityPub/FetchQueue.php @@ -0,0 +1,57 @@ +. + * + */ + +namespace Friendica\Protocol\ActivityPub; + +/** + * This class prevents maximum function nesting errors by flattening recursive calls to Processor::fetchMissingActivity + */ +class FetchQueue +{ + /** @var FetchQueueItem[] */ + protected $queue = []; + + public function push(FetchQueueItem $item) + { + array_push($this->queue, $item); + } + + /** + * Processes missing activities one by one. It is possible that a processing call will add additional missing + * activities, they will be processed in subsequent iterations of the loop. + * + * Since this process is self-contained, it isn't suitable to retrieve the URI of a single activity. + * + * The simplest way to get the URI of the first activity and ensures all the parents are fetched is this way: + * + * $fetchQueue = new ActivityPub\FetchQueue(); + * $fetchedUri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $activityUri); + * $fetchQueue->process(); + */ + public function process() + { + while (count($this->queue)) { + $fetchQueueItem = array_pop($this->queue); + + call_user_func_array([Processor::class, 'fetchMissingActivity'], array_merge([$this], $fetchQueueItem->toParameters())); + } + } +} diff --git a/src/Protocol/ActivityPub/FetchQueueItem.php b/src/Protocol/ActivityPub/FetchQueueItem.php new file mode 100644 index 0000000000..716c231c99 --- /dev/null +++ b/src/Protocol/ActivityPub/FetchQueueItem.php @@ -0,0 +1,62 @@ +. + * + */ + +namespace Friendica\Protocol\ActivityPub; + +class FetchQueueItem +{ + /** @var string */ + private $url; + /** @var array */ + private $child; + /** @var string */ + private $relay_actor; + /** @var int */ + private $completion; + + /** + * This constructor matches the signature of Processor::fetchMissingActivity except for the default $completion value + * + * @param string $url + * @param array $child + * @param string $relay_actor + * @param int $completion + */ + public function __construct(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_AUTO) + { + $this->url = $url; + $this->child = $child; + $this->relay_actor = $relay_actor; + $this->completion = $completion; + } + + /** + * Array meant to be used in call_user_function_array([Processor::class, 'fetchMissingActivity']). Caller needs to + * provide an instance of a FetchQueue that isn't included in these parameters. + * + * @see FetchQueue::process() + * @return array + */ + public function toParameters(): array + { + return [$this->url, $this->child, $this->relay_actor, $this->completion]; + } +} diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index c6b711ab23..cc869e00ee 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -189,15 +189,17 @@ class Processor /** * Updates a message * - * @param array $activity Activity array + * @param FetchQueue $fetchQueue + * @param array $activity Activity array * @throws \Friendica\Network\HTTPException\InternalServerErrorException + * @throws \ImagickException */ - public static function updateItem(array $activity) + public static function updateItem(FetchQueue $fetchQueue, array $activity) { $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]); if (!DBA::isResult($item)) { Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); - $item = self::createItem($activity); + $item = self::createItem($fetchQueue, $activity); if (empty($item)) { return; } @@ -258,12 +260,13 @@ class Processor /** * Prepares data for a message * - * @param array $activity Activity array + * @param FetchQueue $fetchQueue + * @param array $activity Activity array * @return array Internal item * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createItem(array $activity): array + public static function createItem(FetchQueue $fetchQueue, array $activity): array { $item = []; $item['verb'] = Activity::POST; @@ -279,7 +282,12 @@ class Processor if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id']]); - self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + /** + * Instead of calling recursively self::fetchMissingActivity which can hit PHP's default function nesting + * limit of 256 recursive calls, we push the parent activity fetch parameters in this queue. The initial + * caller is responsible for processing the remaining queue once the original activity has been processed. + */ + $fetchQueue->push(new FetchQueueItem($activity['reply-to-id'], $activity)); } $item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? ''; @@ -453,14 +461,15 @@ class Processor /** * Prepare the item array for an activity * - * @param array $activity Activity array - * @param string $verb Activity verb + * @param FetchQueue $fetchQueue + * @param array $activity Activity array + * @param string $verb Activity verb * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createActivity(array $activity, string $verb) + public static function createActivity(FetchQueue $fetchQueue, array $activity, string $verb) { - $item = self::createItem($activity); + $item = self::createItem($fetchQueue, $activity); if (empty($item)) { return; } @@ -1106,14 +1115,16 @@ class Processor /** * Fetches missing posts * - * @param string $url message URL - * @param array $child activity array with the child of this message - * @param string $relay_actor Relay actor - * @param int $completion Completion mode, see Receiver::COMPLETION_* + * @param FetchQueue $fetchQueue + * @param string $url message URL + * @param array $child activity array with the child of this message + * @param string $relay_actor Relay actor + * @param int $completion Completion mode, see Receiver::COMPLETION_* * @return string fetched message URL * @throws \Friendica\Network\HTTPException\InternalServerErrorException + * @throws \ImagickException */ - public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string + public static function fetchMissingActivity(FetchQueue $fetchQueue, string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string { if (!empty($child['receiver'])) { $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']); @@ -1194,7 +1205,7 @@ class Processor return ''; } - ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer); + ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, json_encode($activity), $uid, true, false, $signer); Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]); diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 33c027933c..2a56e9e5e7 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -152,7 +152,9 @@ class Receiver $trust_source = false; } - self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer); + $fetchQueue = new FetchQueue(); + self::processActivity($fetchQueue, $ldactivity, $body, $uid, $trust_source, true, $signer); + $fetchQueue->process(); } /** @@ -200,12 +202,16 @@ class Receiver return; } - $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY); + $fetchQueue = new FetchQueue(); + + $id = Processor::fetchMissingActivity($fetchQueue, $object_id, [], $actor, self::COMPLETION_RELAY); if (empty($id)) { Logger::notice('Relayed message had not been fetched', ['id' => $object_id]); return; } + $fetchQueue->process(); + $item_id = Item::searchByLink($object_id); if ($item_id) { Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id]); @@ -472,15 +478,17 @@ class Receiver /** * Processes the activity object * - * @param array $activity Array with activity data - * @param string $body The unprocessed body - * @param integer $uid User ID - * @param boolean $trust_source Do we trust the source? - * @param boolean $push Message had been pushed to our system - * @param array $signer The signer of the post - * @throws \Exception + * @param FetchQueue $fetchQueue + * @param array $activity Array with activity data + * @param string $body The unprocessed body + * @param int|null $uid User ID + * @param boolean $trust_source Do we trust the source? + * @param boolean $push Message had been pushed to our system + * @param array $signer The signer of the post + * @throws \Friendica\Network\HTTPException\InternalServerErrorException + * @throws \ImagickException */ - public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = []) + public static function processActivity(FetchQueue $fetchQueue, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = []) { $type = JsonLD::fetchElement($activity, '@type'); if (!$type) { @@ -561,7 +569,7 @@ class Receiver switch ($type) { case 'as:Create': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($fetchQueue, $object_data); ActivityPub\Processor::postItem($object_data, $item); } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { // Unhandled Peertube activity @@ -572,7 +580,7 @@ class Receiver case 'as:Invite': if (in_array($object_data['object_type'], ['as:Event'])) { - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($fetchQueue, $object_data); ActivityPub\Processor::postItem($object_data, $item); } else { self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); @@ -596,7 +604,7 @@ class Receiver $object_data['thread-completion'] = Contact::getIdForURL($actor); $object_data['completion-mode'] = self::COMPLETION_ANNOUCE; - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($fetchQueue, $object_data); if (empty($item)) { return; } @@ -615,7 +623,7 @@ class Receiver $announce_object_data['raw'] = $body; } - ActivityPub\Processor::createActivity($announce_object_data, Activity::ANNOUNCE); + ActivityPub\Processor::createActivity($fetchQueue, $announce_object_data, Activity::ANNOUNCE); } else { self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } @@ -623,7 +631,7 @@ class Receiver case 'as:Like': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::LIKE); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::LIKE); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. } else { @@ -633,7 +641,7 @@ class Receiver case 'as:Dislike': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::DISLIKE); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::DISLIKE); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. } else { @@ -643,7 +651,7 @@ class Receiver case 'as:TentativeAccept': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::ATTENDMAYBE); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDMAYBE); } else { self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } @@ -651,7 +659,7 @@ class Receiver case 'as:Update': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::updateItem($object_data); + ActivityPub\Processor::updateItem($fetchQueue, $object_data); } elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) { ActivityPub\Processor::updatePerson($object_data); } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { @@ -696,7 +704,7 @@ class Receiver ActivityPub\Processor::followUser($object_data); } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { $object_data['reply-to-id'] = $object_data['object_id']; - ActivityPub\Processor::createActivity($object_data, Activity::FOLLOW); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::FOLLOW); } else { self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } @@ -706,7 +714,7 @@ class Receiver if ($object_data['object_type'] == 'as:Follow') { ActivityPub\Processor::acceptFollowUser($object_data); } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::ATTEND); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTEND); } else { self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } @@ -716,7 +724,7 @@ class Receiver if ($object_data['object_type'] == 'as:Follow') { ActivityPub\Processor::rejectFollowUser($object_data); } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::ATTENDNO); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDNO); } else { self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } @@ -751,7 +759,7 @@ class Receiver case 'as:View': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::VIEW); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::VIEW); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity. } else { @@ -761,7 +769,7 @@ class Receiver case 'litepub:EmojiReact': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($object_data, Activity::EMOJIREACT); + ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::EMOJIREACT); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. } else {