From 87a945b2956b793cc8d287c3a2fd4b2b6fab68cb Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 6 Aug 2022 17:06:55 +0000 Subject: [PATCH 1/2] More prevention of double processing of the same content --- database.sql | 20 +++++++- doc/database.md | 2 + doc/database/db_arrived-activity.md | 22 +++++++++ doc/database/db_processed-activity.md | 22 +++++++++ src/Module/Debug/ActivityPubConversion.php | 7 +-- src/Protocol/ActivityPub/Processor.php | 55 ++++++---------------- src/Protocol/ActivityPub/Receiver.php | 41 ++++++++++++++-- static/dbstructure.config.php | 24 +++++++++- 8 files changed, 141 insertions(+), 52 deletions(-) create mode 100644 doc/database/db_arrived-activity.md create mode 100644 doc/database/db_processed-activity.md diff --git a/database.sql b/database.sql index 7663b71590..6126e58a87 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ -- Friendica 2022.09-dev (Giant Rhubarb) --- DB_UPDATE_VERSION 1477 +-- DB_UPDATE_VERSION 1478 -- ------------------------------------------ @@ -1716,6 +1716,24 @@ CREATE TABLE IF NOT EXISTS `user-contact` ( FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE ) DEFAULT COLLATE utf8mb4_general_ci COMMENT='User specific public contact data'; +-- +-- TABLE arrived-activity +-- +CREATE TABLE IF NOT EXISTS `arrived-activity` ( + `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity', + `received` datetime COMMENT 'Receiving date', + PRIMARY KEY(`object-id`) +) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities'; + +-- +-- TABLE processed-activity +-- +CREATE TABLE IF NOT EXISTS `processed-activity` ( + `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity', + `received` datetime COMMENT 'Receiving date', + PRIMARY KEY(`object-id`) +) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities'; + -- -- TABLE worker-ipc -- diff --git a/doc/database.md b/doc/database.md index 2bd2fa5f49..fe505ce18b 100644 --- a/doc/database.md +++ b/doc/database.md @@ -13,6 +13,7 @@ Database Tables | [application](help/database/db_application) | OAuth application | | [application-marker](help/database/db_application-marker) | Timeline marker | | [application-token](help/database/db_application-token) | OAuth user token | +| [arrived-activity](help/database/db_arrived-activity) | Id of arrived activities | | [attach](help/database/db_attach) | file attachments | | [cache](help/database/db_cache) | Stores temporary data | | [config](help/database/db_config) | main configuration storage | @@ -67,6 +68,7 @@ Database Tables | [post-user](help/database/db_post-user) | User specific post data | | [post-user-notification](help/database/db_post-user-notification) | User post notifications | | [process](help/database/db_process) | Currently running system processes | +| [processed-activity](help/database/db_processed-activity) | Id of processed activities | | [profile](help/database/db_profile) | user profiles data | | [profile_field](help/database/db_profile_field) | Custom profile fields | | [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers | diff --git a/doc/database/db_arrived-activity.md b/doc/database/db_arrived-activity.md new file mode 100644 index 0000000000..1587c6e312 --- /dev/null +++ b/doc/database/db_arrived-activity.md @@ -0,0 +1,22 @@ +Table arrived-activity +=========== + +Id of arrived activities + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- | +| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | | +| received | Receiving date | datetime | YES | | NULL | | + +Indexes +------------ + +| Name | Fields | +| ------- | --------- | +| PRIMARY | object-id | + + +Return to [database documentation](help/database) diff --git a/doc/database/db_processed-activity.md b/doc/database/db_processed-activity.md new file mode 100644 index 0000000000..618b46f972 --- /dev/null +++ b/doc/database/db_processed-activity.md @@ -0,0 +1,22 @@ +Table processed-activity +=========== + +Id of processed activities + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- | +| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | | +| received | Receiving date | datetime | YES | | NULL | | + +Indexes +------------ + +| Name | Fields | +| ------- | --------- | +| PRIMARY | object-id | + + +Return to [database documentation](help/database) diff --git a/src/Module/Debug/ActivityPubConversion.php b/src/Module/Debug/ActivityPubConversion.php index ec7fee3f46..8593de81c0 100644 --- a/src/Module/Debug/ActivityPubConversion.php +++ b/src/Module/Debug/ActivityPubConversion.php @@ -22,15 +22,10 @@ namespace Friendica\Module\Debug; use Friendica\BaseModule; -use Friendica\Content\Text; -use Friendica\Core\Logger; use Friendica\Core\Renderer; use Friendica\DI; -use Friendica\Model\Item; -use Friendica\Model\Tag; use Friendica\Protocol\ActivityPub; use Friendica\Util\JsonLD; -use Friendica\Util\XML; class ActivityPubConversion extends BaseModule { @@ -123,7 +118,7 @@ class ActivityPubConversion extends BaseModule 'content' => visible_whitespace(var_export($object_data, true)) ]; - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($object_data, true); $results[] = [ 'title' => DI::l10n()->t('Result Item'), diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index d345ba78cf..a8d51da43a 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -60,33 +60,29 @@ class Processor const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:'; const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:'; - static $processed = []; - /** - * Add an activity id to the list of processed ids + * Add an object id to the list of processed ids * * @param string $id * * @return void */ - public static function addActivityId(string $id) + private static function addActivityId(string $id) { - self::$processed[] = $id; - if (count(self::$processed) > 100) { - self::$processed = array_slice(self::$processed, 1); - } + DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); + DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); } /** - * Checks if the given has just been processed + * Checks if the given object id has just been processed * * @param string $id * * @return boolean */ - public static function isProcessed(string $id): bool + private static function isProcessed(string $id): bool { - return in_array($id, self::$processed); + return DBA::exists('processed-activity', ['object-id' => $id]); } /** @@ -233,7 +229,7 @@ class Processor $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]); if (!DBA::isResult($item)) { Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); - $item = self::createItem($activity); + $item = self::createItem($activity, false); if (empty($item)) { return; } @@ -303,7 +299,7 @@ class Processor * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createItem(array $activity, bool $fetch_parents = true): array + public static function createItem(array $activity, bool $fetch_parents): array { if (self::isProcessed($activity['id'])) { Logger::info('Id is already processed', ['id' => $activity['id']]); @@ -324,10 +320,10 @@ class Processor $item['object-type'] = Activity\ObjectType::COMMENT; } - if (!empty($activity['context'])) { - $item['conversation'] = $activity['context']; - } elseif (!empty($activity['conversation'])) { + if (!empty($activity['conversation'])) { $item['conversation'] = $activity['conversation']; + } elseif (!empty($activity['context'])) { + $item['conversation'] = $activity['context']; } if (!empty($item['conversation'])) { @@ -340,6 +336,7 @@ class Processor $conversation = []; } + Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']); if (empty($activity['author']) && empty($activity['actor'])) { Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]); return []; @@ -490,11 +487,6 @@ class Processor */ private static function fetchParent(array $activity): string { - if (self::hasJustBeenFetched($activity['reply-to-id'])) { - Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); - return ''; - } - $recursion_depth = $activity['recursion-depth'] ?? 0; if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { @@ -552,23 +544,6 @@ class Processor return ''; } - /** - * Check if a given activity has recently been fetched - * - * @param string $url - * @return boolean - */ - private static function hasJustBeenFetched(string $url): bool - { - $cachekey = self::CACHEKEY_JUST_FETCHED . $url; - $time = DI::cache()->get($cachekey); - if (is_null($time)) { - DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES); - return false; - } - return ($time + 300) > time(); - } - /** * Check if a given activity is no longer available * @@ -656,7 +631,7 @@ class Processor public static function createActivity(array $activity, string $verb) { $activity['reply-to-id'] = $activity['object_id']; - $item = self::createItem($activity); + $item = self::createItem($activity, false); if (empty($item)) { return; } @@ -1419,7 +1394,7 @@ class Processor $ldactivity = JsonLD::compact($activity); - $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1; + $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0; if (!empty($relay_actor)) { $ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor); diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 1f7946af22..e8267b0eaa 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -586,11 +586,19 @@ class Receiver $object_data['object_activity'] = $activity; } - if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) { - Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); - return true; - } + if (($type == 'as:Create') && $trust_source) { + if (self::hasArrived($object_data['object_id'])) { + Logger::info('The activity already arrived.', ['id' => $object_data['object_id']]); + return true; + } + self::addArrivedId($object_data['object_id']); + if (Queue::exists($object_data['object_id'], $type)) { + Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); + return true; + } + } + if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) { $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source); } @@ -1883,4 +1891,29 @@ class Receiver return $object_data; } + + /** + * Add an object id to the list of arrived activities + * + * @param string $id + * + * @return void + */ + private static function addArrivedId(string $id) + { + DBA::delete('arrived-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); + DBA::insert('arrived-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); + } + + /** + * Checks if the given object already arrived before + * + * @param string $id + * + * @return boolean + */ + private static function hasArrived(string $id): bool + { + return DBA::exists('arrived-activity', ['object-id' => $id]); + } } diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index f72ec08020..df5115dcb4 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1477); + define('DB_UPDATE_VERSION', 1478); } return [ @@ -1723,6 +1723,28 @@ return [ "uri-id_uid" => ["UNIQUE", "uri-id", "uid"], ] ], + "arrived-activity" => [ + "comment" => "Id of arrived activities", + "fields" => [ + "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"], + "received" => ["type" => "datetime", "comment" => "Receiving date"], + ], + "indexes" => [ + "PRIMARY" => ["object-id"], + ], + "engine" => "MEMORY", + ], + "processed-activity" => [ + "comment" => "Id of processed activities", + "fields" => [ + "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"], + "received" => ["type" => "datetime", "comment" => "Receiving date"], + ], + "indexes" => [ + "PRIMARY" => ["object-id"], + ], + "engine" => "MEMORY", + ], "worker-ipc" => [ "comment" => "Inter process communication between the frontend and the worker", "fields" => [ From d60d2caef6abca991d5f81b4882a8957918ed0c4 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 7 Aug 2022 19:24:50 +0000 Subject: [PATCH 2/2] Improved queue processing --- src/Model/Item.php | 35 +++++--- src/Model/ItemURI.php | 10 ++- src/Protocol/ActivityPub/Processor.php | 113 +++++++++++++------------ src/Protocol/ActivityPub/Queue.php | 52 +++++++++++- src/Protocol/ActivityPub/Receiver.php | 33 +++++--- src/Util/ParseUrl.php | 4 + 6 files changed, 169 insertions(+), 78 deletions(-) diff --git a/src/Model/Item.php b/src/Model/Item.php index 3bacd9cc6a..8566643ed2 100644 --- a/src/Model/Item.php +++ b/src/Model/Item.php @@ -905,14 +905,7 @@ class Item 'photo' => $item['owner-avatar'], 'network' => $item['network']]; $item['owner-id'] = ($item['owner-id'] ?? 0) ?: Contact::getIdForURL($item['owner-link'], 0, null, $default); - $actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id']; - if (!$item['origin'] && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) { - $item['post-reason'] = self::PR_FOLLOWER; - } - - if ($item['origin'] && empty($item['post-reason'])) { - $item['post-reason'] = self::PR_LOCAL; - } + $item['post-reason'] = self::getPostReason($item); // Ensure that there is an avatar cache Contact::checkAvatarCache($item['author-id']); @@ -1291,6 +1284,27 @@ class Item return $post_user_id; } + /** + * Fetch the post reason for a given item array + * + * @param array $item + * + * @return integer + */ + public static function getPostReason(array $item): int + { + $actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id']; + if (empty($item['origin']) && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) { + return self::PR_FOLLOWER; + } + + if (!empty($item['origin']) && empty($item['post-reason'])) { + return self::PR_LOCAL; + } + + return $item['post-reason'] ?? self::PR_NONE; + } + /** * Update the display cache * @@ -1495,12 +1509,13 @@ class Item $item = array_merge($item, $fields); + $item['post-reason'] = self::getPostReason($item); + $is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE); if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) && DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE && - !Contact::isSharingByURL($item['author-link'], $uid) && - !Contact::isSharingByURL($item['owner-link'], $uid)) { + !in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) { Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]); return 0; } diff --git a/src/Model/ItemURI.php b/src/Model/ItemURI.php index 020c468d23..86d23ed541 100644 --- a/src/Model/ItemURI.php +++ b/src/Model/ItemURI.php @@ -60,10 +60,13 @@ class ItemURI * Searched for an id of a given uri. Adds it, if not existing yet. * * @param string $uri + * @param bool $insert + * * @return integer item-uri id + * * @throws \Exception */ - public static function getIdByURI(string $uri): int + public static function getIdByURI(string $uri, bool $insert = true): int { if (empty($uri)) { return 0; @@ -74,12 +77,13 @@ class ItemURI $itemuri = DBA::selectFirst('item-uri', ['id'], ['uri' => $uri]); - if (!DBA::isResult($itemuri)) { + if (!DBA::isResult($itemuri) && $insert) { return self::insert(['uri' => $uri]); } - return $itemuri['id']; + return $itemuri['id'] ?? 0; } + /** * Searched for an id of a given guid. * diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 1f336b214d..7b2f2ab0c8 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -231,6 +231,7 @@ class Processor Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); $item = self::createItem($activity, false); if (empty($item)) { + Queue::remove($activity); return; } @@ -243,6 +244,7 @@ class Processor $item = self::processContent($activity, $item); if (empty($item)) { + Queue::remove($activity); return; } @@ -301,7 +303,7 @@ class Processor */ public static function createItem(array $activity, bool $fetch_parents): array { - if (self::isProcessed($activity['id'])) { + if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) { Logger::info('Id is already processed', ['id' => $activity['id']]); return []; } @@ -339,6 +341,7 @@ class Processor Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']); if (empty($activity['author']) && empty($activity['actor'])) { Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]); + Queue::remove($activity); return []; } @@ -361,6 +364,9 @@ class Processor if (empty($conversation) && empty($activity['directmessage']) && ($item['gravity'] != GRAVITY_PARENT) && !Post::exists(['uri' => $item['thr-parent']])) { Logger::info('Parent not found, message will be discarded.', ['thr-parent' => $item['thr-parent']]); + if (!$fetch_parents) { + Queue::remove($activity); + } return []; } @@ -454,6 +460,7 @@ class Processor $item = self::processContent($activity, $item); if (empty($item)) { Logger::info('Message was not processed'); + Queue::remove($activity); return []; } @@ -560,16 +567,23 @@ class Processor } // @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON - if (in_array($curlResult->getReturnCode(), [404])) { + if (in_array($curlResult->getReturnCode(), [401, 404])) { return true; } - $object = json_decode($curlResult->getBody(), true); - if (!empty($object)) { - $activity = JsonLD::compact($object); - if (JsonLD::fetchElement($activity, '@type') == 'as:Tombstone') { + if ($curlResult->isSuccess()) { + $object = json_decode($curlResult->getBody(), true); + if (!empty($object)) { + $activity = JsonLD::compact($object); + if (JsonLD::fetchElement($activity, '@type') == 'as:Tombstone') { + return true; + } + } + } elseif ($curlResult->getReturnCode() == 0) { + $host = parse_url($url, PHP_URL_HOST); + if (!(filter_var($host, FILTER_VALIDATE_IP) || @dns_get_record($host . '.', DNS_A + DNS_AAAA))) { return true; - } + } } return false; @@ -814,7 +828,7 @@ class Processor Logger::warning('Unknown parent item.', ['uri' => $parent_uri]); return false; } - if (($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) { + if (!empty($activity['type']) && in_array($activity['type'], Receiver::CONTENT_TYPES) && ($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) { Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]); return false; } @@ -944,6 +958,9 @@ class Processor if (!self::isSolicitedMessage($activity, $item)) { DBA::delete('item-uri', ['id' => $item['uri-id']]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } return; } @@ -981,12 +998,16 @@ class Processor $item['post-reason'] = Item::PR_NONE; } - if (!empty($activity['from-relay'])) { - $item['post-reason'] = Item::PR_RELAY; - } elseif (!empty($activity['thread-completion'])) { - $item['post-reason'] = Item::PR_FETCHED; - } elseif (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE]) && !empty($activity['push'])) { - $item['post-reason'] = Item::PR_PUSHED; + $item['post-reason'] = Item::getPostReason($item); + + if (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE])) { + if (!empty($activity['from-relay'])) { + $item['post-reason'] = Item::PR_RELAY; + } elseif (!empty($activity['thread-completion'])) { + $item['post-reason'] = Item::PR_FETCHED; + } elseif (!empty($activity['push'])) { + $item['post-reason'] = Item::PR_PUSHED; + } } if ($item['isForum'] ?? false) { @@ -1004,41 +1025,31 @@ class Processor continue; } - if (!($item['isForum'] ?? false) && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !Contact::isSharingByURL($activity['author'], $receiver)) { - if ($item['post-reason'] == Item::PR_BCC) { - Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id']]); - continue; + if (($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !in_array($item['post-reason'], [Item::PR_FOLLOWER, Item::PR_TAG, item::PR_TO, Item::PR_CC])) { + if (!($item['isForum'] ?? false)) { + if ($item['post-reason'] == Item::PR_BCC) { + Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id'], 'url' => $item['uri']]); + continue; + } + + if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE) + && in_array($activity['thread-children-type'] ?? '', Receiver::ACTIVITY_TYPES)) { + Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring', + ['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]); + continue; + } } - if ( - !empty($activity['thread-children-type']) - && in_array($activity['thread-children-type'], Receiver::ACTIVITY_TYPES) - && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE - ) { - Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring', - ['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]); - continue; - } - } - - $is_forum = false; - - if ($receiver != 0) { + $is_forum = false; $user = User::getById($receiver, ['account-type']); if (!empty($user['account-type'])) { $is_forum = ($user['account-type'] == User::ACCOUNT_TYPE_COMMUNITY); } - } - if (!$is_forum && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT)) { - $skip = !Contact::isSharingByURL($activity['author'], $receiver); - - if ($skip && (($activity['type'] == 'as:Announce') || ($item['isForum'] ?? false))) { - $skip = !Contact::isSharingByURL($activity['actor'], $receiver); - } - - if ($skip) { - Logger::info('Skipping post', ['uid' => $receiver, 'url' => $item['uri']]); + if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE) + && ((!$is_forum && !($item['isForum'] ?? false) && ($activity['type'] != 'as:Announce')) + || !Contact::isSharingByURL($activity['actor'], $receiver))) { + Logger::info('Actor is a non sharer, is no forum or it is no announce', ['uid' => $receiver, 'actor' => $activity['actor'], 'url' => $item['uri'], 'type' => $activity['type']]); continue; } @@ -1333,13 +1344,7 @@ class Processor */ public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string { - if (!empty($child['receiver'])) { - $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']); - } else { - $uid = 0; - } - - $object = self::fetchCachedActivity($url, $uid); + $object = self::fetchCachedActivity($url, 0); if (empty($object)) { return ''; } @@ -1352,7 +1357,7 @@ class Processor $compacted = JsonLD::compact($object); $attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id'); } - $signer[] = $attributed_to; + $signer[] = $attributed_to; } if (!empty($object['actor'])) { @@ -1407,8 +1412,12 @@ class Processor $ldactivity['completion-mode'] = $completion; } - if (!empty($child['type'])) { + if (!empty($child['thread-children-type'])) { + $ldactivity['thread-children-type'] = $child['thread-children-type']; + } elseif (!empty($child['type'])) { $ldactivity['thread-children-type'] = $child['type']; + } else { + $ldactivity['thread-children-type'] = 'as:Create'; } if (!empty($relay_actor) && !self::acceptIncomingMessage($ldactivity, $object['id'])) { @@ -1417,7 +1426,7 @@ class Processor if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) { Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]); - } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) { + } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), 0, true, false, $signer, '', $completion)) { Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]); } else { Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]); diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index df4236b5bd..a95226d10d 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -26,6 +26,7 @@ use Friendica\Core\System; use Friendica\Database\Database; use Friendica\Database\DBA; use Friendica\DI; +use Friendica\Model\ItemURI; use Friendica\Model\Post; use Friendica\Util\DateTimeFormat; use Friendica\Util\JsonLD; @@ -140,7 +141,15 @@ class Queue return; } + Logger::debug('Delete inbox-entry', ['id' => $entry['id']]); + DBA::delete('inbox-entry', ['id' => $entry['id']]); + + $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]); + while ($child = DBA::fetch($children)) { + self::deleteById($child['id']); + } + DBA::close($children); } /** @@ -188,6 +197,11 @@ class Queue return false; } + if (!self::isProcessable($id)) { + Logger::debug('Other queue entries need to be processed first.', ['id' => $id]); + return false; + } + if (!empty($entry['wid'])) { $worker = DI::app()->getQueue(); $wid = $worker['id'] ?? 0; @@ -210,6 +224,10 @@ class Queue $activity['worker-id'] = $entry['wid']; $activity['recursion-depth'] = 0; + if (empty($activity['thread-children-type'])) { + $activity['thread-children-type'] = $type; + } + $receivers = DBA::select('inbox-entry-receiver', ['uid'], ["`queue-id` = ? AND `uid` != ?", $entry['id'], 0]); while ($receiver = DBA::fetch($receivers)) { if (!in_array($receiver['uid'], $activity['receiver'])) { @@ -248,6 +266,34 @@ class Queue DBA::close($entries); } + public static function isProcessable(int $id): bool + { + $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]); + if (empty($entry)) { + return false; + } + + if (!empty($entry['object-id']) && Post::exists(['uri' => $entry['object-id']])) { + // The object already exists, so processing can be done + return true; + } + + if (!empty($entry['conversation'])) { + $conv_id = ItemURI::getIdByURI($entry['conversation'], false); + if (DBA::exists('post-thread', ['conversation-id' => $conv_id])) { + // We have got the conversation in the system, so the post can be processed + return true; + } + } + + if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) { + // This entry belongs to some other entry that should be processed first + return false; + } + + return true; + } + /** * Clear old activities * @@ -257,7 +303,11 @@ class Queue { // We delete all entries that aren't associated with a worker entry after seven days. // The other entries are deleted when the worker deferred for too long. - DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]); + $entries = DBA::select('inbox-entry', ['id'], ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]); + while ($entry = DBA::fetch($entries)) { + self::deleteById($entry['id']); + } + DBA::close($entries); // Optimizing this table only last seconds if (DI::config()->get('system', 'optimize_tables')) { diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index e8267b0eaa..b1ed57b26a 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -598,7 +598,7 @@ class Receiver return true; } } - + if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) { $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source); } @@ -609,11 +609,15 @@ class Receiver } if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) { - // We delay by 5 seconds to allow to accumulate all receivers - $delayed = date(DateTimeFormat::MYSQL, time() + 5); - Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]); - $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); - Queue::setWorkerId($object_data['entry-id'], $wid); + if (Queue::isProcessable($object_data['entry-id'])) { + // We delay by 5 seconds to allow to accumulate all receivers + $delayed = date(DateTimeFormat::MYSQL, time() + 5); + Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]); + $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); + Queue::setWorkerId($object_data['entry-id'], $wid); + } else { + Logger::debug('Other queue entries need to be processed first.', ['id' => $object_data['entry-id']]); + } return false; } @@ -687,13 +691,18 @@ class Receiver $object_data['thread-completion'] = Contact::getIdForURL($actor); $object_data['completion-mode'] = self::COMPLETION_ANNOUCE; - $item = ActivityPub\Processor::createItem($object_data, $fetch_parents); - if (empty($item)) { - return false; - } + if (!Post::exists(['uri' => $object_data['id'], 'uid' => 0])) { + $item = ActivityPub\Processor::createItem($object_data, $fetch_parents); + if (empty($item)) { + return false; + } - $item['post-reason'] = Item::PR_ANNOUNCEMENT; - ActivityPub\Processor::postItem($object_data, $item); + $item['post-reason'] = Item::PR_ANNOUNCEMENT; + ActivityPub\Processor::postItem($object_data, $item); + } else { + Logger::info('Announced id already exists', ['id' => $object_data['id']]); + Queue::remove($object_data); + } if (!empty($activity)) { $announce_object_data = self::processObject($activity); diff --git a/src/Util/ParseUrl.php b/src/Util/ParseUrl.php index 0183d6b14a..1fba29ec3b 100644 --- a/src/Util/ParseUrl.php +++ b/src/Util/ParseUrl.php @@ -315,6 +315,10 @@ class ParseUrl $body = mb_convert_encoding($body, 'HTML-ENTITIES', 'UTF-8'); + if (empty($body)) { + return $siteinfo; + } + $doc = new DOMDocument(); @$doc->loadHTML($body);