From 5a1e1c1ec9a59412eab33234f4b88b5be4f1baae Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 17 May 2018 22:17:03 +0000 Subject: [PATCH 1/5] Fix to OStatus delivery to be not so blocking to other tasks --- boot.php | 2 +- src/Database/DBStructure.php | 6 ++- src/Worker/Notifier.php | 4 +- src/Worker/PubSubPublish.php | 77 ++++++++++++++++++++---------------- src/Worker/Queue.php | 2 +- 5 files changed, 52 insertions(+), 39 deletions(-) diff --git a/boot.php b/boot.php index bbc4e9d298..ad162705be 100644 --- a/boot.php +++ b/boot.php @@ -41,7 +41,7 @@ define('FRIENDICA_PLATFORM', 'Friendica'); define('FRIENDICA_CODENAME', 'The Tazmans Flax-lily'); define('FRIENDICA_VERSION', '2018.05-rc'); define('DFRN_PROTOCOL_VERSION', '2.23'); -define('DB_UPDATE_VERSION', 1262); +define('DB_UPDATE_VERSION', 1263); define('NEW_UPDATE_ROUTINE_VERSION', 1170); /** diff --git a/src/Database/DBStructure.php b/src/Database/DBStructure.php index d13ba19df5..c94690a4c8 100644 --- a/src/Database/DBStructure.php +++ b/src/Database/DBStructure.php @@ -1553,12 +1553,14 @@ class DBStructure "callback_url" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], "topic" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], "nickname" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], - "push" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => ""], - "last_update" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => ""], + "push" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => "Retrial counter"], + "last_update" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Date of last successful trial"], + "next_try" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Next retrial date"], "secret" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], ], "indexes" => [ "PRIMARY" => ["id"], + "next_try" => ["next_try"], ] ]; $database["queue"] = [ diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index e22e8a1cd1..f8b37c2f90 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -501,12 +501,12 @@ class Notifier { // Set push flag for PuSH subscribers to this topic, // they will be notified in queue.php $condition = ['push' => false, 'nickname' => $owner['nickname']]; - dba::update('push_subscriber', ['push' => true], $condition); + dba::update('push_subscriber', ['push' => true, 'next_try' => NULL_DATE], $condition); logger('Activating internal PuSH for item '.$item_id, LOGGER_DEBUG); // Handling the pubsubhubbub requests - Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true], + Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], 'PubSubPublish'); } diff --git a/src/Worker/PubSubPublish.php b/src/Worker/PubSubPublish.php index bd4aa0390b..1b2dbbef9f 100644 --- a/src/Worker/PubSubPublish.php +++ b/src/Worker/PubSubPublish.php @@ -12,6 +12,7 @@ use Friendica\Core\Worker; use Friendica\Database\DBM; use Friendica\Protocol\OStatus; use Friendica\Util\Network; +use Friendica\Util\DateTimeFormat; use dba; require_once 'include/items.php'; @@ -21,76 +22,86 @@ class PubSubPublish { { global $a; - if ($pubsubpublish_id == 0) { - // We'll push to each subscriber that has push > 0, - // i.e. there has been an update (set in notifier.php). - $r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0 ORDER BY `last_update` DESC"); - - foreach ($r as $rr) { - logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); - Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true], - 'PubSubPublish', (int)$rr["id"]); - } + if ($pubsubpublish_id != 0) { + self::publish($pubsubpublish_id); + return; } - self::publish($pubsubpublish_id); + // We'll push to each subscriber that has push > 0, + // i.e. there has been an update (set in notifier.php). + $subscribers = dba::select('push_subscriber', ['id', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]); - return; + while ($subscriber = dba::fetch($subscribers)) { + logger("Publish feed to " . $subscriber["callback_url"], LOGGER_DEBUG); + Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], + 'PubSubPublish', (int)$subscriber["id"]); + } + + dba::close($subscribers); } private static function publish($id) { global $a; - $r = q("SELECT * FROM `push_subscriber` WHERE `id` = %d", intval($id)); - if (!DBM::is_result($r)) { + $subscriber = dba::selectFirst('push_subscriber', [], ['id' => $id]); + if (!DBM::is_result($subscriber)) { return; } - $rr = $r[0]; - /// @todo Check server status with PortableContact::checkServer() // Before this can be done we need a way to safely detect the server url. - logger("Generate feed of user ".$rr['nickname']." to ".$rr['callback_url']." - last updated ".$rr['last_update'], LOGGER_DEBUG); + logger("Generate feed of user " . $subscriber['nickname']. " to " . $subscriber['callback_url']. " - last updated " . $subscriber['last_update'], LOGGER_DEBUG); - $last_update = $rr['last_update']; - $params = OStatus::feed($rr['nickname'], $last_update); + $last_update = $subscriber['last_update']; + $params = OStatus::feed($subscriber['nickname'], $last_update); if (!$params) { return; } - $hmac_sig = hash_hmac("sha1", $params, $rr['secret']); + $hmac_sig = hash_hmac("sha1", $params, $subscriber['secret']); $headers = ["Content-type: application/atom+xml", sprintf("Link: <%s>;rel=hub,<%s>;rel=self", - System::baseUrl().'/pubsubhubbub/'.$rr['nickname'], - $rr['topic']), - "X-Hub-Signature: sha1=".$hmac_sig]; + System::baseUrl() . '/pubsubhubbub/' . $subscriber['nickname'], + $subscriber['topic']), + "X-Hub-Signature: sha1=" . $hmac_sig]; - logger('POST '.print_r($headers, true)."\n".$params, LOGGER_DATA); + logger('POST ' . print_r($headers, true) . "\n" . $params, LOGGER_DATA); - Network::post($rr['callback_url'], $params, $headers); + Network::post($subscriber['callback_url'], $params, $headers); $ret = $a->get_curl_code(); + $condition = ['id' => $subscriber['id']]; + if ($ret >= 200 && $ret <= 299) { - logger('successfully pushed to '.$rr['callback_url']); + logger('Successfully pushed to ' . $subscriber['callback_url']); // set last_update to the "created" date of the last item, and reset push=0 - $fields = ['push' => 0, 'last_update' => $last_update]; - dba::update('push_subscriber', $fields, ['id' => $rr['id']]); + $fields = ['push' => 0, 'next_try' => NULL_DATE, 'last_update' => $last_update]; + dba::update('push_subscriber', $fields, $condition); } else { - logger('error when pushing to '.$rr['callback_url'].' HTTP: '.$ret); + logger('Delivery error when pushing to ' . $subscriber['callback_url'] . ' HTTP: ' . $ret); // we use the push variable also as a counter, if we failed we // increment this until some upper limit where we give up - $new_push = intval($rr['push']) + 1; + $retrial = $subscriber['push']; - if ($new_push > 30) // OK, let's give up - $new_push = 0; + if ($retrial > 14) { + dba::update('push_subscriber', ['push' => 0, 'next_try' => NULL_DATE], $condition); + logger('Delivery error: Giving up for ' . $subscriber['callback_url'], LOGGER_DEBUG); + } else { + // Calculate the delay until the next trial + $delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1)); + $next = DateTimeFormat::utc('now + ' . $delay . ' seconds'); - dba::update('push_subscriber', ['push' => $new_push], ['id' => $rr['id']]); + $retrial = $retrial + 1; + + dba::update('push_subscriber', ['push' => $retrial, 'next_try' => $next], $condition); + logger('Delivery error: Next try (' . $retrial . ') for ' . $subscriber['callback_url'] . ' at ' . $next, LOGGER_DEBUG); + } } } } diff --git a/src/Worker/Queue.php b/src/Worker/Queue.php index 345da39abb..20ade6dfae 100644 --- a/src/Worker/Queue.php +++ b/src/Worker/Queue.php @@ -34,7 +34,7 @@ class Queue logger('filling queue jobs - start'); // Handling the pubsubhubbub requests - Worker::add(['priority' => PRIORITY_HIGH, 'dont_fork' => true], 'PubSubPublish'); + Worker::add(['priority' => PRIORITY_LOW, 'dont_fork' => true], 'PubSubPublish'); $r = dba::inArray(dba::p("SELECT `id` FROM `queue` WHERE `next` < UTC_TIMESTAMP() ORDER BY `batch`, `cid`")); From 61824119e41c514951106dd525714e264d1435de Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 17 May 2018 23:30:49 +0000 Subject: [PATCH 2/5] Moved functionality in new model class --- src/Model/PushSubscriber.php | 30 ++++++++++++++++++++++++++++++ src/Worker/Notifier.php | 4 ++-- src/Worker/PubSubPublish.php | 17 ++--------------- src/Worker/Queue.php | 3 ++- 4 files changed, 36 insertions(+), 18 deletions(-) create mode 100644 src/Model/PushSubscriber.php diff --git a/src/Model/PushSubscriber.php b/src/Model/PushSubscriber.php new file mode 100644 index 0000000000..2794a0b612 --- /dev/null +++ b/src/Model/PushSubscriber.php @@ -0,0 +1,30 @@ + 0, + // i.e. there has been an update (set in notifier.php). + $subscribers = dba::select('push_subscriber', ['id', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]); + + while ($subscriber = dba::fetch($subscribers)) { + logger("Publish feed to " . $subscriber["callback_url"], LOGGER_DEBUG); + Worker::add($priority, 'PubSubPublish', (int)$subscriber["id"]); + } + + dba::close($subscribers); + } +} diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index f8b37c2f90..f7b7aeae14 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -11,6 +11,7 @@ use Friendica\Database\DBM; use Friendica\Model\Contact; use Friendica\Model\Group; use Friendica\Model\User; +use Friendica\Model\PushSubscriber; use Friendica\Network\Probe; use Friendica\Protocol\Diaspora; use Friendica\Protocol\OStatus; @@ -506,8 +507,7 @@ class Notifier { logger('Activating internal PuSH for item '.$item_id, LOGGER_DEBUG); // Handling the pubsubhubbub requests - Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], - 'PubSubPublish'); + PushSubscriber::publishFeed($a->queue['priority']); } logger('notifier: calling hooks for ' . $cmd . ' ' . $item_id, LOGGER_DEBUG); diff --git a/src/Worker/PubSubPublish.php b/src/Worker/PubSubPublish.php index 1b2dbbef9f..27b6f3d9a1 100644 --- a/src/Worker/PubSubPublish.php +++ b/src/Worker/PubSubPublish.php @@ -20,24 +20,11 @@ require_once 'include/items.php'; class PubSubPublish { public static function execute($pubsubpublish_id = 0) { - global $a; - - if ($pubsubpublish_id != 0) { - self::publish($pubsubpublish_id); + if ($pubsubpublish_id == 0) { return; } - // We'll push to each subscriber that has push > 0, - // i.e. there has been an update (set in notifier.php). - $subscribers = dba::select('push_subscriber', ['id', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]); - - while ($subscriber = dba::fetch($subscribers)) { - logger("Publish feed to " . $subscriber["callback_url"], LOGGER_DEBUG); - Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], - 'PubSubPublish', (int)$subscriber["id"]); - } - - dba::close($subscribers); + self::publish($pubsubpublish_id); } private static function publish($id) { diff --git a/src/Worker/Queue.php b/src/Worker/Queue.php index 20ade6dfae..50a9a5c91a 100644 --- a/src/Worker/Queue.php +++ b/src/Worker/Queue.php @@ -10,6 +10,7 @@ use Friendica\Core\Config; use Friendica\Core\Worker; use Friendica\Database\DBM; use Friendica\Model\Queue as QueueModel; +use Friendica\Model\PushSubscriber; use Friendica\Protocol\DFRN; use Friendica\Protocol\Diaspora; use Friendica\Protocol\PortableContact; @@ -34,7 +35,7 @@ class Queue logger('filling queue jobs - start'); // Handling the pubsubhubbub requests - Worker::add(['priority' => PRIORITY_LOW, 'dont_fork' => true], 'PubSubPublish'); + PushSubscriber::publishFeed(PRIORITY_LOW); $r = dba::inArray(dba::p("SELECT `id` FROM `queue` WHERE `next` < UTC_TIMESTAMP() ORDER BY `batch`, `cid`")); From 6a8c85fac4a53a9fb59b4f764faea25175c78053 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 17 May 2018 23:35:24 +0000 Subject: [PATCH 3/5] Changed database structure --- database.sql | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/database.sql b/database.sql index aa87247db3..7169847ba3 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ --- Friendica 2018-05-dev (The Tazmans Flax-lily) --- DB_UPDATE_VERSION 1259 +-- Friendica 2018.05-rc (The Tazmans Flax-lily) +-- DB_UPDATE_VERSION 1263 -- ------------------------------------------ @@ -855,10 +855,12 @@ CREATE TABLE IF NOT EXISTS `push_subscriber` ( `callback_url` varchar(255) NOT NULL DEFAULT '' COMMENT '', `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '', `nickname` varchar(255) NOT NULL DEFAULT '' COMMENT '', - `push` tinyint unsigned NOT NULL DEFAULT 0 COMMENT '', - `last_update` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT '', + `push` tinyint unsigned NOT NULL DEFAULT 0 COMMENT 'Retrial counter', + `last_update` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Date of last successful trial', + `next_try` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Next retrial date', `secret` varchar(255) NOT NULL DEFAULT '' COMMENT '', - PRIMARY KEY(`id`) + PRIMARY KEY(`id`), + INDEX `next_try` (`next_try`) ) DEFAULT COLLATE utf8mb4_general_ci; -- @@ -1088,7 +1090,7 @@ CREATE TABLE IF NOT EXISTS `workerqueue` ( INDEX `pid` (`pid`), INDEX `parameter` (`parameter`(64)), INDEX `priority_created` (`priority`,`created`), - INDEX `executed` (`executed`) + INDEX `done_executed` (`done`,`executed`) ) DEFAULT COLLATE utf8mb4_general_ci; From f4a71b76dd62a19aa8ee1c413932f48750e3794c Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 17 May 2018 23:43:44 +0000 Subject: [PATCH 4/5] Dynamic priority handling --- src/Model/PushSubscriber.php | 8 ++++++-- src/Worker/Queue.php | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Model/PushSubscriber.php b/src/Model/PushSubscriber.php index 2794a0b612..e982c14442 100644 --- a/src/Model/PushSubscriber.php +++ b/src/Model/PushSubscriber.php @@ -18,10 +18,14 @@ class PushSubscriber { // We'll push to each subscriber that has push > 0, // i.e. there has been an update (set in notifier.php). - $subscribers = dba::select('push_subscriber', ['id', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]); + $subscribers = dba::select('push_subscriber', ['id', 'push', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]); while ($subscriber = dba::fetch($subscribers)) { - logger("Publish feed to " . $subscriber["callback_url"], LOGGER_DEBUG); + // We always handle retries with low priority + if ($subscriber["push"] > 1) { + $priority = PRIORITY_LOW; + } + logger("Publish feed to " . $subscriber["callback_url"] . " with priority " . $priority, LOGGER_DEBUG); Worker::add($priority, 'PubSubPublish', (int)$subscriber["id"]); } diff --git a/src/Worker/Queue.php b/src/Worker/Queue.php index 50a9a5c91a..256227fb62 100644 --- a/src/Worker/Queue.php +++ b/src/Worker/Queue.php @@ -35,7 +35,7 @@ class Queue logger('filling queue jobs - start'); // Handling the pubsubhubbub requests - PushSubscriber::publishFeed(PRIORITY_LOW); + PushSubscriber::publishFeed(); $r = dba::inArray(dba::p("SELECT `id` FROM `queue` WHERE `next` < UTC_TIMESTAMP() ORDER BY `batch`, `cid`")); From 435501449213958b41d3dbeeacafb4f534aef374 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 17 May 2018 23:47:15 +0000 Subject: [PATCH 5/5] Now it should work ... --- src/Model/PushSubscriber.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Model/PushSubscriber.php b/src/Model/PushSubscriber.php index e982c14442..76a7e06312 100644 --- a/src/Model/PushSubscriber.php +++ b/src/Model/PushSubscriber.php @@ -14,7 +14,7 @@ class PushSubscriber /** * @param string $priority Priority for push workers */ - public static function publishFeed($priority = PRIORITY_HIGH) + public static function publishFeed($default_priority = PRIORITY_HIGH) { // We'll push to each subscriber that has push > 0, // i.e. there has been an update (set in notifier.php). @@ -24,7 +24,10 @@ class PushSubscriber // We always handle retries with low priority if ($subscriber["push"] > 1) { $priority = PRIORITY_LOW; + } else { + $priority = $default_priority; } + logger("Publish feed to " . $subscriber["callback_url"] . " with priority " . $priority, LOGGER_DEBUG); Worker::add($priority, 'PubSubPublish', (int)$subscriber["id"]); }