diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 03c68eac0..b896dfa14 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -281,6 +281,11 @@ class Processor } if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { + if (Queue::hasWorker($activity)) { + Logger::notice('There is already a worker task to dfetch the post.', ['parent' => $activity['reply-to-id']]); + return []; + } + $recursion_depth = $activity['recursion-depth'] ?? 0; Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); if ($recursion_depth < 10) { diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index f8f5666d1..d5a308b10 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -104,6 +104,20 @@ class Queue DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]); } + /** + * Check if there is an assigned worker task + * + * @param array $activity + * @return bool + */ + public static function hasWorker(array $activity = []): bool + { + if (empty($activity['worker-id'])) { + return false; + } + return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]); + } + /** * Process the activity with the given id * @@ -123,7 +137,8 @@ class Queue $type = $entry['type']; $push = $entry['push']; - $activity['entry-id'] = $entry['id']; + $activity['entry-id'] = $entry['id']; + $activity['worker-id'] = $entry['wid']; if (!Receiver::routeActivities($activity, $type, $push)) { self::remove($activity); @@ -150,7 +165,7 @@ class Queue */ public static function clear() { - DBA::delete('inbox-entry', ["`received` < ?", DateTimeFormat::utc('now - 2 days')]); + DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 4 hours')]); } /**