Remove the queue from the core
This commit is contained in:
parent
bd13a73b2b
commit
46a99a05a1
12 changed files with 46 additions and 381 deletions
|
@ -65,8 +65,6 @@ HELP;
|
|||
throw new RuntimeException(L10n::t('Could not find any unarchived contact entry for this URL (%s)', $nurl));
|
||||
}
|
||||
if (DBA::update('contact', ['archive' => true], ['nurl' => $nurl])) {
|
||||
$condition = ["`cid` IN (SELECT `id` FROM `contact` WHERE `archive`)"];
|
||||
DBA::delete('queue', $condition);
|
||||
$this->out(L10n::t('The contact entries have been archived'));
|
||||
} else {
|
||||
throw new RuntimeException('The contact archival failed.');
|
||||
|
|
|
@ -1,126 +0,0 @@
|
|||
<?php
|
||||
/**
|
||||
* @file src/Model/Queue.php
|
||||
*/
|
||||
namespace Friendica\Model;
|
||||
|
||||
use Friendica\Core\Config;
|
||||
use Friendica\Core\Logger;
|
||||
use Friendica\Database\DBA;
|
||||
use Friendica\Util\DateTimeFormat;
|
||||
|
||||
class Queue
|
||||
{
|
||||
/**
|
||||
* @param string $id id
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function updateTime($id)
|
||||
{
|
||||
Logger::log('queue: requeue item ' . $id);
|
||||
$queue = DBA::selectFirst('queue', ['retrial'], ['id' => $id]);
|
||||
if (!DBA::isResult($queue)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$retrial = $queue['retrial'];
|
||||
|
||||
if ($retrial > 14) {
|
||||
self::removeItem($id);
|
||||
}
|
||||
|
||||
// Calculate the delay until the next trial
|
||||
$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
|
||||
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
|
||||
|
||||
DBA::update('queue', ['last' => DateTimeFormat::utcNow(), 'retrial' => $retrial + 1, 'next' => $next], ['id' => $id]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $id id
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function removeItem($id)
|
||||
{
|
||||
Logger::log('queue: remove queue item ' . $id);
|
||||
DBA::delete('queue', ['id' => $id]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Checks if the communication with a given contact had problems recently
|
||||
*
|
||||
* @param int $cid Contact id
|
||||
*
|
||||
* @return bool The communication with this contact has currently problems
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function wasDelayed($cid)
|
||||
{
|
||||
// Are there queue entries that were recently added?
|
||||
$r = q("SELECT `id` FROM `queue` WHERE `cid` = %d
|
||||
AND `last` > UTC_TIMESTAMP() - INTERVAL 15 MINUTE LIMIT 1",
|
||||
intval($cid)
|
||||
);
|
||||
|
||||
$was_delayed = DBA::isResult($r);
|
||||
|
||||
// We set "term-date" to a current date if the communication has problems.
|
||||
// If the communication works again we reset this value.
|
||||
if ($was_delayed) {
|
||||
$r = q("SELECT `term-date` FROM `contact` WHERE `id` = %d AND `term-date` <= '1000-01-01' LIMIT 1",
|
||||
intval($cid)
|
||||
);
|
||||
$was_delayed = !DBA::isResult($r);
|
||||
}
|
||||
|
||||
return $was_delayed;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $cid cid
|
||||
* @param string $network network
|
||||
* @param string $msg message
|
||||
* @param boolean $batch batch, default false
|
||||
* @param string $guid
|
||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
*/
|
||||
public static function add($cid, $network, $msg, $batch = false, $guid = '')
|
||||
{
|
||||
|
||||
$max_queue = Config::get('system', 'max_contact_queue');
|
||||
if ($max_queue < 1) {
|
||||
$max_queue = 500;
|
||||
}
|
||||
|
||||
$batch_queue = Config::get('system', 'max_batch_queue');
|
||||
if ($batch_queue < 1) {
|
||||
$batch_queue = 1000;
|
||||
}
|
||||
|
||||
$r = q("SELECT COUNT(*) AS `total` FROM `queue` INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id`
|
||||
WHERE `queue`.`cid` = %d AND `contact`.`self` = 0 ",
|
||||
intval($cid)
|
||||
);
|
||||
|
||||
if (DBA::isResult($r)) {
|
||||
if ($batch && ($r[0]['total'] > $batch_queue)) {
|
||||
Logger::log('too many queued items for batch server ' . $cid . ' - discarding message');
|
||||
return;
|
||||
} elseif ((! $batch) && ($r[0]['total'] > $max_queue)) {
|
||||
Logger::log('too many queued items for contact ' . $cid . ' - discarding message');
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
DBA::insert('queue', [
|
||||
'cid' => $cid,
|
||||
'network' => $network,
|
||||
'guid' => $guid,
|
||||
'created' => DateTimeFormat::utcNow(),
|
||||
'last' => DateTimeFormat::utcNow(),
|
||||
'content' => $msg,
|
||||
'batch' =>($batch) ? 1 : 0
|
||||
]);
|
||||
Logger::log('Added item ' . $guid . ' for ' . $cid);
|
||||
}
|
||||
}
|
|
@ -28,7 +28,6 @@ use Friendica\Model\GContact;
|
|||
use Friendica\Model\Group;
|
||||
use Friendica\Model\Item;
|
||||
use Friendica\Model\Profile;
|
||||
use Friendica\Model\Queue;
|
||||
use Friendica\Model\User;
|
||||
use Friendica\Network\Probe;
|
||||
use Friendica\Util\Crypto;
|
||||
|
@ -3170,15 +3169,14 @@ class Diaspora
|
|||
* @param array $contact Target of the communication
|
||||
* @param string $envelope The message that is to be transmitted
|
||||
* @param bool $public_batch Is it a public post?
|
||||
* @param bool $queue_run Is the transmission called from the queue?
|
||||
* @param string $guid message guid
|
||||
* @param bool $no_queue
|
||||
* @param bool $no_defer Don't defer a failing delivery
|
||||
*
|
||||
* @return int Result of the transmission
|
||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function transmit(array $owner, array $contact, $envelope, $public_batch, $queue_run = false, $guid = "", $no_queue = false)
|
||||
public static function transmit(array $owner, array $contact, $envelope, $public_batch, $guid = "", $no_defer = false)
|
||||
{
|
||||
$enabled = intval(Config::get("system", "diaspora_enabled"));
|
||||
if (!$enabled) {
|
||||
|
@ -3205,24 +3203,20 @@ class Diaspora
|
|||
|
||||
Logger::log("transmit: ".$logid."-".$guid." ".$dest_url);
|
||||
|
||||
if (!$queue_run && Queue::wasDelayed($contact["id"])) {
|
||||
$return_code = 0;
|
||||
} else {
|
||||
if (!intval(Config::get("system", "diaspora_test"))) {
|
||||
$content_type = (($public_batch) ? "application/magic-envelope+xml" : "application/json");
|
||||
if (!intval(Config::get("system", "diaspora_test"))) {
|
||||
$content_type = (($public_batch) ? "application/magic-envelope+xml" : "application/json");
|
||||
|
||||
$postResult = Network::post($dest_url."/", $envelope, ["Content-Type: ".$content_type]);
|
||||
$return_code = $postResult->getReturnCode();
|
||||
} else {
|
||||
Logger::log("test_mode");
|
||||
return 200;
|
||||
}
|
||||
$postResult = Network::post($dest_url."/", $envelope, ["Content-Type: ".$content_type]);
|
||||
$return_code = $postResult->getReturnCode();
|
||||
} else {
|
||||
Logger::log("test_mode");
|
||||
return 200;
|
||||
}
|
||||
|
||||
Logger::log("transmit: ".$logid."-".$guid." to ".$dest_url." returns: ".$return_code);
|
||||
|
||||
if (!$return_code || (($return_code == 503) && (stristr($postResult->getHeader(), "retry-after")))) {
|
||||
if (!$no_queue && !empty($contact['contact-type']) && ($contact['contact-type'] != Contact::TYPE_RELAY)) {
|
||||
if (!$no_defer && !empty($contact['contact-type']) && ($contact['contact-type'] != Contact::TYPE_RELAY)) {
|
||||
Logger::info('defer message', ['log' => $logid, 'guid' => $guid, 'destination' => $dest_url]);
|
||||
// defer message for redelivery
|
||||
Worker::defer();
|
||||
|
@ -3263,13 +3257,13 @@ class Diaspora
|
|||
* @param array $message The message data
|
||||
* @param bool $public_batch Is it a public post?
|
||||
* @param string $guid message guid
|
||||
* @param bool $no_queue
|
||||
* @param bool $no_defer Don't defer a failing delivery
|
||||
*
|
||||
* @return int Result of the transmission
|
||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
private static function buildAndTransmit(array $owner, array $contact, $type, $message, $public_batch = false, $guid = "", $no_queue = false)
|
||||
private static function buildAndTransmit(array $owner, array $contact, $type, $message, $public_batch = false, $guid = "", $no_defer = false)
|
||||
{
|
||||
$msg = self::buildPostXml($type, $message);
|
||||
|
||||
|
@ -3283,7 +3277,7 @@ class Diaspora
|
|||
|
||||
$envelope = self::buildMessage($msg, $owner, $contact, $owner['uprvkey'], $contact['pubkey'], $public_batch);
|
||||
|
||||
$return_code = self::transmit($owner, $contact, $envelope, $public_batch, false, $guid, $no_queue);
|
||||
$return_code = self::transmit($owner, $contact, $envelope, $public_batch, $guid, $no_defer);
|
||||
|
||||
Logger::log("guid: ".$guid." result ".$return_code, Logger::DEBUG);
|
||||
|
||||
|
|
|
@ -47,9 +47,6 @@ class Cron
|
|||
// Fork the cron jobs in separate parts to avoid problems when one of them is crashing
|
||||
Hook::fork($a->queue['priority'], "cron");
|
||||
|
||||
// run queue delivery process in the background
|
||||
Worker::add(PRIORITY_NEGLIGIBLE, "Queue");
|
||||
|
||||
// run the process to discover global contacts in the background
|
||||
Worker::add(PRIORITY_LOW, "DiscoverPoCo");
|
||||
|
||||
|
|
|
@ -1,164 +0,0 @@
|
|||
<?php
|
||||
/**
|
||||
* @file src/Worker/Queue.php
|
||||
*/
|
||||
namespace Friendica\Worker;
|
||||
|
||||
use Friendica\Core\Cache;
|
||||
use Friendica\Core\Config;
|
||||
use Friendica\Core\Hook;
|
||||
use Friendica\Core\Logger;
|
||||
use Friendica\Core\Protocol;
|
||||
use Friendica\Core\Worker;
|
||||
use Friendica\Database\DBA;
|
||||
use Friendica\Model\Contact;
|
||||
use Friendica\Model\PushSubscriber;
|
||||
use Friendica\Model\Queue as QueueModel;
|
||||
use Friendica\Model\User;
|
||||
use Friendica\Protocol\DFRN;
|
||||
use Friendica\Protocol\Diaspora;
|
||||
use Friendica\Protocol\PortableContact;
|
||||
use Friendica\Protocol\Salmon;
|
||||
|
||||
class Queue
|
||||
{
|
||||
public static function execute($queue_id = 0)
|
||||
{
|
||||
$cachekey_deadguy = 'queue_run:deadguy:';
|
||||
$cachekey_server = 'queue_run:server:';
|
||||
|
||||
$no_dead_check = Config::get('system', 'queue_no_dead_check', false);
|
||||
|
||||
if (!$queue_id) {
|
||||
Logger::log('filling queue jobs - start');
|
||||
|
||||
// Handling the pubsubhubbub requests
|
||||
PushSubscriber::requeue();
|
||||
|
||||
$r = DBA::toArray(DBA::p("SELECT `id` FROM `queue` WHERE `next` < UTC_TIMESTAMP() ORDER BY `batch`, `cid`"));
|
||||
|
||||
Hook::callAll('queue_predeliver', $r);
|
||||
|
||||
if (DBA::isResult($r)) {
|
||||
foreach ($r as $q_item) {
|
||||
Logger::log('Call queue for id ' . $q_item['id']);
|
||||
Worker::add(['priority' => PRIORITY_LOW, 'dont_fork' => true], "Queue", (int) $q_item['id']);
|
||||
}
|
||||
}
|
||||
Logger::log('filling queue jobs - end');
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// delivering
|
||||
$q_item = DBA::selectFirst('queue', [], ['id' => $queue_id]);
|
||||
if (!DBA::isResult($q_item)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$contact = DBA::selectFirst('contact', [], ['id' => $q_item['cid']]);
|
||||
if (!DBA::isResult($contact)) {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
return;
|
||||
}
|
||||
|
||||
if (empty($contact['notify']) || $contact['archive']) {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
return;
|
||||
}
|
||||
|
||||
$dead = Cache::get($cachekey_deadguy . $contact['notify']);
|
||||
|
||||
if (!is_null($dead) && $dead && !$no_dead_check) {
|
||||
Logger::log('queue: skipping known dead url: ' . $contact['notify']);
|
||||
QueueModel::updateTime($q_item['id']);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$no_dead_check) {
|
||||
$server = PortableContact::detectServer($contact['url']);
|
||||
|
||||
if ($server != "") {
|
||||
$vital = Cache::get($cachekey_server . $server);
|
||||
|
||||
if (is_null($vital)) {
|
||||
Logger::log("Check server " . $server . " (" . $contact["network"] . ")");
|
||||
|
||||
$vital = PortableContact::checkServer($server, $contact["network"], true);
|
||||
Cache::set($cachekey_server . $server, $vital, Cache::MINUTE);
|
||||
}
|
||||
|
||||
if (!is_null($vital) && !$vital) {
|
||||
Logger::log('queue: skipping dead server: ' . $server);
|
||||
QueueModel::updateTime($q_item['id']);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$user = DBA::selectFirst('user', [], ['uid' => $contact['uid']]);
|
||||
if (!DBA::isResult($user)) {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
return;
|
||||
}
|
||||
|
||||
$data = $q_item['content'];
|
||||
$public = $q_item['batch'];
|
||||
$owner = User::getOwnerDataById($user['uid']);
|
||||
|
||||
$deliver_status = 0;
|
||||
|
||||
switch ($contact['network']) {
|
||||
case Protocol::DFRN:
|
||||
Logger::log('queue: dfrndelivery: item ' . $q_item['id'] . ' for ' . $contact['name'] . ' <' . $contact['url'] . '>');
|
||||
$deliver_status = DFRN::deliver($owner, $contact, $data);
|
||||
|
||||
if (($deliver_status >= 200) && ($deliver_status <= 299)) {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
} else {
|
||||
QueueModel::updateTime($q_item['id']);
|
||||
Cache::set($cachekey_deadguy . $contact['notify'], true, Cache::MINUTE);
|
||||
}
|
||||
break;
|
||||
|
||||
case Protocol::OSTATUS:
|
||||
Logger::log('queue: slapdelivery: item ' . $q_item['id'] . ' for ' . $contact['name'] . ' <' . $contact['url'] . '>');
|
||||
$deliver_status = Salmon::slapper($owner, $contact['notify'], $data);
|
||||
|
||||
if ($deliver_status == -1) {
|
||||
QueueModel::updateTime($q_item['id']);
|
||||
Cache::set($cachekey_deadguy . $contact['notify'], true, Cache::MINUTE);
|
||||
} else {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
}
|
||||
break;
|
||||
|
||||
case Protocol::DIASPORA:
|
||||
Logger::log('queue: diaspora_delivery: item ' . $q_item['id'] . ' for ' . $contact['name'] . ' <' . $contact['url'] . '>');
|
||||
$deliver_status = Diaspora::transmit($owner, $contact, $data, $public, true, 'Queue:' . $q_item['id'], true);
|
||||
|
||||
if ((($deliver_status >= 200) && ($deliver_status <= 299)) ||
|
||||
($contact['contact-type'] == Contact::TYPE_RELAY)) {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
} else {
|
||||
QueueModel::updateTime($q_item['id']);
|
||||
Cache::set($cachekey_deadguy . $contact['notify'], true, Cache::MINUTE);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
$params = ['owner' => $owner, 'contact' => $contact, 'queue' => $q_item, 'result' => false];
|
||||
Hook::callAll('queue_deliver', $params);
|
||||
|
||||
if ($params['result']) {
|
||||
QueueModel::removeItem($q_item['id']);
|
||||
} else {
|
||||
QueueModel::updateTime($q_item['id']);
|
||||
}
|
||||
break;
|
||||
}
|
||||
Logger::log('Deliver status ' . (int)$deliver_status . ' for item ' . $q_item['id'] . ' to ' . $contact['name'] . ' <' . $contact['url'] . '>');
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue