Fix delivery counter / archive relay contacts

This commit is contained in:
Michael 2019-09-02 03:25:05 +00:00
parent 3ee26ecd24
commit 6af4c90dff
7 changed files with 111 additions and 31 deletions

View file

@ -1104,7 +1104,7 @@ class Worker
* or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id); * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id);
* or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id); * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id);
* *
* @return boolean "false" if proc_run couldn't be executed * @return boolean "false" if worker queue entry already existed or there had been an error
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @note $cmd and string args are surrounded with "" * @note $cmd and string args are surrounded with ""
* *
@ -1154,6 +1154,7 @@ class Worker
$parameters = json_encode($args); $parameters = json_encode($args);
$found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]); $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]);
$added = false;
// Quit if there was a database error - a precaution for the update process to 3.5.3 // Quit if there was a database error - a precaution for the update process to 3.5.3
if (DBA::errorNo() != 0) { if (DBA::errorNo() != 0) {
@ -1161,19 +1162,22 @@ class Worker
} }
if (!$found) { if (!$found) {
DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]); $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
if (!$added) {
return false;
}
} elseif ($force_priority) { } elseif ($force_priority) {
DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]); DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]);
} }
// Should we quit and wait for the worker to be called as a cronjob? // Should we quit and wait for the worker to be called as a cronjob?
if ($dont_fork) { if ($dont_fork) {
return true; return $added;
} }
// If there is a lock then we don't have to check for too much worker // If there is a lock then we don't have to check for too much worker
if (!Lock::acquire('worker', 0)) { if (!Lock::acquire('worker', 0)) {
return true; return $added;
} }
// If there are already enough workers running, don't fork another one // If there are already enough workers running, don't fork another one
@ -1181,19 +1185,19 @@ class Worker
Lock::release('worker'); Lock::release('worker');
if ($quit) { if ($quit) {
return true; return $added;
} }
// We tell the daemon that a new job entry exists // We tell the daemon that a new job entry exists
if (Config::get('system', 'worker_daemon_mode', false)) { if (Config::get('system', 'worker_daemon_mode', false)) {
// We don't have to set the IPC flag - this is done in "tooMuchWorkers" // We don't have to set the IPC flag - this is done in "tooMuchWorkers"
return true; return $added;
} }
// Now call the worker to execute the jobs that we just added to the queue // Now call the worker to execute the jobs that we just added to the queue
self::spawnWorker(); self::spawnWorker();
return true; return $added;
} }
/** /**

View file

@ -906,7 +906,7 @@ class Contact extends BaseObject
// Always unarchive the relay contact entry // Always unarchive the relay contact entry
if (!empty($contact['batch']) && !empty($contact['term-date']) && ($contact['term-date'] > DBA::NULL_DATETIME)) { if (!empty($contact['batch']) && !empty($contact['term-date']) && ($contact['term-date'] > DBA::NULL_DATETIME)) {
$fields = ['term-date' => DBA::NULL_DATETIME, 'archive' => false]; $fields = ['term-date' => DBA::NULL_DATETIME, 'archive' => false];
$condition = ['batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY]; $condition = ['batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY, 'uid' => 0];
DBA::update('contact', $fields, $condition); DBA::update('contact', $fields, $condition);
} }
@ -1620,7 +1620,7 @@ class Contact extends BaseObject
// Check status of Diaspora endpoints // Check status of Diaspora endpoints
if (!empty($contact['batch'])) { if (!empty($contact['batch'])) {
return DBA::exists('contact', ['archive' => true, 'batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY]); return DBA::exists('contact', ['archive' => true, 'batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY, 'uid' => 0]);
} }
return false; return false;

View file

@ -103,6 +103,19 @@ class ItemDeliveryData
return DBA::e('UPDATE `item-delivery-data` SET `queue_failed` = `queue_failed` + 1 WHERE `iid` = ?', $item_id); return DBA::e('UPDATE `item-delivery-data` SET `queue_failed` = `queue_failed` + 1 WHERE `iid` = ?', $item_id);
} }
/**
* Increments the queue_count for the given item ID.
*
* @param integer $item_id
* @param integer $increment
* @return bool
* @throws \Exception
*/
public static function incrementQueueCount($item_id, $increment)
{
return DBA::e('UPDATE `item-delivery-data` SET `queue_count` = `queue_count` + ? WHERE `iid` = ?', $increment, $item_id);
}
/** /**
* Insert a new item delivery data entry * Insert a new item delivery data entry
* *

View file

@ -27,6 +27,7 @@ use Friendica\Model\Conversation;
use Friendica\Model\GContact; use Friendica\Model\GContact;
use Friendica\Model\Group; use Friendica\Model\Group;
use Friendica\Model\Item; use Friendica\Model\Item;
use Friendica\Model\ItemDeliveryData;
use Friendica\Model\Mail; use Friendica\Model\Mail;
use Friendica\Model\Profile; use Friendica\Model\Profile;
use Friendica\Model\User; use Friendica\Model\User;
@ -46,6 +47,35 @@ use SimpleXMLElement;
*/ */
class Diaspora class Diaspora
{ {
/**
* Mark the relay contact of the given contact for archival
* This is called whenever there is a communication issue with the server.
* It avoids sending stuff to servers who don't exist anymore.
* The relay contact is a technical contact entry that exists once per server.
*
* @param array $contact of the relay contact
*/
public static function markRelayForArchival($contact)
{
if (!empty($contact['contact-type']) && ($contact['contact-type'] == Contact::TYPE_RELAY)) {
// This is already the relay contact, we don't need to fetch it
$relay_contact = $contact;
} elseif (empty($contact['baseurl'])) {
if (!empty($contact['batch'])) {
$relay_contact = DBA::selectFirst('contact', [], ['batch' => $contact['batch'], 'contact-type' => Contact::TYPE_RELAY, 'uid' => 0]);
} else {
return;
}
} else {
$relay_contact = self::getRelayContact($contact['baseurl'], []);
}
if (!empty($relay_contact)) {
Logger::info('Relay contact will be marked for archival', ['id' => $relay_contact['id'], 'url' => $relay_contact['url']]);
Contact::markForArchival($relay_contact);
}
}
/** /**
* @brief Return a list of relay servers * @brief Return a list of relay servers
* *
@ -140,13 +170,12 @@ class Diaspora
* @brief Return a contact for a given server address or creates a dummy entry * @brief Return a contact for a given server address or creates a dummy entry
* *
* @param string $server_url The url of the server * @param string $server_url The url of the server
* @param array $fields Fieldlist
* @return array with the contact * @return array with the contact
* @throws \Exception * @throws \Exception
*/ */
private static function getRelayContact($server_url) private static function getRelayContact($server_url, $fields = ['batch', 'id', 'name', 'network', 'protocol', 'archive', 'blocked'])
{ {
$fields = ['batch', 'id', 'name', 'network', 'protocol', 'archive', 'blocked'];
// Fetch the relay contact // Fetch the relay contact
$condition = ['uid' => 0, 'nurl' => Strings::normaliseLink($server_url), $condition = ['uid' => 0, 'nurl' => Strings::normaliseLink($server_url),
'contact-type' => Contact::TYPE_RELAY]; 'contact-type' => Contact::TYPE_RELAY];
@ -2208,7 +2237,9 @@ class Diaspora
} }
Logger::info('Deliver participation', ['item' => $comment['id'], 'contact' => $contact_id]); Logger::info('Deliver participation', ['item' => $comment['id'], 'contact' => $contact_id]);
Worker::add(PRIORITY_HIGH, 'Delivery', Delivery::POST, $comment['id'], $contact_id); if (Worker::add(PRIORITY_HIGH, 'Delivery', Delivery::POST, $comment['id'], $contact_id)) {
ItemDeliveryData::incrementQueueCount($comment['id'], 1);
}
} }
DBA::close($comments); DBA::close($comments);
@ -3181,8 +3212,6 @@ class Diaspora
$logid = Strings::getRandomHex(4); $logid = Strings::getRandomHex(4);
$dest_url = ($public_batch ? $contact["batch"] : $contact["notify"]);
// We always try to use the data from the fcontact table. // We always try to use the data from the fcontact table.
// This is important for transmitting data to Friendica servers. // This is important for transmitting data to Friendica servers.
if (!empty($contact['addr'])) { if (!empty($contact['addr'])) {
@ -3192,6 +3221,10 @@ class Diaspora
} }
} }
if (empty($dest_url)) {
$dest_url = ($public_batch ? $contact["batch"] : $contact["notify"]);
}
if (!$dest_url) { if (!$dest_url) {
Logger::log("no url for contact: ".$contact["id"]." batch mode =".$public_batch); Logger::log("no url for contact: ".$contact["id"]." batch mode =".$public_batch);
return 0; return 0;

View file

@ -48,14 +48,13 @@ class APDelivery extends BaseObject
$data = ActivityPub\Transmitter::createCachedActivityFromItem($target_id); $data = ActivityPub\Transmitter::createCachedActivityFromItem($target_id);
if (!empty($data)) { if (!empty($data)) {
$success = HTTPSignature::transmit($data, $inbox, $uid); $success = HTTPSignature::transmit($data, $inbox, $uid);
if ($success && in_array($cmd, [Delivery::POST])) {
ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::ACTIVITYPUB);
}
} }
} }
if (!$success && !Worker::defer() && in_array($cmd, [Delivery::POST])) { if (!$success && !Worker::defer() && in_array($cmd, [Delivery::POST])) {
ItemDeliveryData::incrementQueueFailed($target_id); ItemDeliveryData::incrementQueueFailed($target_id);
} elseif ($success && in_array($cmd, [Delivery::POST])) {
ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::ACTIVITYPUB);
} }
} }
} }

View file

@ -43,12 +43,14 @@ class Delivery extends BaseObject
if ($cmd == self::MAIL) { if ($cmd == self::MAIL) {
$target_item = DBA::selectFirst('mail', [], ['id' => $target_id]); $target_item = DBA::selectFirst('mail', [], ['id' => $target_id]);
if (!DBA::isResult($target_item)) { if (!DBA::isResult($target_item)) {
self::setFailedQueue($cmd, $target_id);
return; return;
} }
$uid = $target_item['uid']; $uid = $target_item['uid'];
} elseif ($cmd == self::SUGGESTION) { } elseif ($cmd == self::SUGGESTION) {
$target_item = DBA::selectFirst('fsuggest', [], ['id' => $target_id]); $target_item = DBA::selectFirst('fsuggest', [], ['id' => $target_id]);
if (!DBA::isResult($target_item)) { if (!DBA::isResult($target_item)) {
self::setFailedQueue($cmd, $target_id);
return; return;
} }
$uid = $target_item['uid']; $uid = $target_item['uid'];
@ -58,6 +60,7 @@ class Delivery extends BaseObject
} else { } else {
$item = Model\Item::selectFirst(['parent'], ['id' => $target_id]); $item = Model\Item::selectFirst(['parent'], ['id' => $target_id]);
if (!DBA::isResult($item) || empty($item['parent'])) { if (!DBA::isResult($item) || empty($item['parent'])) {
self::setFailedQueue($cmd, $target_id);
return; return;
} }
$parent_id = intval($item['parent']); $parent_id = intval($item['parent']);
@ -79,11 +82,13 @@ class Delivery extends BaseObject
if (empty($target_item)) { if (empty($target_item)) {
Logger::log('Item ' . $target_id . "wasn't found. Quitting here."); Logger::log('Item ' . $target_id . "wasn't found. Quitting here.");
self::setFailedQueue($cmd, $target_id);
return; return;
} }
if (empty($parent)) { if (empty($parent)) {
Logger::log('Parent ' . $parent_id . ' for item ' . $target_id . "wasn't found. Quitting here."); Logger::log('Parent ' . $parent_id . ' for item ' . $target_id . "wasn't found. Quitting here.");
self::setFailedQueue($cmd, $target_id);
return; return;
} }
@ -93,14 +98,13 @@ class Delivery extends BaseObject
$uid = $target_item['uid']; $uid = $target_item['uid'];
} else { } else {
Logger::log('Only public users for item ' . $target_id, Logger::DEBUG); Logger::log('Only public users for item ' . $target_id, Logger::DEBUG);
self::setFailedQueue($cmd, $target_id);
return; return;
} }
if (!empty($contact_id) && Model\Contact::isArchived($contact_id)) { if (!empty($contact_id) && Model\Contact::isArchived($contact_id)) {
Logger::info('Contact is archived', ['id' => $contact_id, 'cmd' => $cmd, 'item' => $target_item['id']]); Logger::info('Contact is archived', ['id' => $contact_id, 'cmd' => $cmd, 'item' => $target_item['id']]);
if (in_array($cmd, [Delivery::POST, Delivery::POKE])) { self::setFailedQueue($cmd, $target_id);
Model\ItemDeliveryData::incrementQueueFailed($target_item['id']);
}
return; return;
} }
@ -160,6 +164,7 @@ class Delivery extends BaseObject
$owner = Model\User::getOwnerDataById($uid); $owner = Model\User::getOwnerDataById($uid);
if (!DBA::isResult($owner)) { if (!DBA::isResult($owner)) {
self::setFailedQueue($cmd, $target_id);
return; return;
} }
@ -168,10 +173,12 @@ class Delivery extends BaseObject
['id' => $contact_id, 'blocked' => false, 'pending' => false, 'self' => false] ['id' => $contact_id, 'blocked' => false, 'pending' => false, 'self' => false]
); );
if (!DBA::isResult($contact)) { if (!DBA::isResult($contact)) {
self::setFailedQueue($cmd, $target_id);
return; return;
} }
if (Network::isUrlBlocked($contact['url'])) { if (Network::isUrlBlocked($contact['url'])) {
self::setFailedQueue($cmd, $target_id);
return; return;
} }
@ -204,6 +211,15 @@ class Delivery extends BaseObject
return; return;
} }
private static function setFailedQueue($cmd, $id)
{
if (!in_array($cmd, [Delivery::POST, Delivery::POKE])) {
return;
}
Model\ItemDeliveryData::incrementQueueFailed($id);
}
/** /**
* @brief Deliver content via DFRN * @brief Deliver content via DFRN
* *
@ -300,6 +316,10 @@ class Delivery extends BaseObject
// We never spool failed relay deliveries // We never spool failed relay deliveries
if ($public_dfrn) { if ($public_dfrn) {
Logger::log('Relay delivery to ' . $contact["url"] . ' with guid ' . $target_item["guid"] . ' returns ' . $deliver_status); Logger::log('Relay delivery to ' . $contact["url"] . ' with guid ' . $target_item["guid"] . ' returns ' . $deliver_status);
if (in_array($cmd, [Delivery::POST, Delivery::POKE])) {
Model\ItemDeliveryData::incrementQueueDone($target_item['id'], $protocol);
}
return; return;
} }
@ -419,6 +439,11 @@ class Delivery extends BaseObject
// The message could not be delivered. We mark the contact as "dead" // The message could not be delivered. We mark the contact as "dead"
Model\Contact::markForArchival($contact); Model\Contact::markForArchival($contact);
// When it is delivered to the public endpoint, we do mark the relay contact for archival as well
if ($public_message) {
Diaspora::markRelayForArchival($contact);
}
if (empty($contact['contact-type']) || ($contact['contact-type'] != Model\Contact::TYPE_RELAY)) { if (empty($contact['contact-type']) || ($contact['contact-type'] != Model\Contact::TYPE_RELAY)) {
Logger::info('Delivery failed: defer message', ['id' => defaults($target_item, 'guid', $target_item['id'])]); Logger::info('Delivery failed: defer message', ['id' => defaults($target_item, 'guid', $target_item['id'])]);
// defer message for redelivery // defer message for redelivery

View file

@ -446,8 +446,6 @@ class Notifier
$conversants[] = $rr['id']; $conversants[] = $rr['id'];
$delivery_queue_count++;
Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG); Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG);
// Ensure that posts with our own protocol arrives before Diaspora posts arrive. // Ensure that posts with our own protocol arrives before Diaspora posts arrive.
@ -459,7 +457,10 @@ class Notifier
} else { } else {
$deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true]; $deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true];
} }
Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$rr['id']);
if (Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$rr['id'])) {
$delivery_queue_count++;
}
} }
} }
@ -495,8 +496,6 @@ class Notifier
continue; continue;
} }
$delivery_queue_count++;
Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG); Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG);
// Ensure that posts with our own protocol arrives before Diaspora posts arrive. // Ensure that posts with our own protocol arrives before Diaspora posts arrive.
@ -507,7 +506,10 @@ class Notifier
} else { } else {
$deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true]; $deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true];
} }
Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$contact['id']);
if (Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$contact['id'])) {
$delivery_queue_count++;
}
} }
DBA::close($delivery_contacts_stmt); DBA::close($delivery_contacts_stmt);
@ -515,11 +517,11 @@ class Notifier
// send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts // send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts
// They are especially used for notifications to OStatus users that don't follow us. // They are especially used for notifications to OStatus users that don't follow us.
if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) { if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) {
$delivery_queue_count += count($url_recipients);
$slap = OStatus::salmon($target_item, $owner); $slap = OStatus::salmon($target_item, $owner);
foreach ($url_recipients as $url) { foreach ($url_recipients as $url) {
Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url); Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url);
/// @TODO Redeliver/queue these items on failure, though there is no contact record /// @TODO Redeliver/queue these items on failure, though there is no contact record
$delivery_queue_count++;
Salmon::slapper($owner, $url, $slap); Salmon::slapper($owner, $url, $slap);
ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::OSTATUS); ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::OSTATUS);
} }
@ -677,13 +679,17 @@ class Notifier
// Fill the item cache // Fill the item cache
ActivityPub\Transmitter::createCachedActivityFromItem($target_item['id'], true); ActivityPub\Transmitter::createCachedActivityFromItem($target_item['id'], true);
$delivery_queue_count = 0;
foreach ($inboxes as $inbox) { foreach ($inboxes as $inbox) {
Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]);
Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true],
'APDelivery', $cmd, $target_item['id'], $inbox, $uid); 'APDelivery', $cmd, $target_item['id'], $inbox, $uid)) {
$delivery_queue_count++;
}
} }
return count($inboxes); return $delivery_queue_count;
} }
} }