From 1696ad962ea4fdae16a6bff29a6a98ee523108e5 Mon Sep 17 00:00:00 2001 From: Hypolite Petovan Date: Fri, 7 Dec 2018 00:52:14 -0500 Subject: [PATCH] Add delivery queue number manipulation - Add new ItemDeliveryData model class - Add queue_count initialization in Notifier - Add queue_done incrementation in various Delivery tasks --- src/Model/Item.php | 93 ++++++++------------------- src/Model/ItemDeliveryData.php | 114 +++++++++++++++++++++++++++++++++ src/Worker/APDelivery.php | 5 ++ src/Worker/Delivery.php | 13 ++++ src/Worker/Notifier.php | 22 ++++++- 5 files changed, 179 insertions(+), 68 deletions(-) create mode 100644 src/Model/ItemDeliveryData.php diff --git a/src/Model/Item.php b/src/Model/Item.php index 09de6cfd1f..122e4c431d 100644 --- a/src/Model/Item.php +++ b/src/Model/Item.php @@ -50,18 +50,21 @@ class Item extends BaseObject const PT_PERSONAL_NOTE = 128; // Field list that is used to display the items - const DISPLAY_FIELDLIST = ['uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid', 'network', - 'commented', 'created', 'edited', 'received', 'verb', 'object-type', 'postopts', 'plink', - 'wall', 'private', 'starred', 'origin', 'title', 'body', 'file', 'attach', 'language', - 'content-warning', 'location', 'coord', 'app', 'rendered-hash', 'rendered-html', 'object', - 'allow_cid', 'allow_gid', 'deny_cid', 'deny_gid', 'item_id', - 'author-id', 'author-link', 'author-name', 'author-avatar', 'author-network', - 'owner-id', 'owner-link', 'owner-name', 'owner-avatar', 'owner-network', - 'contact-id', 'contact-link', 'contact-name', 'contact-avatar', - 'writable', 'self', 'cid', 'alias', - 'event-id', 'event-created', 'event-edited', 'event-start', 'event-finish', - 'event-summary', 'event-desc', 'event-location', 'event-type', - 'event-nofinish', 'event-adjust', 'event-ignore', 'event-id']; + const DISPLAY_FIELDLIST = [ + 'uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid', 'network', + 'commented', 'created', 'edited', 'received', 'verb', 'object-type', 'postopts', 'plink', + 'wall', 'private', 'starred', 'origin', 'title', 'body', 'file', 'attach', 'language', + 'content-warning', 'location', 'coord', 'app', 'rendered-hash', 'rendered-html', 'object', + 'allow_cid', 'allow_gid', 'deny_cid', 'deny_gid', 'item_id', + 'author-id', 'author-link', 'author-name', 'author-avatar', 'author-network', + 'owner-id', 'owner-link', 'owner-name', 'owner-avatar', 'owner-network', + 'contact-id', 'contact-link', 'contact-name', 'contact-avatar', + 'writable', 'self', 'cid', 'alias', + 'event-id', 'event-created', 'event-edited', 'event-start', 'event-finish', + 'event-summary', 'event-desc', 'event-location', 'event-type', + 'event-nofinish', 'event-adjust', 'event-ignore', 'event-id', + 'delivery_queue_count', 'delivery_queue_done' + ]; // Field list that is used to deliver items via the protocols const DELIVER_FIELDLIST = ['uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid', @@ -80,9 +83,6 @@ class Item extends BaseObject // Field list for "item-content" table that is not present in the "item" table const CONTENT_FIELDLIST = ['language']; - // Field list for additional delivery data - const DELIVERY_DATA_FIELDLIST = ['postopts', 'inform']; - // All fields in the item table const ITEM_FIELDLIST = ['id', 'uid', 'parent', 'uri', 'parent-uri', 'thr-parent', 'guid', 'contact-id', 'type', 'wall', 'gravity', 'extid', 'icid', 'iaid', 'psid', @@ -190,7 +190,7 @@ class Item extends BaseObject // Fetch data from the item-content table whenever there is content there if (self::isLegacyMode()) { - $legacy_fields = array_merge(self::DELIVERY_DATA_FIELDLIST, self::MIXED_CONTENT_FIELDLIST); + $legacy_fields = array_merge(ItemDeliveryData::FIELD_LIST, self::MIXED_CONTENT_FIELDLIST); foreach ($legacy_fields as $field) { if (empty($row[$field]) && !empty($row['internal-item-' . $field])) { $row[$field] = $row['internal-item-' . $field]; @@ -551,7 +551,7 @@ class Item extends BaseObject $fields['item-content'] = array_merge(self::CONTENT_FIELDLIST, self::MIXED_CONTENT_FIELDLIST); - $fields['item-delivery-data'] = self::DELIVERY_DATA_FIELDLIST; + $fields['item-delivery-data'] = ItemDeliveryData::FIELD_LIST; $fields['permissionset'] = ['allow_cid', 'allow_gid', 'deny_cid', 'deny_gid']; @@ -730,7 +730,7 @@ class Item extends BaseObject foreach ($fields as $table => $table_fields) { foreach ($table_fields as $field => $select) { if (empty($selected) || in_array($select, $selected)) { - $legacy_fields = array_merge(self::DELIVERY_DATA_FIELDLIST, self::MIXED_CONTENT_FIELDLIST); + $legacy_fields = array_merge(ItemDeliveryData::FIELD_LIST, self::MIXED_CONTENT_FIELDLIST); if (self::isLegacyMode() && in_array($select, $legacy_fields)) { $selection[] = "`item`.`".$select."` AS `internal-item-" . $select . "`"; } @@ -810,7 +810,9 @@ class Item extends BaseObject } } - $clear_fields = ['bookmark', 'type', 'author-name', 'author-avatar', 'author-link', 'owner-name', 'owner-avatar', 'owner-link']; + $delivery_data = ItemDeliveryData::extractFields($fields); + + $clear_fields = ['bookmark', 'type', 'author-name', 'author-avatar', 'author-link', 'owner-name', 'owner-avatar', 'owner-link', 'postopts', 'inform']; foreach ($clear_fields as $field) { if (array_key_exists($field, $fields)) { $fields[$field] = null; @@ -831,12 +833,6 @@ class Item extends BaseObject $files = null; } - $delivery_data = ['postopts' => defaults($fields, 'postopts', ''), - 'inform' => defaults($fields, 'inform', '')]; - - $fields['postopts'] = null; - $fields['inform'] = null; - if (!empty($fields)) { $success = DBA::update('item', $fields, $condition); @@ -914,7 +910,7 @@ class Item extends BaseObject } } - self::updateDeliveryData($item['id'], $delivery_data); + ItemDeliveryData::update($item['id'], $delivery_data); self::updateThread($item['id']); @@ -1061,7 +1057,7 @@ class Item extends BaseObject self::delete(['uri' => $item['uri'], 'uid' => 0, 'deleted' => false], $priority); } - DBA::delete('item-delivery-data', ['iid' => $item['id']]); + ItemDeliveryData::delete($item['id']); // We don't delete the item-activity here, since we need some of the data for ActivityPub @@ -1238,6 +1234,8 @@ class Item extends BaseObject public static function insert($item, $force_parent = false, $notify = false, $dontcache = false) { + $orig_item = $item; + // If it is a posting where users should get notifications, then define it as wall posting if ($notify) { $item['wall'] = 1; @@ -1661,8 +1659,7 @@ class Item extends BaseObject self::insertContent($item); } - $delivery_data = ['postopts' => defaults($item, 'postopts', ''), - 'inform' => defaults($item, 'inform', '')]; + $delivery_data = ItemDeliveryData::extractFields($item); unset($item['postopts']); unset($item['inform']); @@ -1701,10 +1698,7 @@ class Item extends BaseObject if ($spoolpath != "") { $spool = $spoolpath.'/'.$file; - // Ensure to have the removed data from above again in the item array - $item = array_merge($item, $delivery_data); - - file_put_contents($spool, json_encode($item)); + file_put_contents($spool, json_encode($orig_item)); Logger::log("Item wasn't stored - Item was spooled into file ".$file, Logger::DEBUG); } return 0; @@ -1805,9 +1799,7 @@ class Item extends BaseObject self::updateThread($parent_id); } - $delivery_data['iid'] = $current_post; - - self::insertDeliveryData($delivery_data); + ItemDeliveryData::insert($item['id'], $delivery_data); DBA::commit(); @@ -1848,35 +1840,6 @@ class Item extends BaseObject return $current_post; } - /** - * @brief Insert a new item delivery data entry - * - * @param array $item The item fields that are to be inserted - */ - private static function insertDeliveryData($delivery_data) - { - if (empty($delivery_data['iid']) || (empty($delivery_data['postopts']) && empty($delivery_data['inform']))) { - return; - } - - DBA::insert('item-delivery-data', $delivery_data); - } - - /** - * @brief Update an existing item delivery data entry - * - * @param integer $id The item id that is to be updated - * @param array $item The item fields that are to be inserted - */ - private static function updateDeliveryData($id, $delivery_data) - { - if (empty($id) || (empty($delivery_data['postopts']) && empty($delivery_data['inform']))) { - return; - } - - DBA::update('item-delivery-data', $delivery_data, ['iid' => $id], true); - } - /** * @brief Insert a new item content entry * diff --git a/src/Model/ItemDeliveryData.php b/src/Model/ItemDeliveryData.php new file mode 100644 index 0000000000..e3be073e8e --- /dev/null +++ b/src/Model/ItemDeliveryData.php @@ -0,0 +1,114 @@ + 'delivery_queue_count', + 'queue_done' => 'delivery_queue_done', + ]; + + /** + * Extract delivery data from the provided item fields + * + * @param array $fields + * @return array + */ + public static function extractFields(array &$fields) + { + $delivery_data = []; + foreach (ItemDeliveryData::FIELD_LIST as $key => $field) { + if (is_int($key) && isset($fields[$field])) { + // Legacy field moved from item table + $delivery_data[$field] = $fields[$field]; + $fields[$field] = null; + } elseif (isset($fields[$field])) { + // New delivery field with virtual field name in item fields + $delivery_data[$key] = $fields[$field]; + unset($fields[$field]); + } + } + + return $delivery_data; + } + + /** + * Increments the queue_done for the given item ID. + * + * Avoids racing condition between multiple delivery threads. + * + * @param integer $item_id + * @return bool + */ + public static function incrementQueueDone($item_id) + { + return DBA::e('UPDATE `item-delivery-data` SET `queue_done` = `queue_done` + 1 WHERE `iid` = ?', $item_id); + } + + /** + * Insert a new item delivery data entry + * + * @param integer $item_id + * @param array $fields + * @return bool + */ + public static function insert($item_id, array $fields) + { + if (empty($item_id)) { + throw new \BadMethodCallException('Empty item_id'); + } + + $fields['iid'] = $item_id; + + return DBA::insert('item-delivery-data', $fields); + } + + /** + * Update/Insert item delivery data + * + * If you want to update queue_done, please use incrementQueueDone instead. + * + * @param integer $item_id + * @param array $fields + * @return bool + */ + public static function update($item_id, array $fields) + { + if (empty($item_id)) { + throw new \BadMethodCallException('Empty item_id'); + } + + if (empty($fields)) { + // Nothing to do, update successful + return true; + } + + return DBA::update('item-delivery-data', $fields, ['iid' => $item_id], true); + } + + /** + * Delete item delivery data + * + * @param integer $item_id + * @return bool + */ + public static function delete($item_id) + { + if (empty($item_id)) { + throw new \BadMethodCallException('Empty item_id'); + } + + return DBA::delete('item-delivery-data', ['iid' => $item_id]); + } +} diff --git a/src/Worker/APDelivery.php b/src/Worker/APDelivery.php index 268e45a4de..2fabb4a4eb 100644 --- a/src/Worker/APDelivery.php +++ b/src/Worker/APDelivery.php @@ -7,6 +7,7 @@ namespace Friendica\Worker; use Friendica\BaseObject; use Friendica\Core\Logger; use Friendica\Core\Worker; +use Friendica\Model\ItemDeliveryData; use Friendica\Protocol\ActivityPub; use Friendica\Model\Item; use Friendica\Util\HTTPSignature; @@ -40,6 +41,10 @@ class APDelivery extends BaseObject if (!empty($data)) { $success = HTTPSignature::transmit($data, $inbox, $uid); } + + if ($success && in_array($cmd, [Delivery::POST, Delivery::COMMENT])) { + ItemDeliveryData::incrementQueueDone($target_id); + } } if (!$success) { diff --git a/src/Worker/Delivery.php b/src/Worker/Delivery.php index 65cdd79e91..191692a906 100644 --- a/src/Worker/Delivery.php +++ b/src/Worker/Delivery.php @@ -13,6 +13,7 @@ use Friendica\Core\System; use Friendica\Database\DBA; use Friendica\Model\Contact; use Friendica\Model\Item; +use Friendica\Model\ItemDeliveryData; use Friendica\Model\Queue; use Friendica\Model\User; use Friendica\Protocol\DFRN; @@ -180,10 +181,18 @@ class Delivery extends BaseObject case Protocol::DFRN: self::deliverDFRN($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); + + if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) { + ItemDeliveryData::incrementQueueDone($target_id); + } break; case Protocol::DIASPORA: self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); + + if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) { + ItemDeliveryData::incrementQueueDone($target_id); + } break; case Protocol::OSTATUS: @@ -201,6 +210,10 @@ class Delivery extends BaseObject case Protocol::MAIL: self::deliverMail($cmd, $contact, $owner, $target_item); + + if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) { + ItemDeliveryData::incrementQueueDone($target_id); + } break; default: diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 859f92d990..f6b1d23cf0 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -15,6 +15,7 @@ use Friendica\Model\Contact; use Friendica\Model\Conversation; use Friendica\Model\Group; use Friendica\Model\Item; +use Friendica\Model\ItemDeliveryData; use Friendica\Model\PushSubscriber; use Friendica\Model\User; use Friendica\Network\Probe; @@ -65,6 +66,7 @@ class Notifier $delivery_contacts_stmt = null; $target_item = []; $items = []; + $delivery_queue_count = 0; if ($cmd == Delivery::MAIL) { $message = DBA::selectFirst('mail', ['uid', 'contact-id'], ['id' => $target_id]); @@ -150,7 +152,7 @@ class Notifier if (!empty($target_item) && !empty($items)) { $parent = $items[0]; - self::activityPubDelivery($cmd, $target_item, $parent, $a->queue['priority'], $a->query_string['created']); + $delivery_queue_count += self::activityPubDelivery($cmd, $target_item, $parent, $a->queue['priority'], $a->query_string['created']); $fields = ['network', 'author-id', 'owner-id']; $condition = ['uri' => $target_item["thr-parent"], 'uid' => $target_item["uid"]]; @@ -423,6 +425,9 @@ class Notifier if (DBA::isResult($r)) { foreach ($r as $rr) { $conversants[] = $rr['id']; + + $delivery_queue_count++; + Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG); // Ensure that posts with our own protocol arrives before Diaspora posts arrive. @@ -454,6 +459,8 @@ class Notifier continue; } + $delivery_queue_count++; + Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG); // Ensure that posts with our own protocol arrives before Diaspora posts arrive. @@ -473,11 +480,13 @@ class Notifier // send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts // They are especially used for notifications to OStatus users that don't follow us. if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) { + $delivery_queue_count += count($url_recipients); $slap = OStatus::salmon($target_item, $owner); foreach ($url_recipients as $url) { Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url); /// @TODO Redeliver/queue these items on failure, though there is no contact record Salmon::slapper($owner, $url, $slap); + ItemDeliveryData::incrementQueueDone($target_id); } } @@ -492,6 +501,10 @@ class Notifier if (!empty($target_item)) { Logger::log('Calling hooks for ' . $cmd . ' ' . $target_id, Logger::DEBUG); + if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) { + ItemDeliveryData::update($target_item['id'], ['queue_count' => $delivery_queue_count]); + } + Hook::fork($a->queue['priority'], 'notifier_normal', $target_item); Hook::callAll('notifier_end', $target_item); @@ -539,6 +552,7 @@ class Notifier * @param array $parent * @param int $priority The priority the Notifier queue item was created with * @param string $created The date the Notifier queue item was created on + * @return int The number of delivery tasks created */ private static function activityPubDelivery($cmd, array $target_item, array $parent, $priority, $created) { @@ -549,7 +563,7 @@ class Notifier Logger::log('Origin item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' will be distributed.', Logger::DEBUG); } elseif (!DBA::exists('conversation', ['item-uri' => $target_item['uri'], 'protocol' => Conversation::PARCEL_ACTIVITYPUB])) { Logger::log('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' is no AP post. It will not be distributed.', Logger::DEBUG); - return; + return 0; } elseif ($parent['origin']) { // Remote items are transmitted via the personal inboxes. // Doing so ensures that the dedicated receiver will get the message. @@ -559,7 +573,7 @@ class Notifier if (empty($inboxes)) { Logger::log('No inboxes found for item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . '. It will not be distributed.', Logger::DEBUG); - return; + return 0; } // Fill the item cache @@ -571,6 +585,8 @@ class Notifier Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $target_item['contact-uid']); } + + return count($inboxes); } private static function isForumPost(array $item, array $owner)