From 505191dec5b66b86f704d34ee6b5c927edb3b047 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 27 Jul 2022 20:59:42 +0000 Subject: [PATCH] Decouple the processor from the receiver --- src/Protocol/ActivityPub/Processor.php | 1 - src/Protocol/ActivityPub/Queue.php | 9 ++++-- src/Protocol/ActivityPub/Receiver.php | 9 ++++++ src/Worker/ProcessQueue.php | 42 ++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 src/Worker/ProcessQueue.php diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 30f6627ba3..2025aba04e 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -982,7 +982,6 @@ class Processor Queue::remove($activity); if (Queue::hasChildren($item['uri'])) { - //Queue::processReplyByUri($item['uri']); Worker::add(PRIORITY_HIGH, 'ProcessReplyByUri', $item['uri']); } } diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index b389626a3e..b852132581 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -167,13 +167,14 @@ class Queue * Process the activity with the given id * * @param integer $id - * @return void + * + * @return bool */ - public static function process(int $id) + public static function process(int $id): bool { $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]); if (empty($entry)) { - return; + return false; } Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]); @@ -197,6 +198,8 @@ class Queue if (!Receiver::routeActivities($activity, $type, $push)) { self::remove($activity); } + + return true; } /** diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 9e79216996..0515b24e7d 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -28,6 +28,7 @@ use Friendica\Content\Text\Markdown; use Friendica\Core\Logger; use Friendica\Core\Protocol; use Friendica\Core\System; +use Friendica\Core\Worker; use Friendica\DI; use Friendica\Model\Contact; use Friendica\Model\APContact; @@ -36,6 +37,7 @@ use Friendica\Model\Post; use Friendica\Model\User; use Friendica\Protocol\Activity; use Friendica\Protocol\ActivityPub; +use Friendica\Util\DateTimeFormat; use Friendica\Util\HTTPSignature; use Friendica\Util\JsonLD; use Friendica\Util\LDSignature; @@ -597,6 +599,13 @@ class Receiver return; } + if ($push) { + // We delay by 5 seconds to allow to accumulate all receivers + $delayed = date(DateTimeFormat::MYSQL, time() + 5); + Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); + return; + } + if (!empty($activity['recursion-depth'])) { $object_data['recursion-depth'] = $activity['recursion-depth']; } diff --git a/src/Worker/ProcessQueue.php b/src/Worker/ProcessQueue.php new file mode 100644 index 0000000000..dee24ee4c0 --- /dev/null +++ b/src/Worker/ProcessQueue.php @@ -0,0 +1,42 @@ +. + * + */ + +namespace Friendica\Worker; + +use Friendica\Core\Logger; +use Friendica\Protocol\ActivityPub\Queue; + +class ProcessQueue +{ + /** + * Process queue entry + * + * @param int $id queue id + * + * @return void + */ + public static function execute(int $id) + { + Logger::info('Start processing queue entry', ['id' => $id]); + $result = Queue::process($id); + Logger::info('Successfully processed queue entry', ['result' => $result, 'id' => $id]); + } +}