diff --git a/include/poller.php b/include/poller.php index d9394b080b..017c4bc5d3 100644 --- a/include/poller.php +++ b/include/poller.php @@ -85,12 +85,13 @@ function poller_run($argv, $argc){ poller_run_cron(); } - $refetched = false; - $starttime = time(); // We fetch the next queue entry that is about to be executed while ($r = poller_worker_process()) { + + $refetched = false; + foreach ($r AS $entry) { // Assure that the priority is an integer value $entry['priority'] = (int)$entry['priority']; @@ -100,6 +101,13 @@ function poller_run($argv, $argc){ logger('Process execution failed, quitting.', LOGGER_DEBUG); return; } + + // If possible we will fetch new jobs for this worker + if (!$refetched && Lock::set('poller_worker_process', 0)) { + logger('Blubb: a'); + $refetched = find_worker_processes(); + Lock::remove('poller_worker_process'); + } } // To avoid the quitting of multiple pollers only one poller at a time will execute the check @@ -123,13 +131,6 @@ function poller_run($argv, $argc){ logger('Process lifetime reached, quitting.', LOGGER_DEBUG); return; } - - // If possible we will fetch new jobs for this worker - if (!$refetched && Lock::set('poller_worker_process', 0)) { - $refetched = find_worker_processes(); - Lock::remove('poller_worker_process'); - } - } logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } @@ -573,8 +574,7 @@ function poller_too_much_workers() { if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) { logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG); $args = array("include/poller.php", "no_cron"); - $a = get_app(); - $a->proc_run($args); + get_app()->proc_run($args); } } @@ -648,7 +648,12 @@ function poller_passing_slow(&$highest_priority) { * * @return boolean Have we found something? */ -function find_worker_processes() { +function find_worker_processes($mypid = 0) { + + if ($mypid == 0) { + $mypid = getmypid(); + } + // Check if we should pass some low priority process $highest_priority = 0; $found = false; @@ -662,7 +667,7 @@ function find_worker_processes() { $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` < ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + datetime_convert(), $mypid, NULL_DATE, $highest_priority); if ($result) { $found = (dba::affected_rows() > 0); } @@ -672,7 +677,7 @@ function find_worker_processes() { $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` > ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + datetime_convert(), $mypid, NULL_DATE, $highest_priority); if ($result) { $found = (dba::affected_rows() > 0); } @@ -682,7 +687,7 @@ function find_worker_processes() { // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), getmypid(), NULL_DATE); + datetime_convert(), $mypid, NULL_DATE); if ($result) { $found = (dba::affected_rows() > 0); } @@ -778,8 +783,7 @@ function call_worker_if_idle() { logger('Call poller', LOGGER_DEBUG); $args = array("include/poller.php", "no_cron"); - $a = get_app(); - $a->proc_run($args); + get_app()->proc_run($args); return; } @@ -834,8 +838,5 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); - Lock::remove('poller_worker'); - Lock::remove('poller_worker_process'); - killme(); }