implement delivery queue in case notifier gets killed
This commit is contained in:
parent
f29f228463
commit
846c4cea7c
8 changed files with 74 additions and 20 deletions
2
boot.php
2
boot.php
|
@ -9,7 +9,7 @@ require_once("include/pgettext.php");
|
||||||
|
|
||||||
define ( 'FRIENDIKA_VERSION', '2.2.1086' );
|
define ( 'FRIENDIKA_VERSION', '2.2.1086' );
|
||||||
define ( 'DFRN_PROTOCOL_VERSION', '2.21' );
|
define ( 'DFRN_PROTOCOL_VERSION', '2.21' );
|
||||||
define ( 'DB_UPDATE_VERSION', 1083 );
|
define ( 'DB_UPDATE_VERSION', 1084 );
|
||||||
|
|
||||||
define ( 'EOL', "<br />\r\n" );
|
define ( 'EOL', "<br />\r\n" );
|
||||||
define ( 'ATOM_TIME', 'Y-m-d\TH:i:s\Z' );
|
define ( 'ATOM_TIME', 'Y-m-d\TH:i:s\Z' );
|
||||||
|
|
|
@ -606,3 +606,9 @@ INDEX ( `iid` )
|
||||||
) ENGINE = MyISAM DEFAULT CHARSET=utf8;
|
) ENGINE = MyISAM DEFAULT CHARSET=utf8;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `deliverq` (
|
||||||
|
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY ,
|
||||||
|
`cmd` CHAR( 32 ) NOT NULL ,
|
||||||
|
`item` INT NOT NULL ,
|
||||||
|
`contact` INT NOT NULL
|
||||||
|
) ENGINE = MyISAM DEFAULT CHARSET=utf8;
|
||||||
|
|
|
@ -37,6 +37,12 @@ function delivery_run($argv, $argc){
|
||||||
$item_id = intval($argv[2]);
|
$item_id = intval($argv[2]);
|
||||||
$contact_id = intval($argv[3]);
|
$contact_id = intval($argv[3]);
|
||||||
|
|
||||||
|
q("delete from deliverq where cmd = '%s' and item = %d and contact = %d limit 1",
|
||||||
|
dbesc($cmd),
|
||||||
|
dbesc($item_id),
|
||||||
|
dbesc($contact_id)
|
||||||
|
);
|
||||||
|
|
||||||
if((! $item_id) || (! $contact_id))
|
if((! $item_id) || (! $contact_id))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
|
@ -140,10 +140,9 @@ EOT;
|
||||||
$encrypted_outer_key_bundle = '';
|
$encrypted_outer_key_bundle = '';
|
||||||
openssl_public_encrypt($outer_json,$encrypted_outer_key_bundle,$pubkey);
|
openssl_public_encrypt($outer_json,$encrypted_outer_key_bundle,$pubkey);
|
||||||
|
|
||||||
logger('outer_bundle_encrypt: ' . openssl_error_string());
|
|
||||||
$b64_encrypted_outer_key_bundle = base64_encode($encrypted_outer_key_bundle);
|
$b64_encrypted_outer_key_bundle = base64_encode($encrypted_outer_key_bundle);
|
||||||
|
|
||||||
logger('outer_bundle: ' . $b64_encrypted_outer_key_bundle . ' key: ' . $pubkey);
|
logger('outer_bundle: ' . $b64_encrypted_outer_key_bundle . ' key: ' . $pubkey, LOGGER_DATA);
|
||||||
|
|
||||||
$encrypted_header_json_object = json_encode(array('aes_key' => base64_encode($encrypted_outer_key_bundle),
|
$encrypted_header_json_object = json_encode(array('aes_key' => base64_encode($encrypted_outer_key_bundle),
|
||||||
'ciphertext' => base64_encode($ciphertext)));
|
'ciphertext' => base64_encode($ciphertext)));
|
||||||
|
@ -223,7 +222,7 @@ function diaspora_decode($importer,$xml) {
|
||||||
* </decrypted_header>
|
* </decrypted_header>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
logger('decrypted: ' . $decrypted);
|
logger('decrypted: ' . $decrypted, LOGGER_DEBUG);
|
||||||
$idom = parse_xml_string($decrypted,false);
|
$idom = parse_xml_string($decrypted,false);
|
||||||
|
|
||||||
$inner_iv = base64_decode($idom->iv);
|
$inner_iv = base64_decode($idom->iv);
|
||||||
|
|
|
@ -927,7 +927,7 @@ function dfrn_deliver($owner,$contact,$atom, $dissolve = false) {
|
||||||
if(! $curl_stat)
|
if(! $curl_stat)
|
||||||
return(-1); // timed out
|
return(-1); // timed out
|
||||||
|
|
||||||
logger('dfrn_deliver: ' . $xml);
|
logger('dfrn_deliver: ' . $xml, LOGGER_DATA);
|
||||||
|
|
||||||
if(! $xml)
|
if(! $xml)
|
||||||
return 3;
|
return 3;
|
||||||
|
@ -991,7 +991,7 @@ function dfrn_deliver($owner,$contact,$atom, $dissolve = false) {
|
||||||
$key = substr(random_string(),0,16);
|
$key = substr(random_string(),0,16);
|
||||||
$data = bin2hex(aes_encrypt($postvars['data'],$key));
|
$data = bin2hex(aes_encrypt($postvars['data'],$key));
|
||||||
$postvars['data'] = $data;
|
$postvars['data'] = $data;
|
||||||
logger('rino: sent key = ' . $key);
|
logger('rino: sent key = ' . $key, LOGGER_DEBUG);
|
||||||
|
|
||||||
|
|
||||||
if($dfrn_version >= 2.1) {
|
if($dfrn_version >= 2.1) {
|
||||||
|
|
|
@ -379,11 +379,27 @@ function notifier_run($argv, $argc){
|
||||||
dbesc($recip_str)
|
dbesc($recip_str)
|
||||||
);
|
);
|
||||||
|
|
||||||
// delivery loop
|
|
||||||
|
|
||||||
require_once('include/salmon.php');
|
require_once('include/salmon.php');
|
||||||
|
|
||||||
|
$interval = intval(get_config('system','delivery_interval'));
|
||||||
|
if(! $interval)
|
||||||
|
$interval = 2;
|
||||||
|
|
||||||
|
// delivery loop
|
||||||
|
|
||||||
if(count($r)) {
|
if(count($r)) {
|
||||||
|
|
||||||
|
foreach($r as $contact) {
|
||||||
|
if((! $mail) && (! $fsuggest) && (! $followup) && (! $contact['self'])) {
|
||||||
|
q("insert into deliverq ( `cmd`,`item`,`contact` ) values ('%s', %d, %d )",
|
||||||
|
dbesc($cmd),
|
||||||
|
intval($item_id),
|
||||||
|
intval($contact['id'])
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
foreach($r as $contact) {
|
foreach($r as $contact) {
|
||||||
if($contact['self'])
|
if($contact['self'])
|
||||||
continue;
|
continue;
|
||||||
|
@ -392,13 +408,8 @@ function notifier_run($argv, $argc){
|
||||||
// we will deliver single recipient types of message and email receipients here.
|
// we will deliver single recipient types of message and email receipients here.
|
||||||
|
|
||||||
if((! $mail) && (! $fsuggest) && (! $followup)) {
|
if((! $mail) && (! $fsuggest) && (! $followup)) {
|
||||||
$interval = intval(get_config('system','delivery_interval'));
|
|
||||||
if(! $interval)
|
|
||||||
$interval = 2;
|
|
||||||
|
|
||||||
proc_run('php','include/delivery.php',$cmd,$item_id,$contact['id']);
|
proc_run('php','include/delivery.php',$cmd,$item_id,$contact['id']);
|
||||||
sleep($interval);
|
@time_sleep_until(microtime(true) + (float) $interval);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$deliver_status = 0;
|
$deliver_status = 0;
|
||||||
|
@ -624,6 +635,18 @@ function notifier_run($argv, $argc){
|
||||||
if(count($r)) {
|
if(count($r)) {
|
||||||
logger('pubdeliver: ' . print_r($r,true));
|
logger('pubdeliver: ' . print_r($r,true));
|
||||||
|
|
||||||
|
// throw everything into the queue in case we get killed
|
||||||
|
|
||||||
|
foreach($r as $rr) {
|
||||||
|
if((! $mail) && (! $fsuggest) && (! $followup)) {
|
||||||
|
q("insert into deliverq ( `cmd`,`item`,`contact` ) values ('%s', %d, %d )",
|
||||||
|
dbesc($cmd),
|
||||||
|
intval($item_id),
|
||||||
|
intval($rr['id'])
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
foreach($r as $rr) {
|
foreach($r as $rr) {
|
||||||
|
|
||||||
/* Don't deliver to folks who have already been delivered to */
|
/* Don't deliver to folks who have already been delivered to */
|
||||||
|
@ -634,13 +657,9 @@ function notifier_run($argv, $argc){
|
||||||
}
|
}
|
||||||
|
|
||||||
if((! $mail) && (! $fsuggest) && (! $followup)) {
|
if((! $mail) && (! $fsuggest) && (! $followup)) {
|
||||||
$interval = intval(get_config('system','delivery_interval'));
|
logger('notifier: delivery agent: ' . $rr['name'] . ' ' . $rr['id']);
|
||||||
if(! $interval)
|
|
||||||
$interval = 2;
|
|
||||||
|
|
||||||
proc_run('php','include/delivery.php',$cmd,$item_id,$rr['id']);
|
proc_run('php','include/delivery.php',$cmd,$item_id,$rr['id']);
|
||||||
sleep($interval);
|
@time_sleep_until(microtime(true) + (float) $interval);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,20 @@ function queue_run($argv, $argc){
|
||||||
|
|
||||||
logger('queue: start');
|
logger('queue: start');
|
||||||
|
|
||||||
|
$interval = intval(get_config('system','delivery_interval'));
|
||||||
|
if(! $interval)
|
||||||
|
$interval = 2;
|
||||||
|
|
||||||
|
|
||||||
|
$r = q("select * from deliverq where 1");
|
||||||
|
if(count($r)) {
|
||||||
|
foreach($r as $rr) {
|
||||||
|
logger('queue: deliverq');
|
||||||
|
proc_run('php','include/delivery.php',$rr['cmd'],$rr['item'],$rr['contact']);
|
||||||
|
@time_sleep_until(microtime(true) + (float) $interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue`
|
$r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue`
|
||||||
LEFT JOIN `contact` ON `queue`.`cid` = `contact`.`id`
|
LEFT JOIN `contact` ON `queue`.`cid` = `contact`.`id`
|
||||||
WHERE `queue`.`created` < UTC_TIMESTAMP() - INTERVAL 3 DAY");
|
WHERE `queue`.`created` < UTC_TIMESTAMP() - INTERVAL 3 DAY");
|
||||||
|
|
12
update.php
12
update.php
|
@ -1,6 +1,6 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
define( 'UPDATE_VERSION' , 1083 );
|
define( 'UPDATE_VERSION' , 1084 );
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -697,3 +697,13 @@ function update_1082() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function update_1083() {
|
||||||
|
q("CREATE TABLE IF NOT EXISTS `deliverq` (
|
||||||
|
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY ,
|
||||||
|
`cmd` CHAR( 32 ) NOT NULL ,
|
||||||
|
`item` INT NOT NULL ,
|
||||||
|
`contact` INT NOT NULL
|
||||||
|
) ENGINE = MYISAM ;");
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in a new issue