Replaced "table lock" - it is not so good, it seems
This commit is contained in:
parent
2372ef7fd5
commit
1932a6d634
4
boot.php
4
boot.php
|
@ -1089,8 +1089,6 @@ function proc_run($cmd) {
|
||||||
$argv = $args;
|
$argv = $args;
|
||||||
array_shift($argv);
|
array_shift($argv);
|
||||||
|
|
||||||
dba::lock('workerqueue');
|
|
||||||
|
|
||||||
$parameters = json_encode($argv);
|
$parameters = json_encode($argv);
|
||||||
$found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1));
|
$found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1));
|
||||||
|
|
||||||
|
@ -1098,8 +1096,6 @@ function proc_run($cmd) {
|
||||||
dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority));
|
dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority));
|
||||||
}
|
}
|
||||||
|
|
||||||
dba::unlock();
|
|
||||||
|
|
||||||
// Should we quit and wait for the poller to be called as a cronjob?
|
// Should we quit and wait for the poller to be called as a cronjob?
|
||||||
if ($dont_fork) {
|
if ($dont_fork) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -87,7 +87,7 @@ function poller_run($argv, $argc){
|
||||||
|
|
||||||
// If we got that queue entry we claim it for us
|
// If we got that queue entry we claim it for us
|
||||||
if (!poller_claim_process($r[0])) {
|
if (!poller_claim_process($r[0])) {
|
||||||
dba::unlock();
|
Lock::remove('poller_fetch_worker');
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
// Fetch all workerqueue data while the table is still locked
|
// Fetch all workerqueue data while the table is still locked
|
||||||
|
@ -95,7 +95,7 @@ function poller_run($argv, $argc){
|
||||||
$entries = poller_total_entries();
|
$entries = poller_total_entries();
|
||||||
$top_priority = poller_highest_priority();
|
$top_priority = poller_highest_priority();
|
||||||
$high_running = poller_process_with_priority_active($top_priority);
|
$high_running = poller_process_with_priority_active($top_priority);
|
||||||
dba::unlock();
|
Lock::remove('poller_fetch_worker');
|
||||||
}
|
}
|
||||||
|
|
||||||
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
|
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
|
||||||
|
@ -616,9 +616,11 @@ function poller_worker_process() {
|
||||||
// Check if we should pass some low priority process
|
// Check if we should pass some low priority process
|
||||||
$highest_priority = 0;
|
$highest_priority = 0;
|
||||||
|
|
||||||
if (poller_passing_slow($highest_priority)) {
|
if (!Lock::set('poller_fetch_worker')) {
|
||||||
dba::lock('workerqueue');
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (poller_passing_slow($highest_priority)) {
|
||||||
// Are there waiting processes with a higher priority than the currently highest?
|
// Are there waiting processes with a higher priority than the currently highest?
|
||||||
$r = q("SELECT * FROM `workerqueue`
|
$r = q("SELECT * FROM `workerqueue`
|
||||||
WHERE `executed` <= '%s' AND `priority` < %d
|
WHERE `executed` <= '%s' AND `priority` < %d
|
||||||
|
@ -638,8 +640,6 @@ function poller_worker_process() {
|
||||||
if (dbm::is_result($r)) {
|
if (dbm::is_result($r)) {
|
||||||
return $r;
|
return $r;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
dba::lock('workerqueue');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
|
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
|
||||||
|
@ -649,7 +649,7 @@ function poller_worker_process() {
|
||||||
|
|
||||||
// We only unlock the tables here, when we got no data
|
// We only unlock the tables here, when we got no data
|
||||||
if (!dbm::is_result($r)) {
|
if (!dbm::is_result($r)) {
|
||||||
dba::unlock();
|
Lock::remove('poller_fetch_worker');
|
||||||
}
|
}
|
||||||
|
|
||||||
return $r;
|
return $r;
|
||||||
|
@ -800,6 +800,7 @@ if (array_search(__file__,get_included_files())===0){
|
||||||
get_app()->end_process();
|
get_app()->end_process();
|
||||||
|
|
||||||
Lock::remove('poller_worker');
|
Lock::remove('poller_worker');
|
||||||
|
Lock::remove('poller_fetch_worker');
|
||||||
|
|
||||||
killme();
|
killme();
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ function pubsubpublish_run(&$argv, &$argc){
|
||||||
logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
|
logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
|
||||||
proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true),
|
proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true),
|
||||||
'include/pubsubpublish.php', (int)$rr["id"]);
|
'include/pubsubpublish.php', (int)$rr["id"]);
|
||||||
logger("Publish feed to ".$rr["callback_url"].' - done', LOGGER_DEBUG);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue