From 4a1fb1da12ba357a7695c4dcf9dbae7e67fedd7e Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 21 Jul 2022 05:33:01 +0000 Subject: [PATCH] Hourly process pending queue entries --- src/Protocol/ActivityPub/Queue.php | 40 +++++++++++++++++++++++++++--- src/Worker/Cron.php | 4 +++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 0cb84727f..12bf7e268 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -31,6 +31,16 @@ use Friendica\Util\DateTimeFormat; */ class Queue { + /** + * Add activity to the queue + * + * @param array $activity + * @param string $type + * @param integer $uid + * @param string $http_signer + * @param boolean $push + * @return array + */ public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array { $fields = [ @@ -65,23 +75,33 @@ class Queue return $activity; } + /** + * Remove activity from the queue + * + * @param array $activity + * @return void + */ public static function remove(array $activity = []) { if (empty($activity['entry-id'])) { return; } DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); - //echo "Delete ".$activity['entry-id']."\n"; - } + /** + * Process the activity with the given id + * + * @param integer $id + * @return void + */ public static function process(int $id) { $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]); if (empty($entry)) { return; } - + Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]); $activity = json_decode($entry['activity'], true); @@ -95,15 +115,27 @@ class Queue } } + /** + * Process all activities + * + * @return void + */ public static function processAll() { $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]); while ($entry = DBA::fetch($entries)) { - echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n"; self::process($entry['id']); } + + DBA::delete('inbox-entry', ["`received` < ?", DateTimeFormat::utc('now - 1 days')]); } + /** + * Process all activities that are children of a given post url + * + * @param string $uri + * @return void + */ public static function processReplyByUri(string $uri) { $entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]); diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index 30efc61db..ee3fafe4b 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -27,6 +27,7 @@ use Friendica\Core\Worker; use Friendica\Database\DBA; use Friendica\DI; use Friendica\Model\Tag; +use Friendica\Protocol\ActivityPub\Queue; use Friendica\Protocol\Relay; class Cron @@ -88,6 +89,9 @@ class Cron Tag::setLocalTrendingHashtags(24, 20); Tag::setGlobalTrendingHashtags(24, 20); + // Process pending posts in the queue + Queue::processAll(); + // Search for new contacts in the directory if (DI::config()->get('system', 'synchronize_directory')) { Worker::add(PRIORITY_LOW, 'PullDirectory');