Improved relay post processing

This commit is contained in:
Michael 2023-11-09 06:43:03 +00:00
parent a1b5ec94fb
commit 5afa4fa838
8 changed files with 153 additions and 100 deletions

View file

@ -328,7 +328,7 @@ class APContact
if (!empty($local_owner)) { if (!empty($local_owner)) {
$following = ActivityPub\Transmitter::getContacts($local_owner, [Contact::SHARING, Contact::FRIEND], 'following'); $following = ActivityPub\Transmitter::getContacts($local_owner, [Contact::SHARING, Contact::FRIEND], 'following');
} else { } else {
$following = ActivityPub::fetchContent($apcontact['following']); $following = HTTPSignature::fetch($apcontact['following']);
} }
if (!empty($following['totalItems'])) { if (!empty($following['totalItems'])) {
// Mastodon seriously allows for this condition? // Mastodon seriously allows for this condition?
@ -344,7 +344,7 @@ class APContact
if (!empty($local_owner)) { if (!empty($local_owner)) {
$followers = ActivityPub\Transmitter::getContacts($local_owner, [Contact::FOLLOWER, Contact::FRIEND], 'followers'); $followers = ActivityPub\Transmitter::getContacts($local_owner, [Contact::FOLLOWER, Contact::FRIEND], 'followers');
} else { } else {
$followers = ActivityPub::fetchContent($apcontact['followers']); $followers = HTTPSignature::fetch($apcontact['followers']);
} }
if (!empty($followers['totalItems'])) { if (!empty($followers['totalItems'])) {
// Mastodon seriously allows for this condition? // Mastodon seriously allows for this condition?
@ -360,7 +360,7 @@ class APContact
if (!empty($local_owner)) { if (!empty($local_owner)) {
$statuses_count = self::getStatusesCount($local_owner); $statuses_count = self::getStatusesCount($local_owner);
} else { } else {
$outbox = ActivityPub::fetchContent($apcontact['outbox']); $outbox = HTTPSignature::fetch($apcontact['outbox']);
$statuses_count = $outbox['totalItems'] ?? 0; $statuses_count = $outbox['totalItems'] ?? 0;
} }
if (!empty($statuses_count)) { if (!empty($statuses_count)) {
@ -388,11 +388,11 @@ class APContact
if (strlen($apcontact['photo'] ?? '') > 255) { if (strlen($apcontact['photo'] ?? '') > 255) {
$parts = parse_url($apcontact['photo']); $parts = parse_url($apcontact['photo']);
unset($parts['fragment']); unset($parts['fragment']);
$apcontact['photo'] = (string)Uri::fromParts($parts); $apcontact['photo'] = (string)Uri::fromParts((array)$parts);
if (strlen($apcontact['photo']) > 255) { if (strlen($apcontact['photo']) > 255) {
unset($parts['query']); unset($parts['query']);
$apcontact['photo'] = (string)Uri::fromParts($parts); $apcontact['photo'] = (string)Uri::fromParts((array)$parts);
} }
if (strlen($apcontact['photo']) > 255) { if (strlen($apcontact['photo']) > 255) {
@ -587,23 +587,20 @@ class APContact
*/ */
public static function isRelay(array $apcontact): bool public static function isRelay(array $apcontact): bool
{ {
if (in_array($apcontact['type'], ['Person', 'Organization'])) { if (!in_array($apcontact['type'] ?? '', ['Application', 'Group', 'Service'])) {
return false; return false;
} }
if (($apcontact['type'] == 'Service') && empty($apcontact['outbox']) && empty($apcontact['sharedinbox']) && empty($apcontact['following']) && empty($apcontact['followers']) && empty($apcontact['statuses_count'])) { $path = parse_url($apcontact['url'], PHP_URL_PATH);
if (($apcontact['type'] == 'Group') && !empty($apcontact['followers']) && ($apcontact['nick'] == 'relay') && ($path == '/actor')) {
return true; return true;
} }
if (empty($apcontact['nick']) || $apcontact['nick'] != 'relay') { if (in_array($apcontact['type'], ['Application', 'Service']) && empty($apcontact['following']) && empty($apcontact['followers'])) {
return false;
}
if (!empty($apcontact['type']) && $apcontact['type'] == 'Application') {
return true; return true;
} }
if (!empty($apcontact['type']) && in_array($apcontact['type'], ['Group', 'Service']) && is_null($apcontact['outbox'])) { if (($apcontact['type'] == 'Application') && ($apcontact['nick'] == 'relay') && in_array($path, ['/actor', '/relay'])) {
return true; return true;
} }

View file

@ -30,6 +30,7 @@ use Friendica\Database\DBA;
use Friendica\DI; use Friendica\DI;
use Friendica\Protocol\ActivityPub; use Friendica\Protocol\ActivityPub;
use Friendica\Util\DateTimeFormat; use Friendica\Util\DateTimeFormat;
use Friendica\Util\HTTPSignature;
use Friendica\Util\Strings; use Friendica\Util\Strings;
/** /**
@ -195,7 +196,7 @@ class Tag
$target = self::ACCOUNT; $target = self::ACCOUNT;
Logger::debug('URL is an account', ['url' => $url]); Logger::debug('URL is an account', ['url' => $url]);
} elseif ($fetch && ($target != self::GENERAL_COLLECTION)) { } elseif ($fetch && ($target != self::GENERAL_COLLECTION)) {
$content = ActivityPub::fetchContent($url); $content = HTTPSignature::fetch($url);
if (!empty($content['type']) && ($content['type'] == 'OrderedCollection')) { if (!empty($content['type']) && ($content['type'] == 'OrderedCollection')) {
$target = self::GENERAL_COLLECTION; $target = self::GENERAL_COLLECTION;
Logger::debug('URL is an ordered collection', ['url' => $url]); Logger::debug('URL is an ordered collection', ['url' => $url]);

View file

@ -43,6 +43,7 @@ use Friendica\Protocol\Feed;
use Friendica\Protocol\Salmon; use Friendica\Protocol\Salmon;
use Friendica\Util\Crypto; use Friendica\Util\Crypto;
use Friendica\Util\DateTimeFormat; use Friendica\Util\DateTimeFormat;
use Friendica\Util\HTTPSignature;
use Friendica\Util\Network; use Friendica\Util\Network;
use Friendica\Util\Strings; use Friendica\Util\Strings;
use Friendica\Util\XML; use Friendica\Util\XML;
@ -1860,7 +1861,7 @@ class Probe
unset($baseParts['query']); unset($baseParts['query']);
unset($baseParts['fragment']); unset($baseParts['fragment']);
return Network::unparseURL($baseParts); return Network::unparseURL((array)$baseParts);
} }
/** /**
@ -2132,7 +2133,7 @@ class Probe
*/ */
private static function updateFromOutbox(string $feed, array $data): string private static function updateFromOutbox(string $feed, array $data): string
{ {
$outbox = ActivityPub::fetchContent($feed); $outbox = HTTPSignature::fetch($feed);
if (empty($outbox)) { if (empty($outbox)) {
return ''; return '';
} }

View file

@ -105,19 +105,6 @@ class ActivityPub
return $isrequest; return $isrequest;
} }
/**
* Fetches ActivityPub content from the given url
*
* @param string $url content url
* @param integer $uid User ID for the signature
* @return array
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
public static function fetchContent(string $url, int $uid = 0): array
{
return HTTPSignature::fetch($url, $uid);
}
private static function getAccountType(array $apcontact): int private static function getAccountType(array $apcontact): int
{ {
$accounttype = -1; $accounttype = -1;
@ -216,7 +203,7 @@ class ActivityPub
*/ */
public static function fetchOutbox(string $url, int $uid) public static function fetchOutbox(string $url, int $uid)
{ {
$data = self::fetchContent($url, $uid); $data = HTTPSignature::fetch($url, $uid);
if (empty($data)) { if (empty($data)) {
return; return;
} }
@ -255,7 +242,7 @@ class ActivityPub
return []; return [];
} }
$data = self::fetchContent($url, $uid); $data = HTTPSignature::fetch($url, $uid);
if (empty($data)) { if (empty($data)) {
return []; return [];
} }

View file

@ -596,6 +596,10 @@ class Processor
*/ */
public static function isActivityGone(string $url): bool public static function isActivityGone(string $url): bool
{ {
if (Network::isUrlBlocked($url)) {
return true;
}
try { try {
$curlResult = HTTPSignature::fetchRaw($url, 0); $curlResult = HTTPSignature::fetchRaw($url, 0);
} catch (\Exception $exception) { } catch (\Exception $exception) {
@ -603,10 +607,6 @@ class Processor
return true; return true;
} }
if (Network::isUrlBlocked($url)) {
return true;
}
// @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON // @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON
if (in_array($curlResult->getReturnCode(), [401, 404])) { if (in_array($curlResult->getReturnCode(), [401, 404])) {
return true; return true;
@ -1493,7 +1493,7 @@ class Processor
return $object; return $object;
} }
$object = ActivityPub::fetchContent($url, $uid); $object = HTTPSignature::fetch($url, $uid);
if (empty($object)) { if (empty($object)) {
Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]); Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]);
// We perform negative caching. // We perform negative caching.
@ -1520,14 +1520,43 @@ class Processor
* @param string $relay_actor Relay actor * @param string $relay_actor Relay actor
* @param int $completion Completion mode, see Receiver::COMPLETION_* * @param int $completion Completion mode, see Receiver::COMPLETION_*
* @param int $uid User id that is used to fetch the activity * @param int $uid User id that is used to fetch the activity
* @return string fetched message URL * @return string fetched message URL. An empty string indicates a temporary error, null indicates a permament error,
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL, int $uid = 0): string public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL, int $uid = 0): ?string
{ {
$object = self::fetchCachedActivity($url, $uid); if (Network::isUrlBlocked($url)) {
if (empty($object)) { return null;
}
try {
$curlResult = HTTPSignature::fetchRaw($url, $uid);
} catch (\Exception $exception) {
Logger::notice('Error fetching url', ['url' => $url, 'exception' => $exception]);
return '';
}
if (empty($curlResult)) {
return '';
}
$body = $curlResult->getBody();
if (!$curlResult->isSuccess() || empty($body)) {
if (in_array($curlResult->getReturnCode(), [403, 404, 406, 410])) {
return null;
}
return '';
}
$object = json_decode($body, true);
if (empty($object) || !is_array($object)) {
$element = explode(';', $curlResult->getContentType());
if (!in_array($element[0], ['application/activity+json', 'application/ld+json', 'application/json'])) {
Logger::debug('Unexpected content-type', ['url' => $url, 'content-type' => $curlResult->getContentType()]);
return null;
}
Logger::notice('Invalid JSON data', ['url' => $url, 'content-type' => $curlResult->getContentType(), 'body' => $body]);
return ''; return '';
} }
@ -1560,27 +1589,27 @@ class Processor
$actor = $object_actor; $actor = $object_actor;
} }
if (!empty($object['published'])) { $ldobject = JsonLD::compact($object);
$published = $object['published'];
} elseif (!empty($child['published'])) { $type = JsonLD::fetchElement($ldobject, '@type');
$published = $child['published']; $object_id = JsonLD::fetchElement($ldobject, 'as:object', '@id');
} else {
$published = DateTimeFormat::utcNow(); if (!in_array($type, Receiver::CONTENT_TYPES) && !empty($object_id)) {
if (($type == 'as:Announce') && !empty($relay_actor) && ($completion = Receiver::COMPLETION_RELAY)) {
if (Item::searchByLink($object_id)) {
return $object_id;
} }
Logger::debug('Fetch announced activity', ['type' => $type, 'id' => $object_id, 'actor' => $relay_actor, 'signer' => $signer]);
$activity = []; return self::fetchMissingActivity($object_id, $child, $relay_actor, $completion, $uid);
$activity['@context'] = $object['@context'] ?? ActivityPub::CONTEXT; }
unset($object['@context']); $activity = $object;
$activity['id'] = $object['id']; $ldactivity = $ldobject;
$activity['to'] = $object['to'] ?? []; } else {
$activity['cc'] = $object['cc'] ?? []; $activity = self::getActivityForObject($object, $actor);
$activity['audience'] = $object['audience'] ?? [];
$activity['actor'] = $actor;
$activity['object'] = $object;
$activity['published'] = $published;
$activity['type'] = 'Create';
$ldactivity = JsonLD::compact($activity); $ldactivity = JsonLD::compact($activity);
$object_id = $object['id'];
}
$ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0; $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0;
@ -1600,8 +1629,8 @@ class Processor
if ($completion == Receiver::COMPLETION_RELAY) { if ($completion == Receiver::COMPLETION_RELAY) {
$ldactivity['from-relay'] = $ldactivity['thread-completion']; $ldactivity['from-relay'] = $ldactivity['thread-completion'];
if (!self::acceptIncomingMessage($ldactivity, $object['id'])) { if (in_array($type, Receiver::CONTENT_TYPES) && !self::acceptIncomingMessage($ldactivity, $object_id)) {
return ''; return null;
} }
} }
@ -1624,6 +1653,31 @@ class Processor
return $activity['id']; return $activity['id'];
} }
private static function getActivityForObject(array $object, string $actor): array
{
if (!empty($object['published'])) {
$published = $object['published'];
} elseif (!empty($child['published'])) {
$published = $child['published'];
} else {
$published = DateTimeFormat::utcNow();
}
$activity = [];
$activity['@context'] = $object['@context'] ?? ActivityPub::CONTEXT;
unset($object['@context']);
$activity['id'] = $object['id'];
$activity['to'] = $object['to'] ?? [];
$activity['cc'] = $object['cc'] ?? [];
$activity['audience'] = $object['audience'] ?? [];
$activity['actor'] = $actor;
$activity['object'] = $object;
$activity['published'] = $published;
$activity['type'] = 'Create';
return $activity;
}
/** /**
* Test if incoming relay messages should be accepted * Test if incoming relay messages should be accepted
* *

View file

@ -108,7 +108,7 @@ class Receiver
if (empty($apcontact)) { if (empty($apcontact)) {
Logger::notice('Unable to retrieve AP contact for actor - message is discarded', ['actor' => $actor]); Logger::notice('Unable to retrieve AP contact for actor - message is discarded', ['actor' => $actor]);
return; return;
} elseif (APContact::isRelay($apcontact)) { } elseif (APContact::isRelay($apcontact) && self::isRelayPost($ldactivity)) {
self::processRelayPost($ldactivity, $actor); self::processRelayPost($ldactivity, $actor);
return; return;
} else { } else {
@ -116,7 +116,7 @@ class Receiver
} }
$sig_contact = HTTPSignature::getKeyIdContact($header); $sig_contact = HTTPSignature::getKeyIdContact($header);
if (APContact::isRelay($sig_contact)) { if (APContact::isRelay($sig_contact) && self::isRelayPost($ldactivity)) {
Logger::info('Message from a relay', ['url' => $sig_contact['url']]); Logger::info('Message from a relay', ['url' => $sig_contact['url']]);
self::processRelayPost($ldactivity, $sig_contact['url']); self::processRelayPost($ldactivity, $sig_contact['url']);
return; return;
@ -169,6 +169,34 @@ class Receiver
self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer); self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
} }
/**
* Check if the activity is a post rhat can be send via a relay
*
* @param array $activity
* @return boolean
*/
private static function isRelayPost(array $activity): bool
{
$type = JsonLD::fetchElement($activity, '@type');
if (!$type) {
return false;
}
$object_type = JsonLD::fetchElement($activity, 'as:object', '@type') ?? '';
$object_id = JsonLD::fetchElement($activity, 'as:object', '@id');
if (empty($object_id)) {
return false;
}
$handle = ($type == 'as:Announce');
if (!$handle && in_array($type, ['as:Create', 'as:Update'])) {
$handle = in_array($object_type, self::CONTENT_TYPES);
}
return $handle;
}
/** /**
* Process incoming posts from relays * Process incoming posts from relays
* *
@ -192,34 +220,6 @@ class Receiver
return; return;
} }
$handle = ($type == 'as:Announce');
if (!$handle && in_array($type, ['as:Create', 'as:Update'])) {
$handle = in_array($object_type, self::CONTENT_TYPES);
}
if (!$handle) {
$trust_source = false;
$object_data = self::prepareObjectData($activity, 0, false, $trust_source);
if (!$trust_source) {
Logger::notice('Activity trust could not be achieved.', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
return;
}
if (empty($object_data)) {
Logger::notice('No object data found', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
return;
}
if (self::routeActivities($object_data, $type, true)) {
Logger::debug('Handled activity', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor]);
} else {
Logger::info('Unhandled activity', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
}
return;
}
$contact = Contact::getByURL($actor); $contact = Contact::getByURL($actor);
if (empty($contact)) { if (empty($contact)) {
Logger::info('Relay contact not found', ['actor' => $actor]); Logger::info('Relay contact not found', ['actor' => $actor]);
@ -231,7 +231,7 @@ class Receiver
return; return;
} }
Logger::debug('Got relayed message id', ['id' => $object_id, 'actor' => $actor]); Logger::debug('Process post from relay server', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor]);
$item_id = Item::searchByLink($object_id); $item_id = Item::searchByLink($object_id);
if ($item_id) { if ($item_id) {
@ -239,10 +239,21 @@ class Receiver
return; return;
} }
if (!DI::config()->get('system', 'decoupled_receiver')) {
$id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY); $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
if (empty($id)) { if (!empty($id)) {
Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]); Logger::notice('Relayed message is fetched', ['result' => $id, 'id' => $object_id, 'actor' => $actor]);
return; } else {
Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
}
} elseif (!Fetch::hasWorker($object_id)) {
Logger::notice('Fetching is done by worker.', ['id' => $object_id]);
Fetch::add($object_id);
$activity['recursion-depth'] = 0;
$wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $object_id, [], $actor, self::COMPLETION_RELAY);
Fetch::setWorkerId($object_id, $wid);
} else {
Logger::debug('Activity will already be fetched via a worker.', ['url' => $object_id]);
} }
} }

View file

@ -420,7 +420,7 @@ class HTTPSignature
* @return array JSON array * @return array JSON array
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/ */
public static function fetch(string $request, int $uid): array public static function fetch(string $request, int $uid = 0): array
{ {
try { try {
$curlResult = self::fetchRaw($request, $uid); $curlResult = self::fetchRaw($request, $uid);

View file

@ -44,8 +44,10 @@ class FetchMissingActivity
$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
if ($result) { if ($result) {
Logger::info('Successfully fetched missing activity', ['url' => $url]); Logger::info('Successfully fetched missing activity', ['url' => $url]);
} elseif (is_null($result)) {
Logger::info('Permament error, activity could not be fetched', ['url' => $url]);
} elseif (!Worker::defer(self::WORKER_DEFER_LIMIT)) { } elseif (!Worker::defer(self::WORKER_DEFER_LIMIT)) {
Logger::info('Activity could not be fetched', ['url' => $url]); Logger::info('Defer limit reached, activity could not be fetched', ['url' => $url]);
// recursively delete all entries that belong to this worker task // recursively delete all entries that belong to this worker task
$queue = DI::app()->getQueue(); $queue = DI::app()->getQueue();