From 5afa4fa83862878b5fe8ebf11b2988c69e699acd Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 9 Nov 2023 06:43:03 +0000 Subject: [PATCH] Improved relay post processing --- src/Model/APContact.php | 23 +++-- src/Model/Tag.php | 3 +- src/Network/Probe.php | 5 +- src/Protocol/ActivityPub.php | 17 +--- src/Protocol/ActivityPub/Processor.php | 118 ++++++++++++++++++------- src/Protocol/ActivityPub/Receiver.php | 81 +++++++++-------- src/Util/HTTPSignature.php | 2 +- src/Worker/FetchMissingActivity.php | 4 +- 8 files changed, 153 insertions(+), 100 deletions(-) diff --git a/src/Model/APContact.php b/src/Model/APContact.php index ff151e469e..c77e208186 100644 --- a/src/Model/APContact.php +++ b/src/Model/APContact.php @@ -328,7 +328,7 @@ class APContact if (!empty($local_owner)) { $following = ActivityPub\Transmitter::getContacts($local_owner, [Contact::SHARING, Contact::FRIEND], 'following'); } else { - $following = ActivityPub::fetchContent($apcontact['following']); + $following = HTTPSignature::fetch($apcontact['following']); } if (!empty($following['totalItems'])) { // Mastodon seriously allows for this condition? @@ -344,7 +344,7 @@ class APContact if (!empty($local_owner)) { $followers = ActivityPub\Transmitter::getContacts($local_owner, [Contact::FOLLOWER, Contact::FRIEND], 'followers'); } else { - $followers = ActivityPub::fetchContent($apcontact['followers']); + $followers = HTTPSignature::fetch($apcontact['followers']); } if (!empty($followers['totalItems'])) { // Mastodon seriously allows for this condition? @@ -360,7 +360,7 @@ class APContact if (!empty($local_owner)) { $statuses_count = self::getStatusesCount($local_owner); } else { - $outbox = ActivityPub::fetchContent($apcontact['outbox']); + $outbox = HTTPSignature::fetch($apcontact['outbox']); $statuses_count = $outbox['totalItems'] ?? 0; } if (!empty($statuses_count)) { @@ -388,11 +388,11 @@ class APContact if (strlen($apcontact['photo'] ?? '') > 255) { $parts = parse_url($apcontact['photo']); unset($parts['fragment']); - $apcontact['photo'] = (string)Uri::fromParts($parts); + $apcontact['photo'] = (string)Uri::fromParts((array)$parts); if (strlen($apcontact['photo']) > 255) { unset($parts['query']); - $apcontact['photo'] = (string)Uri::fromParts($parts); + $apcontact['photo'] = (string)Uri::fromParts((array)$parts); } if (strlen($apcontact['photo']) > 255) { @@ -587,23 +587,20 @@ class APContact */ public static function isRelay(array $apcontact): bool { - if (in_array($apcontact['type'], ['Person', 'Organization'])) { + if (!in_array($apcontact['type'] ?? '', ['Application', 'Group', 'Service'])) { return false; } - if (($apcontact['type'] == 'Service') && empty($apcontact['outbox']) && empty($apcontact['sharedinbox']) && empty($apcontact['following']) && empty($apcontact['followers']) && empty($apcontact['statuses_count'])) { + $path = parse_url($apcontact['url'], PHP_URL_PATH); + if (($apcontact['type'] == 'Group') && !empty($apcontact['followers']) && ($apcontact['nick'] == 'relay') && ($path == '/actor')) { return true; } - if (empty($apcontact['nick']) || $apcontact['nick'] != 'relay') { - return false; - } - - if (!empty($apcontact['type']) && $apcontact['type'] == 'Application') { + if (in_array($apcontact['type'], ['Application', 'Service']) && empty($apcontact['following']) && empty($apcontact['followers'])) { return true; } - if (!empty($apcontact['type']) && in_array($apcontact['type'], ['Group', 'Service']) && is_null($apcontact['outbox'])) { + if (($apcontact['type'] == 'Application') && ($apcontact['nick'] == 'relay') && in_array($path, ['/actor', '/relay'])) { return true; } diff --git a/src/Model/Tag.php b/src/Model/Tag.php index 1792e29b59..45824bbc1a 100644 --- a/src/Model/Tag.php +++ b/src/Model/Tag.php @@ -30,6 +30,7 @@ use Friendica\Database\DBA; use Friendica\DI; use Friendica\Protocol\ActivityPub; use Friendica\Util\DateTimeFormat; +use Friendica\Util\HTTPSignature; use Friendica\Util\Strings; /** @@ -195,7 +196,7 @@ class Tag $target = self::ACCOUNT; Logger::debug('URL is an account', ['url' => $url]); } elseif ($fetch && ($target != self::GENERAL_COLLECTION)) { - $content = ActivityPub::fetchContent($url); + $content = HTTPSignature::fetch($url); if (!empty($content['type']) && ($content['type'] == 'OrderedCollection')) { $target = self::GENERAL_COLLECTION; Logger::debug('URL is an ordered collection', ['url' => $url]); diff --git a/src/Network/Probe.php b/src/Network/Probe.php index d5a0e285d5..ba2ea4ab4c 100644 --- a/src/Network/Probe.php +++ b/src/Network/Probe.php @@ -43,6 +43,7 @@ use Friendica\Protocol\Feed; use Friendica\Protocol\Salmon; use Friendica\Util\Crypto; use Friendica\Util\DateTimeFormat; +use Friendica\Util\HTTPSignature; use Friendica\Util\Network; use Friendica\Util\Strings; use Friendica\Util\XML; @@ -1860,7 +1861,7 @@ class Probe unset($baseParts['query']); unset($baseParts['fragment']); - return Network::unparseURL($baseParts); + return Network::unparseURL((array)$baseParts); } /** @@ -2132,7 +2133,7 @@ class Probe */ private static function updateFromOutbox(string $feed, array $data): string { - $outbox = ActivityPub::fetchContent($feed); + $outbox = HTTPSignature::fetch($feed); if (empty($outbox)) { return ''; } diff --git a/src/Protocol/ActivityPub.php b/src/Protocol/ActivityPub.php index 522a874fe2..c27506ec1e 100644 --- a/src/Protocol/ActivityPub.php +++ b/src/Protocol/ActivityPub.php @@ -105,19 +105,6 @@ class ActivityPub return $isrequest; } - /** - * Fetches ActivityPub content from the given url - * - * @param string $url content url - * @param integer $uid User ID for the signature - * @return array - * @throws \Friendica\Network\HTTPException\InternalServerErrorException - */ - public static function fetchContent(string $url, int $uid = 0): array - { - return HTTPSignature::fetch($url, $uid); - } - private static function getAccountType(array $apcontact): int { $accounttype = -1; @@ -216,7 +203,7 @@ class ActivityPub */ public static function fetchOutbox(string $url, int $uid) { - $data = self::fetchContent($url, $uid); + $data = HTTPSignature::fetch($url, $uid); if (empty($data)) { return; } @@ -255,7 +242,7 @@ class ActivityPub return []; } - $data = self::fetchContent($url, $uid); + $data = HTTPSignature::fetch($url, $uid); if (empty($data)) { return []; } diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 516ec38ac8..83c068e0ed 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -596,16 +596,16 @@ class Processor */ public static function isActivityGone(string $url): bool { + if (Network::isUrlBlocked($url)) { + return true; + } + try { $curlResult = HTTPSignature::fetchRaw($url, 0); } catch (\Exception $exception) { Logger::notice('Error fetching url', ['url' => $url, 'exception' => $exception]); return true; - } - - if (Network::isUrlBlocked($url)) { - return true; - } + } // @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON if (in_array($curlResult->getReturnCode(), [401, 404])) { @@ -1493,7 +1493,7 @@ class Processor return $object; } - $object = ActivityPub::fetchContent($url, $uid); + $object = HTTPSignature::fetch($url, $uid); if (empty($object)) { Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]); // We perform negative caching. @@ -1520,14 +1520,43 @@ class Processor * @param string $relay_actor Relay actor * @param int $completion Completion mode, see Receiver::COMPLETION_* * @param int $uid User id that is used to fetch the activity - * @return string fetched message URL + * @return string fetched message URL. An empty string indicates a temporary error, null indicates a permament error, * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL, int $uid = 0): string + public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL, int $uid = 0): ?string { - $object = self::fetchCachedActivity($url, $uid); - if (empty($object)) { + if (Network::isUrlBlocked($url)) { + return null; + } + + try { + $curlResult = HTTPSignature::fetchRaw($url, $uid); + } catch (\Exception $exception) { + Logger::notice('Error fetching url', ['url' => $url, 'exception' => $exception]); + return ''; + } + + if (empty($curlResult)) { + return ''; + } + + $body = $curlResult->getBody(); + if (!$curlResult->isSuccess() || empty($body)) { + if (in_array($curlResult->getReturnCode(), [403, 404, 406, 410])) { + return null; + } + return ''; + } + + $object = json_decode($body, true); + if (empty($object) || !is_array($object)) { + $element = explode(';', $curlResult->getContentType()); + if (!in_array($element[0], ['application/activity+json', 'application/ld+json', 'application/json'])) { + Logger::debug('Unexpected content-type', ['url' => $url, 'content-type' => $curlResult->getContentType()]); + return null; + } + Logger::notice('Invalid JSON data', ['url' => $url, 'content-type' => $curlResult->getContentType(), 'body' => $body]); return ''; } @@ -1560,28 +1589,28 @@ class Processor $actor = $object_actor; } - if (!empty($object['published'])) { - $published = $object['published']; - } elseif (!empty($child['published'])) { - $published = $child['published']; + $ldobject = JsonLD::compact($object); + + $type = JsonLD::fetchElement($ldobject, '@type'); + $object_id = JsonLD::fetchElement($ldobject, 'as:object', '@id'); + + if (!in_array($type, Receiver::CONTENT_TYPES) && !empty($object_id)) { + if (($type == 'as:Announce') && !empty($relay_actor) && ($completion = Receiver::COMPLETION_RELAY)) { + if (Item::searchByLink($object_id)) { + return $object_id; + } + Logger::debug('Fetch announced activity', ['type' => $type, 'id' => $object_id, 'actor' => $relay_actor, 'signer' => $signer]); + + return self::fetchMissingActivity($object_id, $child, $relay_actor, $completion, $uid); + } + $activity = $object; + $ldactivity = $ldobject; } else { - $published = DateTimeFormat::utcNow(); + $activity = self::getActivityForObject($object, $actor); + $ldactivity = JsonLD::compact($activity); + $object_id = $object['id']; } - - $activity = []; - $activity['@context'] = $object['@context'] ?? ActivityPub::CONTEXT; - unset($object['@context']); - $activity['id'] = $object['id']; - $activity['to'] = $object['to'] ?? []; - $activity['cc'] = $object['cc'] ?? []; - $activity['audience'] = $object['audience'] ?? []; - $activity['actor'] = $actor; - $activity['object'] = $object; - $activity['published'] = $published; - $activity['type'] = 'Create'; - - $ldactivity = JsonLD::compact($activity); - + $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0; if ($object_actor != $actor) { @@ -1600,8 +1629,8 @@ class Processor if ($completion == Receiver::COMPLETION_RELAY) { $ldactivity['from-relay'] = $ldactivity['thread-completion']; - if (!self::acceptIncomingMessage($ldactivity, $object['id'])) { - return ''; + if (in_array($type, Receiver::CONTENT_TYPES) && !self::acceptIncomingMessage($ldactivity, $object_id)) { + return null; } } @@ -1624,6 +1653,31 @@ class Processor return $activity['id']; } + private static function getActivityForObject(array $object, string $actor): array + { + if (!empty($object['published'])) { + $published = $object['published']; + } elseif (!empty($child['published'])) { + $published = $child['published']; + } else { + $published = DateTimeFormat::utcNow(); + } + + $activity = []; + $activity['@context'] = $object['@context'] ?? ActivityPub::CONTEXT; + unset($object['@context']); + $activity['id'] = $object['id']; + $activity['to'] = $object['to'] ?? []; + $activity['cc'] = $object['cc'] ?? []; + $activity['audience'] = $object['audience'] ?? []; + $activity['actor'] = $actor; + $activity['object'] = $object; + $activity['published'] = $published; + $activity['type'] = 'Create'; + + return $activity; + } + /** * Test if incoming relay messages should be accepted * diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 992b428c35..bb671003da 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -108,7 +108,7 @@ class Receiver if (empty($apcontact)) { Logger::notice('Unable to retrieve AP contact for actor - message is discarded', ['actor' => $actor]); return; - } elseif (APContact::isRelay($apcontact)) { + } elseif (APContact::isRelay($apcontact) && self::isRelayPost($ldactivity)) { self::processRelayPost($ldactivity, $actor); return; } else { @@ -116,7 +116,7 @@ class Receiver } $sig_contact = HTTPSignature::getKeyIdContact($header); - if (APContact::isRelay($sig_contact)) { + if (APContact::isRelay($sig_contact) && self::isRelayPost($ldactivity)) { Logger::info('Message from a relay', ['url' => $sig_contact['url']]); self::processRelayPost($ldactivity, $sig_contact['url']); return; @@ -169,6 +169,34 @@ class Receiver self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer); } + /** + * Check if the activity is a post rhat can be send via a relay + * + * @param array $activity + * @return boolean + */ + private static function isRelayPost(array $activity): bool + { + $type = JsonLD::fetchElement($activity, '@type'); + if (!$type) { + return false; + } + + $object_type = JsonLD::fetchElement($activity, 'as:object', '@type') ?? ''; + + $object_id = JsonLD::fetchElement($activity, 'as:object', '@id'); + if (empty($object_id)) { + return false; + } + + $handle = ($type == 'as:Announce'); + + if (!$handle && in_array($type, ['as:Create', 'as:Update'])) { + $handle = in_array($object_type, self::CONTENT_TYPES); + } + return $handle; + } + /** * Process incoming posts from relays * @@ -192,34 +220,6 @@ class Receiver return; } - $handle = ($type == 'as:Announce'); - - if (!$handle && in_array($type, ['as:Create', 'as:Update'])) { - $handle = in_array($object_type, self::CONTENT_TYPES); - } - - if (!$handle) { - $trust_source = false; - $object_data = self::prepareObjectData($activity, 0, false, $trust_source); - - if (!$trust_source) { - Logger::notice('Activity trust could not be achieved.', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]); - return; - } - - if (empty($object_data)) { - Logger::notice('No object data found', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]); - return; - } - - if (self::routeActivities($object_data, $type, true)) { - Logger::debug('Handled activity', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor]); - } else { - Logger::info('Unhandled activity', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]); - } - return; - } - $contact = Contact::getByURL($actor); if (empty($contact)) { Logger::info('Relay contact not found', ['actor' => $actor]); @@ -231,7 +231,7 @@ class Receiver return; } - Logger::debug('Got relayed message id', ['id' => $object_id, 'actor' => $actor]); + Logger::debug('Process post from relay server', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor]); $item_id = Item::searchByLink($object_id); if ($item_id) { @@ -239,10 +239,21 @@ class Receiver return; } - $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY); - if (empty($id)) { - Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]); - return; + if (!DI::config()->get('system', 'decoupled_receiver')) { + $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY); + if (!empty($id)) { + Logger::notice('Relayed message is fetched', ['result' => $id, 'id' => $object_id, 'actor' => $actor]); + } else { + Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor, 'activity' => $activity]); + } + } elseif (!Fetch::hasWorker($object_id)) { + Logger::notice('Fetching is done by worker.', ['id' => $object_id]); + Fetch::add($object_id); + $activity['recursion-depth'] = 0; + $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $object_id, [], $actor, self::COMPLETION_RELAY); + Fetch::setWorkerId($object_id, $wid); + } else { + Logger::debug('Activity will already be fetched via a worker.', ['url' => $object_id]); } } diff --git a/src/Util/HTTPSignature.php b/src/Util/HTTPSignature.php index bc701d064e..9e130bc916 100644 --- a/src/Util/HTTPSignature.php +++ b/src/Util/HTTPSignature.php @@ -420,7 +420,7 @@ class HTTPSignature * @return array JSON array * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function fetch(string $request, int $uid): array + public static function fetch(string $request, int $uid = 0): array { try { $curlResult = self::fetchRaw($request, $uid); diff --git a/src/Worker/FetchMissingActivity.php b/src/Worker/FetchMissingActivity.php index 4d1b23b068..d2ebc34f96 100644 --- a/src/Worker/FetchMissingActivity.php +++ b/src/Worker/FetchMissingActivity.php @@ -44,8 +44,10 @@ class FetchMissingActivity $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); if ($result) { Logger::info('Successfully fetched missing activity', ['url' => $url]); + } elseif (is_null($result)) { + Logger::info('Permament error, activity could not be fetched', ['url' => $url]); } elseif (!Worker::defer(self::WORKER_DEFER_LIMIT)) { - Logger::info('Activity could not be fetched', ['url' => $url]); + Logger::info('Defer limit reached, activity could not be fetched', ['url' => $url]); // recursively delete all entries that belong to this worker task $queue = DI::app()->getQueue();