queue api + queue limits
This commit is contained in:
parent
d1833cabf6
commit
6e76c86ad2
5 changed files with 53 additions and 62 deletions
|
@ -1,5 +1,6 @@
|
||||||
<?php
|
<?php
|
||||||
require_once("boot.php");
|
require_once("boot.php");
|
||||||
|
require_once('include/queue_fn.php');
|
||||||
|
|
||||||
function delivery_run($argv, $argc){
|
function delivery_run($argv, $argc){
|
||||||
global $a, $db;
|
global $a, $db;
|
||||||
|
@ -323,14 +324,7 @@ function delivery_run($argv, $argc){
|
||||||
|
|
||||||
if($deliver_status == (-1)) {
|
if($deliver_status == (-1)) {
|
||||||
logger('notifier: delivery failed: queuing message');
|
logger('notifier: delivery failed: queuing message');
|
||||||
// queue message for redelivery
|
add_to_queue($contact['id'],NETWORK_DFRN,$atom);
|
||||||
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
|
|
||||||
VALUES ( %d, '%s', '%s', '%s') ",
|
|
||||||
intval($contact['id']),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($atom)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -370,13 +364,7 @@ function delivery_run($argv, $argc){
|
||||||
$deliver_status = slapper($owner,$contact['notify'],$slappy);
|
$deliver_status = slapper($owner,$contact['notify'],$slappy);
|
||||||
if($deliver_status == (-1)) {
|
if($deliver_status == (-1)) {
|
||||||
// queue message for redelivery
|
// queue message for redelivery
|
||||||
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
|
add_to_queue($contact['id'],NETWORK_OSTATUS,$slappy);
|
||||||
VALUES ( %d, '%s', '%s', '%s') ",
|
|
||||||
intval($contact['id']),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($slappy)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ require_once('include/crypto.php');
|
||||||
require_once('include/items.php');
|
require_once('include/items.php');
|
||||||
require_once('include/bb2diaspora.php');
|
require_once('include/bb2diaspora.php');
|
||||||
require_once('include/contact_selectors.php');
|
require_once('include/contact_selectors.php');
|
||||||
|
require_once('include/queue_fn.php');
|
||||||
|
|
||||||
|
|
||||||
function diaspora_dispatch_public($msg) {
|
function diaspora_dispatch_public($msg) {
|
||||||
|
@ -957,7 +958,7 @@ function diaspora_comment($importer,$xml,$msg) {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(! $parent_author_signature) {
|
if(($parent_item['origin']) && (! $parent_author_signature)) {
|
||||||
q("insert into sign (`iid`,`signed_text`,`signature`,`signer`) values (%d,'%s','%s','%s') ",
|
q("insert into sign (`iid`,`signed_text`,`signature`,`signer`) values (%d,'%s','%s','%s') ",
|
||||||
intval($message_id),
|
intval($message_id),
|
||||||
dbesc($author_signed_data),
|
dbesc($author_signed_data),
|
||||||
|
@ -1162,9 +1163,9 @@ EOT;
|
||||||
$arr['parent'] = $parent_item['id'];
|
$arr['parent'] = $parent_item['id'];
|
||||||
$arr['parent-uri'] = $parent_item['uri'];
|
$arr['parent-uri'] = $parent_item['uri'];
|
||||||
|
|
||||||
$arr['owner-name'] = $contact['name'];
|
$arr['owner-name'] = $parent_item['name'];
|
||||||
$arr['owner-link'] = $contact['url'];
|
$arr['owner-link'] = $parent_item['url'];
|
||||||
$arr['owner-avatar'] = $contact['thumb'];
|
$arr['owner-avatar'] = $parent_item['thumb'];
|
||||||
|
|
||||||
$arr['author-name'] = $person['name'];
|
$arr['author-name'] = $person['name'];
|
||||||
$arr['author-link'] = $person['url'];
|
$arr['author-link'] = $person['url'];
|
||||||
|
@ -1206,9 +1207,9 @@ EOT;
|
||||||
|
|
||||||
// if the message isn't already being relayed, notify others
|
// if the message isn't already being relayed, notify others
|
||||||
// the existence of parent_author_signature means the parent_author or owner
|
// the existence of parent_author_signature means the parent_author or owner
|
||||||
// is already relaying.
|
// is already relaying. The parent_item['origin'] indicates the message was created on our system
|
||||||
|
|
||||||
if(! $parent_author_signature)
|
if(($parent_item['origin']) && (! $parent_author_signature))
|
||||||
proc_run('php','include/notifier.php','comment',$message_id);
|
proc_run('php','include/notifier.php','comment',$message_id);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -1647,14 +1648,7 @@ function diaspora_transmit($owner,$contact,$slap,$public_batch) {
|
||||||
if((! $return_code) || (($curl_stat == 503) && (stristr($a->get_curl_headers(),'retry-after')))) {
|
if((! $return_code) || (($curl_stat == 503) && (stristr($a->get_curl_headers(),'retry-after')))) {
|
||||||
logger('diaspora_transmit: queue message');
|
logger('diaspora_transmit: queue message');
|
||||||
// queue message for redelivery
|
// queue message for redelivery
|
||||||
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`,`batch`)
|
add_to_queue($contact['id'],NETWORK_DIASPORA,$slap,$public_batch);
|
||||||
VALUES ( %d, '%s', '%s', '%s', %d) ",
|
|
||||||
intval($contact['id']),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($slap),
|
|
||||||
intval($public_batch)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
require_once("boot.php");
|
require_once("boot.php");
|
||||||
|
require_once('include/queue_fn.php');
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This file was at one time responsible for doing all deliveries, but this caused
|
* This file was at one time responsible for doing all deliveries, but this caused
|
||||||
|
@ -519,13 +520,7 @@ function notifier_run($argv, $argc){
|
||||||
if($deliver_status == (-1)) {
|
if($deliver_status == (-1)) {
|
||||||
logger('notifier: delivery failed: queuing message');
|
logger('notifier: delivery failed: queuing message');
|
||||||
// queue message for redelivery
|
// queue message for redelivery
|
||||||
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
|
add_to_queue($contact['id'],NETWORK_DFRN,$atom);
|
||||||
VALUES ( %d, '%s', '%s', '%s') ",
|
|
||||||
intval($contact['id']),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($atom)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case NETWORK_OSTATUS:
|
case NETWORK_OSTATUS:
|
||||||
|
@ -542,14 +537,7 @@ function notifier_run($argv, $argc){
|
||||||
|
|
||||||
if($deliver_status == (-1)) {
|
if($deliver_status == (-1)) {
|
||||||
// queue message for redelivery
|
// queue message for redelivery
|
||||||
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
|
add_to_queue($contact['id'],NETWORK_OSTATUS,$slap);
|
||||||
VALUES ( %d, '%s', '%s', '%s') ",
|
|
||||||
intval($contact['id']),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($slap)
|
|
||||||
);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -564,13 +552,7 @@ function notifier_run($argv, $argc){
|
||||||
$deliver_status = slapper($owner,$contact['notify'],$slappy);
|
$deliver_status = slapper($owner,$contact['notify'],$slappy);
|
||||||
if($deliver_status == (-1)) {
|
if($deliver_status == (-1)) {
|
||||||
// queue message for redelivery
|
// queue message for redelivery
|
||||||
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
|
add_to_queue($contact['id'],NETWORK_OSTATUS,$slappy);
|
||||||
VALUES ( %d, '%s', '%s', '%s') ",
|
|
||||||
intval($contact['id']),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($slappy)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
require_once('include/datetime.php');
|
require_once('include/datetime.php');
|
||||||
require_once('include/diaspora.php');
|
require_once('include/diaspora.php');
|
||||||
|
require_once('include/queue_fn.php');
|
||||||
|
|
||||||
function profile_change() {
|
function profile_change() {
|
||||||
|
|
||||||
|
@ -81,7 +82,6 @@ function profile_change() {
|
||||||
|
|
||||||
$tpl = get_markup_template('diaspora_profile.tpl');
|
$tpl = get_markup_template('diaspora_profile.tpl');
|
||||||
|
|
||||||
|
|
||||||
$msg = replace_macros($tpl,array(
|
$msg = replace_macros($tpl,array(
|
||||||
'$handle' => $handle,
|
'$handle' => $handle,
|
||||||
'$first' => $first,
|
'$first' => $first,
|
||||||
|
@ -100,14 +100,6 @@ function profile_change() {
|
||||||
|
|
||||||
$msgtosend = diaspora_msg_build($msg,$a->user,null,$a->user['prvkey'],null,true);
|
$msgtosend = diaspora_msg_build($msg,$a->user,null,$a->user['prvkey'],null,true);
|
||||||
foreach($recips as $recip) {
|
foreach($recips as $recip) {
|
||||||
q("insert into queue (`cid`,`network`,`created`,`last`,`content`,`batch`)
|
add_to_queue($recip['id'],NETWORK_DIASPORA,$msgtosend,true);
|
||||||
values(%d,'%s','%s','%s','%s',%d)",
|
|
||||||
intval($recip['id']),
|
|
||||||
dbesc(NETWORK_DIASPORA),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc(datetime_convert()),
|
|
||||||
dbesc($msgtosend),
|
|
||||||
intval(1)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -14,3 +14,38 @@ function remove_queue_item($id) {
|
||||||
intval($id)
|
intval($id)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function add_to_queue($cid,$network,$msg,$batch = false) {
|
||||||
|
|
||||||
|
$max_queue = get_config('system','max_contact_queue');
|
||||||
|
if($max_queue < 1)
|
||||||
|
$max_queue = 500;
|
||||||
|
|
||||||
|
$batch_queue = get_config('system','max_batch_queue');
|
||||||
|
if($batch_queue < 1)
|
||||||
|
$batch_queue = 1000;
|
||||||
|
|
||||||
|
$r = q("SELECT COUNT(*) AS `total` FROM `queue` left join `contact` WHERE ``queue`.`cid` = %d AND `contact`.`self` = 0 ",
|
||||||
|
intval($cid)
|
||||||
|
);
|
||||||
|
if($r && count($r)) {
|
||||||
|
if($batch && ($r[0]['total'] > $batch_queue)) {
|
||||||
|
logger('add_to_queue: too many queued items for batch server ' . $cid . ' - discarding message');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
elseif((! $batch) && ($r[0]['total'] > $max_queue)) {
|
||||||
|
logger('add_to_queue: too many queued items for contact ' . $cid . ' - discarding message');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
q("INSERT INTO `queue` ( `cid`, `network`, `created`, `last`, `content`, `batch`)
|
||||||
|
VALUES ( %d, '%s', '%s', '%s', '%s', %d) ",
|
||||||
|
intval($cid),
|
||||||
|
dbesc(datetime_convert()),
|
||||||
|
dbesc(datetime_convert()),
|
||||||
|
dbesc($msg),
|
||||||
|
intval(($batch) ? 1: 0)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue