Merge pull request #4254 from zeroadam/Queue-#3878
Move queue_fn to src
This commit is contained in:
commit
b505f0d4d2
7 changed files with 121 additions and 109 deletions
|
@ -1,84 +0,0 @@
|
||||||
<?php
|
|
||||||
|
|
||||||
use Friendica\Core\Config;
|
|
||||||
use Friendica\Database\DBM;
|
|
||||||
|
|
||||||
function update_queue_time($id) {
|
|
||||||
logger('queue: requeue item ' . $id);
|
|
||||||
q("UPDATE `queue` SET `last` = '%s' WHERE `id` = %d",
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
intval($id)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
function remove_queue_item($id) {
|
|
||||||
logger('queue: remove queue item ' . $id);
|
|
||||||
dba::delete('queue', ['id' => $id]);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Checks if the communication with a given contact had problems recently
|
|
||||||
*
|
|
||||||
* @param int $cid Contact id
|
|
||||||
*
|
|
||||||
* @return bool The communication with this contact has currently problems
|
|
||||||
*/
|
|
||||||
function was_recently_delayed($cid) {
|
|
||||||
// Are there queue entries that were recently added?
|
|
||||||
$r = q("SELECT `id` FROM `queue` WHERE `cid` = %d
|
|
||||||
AND `last` > UTC_TIMESTAMP() - INTERVAL 15 MINUTE LIMIT 1",
|
|
||||||
intval($cid)
|
|
||||||
);
|
|
||||||
|
|
||||||
$was_delayed = DBM::is_result($r);
|
|
||||||
|
|
||||||
// We set "term-date" to a current date if the communication has problems.
|
|
||||||
// If the communication works again we reset this value.
|
|
||||||
if ($was_delayed) {
|
|
||||||
$r = q("SELECT `term-date` FROM `contact` WHERE `id` = %d AND `term-date` <= '1000-01-01' LIMIT 1",
|
|
||||||
intval($cid)
|
|
||||||
);
|
|
||||||
$was_delayed = !DBM::is_result($r);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $was_delayed;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
function add_to_queue($cid,$network,$msg,$batch = false) {
|
|
||||||
|
|
||||||
$max_queue = Config::get('system','max_contact_queue');
|
|
||||||
if ($max_queue < 1) {
|
|
||||||
$max_queue = 500;
|
|
||||||
}
|
|
||||||
|
|
||||||
$batch_queue = Config::get('system','max_batch_queue');
|
|
||||||
if ($batch_queue < 1) {
|
|
||||||
$batch_queue = 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
$r = q("SELECT COUNT(*) AS `total` FROM `queue` INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id`
|
|
||||||
WHERE `queue`.`cid` = %d AND `contact`.`self` = 0 ",
|
|
||||||
intval($cid)
|
|
||||||
);
|
|
||||||
if (DBM::is_result($r)) {
|
|
||||||
if ($batch && ($r[0]['total'] > $batch_queue)) {
|
|
||||||
logger('add_to_queue: too many queued items for batch server ' . $cid . ' - discarding message');
|
|
||||||
return;
|
|
||||||
} elseif ((! $batch) && ($r[0]['total'] > $max_queue)) {
|
|
||||||
logger('add_to_queue: too many queued items for contact ' . $cid . ' - discarding message');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
q("INSERT INTO `queue` ( `cid`, `network`, `created`, `last`, `content`, `batch`)
|
|
||||||
VALUES ( %d, '%s', '%s', '%s', '%s', %d) ",
|
|
||||||
intval($cid),
|
|
||||||
dbesc($network),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($msg),
|
|
||||||
intval(($batch) ? 1: 0)
|
|
||||||
);
|
|
||||||
|
|
||||||
}
|
|
99
src/Model/Queue.php
Normal file
99
src/Model/Queue.php
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* @file src/Model/Queue.php
|
||||||
|
*/
|
||||||
|
namespace Friendica\Model;
|
||||||
|
|
||||||
|
use Friendica\Core\Config;
|
||||||
|
use Friendica\Database\DBM;
|
||||||
|
use dba;
|
||||||
|
|
||||||
|
require_once 'include/dba.php';
|
||||||
|
require_once 'include/datetime.php';
|
||||||
|
|
||||||
|
class Queue
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @param string $id id
|
||||||
|
*/
|
||||||
|
public static function updateTime($id)
|
||||||
|
{
|
||||||
|
logger('queue: requeue item ' . $id);
|
||||||
|
dba::update('queue', ['last' => datetime_convert()], ['id' => $id]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $id id
|
||||||
|
*/
|
||||||
|
public static function removeItem($id)
|
||||||
|
{
|
||||||
|
logger('queue: remove queue item ' . $id);
|
||||||
|
dba::delete('queue', ['id' => $id]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Checks if the communication with a given contact had problems recently
|
||||||
|
*
|
||||||
|
* @param int $cid Contact id
|
||||||
|
*
|
||||||
|
* @return bool The communication with this contact has currently problems
|
||||||
|
*/
|
||||||
|
public static function wasDelayed($cid)
|
||||||
|
{
|
||||||
|
// Are there queue entries that were recently added?
|
||||||
|
$r = q("SELECT `id` FROM `queue` WHERE `cid` = %d
|
||||||
|
AND `last` > UTC_TIMESTAMP() - INTERVAL 15 MINUTE LIMIT 1",
|
||||||
|
intval($cid)
|
||||||
|
);
|
||||||
|
|
||||||
|
$was_delayed = DBM::is_result($r);
|
||||||
|
|
||||||
|
// We set "term-date" to a current date if the communication has problems.
|
||||||
|
// If the communication works again we reset this value.
|
||||||
|
if ($was_delayed) {
|
||||||
|
$r = q("SELECT `term-date` FROM `contact` WHERE `id` = %d AND `term-date` <= '1000-01-01' LIMIT 1",
|
||||||
|
intval($cid)
|
||||||
|
);
|
||||||
|
$was_delayed = !DBM::is_result($r);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $was_delayed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $cid cid
|
||||||
|
* @param string $network network
|
||||||
|
* @param string $msg message
|
||||||
|
* @param boolean $batch batch, default false
|
||||||
|
*/
|
||||||
|
public static function add($cid, $network, $msg, $batch = false)
|
||||||
|
{
|
||||||
|
|
||||||
|
$max_queue = Config::get('system', 'max_contact_queue');
|
||||||
|
if ($max_queue < 1) {
|
||||||
|
$max_queue = 500;
|
||||||
|
}
|
||||||
|
|
||||||
|
$batch_queue = Config::get('system', 'max_batch_queue');
|
||||||
|
if ($batch_queue < 1) {
|
||||||
|
$batch_queue = 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
$r = q("SELECT COUNT(*) AS `total` FROM `queue` INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id`
|
||||||
|
WHERE `queue`.`cid` = %d AND `contact`.`self` = 0 ",
|
||||||
|
intval($cid)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (DBM::is_result($r)) {
|
||||||
|
if ($batch && ($r[0]['total'] > $batch_queue)) {
|
||||||
|
logger('add_to_queue: too many queued items for batch server ' . $cid . ' - discarding message');
|
||||||
|
return;
|
||||||
|
} elseif ((! $batch) && ($r[0]['total'] > $max_queue)) {
|
||||||
|
logger('add_to_queue: too many queued items for contact ' . $cid . ' - discarding message');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dba::insert('queue', ['cid' => $cid, 'network' => $network, 'created' => datetime_convert(), 'last' => datetime_convert(), 'content' => $msg, 'batch' =>($batch) ? 1 : 0]);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ use Friendica\Model\Contact;
|
||||||
use Friendica\Model\GContact;
|
use Friendica\Model\GContact;
|
||||||
use Friendica\Model\Group;
|
use Friendica\Model\Group;
|
||||||
use Friendica\Model\Profile;
|
use Friendica\Model\Profile;
|
||||||
|
use Friendica\Model\Queue;
|
||||||
use Friendica\Model\User;
|
use Friendica\Model\User;
|
||||||
use Friendica\Network\Probe;
|
use Friendica\Network\Probe;
|
||||||
use Friendica\Util\Crypto;
|
use Friendica\Util\Crypto;
|
||||||
|
@ -32,7 +33,6 @@ require_once 'include/dba.php';
|
||||||
require_once 'include/items.php';
|
require_once 'include/items.php';
|
||||||
require_once 'include/bb2diaspora.php';
|
require_once 'include/bb2diaspora.php';
|
||||||
require_once 'include/datetime.php';
|
require_once 'include/datetime.php';
|
||||||
require_once 'include/queue_fn.php';
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief This class contain functions to create and send Diaspora XML files
|
* @brief This class contain functions to create and send Diaspora XML files
|
||||||
|
@ -3224,7 +3224,7 @@ class Diaspora
|
||||||
|
|
||||||
logger("transmit: ".$logid."-".$guid." ".$dest_url);
|
logger("transmit: ".$logid."-".$guid." ".$dest_url);
|
||||||
|
|
||||||
if (!$queue_run && was_recently_delayed($contact["id"])) {
|
if (!$queue_run && Queue::wasDelayed($contact["id"])) {
|
||||||
$return_code = 0;
|
$return_code = 0;
|
||||||
} else {
|
} else {
|
||||||
if (!intval(Config::get("system", "diaspora_test"))) {
|
if (!intval(Config::get("system", "diaspora_test"))) {
|
||||||
|
@ -3254,7 +3254,7 @@ class Diaspora
|
||||||
logger("add_to_queue ignored - identical item already in queue");
|
logger("add_to_queue ignored - identical item already in queue");
|
||||||
} else {
|
} else {
|
||||||
// queue message for redelivery
|
// queue message for redelivery
|
||||||
add_to_queue($contact["id"], NETWORK_DIASPORA, $envelope, $public_batch);
|
Queue::add($contact["id"], NETWORK_DIASPORA, $envelope, $public_batch);
|
||||||
|
|
||||||
// The message could not be delivered. We mark the contact as "dead"
|
// The message could not be delivered. We mark the contact as "dead"
|
||||||
Contact::markForArchival($contact);
|
Contact::markForArchival($contact);
|
||||||
|
@ -3311,7 +3311,7 @@ class Diaspora
|
||||||
$envelope = self::buildMessage($msg, $owner, $contact, $owner['uprvkey'], $contact['pubkey'], $public_batch);
|
$envelope = self::buildMessage($msg, $owner, $contact, $owner['uprvkey'], $contact['pubkey'], $public_batch);
|
||||||
|
|
||||||
if ($spool) {
|
if ($spool) {
|
||||||
add_to_queue($contact['id'], NETWORK_DIASPORA, $envelope, $public_batch);
|
Queue::add($contact['id'], NETWORK_DIASPORA, $envelope, $public_batch);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
$return_code = self::transmit($owner, $contact, $envelope, $public_batch, false, $guid);
|
$return_code = self::transmit($owner, $contact, $envelope, $public_batch, false, $guid);
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
/**
|
/**
|
||||||
* @file src/Worker/Delivery.php
|
* @file src/Worker/Delivery.php
|
||||||
*/
|
*/
|
||||||
|
|
||||||
namespace Friendica\Worker;
|
namespace Friendica\Worker;
|
||||||
|
|
||||||
use Friendica\App;
|
use Friendica\App;
|
||||||
|
@ -10,13 +9,13 @@ use Friendica\Core\System;
|
||||||
use Friendica\Core\Config;
|
use Friendica\Core\Config;
|
||||||
use Friendica\Database\DBM;
|
use Friendica\Database\DBM;
|
||||||
use Friendica\Model\Contact;
|
use Friendica\Model\Contact;
|
||||||
|
use Friendica\Model\Queue;
|
||||||
use Friendica\Model\User;
|
use Friendica\Model\User;
|
||||||
use Friendica\Protocol\Diaspora;
|
use Friendica\Protocol\Diaspora;
|
||||||
use Friendica\Protocol\DFRN;
|
use Friendica\Protocol\DFRN;
|
||||||
use Friendica\Protocol\Email;
|
use Friendica\Protocol\Email;
|
||||||
use dba;
|
use dba;
|
||||||
|
|
||||||
require_once 'include/queue_fn.php';
|
|
||||||
require_once 'include/html2plain.php';
|
require_once 'include/html2plain.php';
|
||||||
require_once 'include/datetime.php';
|
require_once 'include/datetime.php';
|
||||||
require_once 'include/items.php';
|
require_once 'include/items.php';
|
||||||
|
@ -335,7 +334,7 @@ class Delivery {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!was_recently_delayed($contact['id'])) {
|
if (!Queue::wasDelayed($contact['id'])) {
|
||||||
$deliver_status = DFRN::deliver($owner, $contact, $atom);
|
$deliver_status = DFRN::deliver($owner, $contact, $atom);
|
||||||
} else {
|
} else {
|
||||||
$deliver_status = (-1);
|
$deliver_status = (-1);
|
||||||
|
@ -345,7 +344,7 @@ class Delivery {
|
||||||
|
|
||||||
if ($deliver_status < 0) {
|
if ($deliver_status < 0) {
|
||||||
logger('notifier: delivery failed: queuing message');
|
logger('notifier: delivery failed: queuing message');
|
||||||
add_to_queue($contact['id'],NETWORK_DFRN,$atom);
|
Queue::add($contact['id'], NETWORK_DFRN, $atom);
|
||||||
|
|
||||||
// The message could not be delivered. We mark the contact as "dead"
|
// The message could not be delivered. We mark the contact as "dead"
|
||||||
Contact::markForArchival($contact);
|
Contact::markForArchival($contact);
|
||||||
|
|
|
@ -17,7 +17,6 @@ use Friendica\Protocol\Salmon;
|
||||||
use dba;
|
use dba;
|
||||||
|
|
||||||
require_once 'include/dba.php';
|
require_once 'include/dba.php';
|
||||||
require_once 'include/queue_fn.php';
|
|
||||||
require_once 'include/html2plain.php';
|
require_once 'include/html2plain.php';
|
||||||
require_once 'include/datetime.php';
|
require_once 'include/datetime.php';
|
||||||
require_once 'include/items.php';
|
require_once 'include/items.php';
|
||||||
|
|
|
@ -14,14 +14,13 @@ use dba;
|
||||||
|
|
||||||
require_once 'include/dba.php';
|
require_once 'include/dba.php';
|
||||||
|
|
||||||
Class OnePoll
|
class OnePoll
|
||||||
{
|
{
|
||||||
public static function execute($contact_id = 0, $command = '') {
|
public static function execute($contact_id = 0, $command = '') {
|
||||||
global $a;
|
global $a;
|
||||||
|
|
||||||
require_once 'include/datetime.php';
|
require_once 'include/datetime.php';
|
||||||
require_once 'include/items.php';
|
require_once 'include/items.php';
|
||||||
require_once 'include/queue_fn.php';
|
|
||||||
|
|
||||||
logger('start');
|
logger('start');
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ use Friendica\Core\Cache;
|
||||||
use Friendica\Core\Config;
|
use Friendica\Core\Config;
|
||||||
use Friendica\Core\Worker;
|
use Friendica\Core\Worker;
|
||||||
use Friendica\Database\DBM;
|
use Friendica\Database\DBM;
|
||||||
|
use Friendica\Model\Queue;
|
||||||
use Friendica\Protocol\Diaspora;
|
use Friendica\Protocol\Diaspora;
|
||||||
use Friendica\Protocol\DFRN;
|
use Friendica\Protocol\DFRN;
|
||||||
use Friendica\Protocol\PortableContact;
|
use Friendica\Protocol\PortableContact;
|
||||||
|
@ -15,7 +16,6 @@ use Friendica\Protocol\Salmon;
|
||||||
use dba;
|
use dba;
|
||||||
|
|
||||||
require_once 'include/dba.php';
|
require_once 'include/dba.php';
|
||||||
require_once 'include/queue_fn.php';
|
|
||||||
require_once 'include/datetime.php';
|
require_once 'include/datetime.php';
|
||||||
require_once 'include/items.php';
|
require_once 'include/items.php';
|
||||||
require_once 'include/bbcode.php';
|
require_once 'include/bbcode.php';
|
||||||
|
@ -75,7 +75,7 @@ class Queue
|
||||||
|
|
||||||
$contact = dba::selectFirst('contact', [], ['id' => $q_item['cid']]);
|
$contact = dba::selectFirst('contact', [], ['id' => $q_item['cid']]);
|
||||||
if (!DBM::is_result($contact)) {
|
if (!DBM::is_result($contact)) {
|
||||||
remove_queue_item($q_item['id']);
|
Queue::removeItem($q_item['id']);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ class Queue
|
||||||
|
|
||||||
if (!is_null($dead) && $dead) {
|
if (!is_null($dead) && $dead) {
|
||||||
logger('queue: skipping known dead url: ' . $contact['notify']);
|
logger('queue: skipping known dead url: ' . $contact['notify']);
|
||||||
update_queue_time($q_item['id']);
|
Queue::updateTime($q_item['id']);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,14 +101,14 @@ class Queue
|
||||||
|
|
||||||
if (!is_null($vital) && !$vital) {
|
if (!is_null($vital) && !$vital) {
|
||||||
logger('queue: skipping dead server: ' . $server);
|
logger('queue: skipping dead server: ' . $server);
|
||||||
update_queue_time($q_item['id']);
|
Queue::updateTime($q_item['id']);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$user = dba::selectFirst('user', [], ['uid' => $contact['uid']]);
|
$user = dba::selectFirst('user', [], ['uid' => $contact['uid']]);
|
||||||
if (!DBM::is_result($user)) {
|
if (!DBM::is_result($user)) {
|
||||||
remove_queue_item($q_item['id']);
|
Queue::removeItem($q_item['id']);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,10 +124,10 @@ class Queue
|
||||||
$deliver_status = DFRN::deliver($owner, $contact, $data);
|
$deliver_status = DFRN::deliver($owner, $contact, $data);
|
||||||
|
|
||||||
if ($deliver_status == (-1)) {
|
if ($deliver_status == (-1)) {
|
||||||
update_queue_time($q_item['id']);
|
Queue::updateTime($q_item['id']);
|
||||||
Cache::set($cachekey_deadguy . $contact['notify'], true, CACHE_QUARTER_HOUR);
|
Cache::set($cachekey_deadguy . $contact['notify'], true, CACHE_QUARTER_HOUR);
|
||||||
} else {
|
} else {
|
||||||
remove_queue_item($q_item['id']);
|
Queue::removeItem($q_item['id']);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case NETWORK_OSTATUS:
|
case NETWORK_OSTATUS:
|
||||||
|
@ -136,10 +136,10 @@ class Queue
|
||||||
$deliver_status = Salmon::slapper($owner, $contact['notify'], $data);
|
$deliver_status = Salmon::slapper($owner, $contact['notify'], $data);
|
||||||
|
|
||||||
if ($deliver_status == (-1)) {
|
if ($deliver_status == (-1)) {
|
||||||
update_queue_time($q_item['id']);
|
Queue::updateTime($q_item['id']);
|
||||||
Cache::set($cachekey_deadguy . $contact['notify'], true, CACHE_QUARTER_HOUR);
|
Cache::set($cachekey_deadguy . $contact['notify'], true, CACHE_QUARTER_HOUR);
|
||||||
} else {
|
} else {
|
||||||
remove_queue_item($q_item['id']);
|
Queue::removeItem($q_item['id']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -149,10 +149,10 @@ class Queue
|
||||||
$deliver_status = Diaspora::transmit($owner, $contact, $data, $public, true);
|
$deliver_status = Diaspora::transmit($owner, $contact, $data, $public, true);
|
||||||
|
|
||||||
if ($deliver_status == (-1)) {
|
if ($deliver_status == (-1)) {
|
||||||
update_queue_time($q_item['id']);
|
Queue::updateTime($q_item['id']);
|
||||||
Cache::set($cachekey_deadguy . $contact['notify'], true, CACHE_QUARTER_HOUR);
|
Cache::set($cachekey_deadguy . $contact['notify'], true, CACHE_QUARTER_HOUR);
|
||||||
} else {
|
} else {
|
||||||
remove_queue_item($q_item['id']);
|
Queue::removeItem($q_item['id']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -162,9 +162,9 @@ class Queue
|
||||||
call_hooks('queue_deliver', $params);
|
call_hooks('queue_deliver', $params);
|
||||||
|
|
||||||
if ($params['result']) {
|
if ($params['result']) {
|
||||||
remove_queue_item($q_item['id']);
|
Queue::removeItem($q_item['id']);
|
||||||
} else {
|
} else {
|
||||||
update_queue_time($q_item['id']);
|
Queue::updateTime($q_item['id']);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue