. * */ namespace Friendica\Protocol\ActivityPub; use Friendica\Core\Logger; use Friendica\Database\Database; use Friendica\Database\DBA; use Friendica\Util\DateTimeFormat; /** * This class handles the processing of incoming posts */ class Queue { /** * Add activity to the queue * * @param array $activity * @param string $type * @param integer $uid * @param string $http_signer * @param boolean $push * @return array */ public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array { $fields = [ 'activity-id' => $activity['id'], 'object-id' => $activity['object_id'], 'type' => $type, 'object-type' => $activity['object_type'], 'activity' => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), 'received' => DateTimeFormat::utcNow(), 'push' => $push, ]; if (!empty($activity['reply-to-id'])) { $fields['in-reply-to-id'] = $activity['reply-to-id']; } if (!empty($activity['context'])) { $fields['conversation'] = $activity['context']; } elseif (!empty($activity['conversation'])) { $fields['conversation'] = $activity['conversation']; } if (!empty($activity['object_object_type'])) { $fields['object-object-type'] = $activity['object_object_type']; } if (!empty($http_signer)) { $fields['signer'] = $http_signer; } DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE); $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]); if (!empty($queue['id'])) { $activity['entry-id'] = $queue['id']; DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE); } return $activity; } /** * Remove activity from the queue * * @param array $activity * @return void */ public static function remove(array $activity = []) { if (empty($activity['entry-id'])) { return; } DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); } /** * Set the worker id for the queue entry * * @param array $activity * @param int $wid * @return void */ public static function setWorkerId(array $activity, int $wid) { if (empty($activity['entry-id']) || empty($wid)) { return; } DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]); } /** * Check if there is an assigned worker task * * @param array $activity * @return bool */ public static function hasWorker(array $activity = []): bool { if (empty($activity['worker-id'])) { return false; } return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]); } /** * Process the activity with the given id * * @param integer $id * @return void */ public static function process(int $id) { $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]); if (empty($entry)) { return; } Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]); $activity = json_decode($entry['activity'], true); $type = $entry['type']; $push = $entry['push']; $activity['entry-id'] = $entry['id']; $activity['worker-id'] = $entry['wid']; $receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]); while ($receiver = DBA::fetch($receivers)) { if (!in_array($receiver['uid'], $activity['receiver'])) { $activity['receiver'][] = $receiver['uid']; } } DBA::close($receivers); if (!Receiver::routeActivities($activity, $type, $push)) { self::remove($activity); } } /** * Process all activities * * @return void */ public static function processAll() { $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]); while ($entry = DBA::fetch($entries)) { self::process($entry['id']); } } /** * Clear old activities * * @return void */ public static function clear() { // We delete all entries that aren't associated with a worker entry after seven days. // The other entries are deleted when the worker deferred for too long. DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]); } /** * Process all activities that are children of a given post url * * @param string $uri * @return void */ public static function processReplyByUri(string $uri) { $entries = DBA::select('inbox-entry', ['id'], ["`in-reply-to-id` = ? AND `object-id` != ?", $uri, $uri]); while ($entry = DBA::fetch($entries)) { self::process($entry['id']); } } }