starting the big delivery shakeup

This commit is contained in:
Friendika 2011-09-22 04:11:39 -07:00
parent a6edf2f71c
commit d6b446d63f
9 changed files with 183 additions and 110 deletions

View file

@ -9,7 +9,7 @@ require_once("include/pgettext.php");
define ( 'FRIENDIKA_VERSION', '2.3.1111' );
define ( 'DFRN_PROTOCOL_VERSION', '2.21' );
define ( 'DB_UPDATE_VERSION', 1091 );
define ( 'DB_UPDATE_VERSION', 1092 );
define ( 'EOL', "<br />\r\n" );
define ( 'ATOM_TIME', 'Y-m-d\TH:i:s\Z' );

View file

@ -469,7 +469,8 @@ CREATE TABLE IF NOT EXISTS `queue` (
`network` CHAR( 32 ) NOT NULL,
`created` DATETIME NOT NULL ,
`last` DATETIME NOT NULL ,
`content` MEDIUMTEXT NOT NULL
`content` MEDIUMTEXT NOT NULL,
`batch` TINYINT( 1 ) NOT NULL DEFAULT '0'
) ENGINE = MyISAM DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `pconfig` (
@ -534,6 +535,7 @@ CREATE TABLE IF NOT EXISTS `fcontact` (
`request` CHAR( 255 ) NOT NULL,
`nick` CHAR( 255 ) NOT NULL ,
`addr` CHAR( 255 ) NOT NULL ,
`batch` CHAR( 255) NOT NULL,
`notify` CHAR( 255 ) NOT NULL ,
`poll` CHAR( 255 ) NOT NULL ,
`confirm` CHAR( 255 ) NOT NULL ,

View file

@ -31,7 +31,7 @@ function delivery_run($argv, $argc){
$a->set_baseurl(get_config('system','url'));
logger('delivery: invoked: ' . print_r($argv,true));
logger('delivery: invoked: ' . print_r($argv,true), LOGGER_DEBUG);
$cmd = $argv[1];
$item_id = intval($argv[2]);
@ -145,6 +145,7 @@ function delivery_run($argv, $argc){
$public_message = true;
// fill this in with a single salmon slap if applicable
$slap = '';
require_once('include/group.php');
@ -195,8 +196,6 @@ function delivery_run($argv, $argc){
$public_message = false; // private recipients, not public
}
$conversant_str = intval($contact_id);
$r = q("SELECT * FROM `contact` WHERE `id` = %d AND `blocked` = 0 AND `pending` = 0",
intval($contact_id)
);
@ -204,14 +203,27 @@ function delivery_run($argv, $argc){
if(count($r))
$contact = $r[0];
$hubxml = feed_hublinks();
logger('notifier: slaps: ' . print_r($slaps,true), LOGGER_DATA);
require_once('include/salmon.php');
if($contact['self'])
return;
$deliver_status = 0;
switch($contact['network']) {
case NETWORK_DFRN :
logger('notifier: dfrndelivery: ' . $contact['name']);
$feed_template = get_markup_template('atom_feed.tpl');
$mail_template = get_markup_template('atom_mail.tpl');
$atom = '';
$slaps = array();
$hubxml = feed_hublinks();
$birthday = feed_birthday($owner['uid'],$owner['timezone']);
@ -249,28 +261,12 @@ function delivery_run($argv, $argc){
$atom .= atom_entry($item,'text',$item_contact,$owner,true);
if(($top_level) && ($public_message) && ($item['author-link'] === $item['owner-link']) && (! $expire))
$slaps[] = atom_entry($item,'html',$item_contact,$owner,true);
}
$atom .= '</feed>' . "\r\n";
logger('notifier: ' . $atom, LOGGER_DATA);
logger('notifier: slaps: ' . print_r($slaps,true), LOGGER_DATA);
require_once('include/salmon.php');
if($contact['self'])
return;
$deliver_status = 0;
switch($contact['network']) {
case NETWORK_DFRN :
logger('notifier: dfrndelivery: ' . $contact['name']);
$deliver_status = dfrn_deliver($owner,$contact,$atom);
logger('notifier: dfrn_delivery returns ' . $deliver_status);
@ -299,7 +295,25 @@ function delivery_run($argv, $argc){
// only send salmon if public - e.g. if it's ok to notify
// a public hub, it's ok to send a salmon
if((count($slaps)) && ($public_message) && (! $expire)) {
if(($public_message) && (! $expire)) {
$slaps = array();
foreach($items as $item) {
if(! $item['parent'])
continue;
// private emails may be in included in public conversations. Filter them.
if(($public_message) && $item['private'])
continue;
$item_contact = get_item_contact($item,$icontacts);
if(! $item_contact)
continue;
if(($top_level) && ($public_message) && ($item['author-link'] === $item['owner-link']) && (! $expire))
$slaps[] = atom_entry($item,'html',$item_contact,$owner,true);
}
logger('notifier: slapdelivery: ' . $contact['name']);
foreach($slaps as $slappy) {
if($contact['notify']) {
@ -393,12 +407,17 @@ function delivery_run($argv, $argc){
break;
case NETWORK_DIASPORA :
logger('delivery: diaspora deliver: ' . $contact['name']);
if($public_message)
$loc = 'public batch ' . $contact['batch'];
else
$loc = $contact['name'];
logger('delivery: diaspora batch deliver: ' . $loc);
if(get_config('system','dfrn_only') || (! get_config('system','diaspora_enabled')) || (! $normal_mode))
break;
if(! $contact['pubkey'])
if((! $contact['pubkey']) && (! $public_message))
break;
if($target_item['verb'] === ACTIVITY_DISLIKE) {
@ -406,23 +425,23 @@ function delivery_run($argv, $argc){
break;
}
elseif(($target_item['deleted']) && ($target_item['verb'] !== ACTIVITY_LIKE)) {
logger('delivery: diaspora retract: ' . $contact['name']);
logger('delivery: diaspora retract: ' . $loc);
// diaspora delete,
diaspora_send_retraction($target_item,$owner,$contact);
diaspora_send_retraction($target_item,$owner,$contact,$public_message);
break;
}
elseif($target_item['parent'] != $target_item['id']) {
logger('delivery: diaspora relay: ' . $contact['name']);
logger('delivery: diaspora relay: ' . $loc);
// we are the relay - send comments, likes and unlikes to our conversants
diaspora_send_relay($target_item,$owner,$contact);
diaspora_send_relay($target_item,$owner,$contact,$public_message);
break;
}
elseif(($top_level) && (! $walltowall)) {
// currently no workable solution for sending walltowall
logger('delivery: diaspora status: ' . $contact['name']);
diaspora_send_status($target_item,$owner,$contact);
logger('delivery: diaspora status: ' . $loc);
diaspora_send_status($target_item,$owner,$contact,$public_message);
break;
}

View file

@ -108,8 +108,10 @@ function diaspora_pubmsg_build($msg,$user,$contact,$prvkey,$pubkey) {
$handle = $user['nickname'] . '@' . substr($a->get_baseurl(), strpos($a->get_baseurl(),'://') + 3);
$b64_data = base64_encode($msg);
$b64url_data = base64url_encode($b64_data);
// $b64_data = base64_encode($msg);
// $b64url_data = base64url_encode($b64_data);
$b64url_data = base64url_encode($msg);
$data = str_replace(array("\n","\r"," ","\t"),array('','','',''),$b64url_data);
@ -125,7 +127,7 @@ function diaspora_pubmsg_build($msg,$user,$contact,$prvkey,$pubkey) {
$magic_env = <<< EOT
<?xml version='1.0' encoding='UTF-8'?>
<diaspora xmlns="https://joindiaspora.org/protocol" xmlns:me="http://salmon-protocol.org/ns/magic-env" >
<diaspora xmlns="https://joindiaspora.com/protocol" xmlns:me="http://salmon-protocol.org/ns/magic-env" >
<header>
<author_id>$handle</author_id>
</header>
@ -146,9 +148,12 @@ EOT;
function diaspora_msg_build($msg,$user,$contact,$prvkey,$pubkey) {
function diaspora_msg_build($msg,$user,$contact,$prvkey,$pubkey,$public = false) {
$a = get_app();
if($public)
return diaspora_pubmsg_build($msg,$user,$contact,$prvkey,$pubkey);
logger('diaspora_msg_build: ' . $msg, LOGGER_DATA);
$inner_aes_key = random_string(32);
@ -211,7 +216,7 @@ EOT;
$magic_env = <<< EOT
<?xml version='1.0' encoding='UTF-8'?>
<diaspora xmlns="https://joindiaspora.org/protocol" xmlns:me="http://salmon-protocol.org/ns/magic-env" >
<diaspora xmlns="https://joindiaspora.com/protocol" xmlns:me="http://salmon-protocol.org/ns/magic-env" >
$encrypted_header
<me:env>
<me:encoding>base64url</me:encoding>
@ -289,7 +294,7 @@ function diaspora_decode($importer,$xml) {
***** CURRENT
* <author_id>acct:galaxor@diaspora.priateship.org</author_id>
* <author_id>galaxor@diaspora.priateship.org</author_id>
***** END DIFFS
@ -424,13 +429,16 @@ function diaspora_request($importer,$xml) {
return;
}
$r = q("INSERT INTO `contact` (`uid`, `network`,`addr`,`created`,`url`,`name`,`nick`,`photo`,`pubkey`,`notify`,`poll`,`blocked`,`priority`)
VALUES ( %d, '%s', '%s', '%s','%s','%s','%s','%s','%s','%s','%s',%d,%d) ",
$batch = (($ret['batch']) ? $ret['batch'] : implode('/', array_slice(explode('/',$ret['url']),0,3)) . '/receive/public');
$r = q("INSERT INTO `contact` (`uid`, `network`,`addr`,`created`,`url`,`batch`,`name`,`nick`,`photo`,`pubkey`,`notify`,`poll`,`blocked`,`priority`)
VALUES ( %d, '%s', '%s', '%s','%s','%s','%s','%s','%s','%s','%s','%s',%d,%d) ",
intval($importer['uid']),
dbesc($ret['network']),
dbesc($ret['addr']),
datetime_convert(),
dbesc($ret['url']),
dbesc($batch),
dbesc($ret['name']),
dbesc($ret['nick']),
dbesc($ret['photo']),
@ -980,7 +988,7 @@ function diaspora_unshare($me,$contact) {
function diaspora_send_status($item,$owner,$contact) {
function diaspora_send_status($item,$owner,$contact,$public_batch = false) {
$a = get_app();
$myaddr = $owner['nickname'] . '@' . substr($a->get_baseurl(), strpos($a->get_baseurl(),'://') + 3);
@ -1022,19 +1030,19 @@ function diaspora_send_status($item,$owner,$contact) {
logger('diaspora_send_status: ' . $owner['username'] . ' -> ' . $contact['name'] . ' base message: ' . $msg, LOGGER_DATA);
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'])));
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'],$public_batch)));
$return_code = diaspora_transmit($owner,$contact,$slap);
$return_code = diaspora_transmit($owner,$contact,$slap,$public_batch);
if(count($images)) {
diaspora_send_images($item,$owner,$contact,$images);
diaspora_send_images($item,$owner,$contact,$images,$public_batch);
}
return $return_code;
}
function diaspora_send_images($item,$owner,$contact,$images) {
function diaspora_send_images($item,$owner,$contact,$images,$public_batch = false) {
$a = get_app();
if(! count($images))
return;
@ -1066,14 +1074,14 @@ function diaspora_send_images($item,$owner,$contact,$images) {
logger('diaspora_send_photo: base message: ' . $msg, LOGGER_DATA);
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'])));
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'],$public_batch)));
diaspora_transmit($owner,$contact,$slap);
diaspora_transmit($owner,$contact,$slap,$public_batch);
}
}
function diaspora_send_followup($item,$owner,$contact) {
function diaspora_send_followup($item,$owner,$contact,$public_batch = false) {
$a = get_app();
$myaddr = $owner['nickname'] . '@' . substr($a->get_baseurl(), strpos($a->get_baseurl(),'://') + 3);
@ -1121,13 +1129,13 @@ function diaspora_send_followup($item,$owner,$contact) {
logger('diaspora_followup: base message: ' . $msg, LOGGER_DATA);
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'])));
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'],$public_batch)));
return(diaspora_transmit($owner,$contact,$slap));
return(diaspora_transmit($owner,$contact,$slap,$public_batch));
}
function diaspora_send_relay($item,$owner,$contact) {
function diaspora_send_relay($item,$owner,$contact,$public_batch = false) {
$a = get_app();
@ -1223,15 +1231,15 @@ function diaspora_send_relay($item,$owner,$contact) {
logger('diaspora_relay_comment: base message: ' . $msg, LOGGER_DATA);
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'])));
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'],$public_batch)));
return(diaspora_transmit($owner,$contact,$slap));
return(diaspora_transmit($owner,$contact,$slap,$public_batch));
}
function diaspora_send_retraction($item,$owner,$contact) {
function diaspora_send_retraction($item,$owner,$contact,$public_batch = false) {
$a = get_app();
$myaddr = $owner['nickname'] . '@' . substr($a->get_baseurl(), strpos($a->get_baseurl(),'://') + 3);
@ -1243,30 +1251,32 @@ function diaspora_send_retraction($item,$owner,$contact) {
'$handle' => $myaddr
));
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'])));
$slap = 'xml=' . urlencode(urlencode(diaspora_msg_build($msg,$owner,$contact,$owner['uprvkey'],$contact['pubkey'],$public_batch)));
return(diaspora_transmit($owner,$contact,$slap));
return(diaspora_transmit($owner,$contact,$slap,$public_batch));
}
function diaspora_transmit($owner,$contact,$slap) {
function diaspora_transmit($owner,$contact,$slap,$public_batch) {
$a = get_app();
post_url($contact['notify'] . '/',$slap);
$logid = random_string(4);
logger('diaspora_transmit: ' . $logid . ' ' . (($public_batch) ? $contact['batch'] : $contact['notify']));
post_url((($public_batch) ? $contact['batch'] : $contact['notify']) . '/',$slap);
$return_code = $a->get_curl_code();
logger('diaspora_transmit: returns: ' . $return_code);
logger('diaspora_transmit: ' . $logid . ' returns: ' . $return_code);
if(! $return_code) {
logger('diaspora_transmit: queue message');
// queue message for redelivery
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`,`batch`)
VALUES ( %d, '%s', '%s', '%s') ",
intval($contact['id']),
dbesc(datetime_convert()),
dbesc(datetime_convert()),
dbesc($slap)
dbesc($slap),
intval($public_batch)
);
}

View file

@ -1615,13 +1615,18 @@ function lose_follower($importer,$contact,$datarray,$item) {
}
function subscribe_to_hub($url,$importer,$contact) {
function subscribe_to_hub($url,$importer,$contact,$submode = 'subscribe') {
if(is_array($importer)) {
$r = q("SELECT `nickname` FROM `user` WHERE `uid` = %d LIMIT 1",
intval($importer['uid'])
);
}
// Diaspora has different message-ids in feeds than they do
// through the direct Diaspora protocol. If we try and use
// the feed, we'll get duplicates. So don't.
if((! count($r)) || $contact['network'] === NETWORK_DIASPORA)
return;
@ -1631,7 +1636,7 @@ function subscribe_to_hub($url,$importer,$contact) {
$verify_token = ((strlen($contact['hub-verify'])) ? $contact['hub-verify'] : random_string());
$params= 'hub.mode=subscribe&hub.callback=' . urlencode($push_url) . '&hub.topic=' . urlencode($contact['poll']) . '&hub.verify=async&hub.verify_token=' . $verify_token;
$params= 'hub.mode=' . $hubmode . '&hub.callback=' . urlencode($push_url) . '&hub.topic=' . urlencode($contact['poll']) . '&hub.verify=async&hub.verify_token=' . $verify_token;
logger('subscribe_to_hub: subscribing ' . $contact['name'] . ' to hub ' . $url . ' with verifier ' . $verify_token);

View file

@ -409,6 +409,8 @@ function notifier_run($argv, $argc){
foreach($r as $contact) {
if((! $mail) && (! $fsuggest) && (! $followup) && (! $contact['self'])) {
if(($contact['network'] === NETWORK_DIASPORA) && ($public_message))
continue;
q("insert into deliverq ( `cmd`,`item`,`contact` ) values ('%s', %d, %d )",
dbesc($cmd),
intval($item_id),
@ -583,9 +585,19 @@ function notifier_run($argv, $argc){
break;
case NETWORK_DIASPORA:
require_once('include/diaspora.php');
if(get_config('system','dfrn_only') || (! get_config('system','diaspora_enabled')) || (! $normal_mode))
break;
// special handling for followup to public post
// all other public posts processed as public batches further below
if($public_message) {
if($followup)
diaspora_send_followup($target_item,$owner,$contact, true);
break;
}
if(! $contact['pubkey'])
break;
@ -643,17 +655,25 @@ function notifier_run($argv, $argc){
if($public_message) {
$r = q("SELECT `id`, `name` FROM `contact`
WHERE `network` in ('%s','%s') AND `uid` = %d AND `blocked` = 0 AND `pending` = 0
AND `rel` != %d order by rand() ",
dbesc(NETWORK_DFRN),
$r1 = q("SELECT DISTINCT(`batch`), `id`, `name`,`network` FROM `contact` WHERE `network` = '%s'
AND `uid` = %d AND `rel` != %d ORDER BY rand() ",
dbesc(NETWORK_DIASPORA),
intval($owner['uid']),
intval(CONTACT_IS_SHARING)
);
$r2 = q("SELECT `id`, `name`,`network` FROM `contact`
WHERE `network` = '%s' AND `uid` = %d AND `blocked` = 0 AND `pending` = 0
AND `rel` != %d order by rand() ",
dbesc(NETWORK_DFRN),
intval($owner['uid']),
intval(CONTACT_IS_SHARING)
);
$r = array_merge($r2,$r1);
if(count($r)) {
logger('pubdeliver: ' . print_r($r,true));
logger('pubdeliver: ' . print_r($r,true), LOGGER_DEBUG);
// throw everything into the queue in case we get killed
@ -669,9 +689,10 @@ function notifier_run($argv, $argc){
foreach($r as $rr) {
/* Don't deliver to folks who have already been delivered to */
// except for Diaspora batch jobs
// Don't deliver to folks who have already been delivered to
if(in_array($rr['id'],$conversants)) {
if(($rr['network'] !== NETWORK_DIASPORA) && (in_array($rr['id'],$conversants))) {
logger('notifier: already delivered id=' . $rr['id']);
continue;
}

View file

@ -138,6 +138,8 @@ function poller_run($argv, $argc){
if((datetime_convert('UTC','UTC', 'now') > datetime_convert('UTC','UTC', $t . " + 1 day")) || $force)
$hub_update = true;
}
else
$hub_update = false;
/**
* Based on $contact['priority'], should we poll this site now? Or later?
@ -473,21 +475,25 @@ function poller_run($argv, $argc){
consume_feed($xml,$importer,$contact,$hub,1);
$hubmode = 'subscribe';
if($contact['network'] === NETWORK_DFRN || $contact['blocked'] || $contact['readonly'])
$hubmode = 'unsubscribe';
if((strlen($hub)) && ($hub_update) && (($contact['rel'] == CONTACT_IS_FRIEND) || (($contact['network'] === NETWORK_OSTATUS) && (! $contact['readonly'])))) {
logger('poller: subscribing to hub(s) : ' . $hub . ' contact name : ' . $contact['name'] . ' local user : ' . $importer['name']);
if((strlen($hub)) && ($hub_update) && ($contact['rel'] != CONTACT_IS_FOLLOWER)) {
logger('poller: hub ' . $hubmode . ' : ' . $hub . ' contact name : ' . $contact['name'] . ' local user : ' . $importer['name']);
$hubs = explode(',', $hub);
if(count($hubs)) {
foreach($hubs as $h) {
$h = trim($h);
if(! strlen($h))
continue;
subscribe_to_hub($h,$importer,$contact);
subscribe_to_hub($h,$importer,$contact,$hubmode);
}
}
}
}
$updated = datetime_convert();
$r = q("UPDATE `contact` SET `last-update` = '%s', `success_update` = '%s' WHERE `id` = %d LIMIT 1",

View file

@ -123,6 +123,7 @@ function queue_run($argv, $argc){
}
$data = $qi[0]['content'];
$public = $qi[0]['batch'];
$contact = $c[0];
$owner = $u[0];
@ -155,7 +156,7 @@ function queue_run($argv, $argc){
case NETWORK_DIASPORA:
if($contact['notify']) {
logger('queue: diaspora_delivery: item ' . $q_item['id'] . ' for ' . $contact['name']);
$deliver_status = diaspora_transmit($owner,$contact,$data);
$deliver_status = diaspora_transmit($owner,$contact,$data,$public);
if($deliver_status == (-1))
update_queue_time($q_item['id']);

View file

@ -1,6 +1,6 @@
<?php
define( 'UPDATE_VERSION' , 1091 );
define( 'UPDATE_VERSION' , 1092 );
/**
*
@ -761,3 +761,12 @@ function update_1090() {
}
function update_1091() {
// catch a few stragglers that may have crept in before we added this on remote connects
q("UPDATE `contact` SET `batch` = concat(substring_index(`url`,'/',3),'/receive/public') WHERE `network` = 'dspr' AND `batch` = '' ");
q("ALTER TABLE `queue` ADD `batch` TINYINT( 1 ) NOT NULL DEFAULT '0' ");
q("ALTER TABLE `fcontact` ADD `batch` char(255) NOT NULL AFTER `addr` ");
}