diff --git a/boot.php b/boot.php index d30d978cad..b93810d774 100644 --- a/boot.php +++ b/boot.php @@ -1089,8 +1089,6 @@ function proc_run($cmd) { $argv = $args; array_shift($argv); - dba::lock('workerqueue'); - $parameters = json_encode($argv); $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1)); @@ -1098,8 +1096,6 @@ function proc_run($cmd) { dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority)); } - dba::unlock(); - // Should we quit and wait for the poller to be called as a cronjob? if ($dont_fork) { return; diff --git a/include/poller.php b/include/poller.php index 4b5d6287ec..53f8557853 100644 --- a/include/poller.php +++ b/include/poller.php @@ -87,7 +87,7 @@ function poller_run($argv, $argc){ // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { - dba::unlock(); + Lock::remove('poller_fetch_worker'); continue; } else { // Fetch all workerqueue data while the table is still locked @@ -95,7 +95,7 @@ function poller_run($argv, $argc){ $entries = poller_total_entries(); $top_priority = poller_highest_priority(); $high_running = poller_process_with_priority_active($top_priority); - dba::unlock(); + Lock::remove('poller_fetch_worker'); } // To avoid the quitting of multiple pollers only one poller at a time will execute the check @@ -616,9 +616,11 @@ function poller_worker_process() { // Check if we should pass some low priority process $highest_priority = 0; - if (poller_passing_slow($highest_priority)) { - dba::lock('workerqueue'); + if (!Lock::set('poller_fetch_worker')) { + return false; + } + if (poller_passing_slow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' AND `priority` < %d @@ -638,8 +640,6 @@ function poller_worker_process() { if (dbm::is_result($r)) { return $r; } - } else { - dba::lock('workerqueue'); } // If there is no result (or we shouldn't pass lower processes) we check without priority limit @@ -649,7 +649,7 @@ function poller_worker_process() { // We only unlock the tables here, when we got no data if (!dbm::is_result($r)) { - dba::unlock(); + Lock::remove('poller_fetch_worker'); } return $r; @@ -800,6 +800,7 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); Lock::remove('poller_worker'); + Lock::remove('poller_fetch_worker'); killme(); } diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 0cefa5531d..580e3ffce1 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -20,7 +20,6 @@ function pubsubpublish_run(&$argv, &$argc){ logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true), 'include/pubsubpublish.php', (int)$rr["id"]); - logger("Publish feed to ".$rr["callback_url"].' - done', LOGGER_DEBUG); } }