From 259b99e6e94cd6714e65bb128af9cbe776fd3325 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 31 Dec 2022 12:19:34 +0000 Subject: [PATCH] Bulk delivery added for all protocols --- database.sql | 23 ++++- doc/database.md | 1 + doc/database/db_delivery-queue.md | 39 ++++++++ src/Core/Worker.php | 9 +- src/Core/Worker/Cron.php | 46 ++++++++- src/Model/GServer.php | 55 +++++++++++ src/Protocol/DFRN.php | 8 ++ src/Protocol/Delivery.php | 156 +++++++++++++++++++++++------- src/Protocol/Diaspora.php | 6 ++ src/Worker/BulkDelivery.php | 60 ++++++++++++ src/Worker/Notifier.php | 24 ++++- static/dbstructure.config.php | 20 +++- 12 files changed, 398 insertions(+), 49 deletions(-) create mode 100644 doc/database/db_delivery-queue.md create mode 100644 src/Worker/BulkDelivery.php diff --git a/database.sql b/database.sql index 2ae33689c1..02481bc007 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ -- Friendica 2023.03-dev (Giant Rhubarb) --- DB_UPDATE_VERSION 1505 +-- DB_UPDATE_VERSION 1506 -- ------------------------------------------ @@ -579,6 +579,27 @@ CREATE TABLE IF NOT EXISTS `delayed-post` ( FOREIGN KEY (`wid`) REFERENCES `workerqueue` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE ) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Posts that are about to be distributed at a later time'; +-- +-- TABLE delivery-queue +-- +CREATE TABLE IF NOT EXISTS `delivery-queue` ( + `gsid` int unsigned NOT NULL COMMENT 'Global Server ID', + `uri-id` int unsigned NOT NULL COMMENT 'Id of the item-uri table entry that contains the item uri', + `created` datetime COMMENT '', + `command` varbinary(32) COMMENT '', + `cid` int unsigned COMMENT 'contact_id (ID of the contact in contact table)', + `uid` mediumint unsigned COMMENT 'Delivering user', + `failed` tinyint DEFAULT 0 COMMENT 'Number of times the delivery has failed', + PRIMARY KEY(`uri-id`,`gsid`), + INDEX `gsid_created` (`gsid`,`created`), + INDEX `uid` (`uid`), + INDEX `cid` (`cid`), + FOREIGN KEY (`gsid`) REFERENCES `gserver` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT, + FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE, + FOREIGN KEY (`cid`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE, + FOREIGN KEY (`uid`) REFERENCES `user` (`uid`) ON UPDATE RESTRICT ON DELETE CASCADE +) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Delivery data for posts for the batch processing'; + -- -- TABLE diaspora-contact -- diff --git a/doc/database.md b/doc/database.md index 5759d29ec7..edfb7b8226 100644 --- a/doc/database.md +++ b/doc/database.md @@ -23,6 +23,7 @@ Database Tables | [contact-relation](help/database/db_contact-relation) | Contact relations | | [conv](help/database/db_conv) | private messages | | [delayed-post](help/database/db_delayed-post) | Posts that are about to be distributed at a later time | +| [delivery-queue](help/database/db_delivery-queue) | Delivery data for posts for the batch processing | | [diaspora-contact](help/database/db_diaspora-contact) | Diaspora compatible contacts - used in the Diaspora implementation | | [diaspora-interaction](help/database/db_diaspora-interaction) | Signed Diaspora Interaction | | [endpoint](help/database/db_endpoint) | ActivityPub endpoints - used in the ActivityPub implementation | diff --git a/doc/database/db_delivery-queue.md b/doc/database/db_delivery-queue.md new file mode 100644 index 0000000000..a46f7ac3a5 --- /dev/null +++ b/doc/database/db_delivery-queue.md @@ -0,0 +1,39 @@ +Table delivery-queue +=========== + +Delivery data for posts for the batch processing + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| ------- | --------------------------------------------------------- | ------------------ | ---- | --- | ------- | ----- | +| gsid | Global Server ID | int unsigned | NO | PRI | NULL | | +| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | | +| created | | datetime | YES | | NULL | | +| command | | varbinary(32) | YES | | NULL | | +| cid | contact_id (ID of the contact in contact table) | int unsigned | YES | | NULL | | +| uid | Delivering user | mediumint unsigned | YES | | NULL | | +| failed | Number of times the delivery has failed | tinyint | YES | | 0 | | + +Indexes +------------ + +| Name | Fields | +| ------------ | ------------- | +| PRIMARY | uri-id, gsid | +| gsid_created | gsid, created | +| uid | uid | +| cid | cid | + +Foreign Keys +------------ + +| Field | Target Table | Target Field | +|-------|--------------|--------------| +| gsid | [gserver](help/database/db_gserver) | id | +| uri-id | [item-uri](help/database/db_item-uri) | id | +| cid | [contact](help/database/db_contact) | id | +| uid | [user](help/database/db_user) | uid | + +Return to [database documentation](help/database) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index cbe6645294..3c5d38c09f 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1264,7 +1264,7 @@ class Worker $command = array_shift($args); $parameters = json_encode($args); - $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]); + $queue = DBA::selectFirst('workerqueue', [], ['command' => $command, 'parameter' => $parameters, 'done' => false]); $added = 0; if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) { @@ -1277,14 +1277,17 @@ class Worker return 0; } - if (!$found) { + if (empty($queue)) { if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created, 'priority' => $priority, 'next_try' => $delayed])) { return 0; } $added = DBA::lastInsertId(); } elseif ($force_priority) { - DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + if ($ret && ($priority != $queue['priority'])) { + $added = $queue['id']; + } } // Set the IPC flag to ensure an immediate process execution via daemon diff --git a/src/Core/Worker/Cron.php b/src/Core/Worker/Cron.php index f73edb816b..5fdda1396c 100644 --- a/src/Core/Worker/Cron.php +++ b/src/Core/Worker/Cron.php @@ -26,8 +26,10 @@ use Friendica\Core\Worker; use Friendica\Database\DBA; use Friendica\DI; use Friendica\Model\Contact; +use Friendica\Model\GServer; use Friendica\Model\Post; use Friendica\Protocol\ActivityPub; +use Friendica\Protocol\Delivery; use Friendica\Util\DateTimeFormat; use Friendica\Util\Strings; @@ -58,7 +60,10 @@ class Cron // Remove old entries from the workerqueue self::cleanWorkerQueue(); - // Directly deliver or requeue posts + // Directly deliver or requeue posts to ActivityPub systems + self::deliverAPPosts(); + + // Directly deliver or requeue posts to other systems self::deliverPosts(); } @@ -157,7 +162,7 @@ class Cron * * This function is placed here as a safeguard. Even when the worker queue is completely blocked, messages will be delivered. */ - private static function deliverPosts() + private static function deliverAPPosts() { $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox` ORDER BY RAND()"); while ($delivery = DBA::fetch($deliveries)) { @@ -181,7 +186,7 @@ class Cron } if (Worker::add(['priority' => $priority, 'force_priority' => true], 'APDelivery', '', 0, $delivery['inbox'], 0)) { - Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]); + Logger::info('Priority for APDelivery worker adjusted', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]); } } @@ -193,6 +198,41 @@ class Cron } } + /** + * Directly deliver messages or requeue them. + */ + private static function deliverPosts() + { + $deliveries = DBA::p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM `delivery-queue` GROUP BY `gsid` ORDER BY RAND()"); + while ($delivery = DBA::fetch($deliveries)) { + if ($delivery['failed'] > 0) { + Logger::info('Removing failed deliveries', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed']]); + Delivery::removeFailedQueue($delivery['gsid']); + } + + if (($delivery['failed'] < 3) || GServer::reachableById($delivery['gsid'])) { + $priority = Worker::PRIORITY_HIGH; + } elseif ($delivery['failed'] < 6) { + $priority = Worker::PRIORITY_MEDIUM; + } elseif ($delivery['failed'] < 8) { + $priority = Worker::PRIORITY_LOW; + } else { + $priority = Worker::PRIORITY_NEGLIGIBLE; + } + + if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery['gsid'])) { + Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed'], 'priority' => $priority]); + } + } + + // Optimizing this table only last seconds + if (DI::config()->get('system', 'optimize_tables')) { + Logger::info('Optimize start'); + DBA::e("OPTIMIZE TABLE `delivery-queue`"); + Logger::info('Optimize end'); + } + } + /** * Add missing "intro" records. * diff --git a/src/Model/GServer.php b/src/Model/GServer.php index 6945fab3e0..98acc9b67d 100644 --- a/src/Model/GServer.php +++ b/src/Model/GServer.php @@ -164,6 +164,25 @@ class GServer return DI::dba()->toArray($stmt); } + /** + * Checks if the given server id is reachable + * + * @param integer $gsid + * @return boolean + */ + public static function reachableById(int $gsid): bool + { + $gserver = DBA::selectFirst('gserver', ['url', 'next_contact', 'failed', 'network'], ['id' => $gsid]); + if (empty($gserver)) { + return true; + } else { + if (strtotime($gserver['next_contact']) < time()) { + Worker::add(Worker::PRIORITY_LOW, 'UpdateGServer', $gserver['url'], false); + } + return !$gserver['failed'] && in_array($gserver['network'], Protocol::FEDERATED); + } + } + /** * Checks if the given server is reachable * @@ -305,6 +324,42 @@ class GServer return self::detect($server_url, $network, $only_nodeinfo); } + /** + * Reset failed server status by gserver id + * + * @param int $gsid + */ + public static function setReachableById(int $gsid) + { + $gserver = DBA::selectFirst('gserver', ['url', 'failed', 'next_contact'], ['id' => $gsid]); + if (DBA::isResult($gserver) && $gserver['failed']) { + self::update(['failed' => false, 'last_contact' => DateTimeFormat::utcNow()], ['id' => $gsid]); + Logger::info('Reset failed status for server', ['url' => $gserver['url']]); + + if (strtotime($gserver['next_contact']) < time()) { + Worker::add(Worker::PRIORITY_LOW, 'UpdateGServer', $gserver['url'], false); + } + } + } + + /** + * Set failed server status by gserver id + * + * @param int $gsid + */ + public static function setFailureById(int $gsid) + { + $gserver = DBA::selectFirst('gserver', ['url', 'failed', 'next_contact'], ['id' => $gsid]); + if (DBA::isResult($gserver) && !$gserver['failed']) { + self::update(['failed' => true, 'last_failure' => DateTimeFormat::utcNow()], ['id' => $gsid]); + Logger::info('Set failed status for server', ['url' => $gserver['url']]); + + if (strtotime($gserver['next_contact']) < time()) { + Worker::add(Worker::PRIORITY_LOW, 'UpdateGServer', $gserver['url'], false); + } + } + } + /** * Set failed server status * diff --git a/src/Protocol/DFRN.php b/src/Protocol/DFRN.php index 52ae12f9c3..8bf299be68 100644 --- a/src/Protocol/DFRN.php +++ b/src/Protocol/DFRN.php @@ -1014,6 +1014,10 @@ class DFRN $xml = $postResult->getBody(); $curl_stat = $postResult->getReturnCode(); + if (!empty($contact['gsid']) && ($postResult->isTimeout() || empty($curl_stat))) { + GServer::setFailureById($contact['gsid']); + } + if (empty($curl_stat) || empty($xml)) { Logger::notice('Empty answer from ' . $contact['id'] . ' - ' . $dest_url); return -9; // timed out @@ -1035,6 +1039,10 @@ class DFRN return -23; } + if (!empty($contact['gsid'])) { + GServer::setReachableById($contact['gsid']); + } + if (!empty($res->message)) { Logger::info('Transmit to ' . $dest_url . ' returned status '.$res->status.' - '.$res->message); } diff --git a/src/Protocol/Delivery.php b/src/Protocol/Delivery.php index 79297e2282..338e75a0c1 100644 --- a/src/Protocol/Delivery.php +++ b/src/Protocol/Delivery.php @@ -26,6 +26,7 @@ use Friendica\Contact\FriendSuggest\Exception\FriendSuggestNotFoundException; use Friendica\Core\Logger; use Friendica\Core\Protocol; use Friendica\Core\Worker; +use Friendica\Database\Database; use Friendica\Database\DBA; use Friendica\DI; use Friendica\Model\Contact; @@ -45,7 +46,16 @@ class Delivery const REMOVAL = 'removeme'; const PROFILEUPDATE = 'profileupdate'; - public static function deliver(string $cmd, int $post_uriid, int $contact_id, int $sender_uid = 0) + /** + * Deliver posts to other systems + * + * @param string $cmd + * @param integer $post_uriid + * @param integer $contact_id + * @param integer $sender_uid + * @return bool "false" on remote system error. "true" when delivery was successful or we shouldn't retry. + */ + public static function deliver(string $cmd, int $post_uriid, int $contact_id, int $sender_uid = 0): bool { Logger::info('Invoked', ['cmd' => $cmd, 'target' => $post_uriid, 'sender_uid' => $sender_uid, 'contact' => $contact_id]); @@ -57,7 +67,7 @@ class Delivery if ($cmd == self::MAIL) { $target_item = DBA::selectFirst('mail', [], ['id' => $post_uriid]); if (!DBA::isResult($target_item)) { - return; + return true; } $uid = $target_item['uid']; } elseif ($cmd == self::SUGGESTION) { @@ -65,7 +75,7 @@ class Delivery $target_item = DI::fsuggest()->selectOneById($post_uriid)->toArray(); } catch (FriendSuggestNotFoundException $e) { DI::logger()->info('Cannot find FriendSuggestion', ['id' => $post_uriid]); - return; + return true; } $uid = $target_item['uid']; } elseif ($cmd == self::RELOCATION) { @@ -75,7 +85,7 @@ class Delivery $item = Post::selectFirst(['id', 'parent'], ['uri-id' => $post_uriid, 'uid' => $sender_uid]); if (!DBA::isResult($item) || empty($item['parent'])) { Logger::warning('Post not found', ['uri-id' => $post_uriid, 'uid' => $sender_uid]); - return; + return true; } $target_id = intval($item['id']); $parent_id = intval($item['parent']); @@ -101,13 +111,13 @@ class Delivery if (empty($target_item)) { Logger::warning("No target item data. Quitting here.", ['id' => $target_id]); - return; + return true; } if (empty($parent)) { Logger::warning('Parent ' . $parent_id . ' for item ' . $target_id . "wasn't found. Quitting here."); self::setFailedQueue($cmd, $target_item); - return; + return true; } if (!empty($target_item['contact-uid'])) { @@ -117,7 +127,7 @@ class Delivery } else { Logger::info('Only public users for item ' . $target_id); self::setFailedQueue($cmd, $target_item); - return; + return true; } $condition = ['uri' => $target_item['thr-parent'], 'uid' => $target_item['uid']]; @@ -131,7 +141,7 @@ class Delivery if (!empty($contact_id) && Contact::isArchived($contact_id)) { Logger::info('Contact is archived', ['id' => $contact_id, 'cmd' => $cmd, 'item' => $target_item['id']]); self::setFailedQueue($cmd, $target_item); - return; + return true; } // avoid race condition with deleting entries @@ -185,7 +195,7 @@ class Delivery $owner = User::getOwnerDataById($uid); if (!DBA::isResult($owner)) { self::setFailedQueue($cmd, $target_item); - return; + return true; } // We don't deliver our items to blocked, archived or pending contacts, and not to ourselves either @@ -194,12 +204,12 @@ class Delivery ); if (!DBA::isResult($contact)) { self::setFailedQueue($cmd, $target_item); - return; + return true; } if (Network::isUrlBlocked($contact['url'])) { self::setFailedQueue($cmd, $target_item); - return; + return true; } $protocol = GServer::getProtocol($contact['gsid'] ?? 0); @@ -217,22 +227,23 @@ class Delivery switch ($contact['network']) { case Protocol::DFRN: - self::deliverDFRN($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup, $protocol); + $success = self::deliverDFRN($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup, $protocol); break; case Protocol::DIASPORA: - self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); + $success = self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); break; case Protocol::MAIL: - self::deliverMail($cmd, $contact, $owner, $target_item, $thr_parent); + $success = self::deliverMail($cmd, $contact, $owner, $target_item, $thr_parent); break; default: + $success = true; break; } - return; + return $success; } /** @@ -265,19 +276,18 @@ class Delivery * @param boolean $followup Is it an answer to a remote post? * @param int|null $server_protocol The protocol of the server * - * @return void + * @return bool "false" on remote system error. "true" when delivery was successful or we shouldn't retry. * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - private static function deliverDFRN(string $cmd, array $contact, array $owner, array $items, array $target_item, bool $public_message, bool $top_level, bool $followup, int $server_protocol = null) + private static function deliverDFRN(string $cmd, array $contact, array $owner, array $items, array $target_item, bool $public_message, bool $top_level, bool $followup, int $server_protocol = null): bool { $target_item_id = $target_item['guid'] ?? '' ?: $target_item['id'] ?? null; // Transmit Diaspora reshares via Diaspora if the Friendica contact support Diaspora if (Diaspora::getReshareDetails($target_item) && Diaspora::isSupportedByContactUrl($contact['addr'])) { Logger::info('Reshare will be transmitted via Diaspora', ['url' => $contact['url'], 'guid' => $target_item_id]); - self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); - return; + return self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); } Logger::info('Deliver ' . ($target_item_id ?? 'relocation') . ' via DFRN to ' . ($contact['addr'] ?? '' ?: $contact['url'])); @@ -333,17 +343,18 @@ class Delivery Post\DeliveryData::incrementQueueDone($target_item['uri-id'], $protocol); GServer::setProtocol($contact['gsid'] ?? 0, $protocol); + $success = true; } else { Post\DeliveryData::incrementQueueFailed($target_item['uri-id']); + $success = false; } } - return; + return $success; } if ((($deliver_status < 200) || ($deliver_status > 299)) && (empty($server_protocol) || ($server_protocol == Post\DeliveryData::LEGACY_DFRN))) { // Transmit via Diaspora if not possible via Friendica - self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); - return; + return self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); } } else { // DFRN payload over Diaspora transport layer @@ -361,6 +372,7 @@ class Delivery if ($cmd == Delivery::POST) { Post\DeliveryData::incrementQueueDone($target_item['uri-id'], $protocol); } + $success = true; } else { // The message could not be delivered. We mark the contact as "dead" Contact::markForArchival($contact); @@ -369,7 +381,9 @@ class Delivery if (!Worker::defer() && $cmd == Delivery::POST) { Post\DeliveryData::incrementQueueFailed($target_item['uri-id']); } + $success = false; } + return $success; } /** @@ -384,11 +398,11 @@ class Delivery * @param boolean $top_level Is it a thread starter? * @param boolean $followup Is it an answer to a remote post? * - * @return void + * @return bool "false" on remote system error. "true" when delivery was successful or we shouldn't retry. * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - private static function deliverDiaspora(string $cmd, array $contact, array $owner, array $items, array $target_item, bool $public_message, bool $top_level, bool $followup) + private static function deliverDiaspora(string $cmd, array $contact, array $owner, array $items, array $target_item, bool $public_message, bool $top_level, bool $followup): bool { // We don't treat Forum posts as "wall-to-wall" to be able to post them via Diaspora $walltowall = $top_level && ($owner['id'] != $items[0]['contact-id']) & ($owner['account-type'] != User::ACCOUNT_TYPE_COMMUNITY); @@ -402,20 +416,20 @@ class Delivery Logger::notice('Deliver via Diaspora', ['target' => $target_item['id'], 'guid' => $target_item['guid'], 'to' => $loc]); if (!DI::config()->get('system', 'diaspora_enabled')) { - return; + return true; } if ($cmd == self::MAIL) { - Diaspora::sendMail($target_item, $owner, $contact); - return; + $deliver_status = Diaspora::sendMail($target_item, $owner, $contact); + return ($deliver_status >= 200) && ($deliver_status <= 299); } if ($cmd == self::SUGGESTION) { - return; + return true; } if (!$contact['pubkey'] && !$public_message) { - return; + return true; } if ($cmd == self::RELOCATION) { @@ -438,7 +452,7 @@ class Delivery $deliver_status = Diaspora::sendStatus($target_item, $owner, $contact, $public_message); } else { Logger::warning('Unknown mode', ['command' => $cmd, 'target' => $loc]); - return; + return true; } if (($deliver_status >= 200) && ($deliver_status <= 299)) { @@ -450,6 +464,7 @@ class Delivery if ($cmd == Delivery::POST) { Post\DeliveryData::incrementQueueDone($target_item['uri-id'], Post\DeliveryData::DIASPORA); } + $success = true; } else { // The message could not be delivered. We mark the contact as "dead" Contact::markForArchival($contact); @@ -468,7 +483,9 @@ class Delivery } elseif ($cmd == Delivery::POST) { Post\DeliveryData::incrementQueueFailed($target_item['uri-id']); } + $success = false; } + return $success; } /** @@ -480,27 +497,27 @@ class Delivery * @param array $target_item Item record of the content * @param array $thr_parent Item record of the direct parent in the thread * - * @return void + * @return bool "false" on remote system error. "true" when delivery was successful or we shouldn't retry. * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - private static function deliverMail(string $cmd, array $contact, array $owner, array $target_item, array $thr_parent) + private static function deliverMail(string $cmd, array $contact, array $owner, array $target_item, array $thr_parent): bool { if (DI::config()->get('system', 'imap_disabled')) { - return; + return true; } $addr = $contact['addr']; if (!strlen($addr)) { - return; + return true; } if ($cmd != self::POST) { - return; + return true; } if ($target_item['verb'] != Activity::POST) { - return; + return true; } if (!empty($thr_parent['object'])) { @@ -516,7 +533,7 @@ class Delivery $local_user = DBA::selectFirst('user', [], ['uid' => $owner['uid']]); if (!DBA::isResult($local_user)) { - return; + return true; } Logger::info('About to deliver via mail', ['guid' => $target_item['guid'], 'to' => $addr]); @@ -587,5 +604,70 @@ class Delivery // Failed Logger::warning('Delivery of mail has FAILED', ['to' => $addr, 'subject' => $subject, 'guid' => $target_item['guid']]); } + return $success; + } + + /** + * Add post for a server + * + * @param string $cmd + * @param integer $uri_id + * @param string $created + * @param integer $cid + * @param integer $gsid + * @param integer $uid + * @return bool + */ + public static function addQueue(string $cmd, int $uri_id, string $created, int $cid, int $gsid, int $uid): bool + { + $fields = ['uri-id' => $uri_id, 'uid' => $uid, 'cid' => $cid, 'gsid' => $gsid, 'created' => $created, 'command' => $cmd]; + + return DBA::insert('delivery-queue', $fields, Database::INSERT_IGNORE); + } + + /** + * Remove post by a server after delivery + * + * @param integer $uri_id + * @param integer $gsid + * @return bool + */ + public static function removeQueue(int $uri_id, int $gsid): bool + { + return DBA::delete('delivery-queue', ['uri-id' => $uri_id, 'gsid' => $gsid]); + } + + /** + * Remove failed posts for the given server + * + * @param integer $gsid + * @return bool + */ + public static function removeFailedQueue(int $gsid): bool + { + return DBA::delete('delivery-queue', ["`gsid` = ? AND `failed` >= ?", $gsid, DI::config()->get('system', 'worker_defer_limit')]); + } + + /** + * Increment "failed" counter for the given server and post + * + * @param integer $uri_id + * @param integer $gsid + * @return bool + */ + public static function incrementFailedQueue(int $uri_id, int $gsid): bool + { + return DBA::e('UPDATE `delivery-queue` SET `failed` = `failed` + 1 WHERE `uri-id` = ? AND `gsid` = ?', $uri_id, $gsid); + } + + /** + * Select queue entries for the given server + * + * @param integer $gsid + * @return array + */ + public static function selectQueueForServer(int $gsid): array + { + return DBA::selectToArray('delivery-queue', [], ["`gsid` = ? AND `failed` < ?", $gsid, DI::config()->get('system', 'worker_defer_limit')], ['order' => ['created']]); } } diff --git a/src/Protocol/Diaspora.php b/src/Protocol/Diaspora.php index c0fd4fd329..3ab91a0cb4 100644 --- a/src/Protocol/Diaspora.php +++ b/src/Protocol/Diaspora.php @@ -2955,6 +2955,12 @@ class Diaspora return 200; } + if (!empty($contact['gsid']) && (empty($return_code) || $postResult->isTimeout())) { + GServer::setFailureById($contact['gsid']); + } elseif (!empty($contact['gsid']) && ($return_code >= 200) && ($return_code <= 299)) { + GServer::setReachableById($contact['gsid']); + } + Logger::notice('transmit: ' . $logid . '-' . $guid . ' to ' . $dest_url . ' returns: ' . $return_code); return $return_code ? $return_code : -1; diff --git a/src/Worker/BulkDelivery.php b/src/Worker/BulkDelivery.php new file mode 100644 index 0000000000..e6b484b2cc --- /dev/null +++ b/src/Worker/BulkDelivery.php @@ -0,0 +1,60 @@ +. + * + */ + +namespace Friendica\Worker; + +use Friendica\Core\Logger; +use Friendica\Core\Worker; +use Friendica\Model\GServer; +use Friendica\Protocol\Delivery as ProtocolDelivery; + +class BulkDelivery +{ + public static function execute(int $gsid) + { + $server_failure = false; + $delivery_failure = false; + + $posts = ProtocolDelivery::selectQueueForServer($gsid); + foreach ($posts as $post) { + if (!$server_failure && ProtocolDelivery::deliver($post['command'], $post['uri-id'], $post['cid'], $post['uid'])) { + ProtocolDelivery::removeQueue($post['uri-id'], $post['gsid']); + Logger::debug('Delivery successful', $post); + } else { + ProtocolDelivery::incrementFailedQueue($post['uri-id'], $post['gsid']); + $delivery_failure = true; + + if (!$server_failure) { + $server_failure = !GServer::reachableById($gsid); + } + Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $post]); + } + } + + if ($server_failure) { + Worker::defer(); + } + + if ($delivery_failure) { + ProtocolDelivery::removeFailedQueue($gsid); + } + } +} diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 18ac579ccd..0dd144a485 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -565,7 +565,16 @@ class Notifier continue; } - if (!GServer::reachable($contact)) { + if (empty($contact['gsid'])) { + $reachable = !GServer::reachable($contact); + } elseif (!DI::config()->get('system', 'bulk_delivery')) { + $reachable = !GServer::reachableById($contact['gsid']); + } else { + // On bulk delivery we don't check the server status at this point + $reachable = true; + } + + if (!$reachable) { Logger::info('Server is not reachable', ['id' => $post_uriid, 'uid' => $sender_uid, 'contact' => $contact]); continue; } @@ -582,9 +591,16 @@ class Notifier $deliver_options = ['priority' => $a->getQueueValue('priority'), 'created' => $a->getQueueValue('created'), 'dont_fork' => true]; } - if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) { + if (!empty($contact['gsid']) && DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; + Delivery::addQueue($cmd, $post_uriid, $target_item['created'], $contact['id'], $contact['gsid'], $sender_uid); + Worker::add(['priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true], 'BulkDelivery', $contact['gsid']); + } else { + if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) { + $delivery_queue_count++; + } } + Worker::coolDown(); } return $delivery_queue_count; @@ -834,7 +850,7 @@ class Notifier if (DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd, $receivers); - Worker::add(Worker::PRIORITY_HIGH, 'APDelivery', '', 0, $inbox, 0); + Worker::add([Worker::PRIORITY_HIGH, 'dont_fork' => true], 'APDelivery', '', 0, $inbox, 0); } else { if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, $receivers, $target_item['uri-id'])) { @@ -851,7 +867,7 @@ class Notifier if (DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd, []); - Worker::add(Worker::PRIORITY_MEDIUM, 'APDelivery', '', 0, $inbox, 0); + Worker::add([Worker::PRIORITY_MEDIUM, 'dont_fork' => true], 'APDelivery', '', 0, $inbox, 0); } else { if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, [], $target_item['uri-id'])) { $delivery_queue_count++; diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index 25e416d508..c45daede80 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1505); + define('DB_UPDATE_VERSION', 1506); } return [ @@ -638,6 +638,24 @@ return [ "wid" => ["wid"], ] ], + "delivery-queue" => [ + "comment" => "Delivery data for posts for the batch processing", + "fields" => [ + "gsid" => ["type" => "int unsigned", "not null" => "1", "primary" => "1", "foreign" => ["gserver" => "id", "on delete" => "restrict"], "comment" => "Global Server ID"], + "uri-id" => ["type" => "int unsigned", "not null" => "1", "primary" => "1", "foreign" => ["item-uri" => "id"], "comment" => "Id of the item-uri table entry that contains the item uri"], + "created" => ["type" => "datetime", "comment" => ""], + "command" => ["type" => "varbinary(32)", "comment" => ""], + "cid" => ["type" => "int unsigned", "foreign" => ["contact" => "id"], "comment" => "contact_id (ID of the contact in contact table)"], + "uid" => ["type" => "mediumint unsigned", "foreign" => ["user" => "uid"], "comment" => "Delivering user"], + "failed" => ["type" => "tinyint", "default" => 0, "comment" => "Number of times the delivery has failed"], + ], + "indexes" => [ + "PRIMARY" => ["uri-id", "gsid"], + "gsid_created" => ["gsid", "created"], + "uid" => ["uid"], + "cid" => ["cid"], + ] + ], "diaspora-contact" => [ "comment" => "Diaspora compatible contacts - used in the Diaspora implementation", "fields" => [