We got rid of two workerqueue queries, yeah!
This commit is contained in:
parent
06815f1a38
commit
7d0a7f6be9
4 changed files with 12 additions and 28 deletions
|
@ -55,19 +55,6 @@ function notifier_run(&$argv, &$argc){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inherit the priority
|
|
||||||
$queue = dba::select('workerqueue', array('priority', 'created'), array('pid' => getmypid()), array('limit' => 1));
|
|
||||||
if (dbm::is_result($queue)) {
|
|
||||||
$priority = (int)$queue['priority'];
|
|
||||||
$process_created = $queue['created'];
|
|
||||||
logger('inherited priority: '.$priority);
|
|
||||||
} else {
|
|
||||||
// Normally this shouldn't happen.
|
|
||||||
$priority = PRIORITY_HIGH;
|
|
||||||
$process_created = datetime_convert();
|
|
||||||
logger('no inherited priority! Something is wrong.');
|
|
||||||
}
|
|
||||||
|
|
||||||
logger('notifier: invoked: ' . print_r($argv,true), LOGGER_DEBUG);
|
logger('notifier: invoked: ' . print_r($argv,true), LOGGER_DEBUG);
|
||||||
|
|
||||||
$cmd = $argv[1];
|
$cmd = $argv[1];
|
||||||
|
@ -361,7 +348,7 @@ function notifier_run(&$argv, &$argc){
|
||||||
// a delivery fork. private groups (forum_mode == 2) do not uplink
|
// a delivery fork. private groups (forum_mode == 2) do not uplink
|
||||||
|
|
||||||
if ((intval($parent['forum_mode']) == 1) && (! $top_level) && ($cmd !== 'uplink')) {
|
if ((intval($parent['forum_mode']) == 1) && (! $top_level) && ($cmd !== 'uplink')) {
|
||||||
proc_run($priority, 'include/notifier.php', 'uplink', $item_id);
|
proc_run($a->queue['priority'], 'include/notifier.php', 'uplink', $item_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
$conversants = array();
|
$conversants = array();
|
||||||
|
@ -500,7 +487,7 @@ function notifier_run(&$argv, &$argc){
|
||||||
}
|
}
|
||||||
logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
|
logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
|
||||||
|
|
||||||
proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true),
|
proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
|
||||||
'include/delivery.php', $cmd, $item_id, $contact['id']);
|
'include/delivery.php', $cmd, $item_id, $contact['id']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -566,7 +553,7 @@ function notifier_run(&$argv, &$argc){
|
||||||
|
|
||||||
if ((! $mail) && (! $fsuggest) && (! $followup)) {
|
if ((! $mail) && (! $fsuggest) && (! $followup)) {
|
||||||
logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
|
logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
|
||||||
proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true),
|
proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
|
||||||
'include/delivery.php', $cmd, $item_id, $rr['id']);
|
'include/delivery.php', $cmd, $item_id, $rr['id']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -607,7 +594,7 @@ function notifier_run(&$argv, &$argc){
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handling the pubsubhubbub requests
|
// Handling the pubsubhubbub requests
|
||||||
proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true),
|
proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true),
|
||||||
'include/pubsubpublish.php');
|
'include/pubsubpublish.php');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,6 +84,9 @@ function poller_run($argv, $argc){
|
||||||
// We fetch the next queue entry that is about to be executed
|
// We fetch the next queue entry that is about to be executed
|
||||||
while ($r = poller_worker_process()) {
|
while ($r = poller_worker_process()) {
|
||||||
|
|
||||||
|
// Assure that the priority is an integer value
|
||||||
|
$r[0]['priority'] = (int)$r[0]['priority'];
|
||||||
|
|
||||||
// If we got that queue entry we claim it for us
|
// If we got that queue entry we claim it for us
|
||||||
if (!poller_claim_process($r[0])) {
|
if (!poller_claim_process($r[0])) {
|
||||||
dba::unlock();
|
dba::unlock();
|
||||||
|
@ -255,10 +258,12 @@ function poller_exec_function($queue, $funcname, $argv) {
|
||||||
// But preserve the old one for the worker
|
// But preserve the old one for the worker
|
||||||
$old_process_id = $a->process_id;
|
$old_process_id = $a->process_id;
|
||||||
$a->process_id = uniqid("wrk", true);
|
$a->process_id = uniqid("wrk", true);
|
||||||
|
$a->queue = $queue;
|
||||||
|
|
||||||
$funcname($argv, $argc);
|
$funcname($argv, $argc);
|
||||||
|
|
||||||
$a->process_id = $old_process_id;
|
$a->process_id = $old_process_id;
|
||||||
|
unset($a->queue);
|
||||||
|
|
||||||
$duration = number_format(microtime(true) - $stamp, 3);
|
$duration = number_format(microtime(true) - $stamp, 3);
|
||||||
|
|
||||||
|
|
|
@ -7,27 +7,18 @@ require_once('include/items.php');
|
||||||
require_once('include/ostatus.php');
|
require_once('include/ostatus.php');
|
||||||
|
|
||||||
function pubsubpublish_run(&$argv, &$argc){
|
function pubsubpublish_run(&$argv, &$argc){
|
||||||
|
global $a;
|
||||||
|
|
||||||
if ($argc > 1) {
|
if ($argc > 1) {
|
||||||
$pubsubpublish_id = intval($argv[1]);
|
$pubsubpublish_id = intval($argv[1]);
|
||||||
} else {
|
} else {
|
||||||
// Inherit the creation time
|
|
||||||
$queue = dba::select('workerqueue', array('created'), array('pid' => getmypid()), array('limit' => 1));
|
|
||||||
if (dbm::is_result($queue)) {
|
|
||||||
$process_created = $queue['created'];
|
|
||||||
} else {
|
|
||||||
// Normally this shouldn't happen.
|
|
||||||
$process_created = datetime_convert();
|
|
||||||
logger('no inherited priority! Something is wrong.');
|
|
||||||
}
|
|
||||||
|
|
||||||
// We'll push to each subscriber that has push > 0,
|
// We'll push to each subscriber that has push > 0,
|
||||||
// i.e. there has been an update (set in notifier.php).
|
// i.e. there has been an update (set in notifier.php).
|
||||||
$r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0");
|
$r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0");
|
||||||
|
|
||||||
foreach ($r as $rr) {
|
foreach ($r as $rr) {
|
||||||
logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
|
logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
|
||||||
proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true),
|
proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true),
|
||||||
'include/pubsubpublish.php', $rr["id"]);
|
'include/pubsubpublish.php', $rr["id"]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,6 +100,7 @@ class App {
|
||||||
*/
|
*/
|
||||||
public $template_engine_instance = array();
|
public $template_engine_instance = array();
|
||||||
public $process_id;
|
public $process_id;
|
||||||
|
public $queue;
|
||||||
private $ldelim = array(
|
private $ldelim = array(
|
||||||
'internal' => '',
|
'internal' => '',
|
||||||
'smarty3' => '{{'
|
'smarty3' => '{{'
|
||||||
|
|
Loading…
Reference in a new issue