From 6af4c90dff789eab9da78ac47ed1d29e107abdaf Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 2 Sep 2019 03:25:05 +0000 Subject: [PATCH] Fix delivery counter / archive relay contacts --- src/Core/Worker.php | 18 ++++++++------ src/Model/Contact.php | 4 +-- src/Model/ItemDeliveryData.php | 13 ++++++++++ src/Protocol/Diaspora.php | 45 +++++++++++++++++++++++++++++----- src/Worker/APDelivery.php | 5 ++-- src/Worker/Delivery.php | 31 ++++++++++++++++++++--- src/Worker/Notifier.php | 26 ++++++++++++-------- 7 files changed, 111 insertions(+), 31 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 3da17d4a6e..c64b0ebc6b 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1104,7 +1104,7 @@ class Worker * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id); * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id); * - * @return boolean "false" if proc_run couldn't be executed + * @return boolean "false" if worker queue entry already existed or there had been an error * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @note $cmd and string args are surrounded with "" * @@ -1154,6 +1154,7 @@ class Worker $parameters = json_encode($args); $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]); + $added = false; // Quit if there was a database error - a precaution for the update process to 3.5.3 if (DBA::errorNo() != 0) { @@ -1161,19 +1162,22 @@ class Worker } if (!$found) { - DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]); + $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]); + if (!$added) { + return false; + } } elseif ($force_priority) { DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]); } // Should we quit and wait for the worker to be called as a cronjob? if ($dont_fork) { - return true; + return $added; } // If there is a lock then we don't have to check for too much worker if (!Lock::acquire('worker', 0)) { - return true; + return $added; } // If there are already enough workers running, don't fork another one @@ -1181,19 +1185,19 @@ class Worker Lock::release('worker'); if ($quit) { - return true; + return $added; } // We tell the daemon that a new job entry exists if (Config::get('system', 'worker_daemon_mode', false)) { // We don't have to set the IPC flag - this is done in "tooMuchWorkers" - return true; + return $added; } // Now call the worker to execute the jobs that we just added to the queue self::spawnWorker(); - return true; + return $added; } /** diff --git a/src/Model/Contact.php b/src/Model/Contact.php index 9d9ebe95f1..4bdaa9c713 100644 --- a/src/Model/Contact.php +++ b/src/Model/Contact.php @@ -906,7 +906,7 @@ class Contact extends BaseObject // Always unarchive the relay contact entry if (!empty($contact['batch']) && !empty($contact['term-date']) && ($contact['term-date'] > DBA::NULL_DATETIME)) { $fields = ['term-date' => DBA::NULL_DATETIME, 'archive' => false]; - $condition = ['batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY]; + $condition = ['batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY, 'uid' => 0]; DBA::update('contact', $fields, $condition); } @@ -1620,7 +1620,7 @@ class Contact extends BaseObject // Check status of Diaspora endpoints if (!empty($contact['batch'])) { - return DBA::exists('contact', ['archive' => true, 'batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY]); + return DBA::exists('contact', ['archive' => true, 'batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY, 'uid' => 0]); } return false; diff --git a/src/Model/ItemDeliveryData.php b/src/Model/ItemDeliveryData.php index f007537f99..5a181eb1b1 100644 --- a/src/Model/ItemDeliveryData.php +++ b/src/Model/ItemDeliveryData.php @@ -103,6 +103,19 @@ class ItemDeliveryData return DBA::e('UPDATE `item-delivery-data` SET `queue_failed` = `queue_failed` + 1 WHERE `iid` = ?', $item_id); } + /** + * Increments the queue_count for the given item ID. + * + * @param integer $item_id + * @param integer $increment + * @return bool + * @throws \Exception + */ + public static function incrementQueueCount($item_id, $increment) + { + return DBA::e('UPDATE `item-delivery-data` SET `queue_count` = `queue_count` + ? WHERE `iid` = ?', $increment, $item_id); + } + /** * Insert a new item delivery data entry * diff --git a/src/Protocol/Diaspora.php b/src/Protocol/Diaspora.php index b694ab9172..a0bb0d9fa3 100644 --- a/src/Protocol/Diaspora.php +++ b/src/Protocol/Diaspora.php @@ -27,6 +27,7 @@ use Friendica\Model\Conversation; use Friendica\Model\GContact; use Friendica\Model\Group; use Friendica\Model\Item; +use Friendica\Model\ItemDeliveryData; use Friendica\Model\Mail; use Friendica\Model\Profile; use Friendica\Model\User; @@ -46,6 +47,35 @@ use SimpleXMLElement; */ class Diaspora { + /** + * Mark the relay contact of the given contact for archival + * This is called whenever there is a communication issue with the server. + * It avoids sending stuff to servers who don't exist anymore. + * The relay contact is a technical contact entry that exists once per server. + * + * @param array $contact of the relay contact + */ + public static function markRelayForArchival($contact) + { + if (!empty($contact['contact-type']) && ($contact['contact-type'] == Contact::TYPE_RELAY)) { + // This is already the relay contact, we don't need to fetch it + $relay_contact = $contact; + } elseif (empty($contact['baseurl'])) { + if (!empty($contact['batch'])) { + $relay_contact = DBA::selectFirst('contact', [], ['batch' => $contact['batch'], 'contact-type' => Contact::TYPE_RELAY, 'uid' => 0]); + } else { + return; + } + } else { + $relay_contact = self::getRelayContact($contact['baseurl'], []); + } + + if (!empty($relay_contact)) { + Logger::info('Relay contact will be marked for archival', ['id' => $relay_contact['id'], 'url' => $relay_contact['url']]); + Contact::markForArchival($relay_contact); + } + } + /** * @brief Return a list of relay servers * @@ -140,13 +170,12 @@ class Diaspora * @brief Return a contact for a given server address or creates a dummy entry * * @param string $server_url The url of the server + * @param array $fields Fieldlist * @return array with the contact * @throws \Exception */ - private static function getRelayContact($server_url) + private static function getRelayContact($server_url, $fields = ['batch', 'id', 'name', 'network', 'protocol', 'archive', 'blocked']) { - $fields = ['batch', 'id', 'name', 'network', 'protocol', 'archive', 'blocked']; - // Fetch the relay contact $condition = ['uid' => 0, 'nurl' => Strings::normaliseLink($server_url), 'contact-type' => Contact::TYPE_RELAY]; @@ -2208,7 +2237,9 @@ class Diaspora } Logger::info('Deliver participation', ['item' => $comment['id'], 'contact' => $contact_id]); - Worker::add(PRIORITY_HIGH, 'Delivery', Delivery::POST, $comment['id'], $contact_id); + if (Worker::add(PRIORITY_HIGH, 'Delivery', Delivery::POST, $comment['id'], $contact_id)) { + ItemDeliveryData::incrementQueueCount($comment['id'], 1); + } } DBA::close($comments); @@ -3181,8 +3212,6 @@ class Diaspora $logid = Strings::getRandomHex(4); - $dest_url = ($public_batch ? $contact["batch"] : $contact["notify"]); - // We always try to use the data from the fcontact table. // This is important for transmitting data to Friendica servers. if (!empty($contact['addr'])) { @@ -3192,6 +3221,10 @@ class Diaspora } } + if (empty($dest_url)) { + $dest_url = ($public_batch ? $contact["batch"] : $contact["notify"]); + } + if (!$dest_url) { Logger::log("no url for contact: ".$contact["id"]." batch mode =".$public_batch); return 0; diff --git a/src/Worker/APDelivery.php b/src/Worker/APDelivery.php index 3c12e45491..812db57013 100644 --- a/src/Worker/APDelivery.php +++ b/src/Worker/APDelivery.php @@ -48,14 +48,13 @@ class APDelivery extends BaseObject $data = ActivityPub\Transmitter::createCachedActivityFromItem($target_id); if (!empty($data)) { $success = HTTPSignature::transmit($data, $inbox, $uid); - if ($success && in_array($cmd, [Delivery::POST])) { - ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::ACTIVITYPUB); - } } } if (!$success && !Worker::defer() && in_array($cmd, [Delivery::POST])) { ItemDeliveryData::incrementQueueFailed($target_id); + } elseif ($success && in_array($cmd, [Delivery::POST])) { + ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::ACTIVITYPUB); } } } diff --git a/src/Worker/Delivery.php b/src/Worker/Delivery.php index eb86f910dd..9973154488 100644 --- a/src/Worker/Delivery.php +++ b/src/Worker/Delivery.php @@ -43,12 +43,14 @@ class Delivery extends BaseObject if ($cmd == self::MAIL) { $target_item = DBA::selectFirst('mail', [], ['id' => $target_id]); if (!DBA::isResult($target_item)) { + self::setFailedQueue($cmd, $target_id); return; } $uid = $target_item['uid']; } elseif ($cmd == self::SUGGESTION) { $target_item = DBA::selectFirst('fsuggest', [], ['id' => $target_id]); if (!DBA::isResult($target_item)) { + self::setFailedQueue($cmd, $target_id); return; } $uid = $target_item['uid']; @@ -58,6 +60,7 @@ class Delivery extends BaseObject } else { $item = Model\Item::selectFirst(['parent'], ['id' => $target_id]); if (!DBA::isResult($item) || empty($item['parent'])) { + self::setFailedQueue($cmd, $target_id); return; } $parent_id = intval($item['parent']); @@ -79,11 +82,13 @@ class Delivery extends BaseObject if (empty($target_item)) { Logger::log('Item ' . $target_id . "wasn't found. Quitting here."); + self::setFailedQueue($cmd, $target_id); return; } if (empty($parent)) { Logger::log('Parent ' . $parent_id . ' for item ' . $target_id . "wasn't found. Quitting here."); + self::setFailedQueue($cmd, $target_id); return; } @@ -93,14 +98,13 @@ class Delivery extends BaseObject $uid = $target_item['uid']; } else { Logger::log('Only public users for item ' . $target_id, Logger::DEBUG); + self::setFailedQueue($cmd, $target_id); return; } if (!empty($contact_id) && Model\Contact::isArchived($contact_id)) { Logger::info('Contact is archived', ['id' => $contact_id, 'cmd' => $cmd, 'item' => $target_item['id']]); - if (in_array($cmd, [Delivery::POST, Delivery::POKE])) { - Model\ItemDeliveryData::incrementQueueFailed($target_item['id']); - } + self::setFailedQueue($cmd, $target_id); return; } @@ -160,6 +164,7 @@ class Delivery extends BaseObject $owner = Model\User::getOwnerDataById($uid); if (!DBA::isResult($owner)) { + self::setFailedQueue($cmd, $target_id); return; } @@ -168,10 +173,12 @@ class Delivery extends BaseObject ['id' => $contact_id, 'blocked' => false, 'pending' => false, 'self' => false] ); if (!DBA::isResult($contact)) { + self::setFailedQueue($cmd, $target_id); return; } if (Network::isUrlBlocked($contact['url'])) { + self::setFailedQueue($cmd, $target_id); return; } @@ -204,6 +211,15 @@ class Delivery extends BaseObject return; } + private static function setFailedQueue($cmd, $id) + { + if (!in_array($cmd, [Delivery::POST, Delivery::POKE])) { + return; + } + + Model\ItemDeliveryData::incrementQueueFailed($id); + } + /** * @brief Deliver content via DFRN * @@ -300,6 +316,10 @@ class Delivery extends BaseObject // We never spool failed relay deliveries if ($public_dfrn) { Logger::log('Relay delivery to ' . $contact["url"] . ' with guid ' . $target_item["guid"] . ' returns ' . $deliver_status); + + if (in_array($cmd, [Delivery::POST, Delivery::POKE])) { + Model\ItemDeliveryData::incrementQueueDone($target_item['id'], $protocol); + } return; } @@ -419,6 +439,11 @@ class Delivery extends BaseObject // The message could not be delivered. We mark the contact as "dead" Model\Contact::markForArchival($contact); + // When it is delivered to the public endpoint, we do mark the relay contact for archival as well + if ($public_message) { + Diaspora::markRelayForArchival($contact); + } + if (empty($contact['contact-type']) || ($contact['contact-type'] != Model\Contact::TYPE_RELAY)) { Logger::info('Delivery failed: defer message', ['id' => defaults($target_item, 'guid', $target_item['id'])]); // defer message for redelivery diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 20023792d5..102b24d0ad 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -446,8 +446,6 @@ class Notifier $conversants[] = $rr['id']; - $delivery_queue_count++; - Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG); // Ensure that posts with our own protocol arrives before Diaspora posts arrive. @@ -459,7 +457,10 @@ class Notifier } else { $deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true]; } - Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$rr['id']); + + if (Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$rr['id'])) { + $delivery_queue_count++; + } } } @@ -495,8 +496,6 @@ class Notifier continue; } - $delivery_queue_count++; - Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG); // Ensure that posts with our own protocol arrives before Diaspora posts arrive. @@ -507,7 +506,10 @@ class Notifier } else { $deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true]; } - Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$contact['id']); + + if (Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$contact['id'])) { + $delivery_queue_count++; + } } DBA::close($delivery_contacts_stmt); @@ -515,11 +517,11 @@ class Notifier // send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts // They are especially used for notifications to OStatus users that don't follow us. if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) { - $delivery_queue_count += count($url_recipients); $slap = OStatus::salmon($target_item, $owner); foreach ($url_recipients as $url) { Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url); /// @TODO Redeliver/queue these items on failure, though there is no contact record + $delivery_queue_count++; Salmon::slapper($owner, $url, $slap); ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::OSTATUS); } @@ -677,13 +679,17 @@ class Notifier // Fill the item cache ActivityPub\Transmitter::createCachedActivityFromItem($target_item['id'], true); + $delivery_queue_count = 0; + foreach ($inboxes as $inbox) { Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); - Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], - 'APDelivery', $cmd, $target_item['id'], $inbox, $uid); + if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], + 'APDelivery', $cmd, $target_item['id'], $inbox, $uid)) { + $delivery_queue_count++; + } } - return count($inboxes); + return $delivery_queue_count; } }