@ -85,6 +85,8 @@ function poller_run($argv, $argc){
poller_run_cron ();
}
$refetched = false ;
$starttime = time ();
// We fetch the next queue entry that is about to be executed
@ -121,6 +123,13 @@ function poller_run($argv, $argc){
logger ( 'Process lifetime reached, quitting.' , LOGGER_DEBUG );
return ;
}
// If possible we will fetch new jobs for this worker
if ( ! $refetched && Lock :: set ( 'poller_worker_process' , 0 )) {
$refetched = find_worker_processes ();
Lock :: remove ( 'poller_worker_process' );
}
}
logger ( " Couldn't select a workerqueue entry, quitting. " , LOGGER_DEBUG );
}
@ -235,7 +244,7 @@ function poller_execute($queue) {
* @ param array $argv Array of values to be passed to the function
*/
function poller_exec_function ( $queue , $funcname , $argv ) {
global $poller_up_start , $poller_db_duration ;
global $poller_up_start , $poller_db_duration , $poller_lock_duration ;
$a = get_app ();
@ -284,7 +293,11 @@ function poller_exec_function($queue, $funcname, $argv) {
* The execution time is the productive time .
* By changing parameters like the maximum number of workers we can check the effectivness .
*/
logger ( 'DB: ' . number_format ( $poller_db_duration , 2 ) . ' - Rest: ' . number_format ( $up_duration - $poller_db_duration , 2 ) . ' - Execution: ' . number_format ( $duration , 2 ), LOGGER_DEBUG );
logger ( 'DB: ' . number_format ( $poller_db_duration , 2 ) .
' - Lock: ' . number_format ( $poller_lock_duration , 2 ) .
' - Rest: ' . number_format ( $up_duration - $poller_db_duration - $poller_lock_duration , 2 ) .
' - Execution: ' . number_format ( $duration , 2 ), LOGGER_DEBUG );
$poller_lock_duration = 0 ;
if ( $duration > 3600 ) {
logger ( " Prio " . $queue [ " priority " ] . " : " . $queue [ " parameter " ] . " - longer than 1 hour ( " . round ( $duration / 60 , 3 ) . " ) " , LOGGER_DEBUG );
@ -448,7 +461,7 @@ function poller_kill_stale_workers() {
foreach ( $r AS $pid ) {
if ( ! posix_kill ( $pid [ " pid " ], 0 )) {
dba :: update ( 'workerqueue' , array ( 'executed' => NULL_DATE , 'pid' => 0 ),
array ( 'pid' => $pid [ " pid " ]));
array ( 'pid' => $pid [ " pid " ], 'done' => false ));
} else {
// Kill long running processes
@ -475,7 +488,7 @@ function poller_kill_stale_workers() {
// Additionally we are lowering the priority.
dba :: update ( 'workerqueue' ,
array ( 'executed' => NULL_DATE , 'created' => datetime_convert (), 'priority' => PRIORITY_NEGLIGIBLE , 'pid' => 0 ),
array ( 'pid' => $pid [ " pid " ]));
array ( 'pid' => $pid [ " pid " ], 'done' => false ));
} else {
logger ( " Worker process " . $pid [ " pid " ] . " ( " . implode ( " " , $argv ) . " ) now runs for " . round ( $duration ) . " of " . $max_duration . " allowed minutes. That's okay. " , LOGGER_DEBUG );
}
@ -512,7 +525,9 @@ function poller_too_much_workers() {
$listitem = array ();
// Adding all processes with no workerqueue entry
$processes = dba :: p ( " SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`) " );
$processes = dba :: p ( " SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS
( SELECT id FROM `workerqueue`
WHERE `workerqueue` . `pid` = `process` . `pid` AND NOT `done` AND `pid` != ? ) " , getmypid());
if ( $process = dba :: fetch ( $processes )) {
$listitem [ 0 ] = " 0: " . $process [ " running " ];
}
@ -528,7 +543,16 @@ function poller_too_much_workers() {
dba :: close ( $processes );
}
dba :: close ( $entries );
$processlist = ' (' . implode ( ', ' , $listitem ) . ')' ;
$jobs_per_minute = 0 ;
$jobs = dba :: p ( " SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE " );
if ( $job = dba :: fetch ( $jobs )) {
$jobs_per_minute = number_format ( $job [ 'jobs' ] / 10 , 0 );
}
dba :: close ( $jobs );
$processlist = ' - jpm: ' . $jobs_per_minute . ' (' . implode ( ', ' , $listitem ) . ')' ;
}
$entries = poller_total_entries ();
@ -628,7 +652,10 @@ function find_worker_processes() {
// Check if we should pass some low priority process
$highest_priority = 0 ;
$found = false ;
$limit = Config :: get ( 'system' , 'worker_fetch_limit' , 5 );
// The higher the number of parallel workers, the more we prefetch to prevent concurring access
$limit = Config :: get ( " system " , " worker_queues " , 4 ) * 2 ;
$limit = Config :: get ( 'system' , 'worker_fetch_limit' , $limit );
if ( poller_passing_slow ( $highest_priority )) {
// Are there waiting processes with a higher priority than the currently highest?
@ -644,7 +671,7 @@ function find_worker_processes() {
// Give slower processes some processing time
$result = dba :: e ( " UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
ORDER BY `priority` , `created` LIMIT 1 " ,
ORDER BY `priority` , `created` LIMIT " .intval( $limit ) ,
datetime_convert (), getmypid (), NULL_DATE , $highest_priority );
if ( $result ) {
$found = ( dba :: affected_rows () > 0 );
@ -669,14 +696,29 @@ function find_worker_processes() {
* @ return string SQL statement
*/
function poller_worker_process () {
global $poller_db_duration ;
global $poller_db_duration , $poller_lock_duration ;
$stamp = ( float ) microtime ( true );
$found = find_worker_processes ();
// There can already be jobs for us in the queue.
$r = q ( " SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done` " , intval ( getmypid ()));
if ( dbm :: is_result ( $r )) {
$poller_db_duration += ( microtime ( true ) - $stamp );
return $r ;
}
$stamp = ( float ) microtime ( true );
if ( ! Lock :: set ( 'poller_worker_process' )) {
return false ;
}
$poller_lock_duration = ( microtime ( true ) - $stamp );
$stamp = ( float ) microtime ( true );
$found = find_worker_processes ();
$poller_db_duration += ( microtime ( true ) - $stamp );
Lock :: remove ( 'poller_worker_process' );
if ( $found ) {
$r = q ( " SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done` " , intval ( getmypid ()));
}
@ -689,7 +731,7 @@ function poller_worker_process() {
function poller_unclaim_process () {
$mypid = getmypid ();
dba :: update ( 'workerqueue' , array ( 'executed' => NULL_DATE , 'pid' => 0 ), array ( 'pid' => $mypid ));
dba :: update ( 'workerqueue' , array ( 'executed' => NULL_DATE , 'pid' => 0 ), array ( 'pid' => $mypid , 'done' => false ));
}
/**
@ -793,6 +835,7 @@ if (array_search(__file__,get_included_files())===0){
get_app () -> end_process ();
Lock :: remove ( 'poller_worker' );
Lock :: remove ( 'poller_worker_process' );
killme ();
}