diff --git a/src/Console/FixAPDeliveryWorkerTaskParameters.php b/src/Console/FixAPDeliveryWorkerTaskParameters.php new file mode 100644 index 000000000..9023d84ac --- /dev/null +++ b/src/Console/FixAPDeliveryWorkerTaskParameters.php @@ -0,0 +1,163 @@ +. + * + */ + +namespace Friendica\Console; + +use Friendica\App; +use Friendica\Database\Database; +use Friendica\Database\DBA; +use Friendica\DI; +use Friendica\Model\Contact; +use Friendica\Util\Strings; +use RuntimeException; + +/** + * License: AGPLv3 or later, same as Friendica + */ +class FixAPDeliveryWorkerTaskParameters extends \Asika\SimpleConsole\Console +{ + protected $helpOptions = ['h', 'help', '?']; + + /** + * @var App\Mode + */ + private $appMode; + /** + * @var Database + */ + private $dba; + /** + * @var int + */ + private $examined; + /** + * @var int + */ + private $processed; + /** + * @var int + */ + private $errored; + + protected function getHelp() + { + $help = <<appMode = $appMode; + $this->dba = $dba; + $this->l10n = $l10n; + } + + protected function doExecute() + { + if ($this->getOption('v')) { + $this->out('Class: ' . __CLASS__); + $this->out('Arguments: ' . var_export($this->args, true)); + $this->out('Options: ' . var_export($this->options, true)); + } + + if (count($this->args) > 0) { + throw new \Asika\SimpleConsole\CommandArgsException('Too many arguments'); + } + + if ($this->appMode->isInstall()) { + throw new RuntimeException('Friendica isn\'t properly installed yet.'); + } + + $this->examined = 0; + $this->processed = 0; + $this->errored = 0; + + do { + $result = $this->dba->p('SELECT `id`, `parameter` FROM `workerqueue` WHERE `command` = "APDelivery" AND `parameter` LIKE "[\"%\",\"\",%" LIMIT ' . $this->examined . ', 100'); + while ($row = $this->dba->fetch($result)) { + $this->examined++; + $this->processRow($row); + } + } while ($this->dba->isResult($result)); + + if ($this->getOption('v')) { + $this->out('Examined: ' . $this->examined); + $this->out('Processed: ' . $this->processed); + $this->out('Errored: ' . $this->errored); + } + + return 0; + } + + private function processRow(array $workerqueueItem) + { + $parameters = json_decode($workerqueueItem['parameter'], true); + + if (!$parameters) { + $this->errored++; + if ($this->getOption('v')) { + $this->out('Unabled to parse parameter JSON of the row with id ' . $workerqueueItem['id']); + $this->out('JSON: ' . var_export($workerqueueItem['parameter'], true)); + } + } + + if ($parameters[1] !== '' && !is_array($parameters[2])) { + // Nothing to do, we save a write + return; + } + + if ($parameters[1] === '') { + $parameters[1] = 0; + } + + if (is_array($parameters[2])) { + $parameters[4] = $parameters[2]; + $contact = Contact::getById(current($parameters[2]), ['url']); + $parameters[2] = $contact['url']; + } + + $fields = ['parameter' => json_encode($parameters)]; + if ($this->dba->update('workerqueue', $fields, ['id' => $workerqueueItem['id']])) { + $this->processed++; + } else { + $this->errored++; + if ($this->getOption('v')) { + $this->out('Unabled to update the row with id ' . $workerqueueItem['id']); + $this->out('Fields: ' . var_export($fields, true)); + } + } + } +} diff --git a/src/Core/Console.php b/src/Core/Console.php index 4a4dc13ef..f43b89e9e 100644 --- a/src/Core/Console.php +++ b/src/Core/Console.php @@ -94,6 +94,7 @@ HELP; 'serverblock' => Friendica\Console\ServerBlock::class, 'storage' => Friendica\Console\Storage::class, 'relay' => Friendica\Console\Relay::class, + 'fixapdeliveryworkertaskparameters' => Friendica\Console\FixAPDeliveryWorkerTaskParameters::class, ]; /** diff --git a/src/Worker/APDelivery.php b/src/Worker/APDelivery.php index c98f1e399..0c2c0ca9c 100644 --- a/src/Worker/APDelivery.php +++ b/src/Worker/APDelivery.php @@ -34,58 +34,36 @@ class APDelivery /** * Delivers ActivityPub messages * - * @param string $cmd - * @param integer $target_id - * @param string|array $inboxes - * @param integer $uid - * @param array $receivers + * @param string $cmd One of the Worker\Delivery constant values + * @param integer $item_id 0 if no item is involved (like Delivery::REMOVAL and Delivery::PROFILEUPDATE) + * @param string $inbox The URL of the recipient profile + * @param integer $uid The ID of the user who triggered this delivery + * @param array $receivers The contact IDs related to the inbox URL for contact archival housekeeping * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function execute(string $cmd, int $target_id, $inboxes, int $uid, array $receivers = []) - { - if (is_string($inboxes)) { - $inboxes = [$inboxes]; - } - - foreach ($inboxes as $inbox) { - self::perform($cmd, $target_id, $inbox, $uid, $receivers); - } - } - - /** - * Delivers ActivityPub messages - * - * @param string $cmd - * @param integer $target_id - * @param string $inbox - * @param integer $uid - * @param array $receivers - * @throws \Friendica\Network\HTTPException\InternalServerErrorException - * @throws \ImagickException - */ - private static function perform(string $cmd, int $target_id, string $inbox, int $uid, array $receivers = []) + public static function execute(string $cmd, int $item_id, string $inbox, int $uid, array $receivers = []) { if (ActivityPub\Transmitter::archivedInbox($inbox)) { - Logger::info('Inbox is archived', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $target_id, 'uid' => $uid]); + Logger::info('Inbox is archived', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uid' => $uid]); if (in_array($cmd, [Delivery::POST])) { - $item = Item::selectFirst(['uri-id'], ['id' => $target_id]); + $item = Item::selectFirst(['uri-id'], ['id' => $item_id]); Post\DeliveryData::incrementQueueFailed($item['uri-id'] ?? 0); } return; } - Logger::info('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $target_id, 'uid' => $uid]); + Logger::info('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uid' => $uid]); $success = true; if ($cmd == Delivery::MAIL) { - $data = ActivityPub\Transmitter::createActivityFromMail($target_id); + $data = ActivityPub\Transmitter::createActivityFromMail($item_id); if (!empty($data)) { $success = HTTPSignature::transmit($data, $inbox, $uid); } } elseif ($cmd == Delivery::SUGGESTION) { - $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $target_id); + $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id); } elseif ($cmd == Delivery::RELOCATION) { // @todo Implementation pending } elseif ($cmd == Delivery::POKE) { @@ -95,14 +73,14 @@ class APDelivery } elseif ($cmd == Delivery::PROFILEUPDATE) { $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox); } else { - $data = ActivityPub\Transmitter::createCachedActivityFromItem($target_id); + $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id); if (!empty($data)) { $success = HTTPSignature::transmit($data, $inbox, $uid); } } // This should never fail and is temporariy (until the move to the "post" structure) - $item = Item::selectFirst(['uri-id'], ['id' => $target_id]); + $item = Item::selectFirst(['uri-id'], ['id' => $item_id]); $uriid = $item['uri-id'] ?? 0; foreach ($receivers as $receiver) { diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 02992898c..907037dce 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -81,10 +81,10 @@ class Notifier $mail = ActivityPub\Transmitter::ItemArrayFromMail($target_id); $inboxes = ActivityPub\Transmitter::fetchTargetInboxes($mail, $uid, true); - foreach ($inboxes as $inbox) { + foreach ($inboxes as $inbox => $receivers) { Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'target' => $target_id, 'inbox' => $inbox]); Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true], - 'APDelivery', $cmd, $target_id, $inbox, $uid); + 'APDelivery', $cmd, $target_id, $inbox, $uid, $receivers); } } elseif ($cmd == Delivery::SUGGESTION) { $suggest = DI::fsuggest()->getById($target_id); @@ -744,10 +744,10 @@ class Notifier DBA::close($contacts_stmt); $inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser(0); - foreach ($inboxes as $inbox) { + foreach ($inboxes as $inbox => $receivers) { Logger::info('Account removal via ActivityPub', ['uid' => $self_user_id, 'inbox' => $inbox]); Worker::add(['priority' => PRIORITY_NEGLIGIBLE, 'created' => $created, 'dont_fork' => true], - 'APDelivery', Delivery::REMOVAL, '', $inbox, $self_user_id); + 'APDelivery', Delivery::REMOVAL, 0, $inbox, $self_user_id, $receivers); } return true; @@ -834,10 +834,10 @@ class Notifier } // We deliver posts to relay servers slightly delayed to priorize the direct delivery - foreach ($relay_inboxes as $inbox => $receivers) { + foreach ($relay_inboxes as $inbox) { Logger::info('Delivery to relay servers via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); - if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, $receivers)) { + if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid)) { $delivery_queue_count++; } } diff --git a/src/Worker/ProfileUpdate.php b/src/Worker/ProfileUpdate.php index f33811ef6..7ced7aa2e 100644 --- a/src/Worker/ProfileUpdate.php +++ b/src/Worker/ProfileUpdate.php @@ -40,10 +40,10 @@ class ProfileUpdate { $inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser($uid); - foreach ($inboxes as $inbox) { + foreach ($inboxes as $inbox => $receivers) { Logger::log('Profile update for user ' . $uid . ' to ' . $inbox .' via ActivityPub', Logger::DEBUG); Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], - 'APDelivery', Delivery::PROFILEUPDATE, '', $inbox, $uid); + 'APDelivery', Delivery::PROFILEUPDATE, 0, $inbox, $uid, $receivers); } Diaspora::sendProfile($uid);