From 1d13574225ef977fa4f9d9a5e96ba7e81e7d22d3 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 21 Jul 2022 05:16:14 +0000 Subject: [PATCH] Fetching of missing posts is reworked --- src/Model/Item.php | 4 +- src/Module/Debug/ActivityPubConversion.php | 2 +- src/Protocol/ActivityPub.php | 7 +- src/Protocol/ActivityPub/FetchQueue.php | 57 ------ src/Protocol/ActivityPub/FetchQueueItem.php | 62 ------- src/Protocol/ActivityPub/Processor.php | 69 ++++---- src/Protocol/ActivityPub/Queue.php | 114 ++++++++++++ src/Protocol/ActivityPub/Receiver.php | 182 +++++++++----------- src/Worker/FetchMissingActivity.php | 40 +++++ static/dbstructure.config.php | 2 + 10 files changed, 275 insertions(+), 264 deletions(-) delete mode 100644 src/Protocol/ActivityPub/FetchQueue.php delete mode 100644 src/Protocol/ActivityPub/FetchQueueItem.php create mode 100644 src/Protocol/ActivityPub/Queue.php create mode 100644 src/Worker/FetchMissingActivity.php diff --git a/src/Model/Item.php b/src/Model/Item.php index 01ea942c8..7c3eea9eb 100644 --- a/src/Model/Item.php +++ b/src/Model/Item.php @@ -3410,9 +3410,7 @@ class Item return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0; } - $fetchQueue = new ActivityPub\FetchQueue(); - $fetched_uri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $uri); - $fetchQueue->process(); + $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri); if ($fetched_uri) { $item_id = self::searchByLink($fetched_uri, $uid); diff --git a/src/Module/Debug/ActivityPubConversion.php b/src/Module/Debug/ActivityPubConversion.php index 5fa9a8b40..ec7fee3f4 100644 --- a/src/Module/Debug/ActivityPubConversion.php +++ b/src/Module/Debug/ActivityPubConversion.php @@ -123,7 +123,7 @@ class ActivityPubConversion extends BaseModule 'content' => visible_whitespace(var_export($object_data, true)) ]; - $item = ActivityPub\Processor::createItem(new ActivityPub\FetchQueue(), $object_data); + $item = ActivityPub\Processor::createItem($object_data); $results[] = [ 'title' => DI::l10n()->t('Result Item'), diff --git a/src/Protocol/ActivityPub.php b/src/Protocol/ActivityPub.php index 858f837e8..93204e81d 100644 --- a/src/Protocol/ActivityPub.php +++ b/src/Protocol/ActivityPub.php @@ -25,7 +25,6 @@ use Friendica\Core\Logger; use Friendica\Core\Protocol; use Friendica\Model\APContact; use Friendica\Model\User; -use Friendica\Protocol\ActivityPub\FetchQueue; use Friendica\Util\HTTPSignature; use Friendica\Util\JsonLD; @@ -224,14 +223,10 @@ class ActivityPub $items = []; } - $fetchQueue = new FetchQueue(); - foreach ($items as $activity) { $ldactivity = JsonLD::compact($activity); - ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, '', $uid, true); + ActivityPub\Receiver::processActivity($ldactivity, '', $uid, true); } - - $fetchQueue->process(); } /** diff --git a/src/Protocol/ActivityPub/FetchQueue.php b/src/Protocol/ActivityPub/FetchQueue.php deleted file mode 100644 index dfaa33836..000000000 --- a/src/Protocol/ActivityPub/FetchQueue.php +++ /dev/null @@ -1,57 +0,0 @@ -. - * - */ - -namespace Friendica\Protocol\ActivityPub; - -/** - * This class prevents maximum function nesting errors by flattening recursive calls to Processor::fetchMissingActivity - */ -class FetchQueue -{ - /** @var FetchQueueItem[] */ - protected $queue = []; - - public function push(FetchQueueItem $item) - { - array_push($this->queue, $item); - } - - /** - * Processes missing activities one by one. It is possible that a processing call will add additional missing - * activities, they will be processed in subsequent iterations of the loop. - * - * Since this process is self-contained, it isn't suitable to retrieve the URI of a single activity. - * - * The simplest way to get the URI of the first activity and ensures all the parents are fetched is this way: - * - * $fetchQueue = new ActivityPub\FetchQueue(); - * $fetchedUri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $activityUri); - * $fetchQueue->process(); - */ - public function process() - { - while (count($this->queue)) { - $fetchQueueItem = array_pop($this->queue); - - call_user_func_array([Processor::class, 'fetchMissingActivity'], array_merge([$this], $fetchQueueItem->toParameters())); - } - } -} diff --git a/src/Protocol/ActivityPub/FetchQueueItem.php b/src/Protocol/ActivityPub/FetchQueueItem.php deleted file mode 100644 index 716c231c9..000000000 --- a/src/Protocol/ActivityPub/FetchQueueItem.php +++ /dev/null @@ -1,62 +0,0 @@ -. - * - */ - -namespace Friendica\Protocol\ActivityPub; - -class FetchQueueItem -{ - /** @var string */ - private $url; - /** @var array */ - private $child; - /** @var string */ - private $relay_actor; - /** @var int */ - private $completion; - - /** - * This constructor matches the signature of Processor::fetchMissingActivity except for the default $completion value - * - * @param string $url - * @param array $child - * @param string $relay_actor - * @param int $completion - */ - public function __construct(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_AUTO) - { - $this->url = $url; - $this->child = $child; - $this->relay_actor = $relay_actor; - $this->completion = $completion; - } - - /** - * Array meant to be used in call_user_function_array([Processor::class, 'fetchMissingActivity']). Caller needs to - * provide an instance of a FetchQueue that isn't included in these parameters. - * - * @see FetchQueue::process() - * @return array - */ - public function toParameters(): array - { - return [$this->url, $this->child, $this->relay_actor, $this->completion]; - } -} diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index e6ff43f06..60793f802 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -189,17 +189,16 @@ class Processor /** * Updates a message * - * @param FetchQueue $fetchQueue * @param array $activity Activity array * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function updateItem(FetchQueue $fetchQueue, array $activity) + public static function updateItem(array $activity) { $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]); if (!DBA::isResult($item)) { Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); - $item = self::createItem($fetchQueue, $activity); + $item = self::createItem($activity); if (empty($item)) { return; } @@ -223,7 +222,7 @@ class Processor Post\History::add($item['uri-id'], $item); Item::update($item, ['uri' => $activity['id']]); - Receiver::removeFromQueue($activity); + Queue::remove($activity); if ($activity['object_type'] == 'as:Event') { $posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]); @@ -262,13 +261,12 @@ class Processor /** * Prepares data for a message * - * @param FetchQueue $fetchQueue * @param array $activity Activity array * @return array Internal item * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createItem(FetchQueue $fetchQueue, array $activity): array + public static function createItem(array $activity): array { $item = []; $item['verb'] = Activity::POST; @@ -283,13 +281,15 @@ class Processor } if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { - Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id']]); - /** - * Instead of calling recursively self::fetchMissingActivity which can hit PHP's default function nesting - * limit of 256 recursive calls, we push the parent activity fetch parameters in this queue. The initial - * caller is responsible for processing the remaining queue once the original activity has been processed. - */ - $fetchQueue->push(new FetchQueueItem($activity['reply-to-id'], $activity)); + $recursion_depth = $activity['recursion-depth'] ?? 0; + Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + if ($recursion_depth < 10) { + self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + } else { + Logger::notice('Recursion level is too high, fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + return []; + } } $item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? ''; @@ -428,7 +428,7 @@ class Processor Logger::info('Deleting item', ['object' => $activity['object_id'], 'owner' => $owner]); Item::markForDeletion(['uri' => $activity['object_id'], 'owner-id' => $owner]); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -464,15 +464,14 @@ class Processor /** * Prepare the item array for an activity * - * @param FetchQueue $fetchQueue * @param array $activity Activity array * @param string $verb Activity verb * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createActivity(FetchQueue $fetchQueue, array $activity, string $verb) + public static function createActivity(array $activity, string $verb) { - $item = self::createItem($fetchQueue, $activity); + $item = self::createItem($activity); if (empty($item)) { return; } @@ -546,7 +545,7 @@ class Processor Logger::debug('Add post to featured collection', ['uri-id' => $uriid]); Post\Collection::add($uriid, Post\Collection::FEATURED); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -564,7 +563,7 @@ class Processor Logger::debug('Remove post from featured collection', ['uri-id' => $uriid]); Post\Collection::remove($uriid, Post\Collection::FEATURED); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -652,13 +651,12 @@ class Processor $item['body'] = Item::improveSharedDataInBody($item); } else { if (empty($activity['directmessage']) && ($item['thr-parent'] != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) { - $item_private = !in_array(0, $activity['item_receiver']); $parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $item['thr-parent']]); if (!DBA::isResult($parent)) { Logger::warning('Unknown parent item.', ['uri' => $item['thr-parent']]); return false; } - if ($item_private && ($parent['private'] != Item::PRIVATE)) { + if (($parent['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) { Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]); return false; } @@ -783,6 +781,7 @@ class Processor } $stored = false; + $success = false; ksort($activity['receiver']); if (!self::isSolicitedMessage($activity, $item)) { @@ -895,7 +894,7 @@ class Processor $item_id = Item::insert($item); if ($item_id) { Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]); - Receiver::removeFromQueue($activity); + $success = true; } else { Logger::notice('Item insertion aborted', ['user' => $item['uid']]); } @@ -905,6 +904,11 @@ class Processor } } + if ($success) { + Queue::remove($activity); + Queue::processReplyByUri($item['uri']); + } + // Store send a follow request for every reshare - but only when the item had been stored if ($stored && ($item['private'] != Item::PRIVATE) && ($item['gravity'] == GRAVITY_PARENT) && !empty($item['author-link']) && ($item['author-link'] != $item['owner-link'])) { $author = APContact::getByURL($item['owner-link'], false); @@ -1121,7 +1125,6 @@ class Processor /** * Fetches missing posts * - * @param FetchQueue $fetchQueue * @param string $url message URL * @param array $child activity array with the child of this message * @param string $relay_actor Relay actor @@ -1130,7 +1133,7 @@ class Processor * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function fetchMissingActivity(FetchQueue $fetchQueue, string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string + public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string { if (!empty($child['receiver'])) { $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']); @@ -1140,7 +1143,7 @@ class Processor $object = ActivityPub::fetchContent($url, $uid); if (empty($object)) { - Logger::notice('Activity was not fetchable, aborting.', ['url' => $url]); + Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]); return ''; } @@ -1192,6 +1195,8 @@ class Processor $ldactivity = JsonLD::compact($activity); + $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1; + if (!empty($relay_actor)) { $ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor); $ldactivity['completion-mode'] = Receiver::COMPLETION_RELAY; @@ -1211,7 +1216,7 @@ class Processor return ''; } - ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, json_encode($activity), $uid, true, false, $signer); + ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer); Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]); @@ -1354,7 +1359,7 @@ class Processor Logger::info('Updating profile', ['object' => $activity['object_id']]); Contact::updateFromProbeByURL($activity['object_id']); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -1383,7 +1388,7 @@ class Processor DBA::close($contacts); Logger::info('Deleted contact', ['object' => $activity['object_id']]); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -1466,7 +1471,7 @@ class Processor $condition = ['id' => $cid]; Contact::update($fields, $condition); Logger::info('Accept contact request', ['contact' => $cid, 'user' => $uid]); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -1500,7 +1505,7 @@ class Processor } else { Logger::info('Rejected contact request', ['contact' => $cid, 'user' => $uid]); } - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -1526,7 +1531,7 @@ class Processor } Item::markForDeletion(['uri' => $activity['object_id'], 'author-id' => $author_id, 'gravity' => GRAVITY_ACTIVITY]); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** @@ -1563,7 +1568,7 @@ class Processor Contact::removeFollower($contact); Logger::info('Undo following request', ['contact' => $cid, 'user' => $uid]); - Receiver::removeFromQueue($activity); + Queue::remove($activity); } /** diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php new file mode 100644 index 000000000..0cb84727f --- /dev/null +++ b/src/Protocol/ActivityPub/Queue.php @@ -0,0 +1,114 @@ +. + * + */ + +namespace Friendica\Protocol\ActivityPub; + +use Friendica\Core\Logger; +use Friendica\Database\Database; +use Friendica\Database\DBA; +use Friendica\Util\DateTimeFormat; + +/** + * This class handles the processing of incoming posts + */ +class Queue +{ + public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array + { + $fields = [ + 'activity-id' => $activity['id'], + 'object-id' => $activity['object_id'], + 'type' => $type, + 'object-type' => $activity['object_type'], + 'activity' => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), + 'received' => DateTimeFormat::utcNow(), + 'push' => $push, + ]; + + if (!empty($activity['reply-to-id'])) { + $fields['in-reply-to-id'] = $activity['reply-to-id']; + } + + if (!empty($activity['object_object_type'])) { + $fields['object-object-type'] = $activity['object_object_type']; + } + + if (!empty($http_signer)) { + $fields['signer'] = $http_signer; + } + + DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE); + + $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]); + if (!empty($queue['id'])) { + $activity['entry-id'] = $queue['id']; + DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE); + } + return $activity; + } + + public static function remove(array $activity = []) + { + if (empty($activity['entry-id'])) { + return; + } + DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); + //echo "Delete ".$activity['entry-id']."\n"; + + } + + public static function process(int $id) + { + $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]); + if (empty($entry)) { + return; + } + + Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]); + + $activity = json_decode($entry['activity'], true); + $type = $entry['type']; + $push = $entry['push']; + + $activity['entry-id'] = $entry['id']; + + if (!Receiver::routeActivities($activity, $type, $push)) { + self::remove($activity); + } + } + + public static function processAll() + { + $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]); + while ($entry = DBA::fetch($entries)) { + echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n"; + self::process($entry['id']); + } + } + + public static function processReplyByUri(string $uri) + { + $entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]); + while ($entry = DBA::fetch($entries)) { + self::process($entry['id']); + } + } +} diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 4d7309001..cd583132e 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -28,7 +28,6 @@ use Friendica\Content\Text\Markdown; use Friendica\Core\Logger; use Friendica\Core\Protocol; use Friendica\Core\System; -use Friendica\Database\Database; use Friendica\DI; use Friendica\Model\Contact; use Friendica\Model\APContact; @@ -37,7 +36,6 @@ use Friendica\Model\Post; use Friendica\Model\User; use Friendica\Protocol\Activity; use Friendica\Protocol\ActivityPub; -use Friendica\Util\DateTimeFormat; use Friendica\Util\HTTPSignature; use Friendica\Util\JsonLD; use Friendica\Util\LDSignature; @@ -155,46 +153,7 @@ class Receiver $trust_source = false; } - $fetchQueue = new FetchQueue(); - self::processActivity($fetchQueue, $ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer); - $fetchQueue->process(); - } - - private static function enqueuePost(array $ldactivity = [], string $type, int $uid, string $http_signer): array - { - $fields = [ - 'activity-id' => $ldactivity['id'], - 'object-id' => $ldactivity['object_id'], - 'type' => $type, - 'object-type' => $ldactivity['object_type'], - 'activity' => json_encode($ldactivity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), - 'received' => DateTimeFormat::utcNow(), - ]; - - if (!empty($ldactivity['object_object_type'])) { - $fields['object-object-type'] = $ldactivity['object_object_type']; - } - - if (!empty($http_signer)) { - $fields['signer'] = $http_signer; - } - - DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE); - - $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $ldactivity['id']]); - if (!empty($queue['id'])) { - $ldactivity['entry-id'] = $queue['id']; - DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE); - } - return $ldactivity; - } - - public static function removeFromQueue(array $activity = []) - { - if (empty($activity['entry-id'])) { - return; - } - DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); + self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer); } /** @@ -242,16 +201,12 @@ class Receiver return; } - $fetchQueue = new FetchQueue(); - - $id = Processor::fetchMissingActivity($fetchQueue, $object_id, [], $actor, self::COMPLETION_RELAY); + $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY); if (empty($id)) { Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]); return; } - $fetchQueue->process(); - $item_id = Item::searchByLink($object_id); if ($item_id) { Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]); @@ -518,7 +473,6 @@ class Receiver /** * Processes the activity object * - * @param FetchQueue $fetchQueue * @param array $activity Array with activity data * @param string $body The unprocessed body * @param int|null $uid User ID @@ -528,7 +482,7 @@ class Receiver * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function processActivity(FetchQueue $fetchQueue, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '') + public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '') { $type = JsonLD::fetchElement($activity, '@type'); if (!$type) { @@ -588,7 +542,7 @@ class Receiver if (!empty($activity['thread-completion'])) { $object_data['thread-completion'] = $activity['thread-completion']; } - + if (!empty($activity['completion-mode'])) { $object_data['completion-mode'] = $activity['completion-mode']; } @@ -597,36 +551,56 @@ class Receiver $object_data['thread-children-type'] = $activity['thread-children-type']; } + if (!empty($activity['recursion-depth'])) { + $object_data['recursion-depth'] = $activity['recursion-depth']; + } + // Internal flag for posts that arrived via relay if (!empty($activity['from-relay'])) { $object_data['from-relay'] = $activity['from-relay']; } - $object_data = self::enqueuePost($object_data, $type, $uid, $http_signer); + if ($type == 'as:Announce') { + $object_data['object_activity'] = $activity; + } + + $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push); if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) { self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } + if (!self::routeActivities($object_data, $type, $push)) { + self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + //if (!DI::config()->get('debug', 'ap_log_unknown')) { + // Queue::remove($object_data); + //} + } + } + + public static function routeActivities($object_data, $type, $push) + { + $activity = $object_data['object_activity'] ?? []; + switch ($type) { case 'as:Create': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - $item = ActivityPub\Processor::createItem($fetchQueue, $object_data); + $item = ActivityPub\Processor::createItem($object_data); ActivityPub\Processor::postItem($object_data, $item); } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { // Unhandled Peertube activity - self::removeFromQueue($object_data); + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:Invite': if (in_array($object_data['object_type'], ['as:Event'])) { - $item = ActivityPub\Processor::createItem($fetchQueue, $object_data); + $item = ActivityPub\Processor::createItem($object_data); ActivityPub\Processor::postItem($object_data, $item); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -637,17 +611,19 @@ class Receiver ActivityPub\Processor::addToFeaturedCollection($object_data); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:Announce': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { + $actor = JsonLD::fetchElement($activity, 'as:actor', '@id'); $object_data['thread-completion'] = Contact::getIdForURL($actor); $object_data['completion-mode'] = self::COMPLETION_ANNOUCE; - $item = ActivityPub\Processor::createItem($fetchQueue, $object_data); + $item = ActivityPub\Processor::createItem($object_data); if (empty($item)) { return; } @@ -655,61 +631,64 @@ class Receiver $item['post-reason'] = Item::PR_ANNOUNCEMENT; ActivityPub\Processor::postItem($object_data, $item); - $announce_object_data = self::processObject($activity); - $announce_object_data['name'] = $type; - $announce_object_data['author'] = JsonLD::fetchElement($activity, 'as:actor', '@id'); - $announce_object_data['object_id'] = $object_data['object_id']; - $announce_object_data['object_type'] = $object_data['object_type']; - $announce_object_data['push'] = $push; + if (!empty($activity)) { + $announce_object_data = self::processObject($activity); + $announce_object_data['name'] = $type; + $announce_object_data['author'] = $actor; + $announce_object_data['object_id'] = $object_data['object_id']; + $announce_object_data['object_type'] = $object_data['object_type']; + $announce_object_data['push'] = $push; - if (!empty($body)) { - $announce_object_data['raw'] = $body; - } - - ActivityPub\Processor::createActivity($fetchQueue, $announce_object_data, Activity::ANNOUNCE); + if (!empty($object_data['raw'])) { + $announce_object_data['raw'] = $object_data['raw']; + } + ActivityPub\Processor::createActivity($announce_object_data, Activity::ANNOUNCE); + } else echo "\n***************************\n"; } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:Like': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::LIKE); + ActivityPub\Processor::createActivity($object_data, Activity::LIKE); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:Dislike': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::DISLIKE); + ActivityPub\Processor::createActivity($object_data, Activity::DISLIKE); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:TentativeAccept': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDMAYBE); + ActivityPub\Processor::createActivity($object_data, Activity::ATTENDMAYBE); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:Update': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::updateItem($fetchQueue, $object_data); + ActivityPub\Processor::updateItem($object_data); } elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) { ActivityPub\Processor::updatePerson($object_data); } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { // Unhandled Peertube activity - self::removeFromQueue($object_data); + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -720,8 +699,9 @@ class Receiver ActivityPub\Processor::deletePerson($object_data); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity. + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -729,7 +709,7 @@ class Receiver if (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) { ActivityPub\Processor::blockAccount($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -738,8 +718,9 @@ class Receiver ActivityPub\Processor::removeFromFeaturedCollection($object_data); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -748,9 +729,9 @@ class Receiver ActivityPub\Processor::followUser($object_data); } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { $object_data['reply-to-id'] = $object_data['object_id']; - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::FOLLOW); + ActivityPub\Processor::createActivity($object_data, Activity::FOLLOW); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -758,9 +739,9 @@ class Receiver if ($object_data['object_type'] == 'as:Follow') { ActivityPub\Processor::acceptFollowUser($object_data); } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTEND); + ActivityPub\Processor::createActivity($object_data, Activity::ATTEND); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -768,9 +749,9 @@ class Receiver if ($object_data['object_type'] == 'as:Follow') { ActivityPub\Processor::rejectFollowUser($object_data); } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDNO); + ActivityPub\Processor::createActivity($object_data, Activity::ATTENDNO); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; @@ -793,42 +774,42 @@ class Receiver } elseif (in_array($object_data['object_type'], array_merge(self::ACTIVITY_TYPES, ['as:Announce', 'as:Create', ''])) && empty($object_data['object_object_type'])) { // We cannot detect the target object. So we can ignore it. - self::removeFromQueue($object_data); } elseif (in_array($object_data['object_type'], ['as:Create']) && in_array($object_data['object_object_type'], ['pt:CacheFile'])) { // Unhandled Peertube activity - self::removeFromQueue($object_data); + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'as:View': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::VIEW); + ActivityPub\Processor::createActivity($object_data, Activity::VIEW); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity. - self::removeFromQueue($object_data); + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; case 'litepub:EmojiReact': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::EMOJIREACT); + ActivityPub\Processor::createActivity($object_data, Activity::EMOJIREACT); } elseif ($object_data['object_type'] == '') { // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. + Queue::remove($object_data); } else { - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); + return false; } break; default: Logger::info('Unknown activity: ' . $type . ' ' . $object_data['object_type']); - self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); - break; + return false; } + return true; } /** @@ -847,11 +828,6 @@ class Receiver */ private static function storeUnhandledActivity(bool $unknown, string $type, array $object_data, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = []) { - if (!DI::config()->get('debug', 'ap_log_unknown')) { - self::removeFromQueue($activity); - return; - } - $file = ($unknown ? 'unknown-' : 'unhandled-') . str_replace(':', '-', $type) . '-'; if (!empty($object_data['object_type'])) { diff --git a/src/Worker/FetchMissingActivity.php b/src/Worker/FetchMissingActivity.php new file mode 100644 index 000000000..5aabe89ce --- /dev/null +++ b/src/Worker/FetchMissingActivity.php @@ -0,0 +1,40 @@ +. + * + */ + +namespace Friendica\Worker; + +use Friendica\Core\Logger; +use Friendica\Protocol\ActivityPub; +use Friendica\Protocol\ActivityPub\Receiver; + +class FetchMissingActivity +{ + /** + * Fetch missing activities + * @param string $url Contact URL + */ + public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL) + { + Logger::info('Start fetching missing activity', ['url' => $url]); + $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); + Logger::info('Finished fetching missing activity', ['url' => $url, 'result' => $result]); + } +} diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index c1f1cfcbc..0db568187 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -790,12 +790,14 @@ return [ "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"], "activity-id" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"], "object-id" => ["type" => "varbinary(255)", "comment" => ""], + "in-reply-to-id" => ["type" => "varbinary(255)", "comment" => ""], "type" => ["type" => "varchar(64)", "comment" => "Type of the activity"], "object-type" => ["type" => "varchar(64)", "comment" => "Type of the object activity"], "object-object-type" => ["type" => "varchar(64)", "comment" => "Type of the object's object activity"], "received" => ["type" => "datetime", "comment" => "Receiving date"], "activity" => ["type" => "mediumtext", "comment" => "The JSON activity"], "signer" => ["type" => "varchar(255)", "comment" => ""], + "push" => ["type" => "boolean", "not null" => "1", "default" => "0", "comment" => ""], ], "indexes" => [ "PRIMARY" => ["id"],