2017-11-05 11:33:46 +01:00
< ? php
2017-11-19 23:04:40 +01:00
/**
* @ file src / Core / Worker . php
*/
2017-11-05 11:33:46 +01:00
namespace Friendica\Core ;
2018-10-21 07:53:47 +02:00
use Friendica\BaseObject ;
2018-07-20 14:19:26 +02:00
use Friendica\Database\DBA ;
2018-01-16 01:08:28 +01:00
use Friendica\Model\Process ;
2018-01-27 03:38:34 +01:00
use Friendica\Util\DateTimeFormat ;
2019-02-22 20:10:27 +01:00
use Friendica\Util\Logger\WorkerLogger ;
2018-06-26 23:42:26 +02:00
use Friendica\Util\Network ;
2017-11-05 11:33:46 +01:00
/**
2017-11-05 13:15:53 +01:00
* @ file src / Core / Worker . php
2017-11-05 11:33:46 +01:00
*
2017-11-06 16:32:34 +01:00
* @ brief Contains the class for the worker background job processing
2017-11-05 11:33:46 +01:00
*/
/**
* @ brief Worker methods
*/
2017-11-19 23:04:40 +01:00
class Worker
{
2019-02-27 10:49:26 +01:00
const STATE_STARTUP = 1 ; // Worker is in startup. This takes most time.
2019-03-08 21:39:58 +01:00
const STATE_LONG_LOOP = 2 ; // Worker is processing the whole - long - loop.
2019-02-27 10:49:26 +01:00
const STATE_REFETCH = 3 ; // Worker had refetched jobs in the execution loop.
2019-03-08 21:39:58 +01:00
const STATE_SHORT_LOOP = 4 ; // Worker is processing preassigned jobs, thus saving much time.
2019-02-27 07:55:04 +01:00
2019-03-23 07:08:18 +01:00
const FAST_COMMANDS = [ 'APDelivery' , 'Delivery' , 'CreateShadowEntry' ];
2017-11-05 11:33:46 +01:00
private static $up_start ;
2019-02-10 00:10:15 +01:00
private static $db_duration = 0 ;
private static $db_duration_count = 0 ;
private static $db_duration_write = 0 ;
private static $db_duration_stat = 0 ;
private static $lock_duration = 0 ;
2017-11-05 11:33:46 +01:00
private static $last_update ;
2019-02-27 07:55:04 +01:00
private static $state ;
2017-11-05 11:33:46 +01:00
/**
* @ brief Processes the tasks that are in the workerqueue table
*
* @ param boolean $run_cron Should the cron processes be executed ?
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
public static function processQueue ( $run_cron = true )
{
2018-12-28 01:22:35 +01:00
$a = \get_app ();
2017-11-05 11:33:46 +01:00
2019-02-12 07:42:45 +01:00
// Ensure that all "strtotime" operations do run timezone independent
date_default_timezone_set ( 'UTC' );
2017-11-05 11:33:46 +01:00
self :: $up_start = microtime ( true );
2017-11-05 16:28:55 +01:00
// At first check the maximum load. We shouldn't continue with a high load
2018-06-30 20:07:01 +02:00
if ( $a -> isMaxLoadReached ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Pre check: maximum load reached, quitting.' , Logger :: DEBUG );
2017-11-05 16:28:55 +01:00
return ;
}
// We now start the process. This is done after the load check since this could increase the load.
2018-01-16 01:08:28 +01:00
self :: startProcess ();
2017-11-05 16:28:55 +01:00
2017-11-05 11:33:46 +01:00
// Kill stale processes every 5 minutes
2018-06-02 00:09:27 +02:00
$last_cleanup = Config :: get ( 'system' , 'worker_last_cleaned' , 0 );
2017-11-05 11:33:46 +01:00
if ( time () > ( $last_cleanup + 300 )) {
2018-06-02 00:09:27 +02:00
Config :: set ( 'system' , 'worker_last_cleaned' , time ());
2017-11-05 11:33:46 +01:00
self :: killStaleWorkers ();
}
// Count active workers and compare them with a maximum value that depends on the load
if ( self :: tooMuchWorkers ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Pre check: Active worker limit reached, quitting.' , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return ;
}
// Do we have too few memory?
2018-10-09 19:58:58 +02:00
if ( $a -> isMinMemoryReached ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Pre check: Memory limit reached, quitting.' , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return ;
}
// Possibly there are too much database connections
if ( self :: maxConnectionsReached ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Pre check: maximum connections reached, quitting.' , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return ;
}
// Possibly there are too much database processes that block the system
2018-06-30 20:07:01 +02:00
if ( $a -> isMaxProcessesReached ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Pre check: maximum processes reached, quitting.' , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return ;
}
// Now we start additional cron processes if we should do so
if ( $run_cron ) {
self :: runCron ();
}
$starttime = time ();
2019-02-27 07:55:04 +01:00
self :: $state = self :: STATE_STARTUP ;
2017-11-05 11:33:46 +01:00
// We fetch the next queue entry that is about to be executed
2019-02-17 19:55:17 +01:00
while ( $r = self :: workerProcess ()) {
2019-03-08 21:39:58 +01:00
$refetched = false ;
2017-11-19 23:04:40 +01:00
foreach ( $r as $entry ) {
2017-11-05 11:33:46 +01:00
// Assure that the priority is an integer value
$entry [ 'priority' ] = ( int ) $entry [ 'priority' ];
// The work will be done
if ( ! self :: execute ( $entry )) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Process execution failed, quitting.' , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return ;
}
2019-03-08 21:39:58 +01:00
// Trying to fetch new processes - but only once when successful
if ( ! $refetched && Lock :: acquire ( 'worker_process' , 0 )) {
2019-02-17 19:55:17 +01:00
self :: findWorkerProcesses ();
Lock :: release ( 'worker_process' );
2019-02-27 07:55:04 +01:00
self :: $state = self :: STATE_REFETCH ;
2019-03-08 21:39:58 +01:00
$refetched = true ;
} else {
self :: $state = self :: STATE_SHORT_LOOP ;
2017-11-05 11:33:46 +01:00
}
}
2019-03-08 21:39:58 +01:00
// To avoid the quitting of multiple workers only one worker at a time will execute the check
if ( ! self :: getWaitingJobForPID ()) {
2019-02-27 07:55:04 +01:00
self :: $state = self :: STATE_LONG_LOOP ;
2019-02-27 07:36:19 +01:00
2019-03-08 21:39:58 +01:00
if ( Lock :: acquire ( 'worker' , 0 )) {
2017-11-05 11:33:46 +01:00
// Count active workers and compare them with a maximum value that depends on the load
2019-03-08 21:39:58 +01:00
if ( self :: tooMuchWorkers ()) {
Logger :: log ( 'Active worker limit reached, quitting.' , Logger :: DEBUG );
Lock :: release ( 'worker' );
return ;
}
// Check free memory
if ( $a -> isMinMemoryReached ()) {
Logger :: log ( 'Memory limit reached, quitting.' , Logger :: DEBUG );
Lock :: release ( 'worker' );
return ;
}
2018-09-06 08:11:18 +02:00
Lock :: release ( 'worker' );
2017-11-05 11:33:46 +01:00
}
}
2019-02-27 08:08:44 +01:00
// Quit the worker once every cron interval
2019-02-27 09:55:25 +01:00
if ( time () > ( $starttime + ( Config :: get ( 'system' , 'cron_interval' ) * 60 ))) {
2019-02-27 10:49:26 +01:00
Logger :: info ( 'Process lifetime reached, respawning.' );
2019-02-27 08:08:44 +01:00
self :: spawnWorker ();
return ;
}
2017-11-05 11:33:46 +01:00
}
2018-06-15 20:18:20 +02:00
// Cleaning up. Possibly not needed, but it doesn't harm anything.
2018-06-02 00:09:27 +02:00
if ( Config :: get ( 'system' , 'worker_daemon_mode' , false )) {
self :: IPCSetJobState ( false );
}
2018-10-30 14:58:45 +01:00
Logger :: log ( " Couldn't select a workerqueue entry, quitting process " . getmypid () . " . " , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
}
2019-02-10 00:10:15 +01:00
/**
* @ brief Check if non executed tasks do exist in the worker queue
*
* @ return boolean Returns " true " if tasks are existing
* @ throws \Exception
*/
private static function entriesExists ()
{
$stamp = ( float ) microtime ( true );
$exists = DBA :: exists ( 'workerqueue' , [ " NOT `done` AND `pid` = 0 AND `next_try` < ? " , DateTimeFormat :: utcNow ()]);
self :: $db_duration += ( microtime ( true ) - $stamp );
return $exists ;
}
2018-10-23 22:38:28 +02:00
/**
* @ brief Returns the number of deferred entries in the worker queue
*
* @ return integer Number of deferred entries in the worker queue
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2018-10-23 22:38:28 +02:00
*/
private static function deferredEntries ()
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
$count = DBA :: count ( 'workerqueue' , [ " NOT `done` AND `pid` = 0 AND `next_try` > ? " , DateTimeFormat :: utcNow ()]);
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_count += ( microtime ( true ) - $stamp );
return $count ;
2018-10-23 22:38:28 +02:00
}
2017-11-05 11:33:46 +01:00
/**
* @ brief Returns the number of non executed entries in the worker queue
*
* @ return integer Number of non executed entries in the worker queue
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function totalEntries ()
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
$count = DBA :: count ( 'workerqueue' , [ 'done' => false , 'pid' => 0 ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_count += ( microtime ( true ) - $stamp );
return $count ;
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Returns the highest priority in the worker queue that isn ' t executed
*
2017-11-19 22:47:21 +01:00
* @ return integer Number of active worker processes
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function highestPriority ()
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2019-02-08 22:48:41 +01:00
$condition = [ " `pid` = 0 AND NOT `done` AND `next_try` < ? " , DateTimeFormat :: utcNow ()];
2018-07-20 14:19:26 +02:00
$workerqueue = DBA :: selectFirst ( 'workerqueue' , [ 'priority' ], $condition , [ 'order' => [ 'priority' ]]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-21 14:46:04 +02:00
if ( DBA :: isResult ( $workerqueue )) {
2018-01-11 09:26:30 +01:00
return $workerqueue [ " priority " ];
2017-11-05 11:33:46 +01:00
} else {
return 0 ;
}
}
/**
* @ brief Returns if a process with the given priority is running
*
* @ param integer $priority The priority that should be checked
*
* @ return integer Is there a process running with that priority ?
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function processWithPriorityActive ( $priority )
{
2019-02-08 22:48:41 +01:00
$condition = [ " `priority` <= ? AND `pid` != 0 AND NOT `done` " , $priority ];
2018-07-20 14:19:26 +02:00
return DBA :: exists ( 'workerqueue' , $condition );
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Execute a worker entry
*
* @ param array $queue Workerqueue entry
*
* @ return boolean " true " if further processing should be stopped
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
public static function execute ( $queue )
{
2018-12-28 01:22:35 +01:00
$a = \get_app ();
2017-11-05 11:33:46 +01:00
$mypid = getmypid ();
// Quit when in maintenance
2018-03-04 14:01:46 +01:00
if ( Config :: get ( 'system' , 'maintenance' , false , true )) {
2018-10-30 14:58:45 +01:00
Logger :: log ( " Maintenance mode - quit process " . $mypid , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return false ;
}
// Constantly check the number of parallel database processes
2018-06-30 20:07:01 +02:00
if ( $a -> isMaxProcessesReached ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( " Max processes reached for process " . $mypid , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return false ;
}
// Constantly check the number of available database connections to let the frontend be accessible at any time
if ( self :: maxConnectionsReached ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( " Max connection reached for process " . $mypid , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
return false ;
}
2018-02-11 17:18:39 +01:00
$argv = json_decode ( $queue [ " parameter " ], true );
2017-11-05 11:33:46 +01:00
// Check for existance and validity of the include file
$include = $argv [ 0 ];
2017-11-12 08:21:23 +01:00
if ( method_exists ( sprintf ( 'Friendica\Worker\%s' , $include ), 'execute' )) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if ( ! isset ( self :: $last_update )) {
self :: $last_update = strtotime ( $queue [ " executed " ]);
}
$age = ( time () - self :: $last_update ) / 60 ;
self :: $last_update = time ();
if ( $age > 1 ) {
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow ()], [ 'pid' => $mypid , 'done' => false ]);
2017-11-12 08:21:23 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-10 00:10:15 +01:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-12 08:21:23 +01:00
}
array_shift ( $argv );
self :: execFunction ( $queue , $include , $argv , true );
$stamp = ( float ) microtime ( true );
2018-10-23 05:54:18 +02:00
$condition = [ " `id` = ? AND `next_try` < ? " , $queue [ 'id' ], DateTimeFormat :: utcNow ()];
if ( DBA :: update ( 'workerqueue' , [ 'done' => true ], $condition )) {
2018-06-02 00:09:27 +02:00
Config :: set ( 'system' , 'last_worker_execution' , DateTimeFormat :: utcNow ());
2017-11-12 08:21:23 +01:00
}
self :: $db_duration = ( microtime ( true ) - $stamp );
2019-02-10 00:10:15 +01:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-12 08:21:23 +01:00
return true ;
}
2017-11-05 11:33:46 +01:00
// The script could be provided as full path or only with the function name
if ( $include == basename ( $include )) {
2017-11-14 23:13:33 +01:00
$include = " include/ " . $include . " .php " ;
2017-11-05 11:33:46 +01:00
}
if ( ! validate_include ( $include )) {
2018-10-29 22:20:46 +01:00
Logger :: log ( " Include file " . $argv [ 0 ] . " is not valid! " );
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: delete ( 'workerqueue' , [ 'id' => $queue [ " id " ]]);
2019-02-10 00:10:15 +01:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
return true ;
}
2017-11-19 23:04:40 +01:00
require_once $include ;
2017-11-05 11:33:46 +01:00
$funcname = str_replace ( " .php " , " " , basename ( $argv [ 0 ])) . " _run " ;
if ( function_exists ( $funcname )) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if ( ! isset ( self :: $last_update )) {
self :: $last_update = strtotime ( $queue [ " executed " ]);
}
$age = ( time () - self :: $last_update ) / 60 ;
self :: $last_update = time ();
if ( $age > 1 ) {
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow ()], [ 'pid' => $mypid , 'done' => false ]);
2017-11-05 11:33:46 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-10 00:10:15 +01:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
}
2017-11-12 08:21:23 +01:00
self :: execFunction ( $queue , $funcname , $argv , false );
2017-11-05 11:33:46 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
if ( DBA :: update ( 'workerqueue' , [ 'done' => true ], [ 'id' => $queue [ " id " ]])) {
2018-06-02 00:09:27 +02:00
Config :: set ( 'system' , 'last_worker_execution' , DateTimeFormat :: utcNow ());
2017-11-05 11:33:46 +01:00
}
self :: $db_duration = ( microtime ( true ) - $stamp );
2019-02-10 00:10:15 +01:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
} else {
2018-10-29 22:20:46 +01:00
Logger :: log ( " Function " . $funcname . " does not exist " );
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: delete ( 'workerqueue' , [ 'id' => $queue [ " id " ]]);
2019-02-10 00:10:15 +01:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
}
return true ;
}
/**
* @ brief Execute a function from the queue
*
2017-11-19 23:04:40 +01:00
* @ param array $queue Workerqueue entry
* @ param string $funcname name of the function
* @ param array $argv Array of values to be passed to the function
* @ param boolean $method_call boolean
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function execFunction ( $queue , $funcname , $argv , $method_call )
{
2018-12-28 01:22:35 +01:00
$a = \get_app ();
2017-11-05 11:33:46 +01:00
$argc = count ( $argv );
2019-02-22 20:10:27 +01:00
$logger = $a -> getLogger ();
$workerLogger = new WorkerLogger ( $logger , $funcname );
2017-11-05 11:33:46 +01:00
2019-02-22 20:41:13 +01:00
$workerLogger -> info ( " Process start. " , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ]]);
2017-11-05 11:33:46 +01:00
$stamp = ( float ) microtime ( true );
// We use the callstack here to analyze the performance of executed worker entries.
// For this reason the variables have to be initialized.
2019-02-16 23:11:30 +01:00
$a -> getProfiler () -> reset ();
2017-11-05 11:33:46 +01:00
$a -> queue = $queue ;
2019-02-10 00:10:15 +01:00
$up_duration = microtime ( true ) - self :: $up_start ;
2017-11-05 11:33:46 +01:00
// Reset global data to avoid interferences
unset ( $_SESSION );
2019-02-22 20:41:13 +01:00
// Set the workerLogger as new default logger
2019-02-22 20:10:27 +01:00
Logger :: init ( $workerLogger );
2017-11-12 08:21:23 +01:00
if ( $method_call ) {
call_user_func_array ( sprintf ( 'Friendica\Worker\%s::execute' , $funcname ), $argv );
} else {
$funcname ( $argv , $argc );
}
2019-02-22 20:10:27 +01:00
Logger :: init ( $logger );
2017-11-05 11:33:46 +01:00
unset ( $a -> queue );
2018-07-15 20:36:20 +02:00
$duration = ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
/* With these values we can analyze how effective the worker is .
* The database and rest time should be low since this is the unproductive time .
* The execution time is the productive time .
* By changing parameters like the maximum number of workers we can check the effectivness .
*/
2019-02-27 07:36:19 +01:00
$dbtotal = round ( self :: $db_duration , 2 );
$dbread = round ( self :: $db_duration - ( self :: $db_duration_count + self :: $db_duration_write + self :: $db_duration_stat ), 2 );
$dbcount = round ( self :: $db_duration_count , 2 );
$dbstat = round ( self :: $db_duration_stat , 2 );
$dbwrite = round ( self :: $db_duration_write , 2 );
$dblock = round ( self :: $lock_duration , 2 );
$rest = round ( max ( 0 , $up_duration - ( self :: $db_duration + self :: $lock_duration )), 2 );
$exec = round ( $duration , 2 );
2019-02-21 20:32:31 +01:00
2019-02-27 09:41:45 +01:00
$logger -> info ( 'Performance:' , [ 'state' => self :: $state , 'count' => $dbcount , 'stat' => $dbstat , 'write' => $dbwrite , 'lock' => $dblock , 'total' => $dbtotal , 'rest' => $rest , 'exec' => $exec ]);
2017-11-19 23:04:40 +01:00
2019-02-10 00:10:15 +01:00
self :: $up_start = microtime ( true );
self :: $db_duration = 0 ;
self :: $db_duration_count = 0 ;
self :: $db_duration_stat = 0 ;
self :: $db_duration_write = 0 ;
2017-11-05 11:33:46 +01:00
self :: $lock_duration = 0 ;
if ( $duration > 3600 ) {
2019-02-27 10:11:37 +01:00
$logger -> info ( 'Longer than 1 hour.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 11:33:46 +01:00
} elseif ( $duration > 600 ) {
2019-02-27 10:11:37 +01:00
$logger -> info ( 'Longer than 10 minutes.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 11:33:46 +01:00
} elseif ( $duration > 300 ) {
2019-02-27 10:11:37 +01:00
$logger -> info ( 'Longer than 5 minutes.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 11:33:46 +01:00
} elseif ( $duration > 120 ) {
2019-02-27 10:11:37 +01:00
$logger -> info ( 'Longer than 2 minutes.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 11:33:46 +01:00
}
2019-02-27 10:12:40 +01:00
$workerLogger -> info ( 'Process done.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration , 3 )]);
2017-11-05 11:33:46 +01:00
2019-02-18 11:18:05 +01:00
$a -> getProfiler () -> saveLog ( $a -> getLogger (), " ID " . $queue [ " id " ] . " : " . $funcname );
2017-11-05 11:33:46 +01:00
$cooldown = Config :: get ( " system " , " worker_cooldown " , 0 );
if ( $cooldown > 0 ) {
2019-02-23 10:24:22 +01:00
$logger -> info ( 'Cooldown.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'cooldown' => $cooldown ]);
2017-11-05 11:33:46 +01:00
sleep ( $cooldown );
}
}
/**
* @ brief Checks if the number of database connections has reached a critical limit .
*
* @ return bool Are more than 3 / 4 of the maximum connections used ?
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function maxConnectionsReached ()
{
2017-11-05 11:33:46 +01:00
// Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
$max = Config :: get ( " system " , " max_connections " );
2017-11-19 22:47:21 +01:00
// Fetch the percentage level where the worker will get active
2017-11-05 11:33:46 +01:00
$maxlevel = Config :: get ( " system " , " max_connections_level " , 75 );
if ( $max == 0 ) {
// the maximum number of possible user connections can be a system variable
2018-07-21 04:01:53 +02:00
$r = DBA :: fetchFirst ( " SHOW VARIABLES WHERE `variable_name` = 'max_user_connections' " );
2018-07-21 14:46:04 +02:00
if ( DBA :: isResult ( $r )) {
2017-11-05 18:13:37 +01:00
$max = $r [ " Value " ];
2017-11-05 11:33:46 +01:00
}
// Or it can be granted. This overrides the system variable
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
$r = DBA :: p ( 'SHOW GRANTS' );
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-20 14:19:26 +02:00
while ( $grants = DBA :: fetch ( $r )) {
2017-11-05 18:13:37 +01:00
$grant = array_pop ( $grants );
if ( stristr ( $grant , " GRANT USAGE ON " )) {
if ( preg_match ( " /WITH MAX_USER_CONNECTIONS ( \ d*)/ " , $grant , $match )) {
$max = $match [ 1 ];
2017-11-05 11:33:46 +01:00
}
}
}
2018-07-20 14:19:26 +02:00
DBA :: close ( $r );
2017-11-05 11:33:46 +01:00
}
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ( $max != 0 ) {
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
$r = DBA :: p ( 'SHOW PROCESSLIST' );
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-21 04:05:12 +02:00
$used = DBA :: numRows ( $r );
2018-07-20 14:19:26 +02:00
DBA :: close ( $r );
2017-11-05 11:33:46 +01:00
2018-10-30 14:58:45 +01:00
Logger :: log ( " Connection usage (user values): " . $used . " / " . $max , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
$level = ( $used / $max ) * 100 ;
if ( $level >= $maxlevel ) {
2018-10-29 22:20:46 +01:00
Logger :: log ( " Maximum level ( " . $maxlevel . " %) of user connections reached: " . $used . " / " . $max );
2017-11-05 11:33:46 +01:00
return true ;
}
}
// We will now check for the system values.
// This limit could be reached although the user limits are fine.
2018-07-21 04:01:53 +02:00
$r = DBA :: fetchFirst ( " SHOW VARIABLES WHERE `variable_name` = 'max_connections' " );
2018-07-21 14:46:04 +02:00
if ( ! DBA :: isResult ( $r )) {
2017-11-05 11:33:46 +01:00
return false ;
}
2017-11-05 18:13:37 +01:00
$max = intval ( $r [ " Value " ]);
2017-11-05 11:33:46 +01:00
if ( $max == 0 ) {
return false ;
}
2018-07-21 04:01:53 +02:00
$r = DBA :: fetchFirst ( " SHOW STATUS WHERE `variable_name` = 'Threads_connected' " );
2018-07-21 14:46:04 +02:00
if ( ! DBA :: isResult ( $r )) {
2017-11-05 11:33:46 +01:00
return false ;
}
2017-11-05 18:13:37 +01:00
$used = intval ( $r [ " Value " ]);
2017-11-05 11:33:46 +01:00
if ( $used == 0 ) {
return false ;
}
2018-10-30 14:58:45 +01:00
Logger :: log ( " Connection usage (system values): " . $used . " / " . $max , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
$level = $used / $max * 100 ;
if ( $level < $maxlevel ) {
return false ;
}
2018-10-29 22:20:46 +01:00
Logger :: log ( " Maximum level ( " . $level . " %) of system connections reached: " . $used . " / " . $max );
2017-11-05 11:33:46 +01:00
return true ;
}
/**
* @ brief fix the queue entry if the worker process died
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function killStaleWorkers ()
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
$entries = DBA :: select (
2017-11-19 23:04:40 +01:00
'workerqueue' ,
2018-01-15 14:05:12 +01:00
[ 'id' , 'pid' , 'executed' , 'priority' , 'parameter' ],
2019-02-08 22:48:41 +01:00
[ 'NOT `done` AND `pid` != 0' ],
2018-01-15 14:05:12 +01:00
[ 'order' => [ 'priority' , 'created' ]]
2017-11-19 23:04:40 +01:00
);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2017-11-19 23:04:40 +01:00
2018-07-20 14:19:26 +02:00
while ( $entry = DBA :: fetch ( $entries )) {
2017-11-05 11:33:46 +01:00
if ( ! posix_kill ( $entry [ " pid " ], 0 )) {
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: update (
2017-11-20 17:29:55 +01:00
'workerqueue' ,
2018-10-21 07:53:47 +02:00
[ 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 ],
2018-01-15 14:05:12 +01:00
[ 'id' => $entry [ " id " ]]
2017-11-20 17:29:55 +01:00
);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
} else {
// Kill long running processes
// Check if the priority is in a valid range
2018-01-15 14:05:12 +01:00
if ( ! in_array ( $entry [ " priority " ], [ PRIORITY_CRITICAL , PRIORITY_HIGH , PRIORITY_MEDIUM , PRIORITY_LOW , PRIORITY_NEGLIGIBLE ])) {
2017-11-05 11:33:46 +01:00
$entry [ " priority " ] = PRIORITY_MEDIUM ;
}
// Define the maximum durations
2018-01-15 14:05:12 +01:00
$max_duration_defaults = [ PRIORITY_CRITICAL => 720 , PRIORITY_HIGH => 10 , PRIORITY_MEDIUM => 60 , PRIORITY_LOW => 180 , PRIORITY_NEGLIGIBLE => 720 ];
2017-11-05 11:33:46 +01:00
$max_duration = $max_duration_defaults [ $entry [ " priority " ]];
2018-02-11 17:18:39 +01:00
$argv = json_decode ( $entry [ " parameter " ], true );
2017-11-05 11:33:46 +01:00
$argv [ 0 ] = basename ( $argv [ 0 ]);
// How long is the process already running?
2019-02-12 07:42:45 +01:00
$duration = ( time () - strtotime ( $entry [ " executed " ])) / 60 ;
2017-11-05 11:33:46 +01:00
if ( $duration > $max_duration ) {
2018-10-29 22:20:46 +01:00
Logger :: log ( " Worker process " . $entry [ " pid " ] . " ( " . substr ( json_encode ( $argv ), 0 , 50 ) . " ) took more than " . $max_duration . " minutes. It will be killed now. " );
2017-11-05 11:33:46 +01:00
posix_kill ( $entry [ " pid " ], SIGTERM );
// We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
// Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
2018-02-14 06:05:00 +01:00
$new_priority = $entry [ " priority " ];
2017-11-05 11:33:46 +01:00
if ( $entry [ " priority " ] == PRIORITY_HIGH ) {
$new_priority = PRIORITY_MEDIUM ;
} elseif ( $entry [ " priority " ] == PRIORITY_MEDIUM ) {
$new_priority = PRIORITY_LOW ;
} elseif ( $entry [ " priority " ] != PRIORITY_CRITICAL ) {
$new_priority = PRIORITY_NEGLIGIBLE ;
}
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: update (
2017-11-19 23:04:40 +01:00
'workerqueue' ,
2018-10-21 07:53:47 +02:00
[ 'executed' => DBA :: NULL_DATETIME , 'created' => DateTimeFormat :: utcNow (), 'priority' => $new_priority , 'pid' => 0 ],
2018-01-15 14:05:12 +01:00
[ 'id' => $entry [ " id " ]]
2017-11-19 23:04:40 +01:00
);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
} else {
2018-10-30 14:58:45 +01:00
Logger :: log ( " Worker process " . $entry [ " pid " ] . " ( " . substr ( json_encode ( $argv ), 0 , 50 ) . " ) now runs for " . round ( $duration ) . " of " . $max_duration . " allowed minutes. That's okay. " , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
}
}
}
}
/**
* @ brief Checks if the number of active workers exceeds the given limits
*
* @ return bool Are there too much workers running ?
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2019-02-17 19:55:17 +01:00
private static function tooMuchWorkers ()
2017-11-19 23:04:40 +01:00
{
2019-03-02 19:41:12 +01:00
$queues = Config :: get ( " system " , " worker_queues " , 10 );
2017-11-05 11:33:46 +01:00
$maxqueues = $queues ;
$active = self :: activeWorkers ();
// Decrease the number of workers at higher load
2018-10-13 18:57:31 +02:00
$load = System :: currentLoad ();
2017-11-05 11:33:46 +01:00
if ( $load ) {
2019-03-02 19:41:12 +01:00
$maxsysload = intval ( Config :: get ( " system " , " maxloadavg " , 20 ));
2018-06-20 06:38:50 +02:00
2018-06-20 12:43:57 +02:00
/* Default exponent 3 causes queues to rapidly decrease as load increases .
* If you have 20 max queues at idle , then you get only 5 queues at 37.1 % of $maxsysload .
* For some environments , this rapid decrease is not needed .
* With exponent 1 , you could have 20 max queues at idle and 13 at 37 % of $maxsysload .
*/
2018-06-20 12:22:53 +02:00
$exponent = intval ( Config :: get ( 'system' , 'worker_load_exponent' , 3 ));
2018-06-20 12:06:20 +02:00
$slope = pow ( max ( 0 , $maxsysload - $load ) / $maxsysload , $exponent );
$queues = intval ( ceil ( $slope * $maxqueues ));
2018-01-01 21:51:02 +01:00
$processlist = '' ;
2017-11-05 11:33:46 +01:00
2019-02-06 08:37:45 +01:00
if ( Config :: get ( 'system' , 'worker_jpm' )) {
2019-02-10 00:10:15 +01:00
$intervals = explode ( ',' , Config :: get ( 'system' , 'worker_jpm_range' ));
2019-02-06 08:37:45 +01:00
$jobs_per_minute = [];
foreach ( $intervals as $interval ) {
2019-02-11 05:39:24 +01:00
if ( $interval == 0 ) {
continue ;
} else {
$interval = ( int ) $interval ;
}
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2019-02-08 22:48:41 +01:00
$jobs = DBA :: p ( " SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE " , $interval );
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-17 20:20:24 +01:00
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2019-02-06 08:37:45 +01:00
if ( $job = DBA :: fetch ( $jobs )) {
$jobs_per_minute [ $interval ] = number_format ( $job [ 'jobs' ] / $interval , 0 );
}
DBA :: close ( $jobs );
}
$processlist = ' - jpm: ' . implode ( '/' , $jobs_per_minute );
}
2019-02-10 00:10:15 +01:00
// Create a list of queue entries grouped by their priority
$listitem = [ 0 => '' ];
2017-11-19 23:04:40 +01:00
2019-02-10 00:10:15 +01:00
$idle_workers = $active ;
2017-11-05 11:33:46 +01:00
2019-02-17 19:55:17 +01:00
$deferred = self :: deferredEntries ();
2019-02-11 09:59:14 +01:00
2019-02-10 00:10:15 +01:00
if ( Config :: get ( 'system' , 'worker_debug' )) {
2019-02-11 09:59:14 +01:00
$waiting_processes = 0 ;
2017-11-05 11:33:46 +01:00
// Now adding all processes with workerqueue entries
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
$jobs = DBA :: p ( " SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` AND `next_try` < ? GROUP BY `priority` " , DateTimeFormat :: utcNow ());
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
while ( $entry = DBA :: fetch ( $jobs )) {
$stamp = ( float ) microtime ( true );
2019-02-08 22:48:41 +01:00
$processes = DBA :: p ( " SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `priority` = ? " , $entry [ " priority " ]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2018-07-20 14:19:26 +02:00
if ( $process = DBA :: fetch ( $processes )) {
2019-02-08 22:48:41 +01:00
$idle_workers -= $process [ " running " ];
2019-02-11 09:59:14 +01:00
$waiting_processes += $entry [ " entries " ];
2017-11-05 11:33:46 +01:00
$listitem [ $entry [ " priority " ]] = $entry [ " priority " ] . " : " . $process [ " running " ] . " / " . $entry [ " entries " ];
}
2018-07-20 14:19:26 +02:00
DBA :: close ( $processes );
2017-11-05 11:33:46 +01:00
}
2019-02-10 00:10:15 +01:00
DBA :: close ( $jobs );
2019-02-17 19:55:17 +01:00
$entries = $deferred + $waiting_processes ;
2019-02-10 00:10:15 +01:00
} else {
2019-02-17 19:55:17 +01:00
$entries = self :: totalEntries ();
$waiting_processes = max ( 0 , $entries - $deferred );
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
$jobs = DBA :: p ( " SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority` " );
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-17 19:55:17 +01:00
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2019-02-08 22:48:41 +01:00
2019-02-10 00:10:15 +01:00
while ( $entry = DBA :: fetch ( $jobs )) {
$idle_workers -= $entry [ " running " ];
$listitem [ $entry [ " priority " ]] = $entry [ " priority " ] . " : " . $entry [ " running " ];
}
DBA :: close ( $jobs );
2017-11-05 11:33:46 +01:00
}
2019-02-10 00:10:15 +01:00
$listitem [ 0 ] = " 0: " . max ( 0 , $idle_workers );
$processlist .= ' (' . implode ( ', ' , $listitem ) . ')' ;
2017-11-05 11:33:46 +01:00
2019-03-08 06:53:36 +01:00
if ( Config :: get ( " system " , " worker_fastlane " , false ) && ( $queues > 0 ) && ( $active >= $queues ) && self :: entriesExists ()) {
2017-11-05 11:33:46 +01:00
$top_priority = self :: highestPriority ();
$high_running = self :: processWithPriorityActive ( $top_priority );
if ( ! $high_running && ( $top_priority > PRIORITY_UNDEFINED ) && ( $top_priority < PRIORITY_NEGLIGIBLE )) {
2018-10-30 14:58:45 +01:00
Logger :: log ( " There are jobs with priority " . $top_priority . " waiting but none is executed. Open a fastlane. " , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
$queues = $active + 1 ;
}
}
2019-02-11 09:59:14 +01:00
Logger :: log ( " Load: " . $load . " / " . $maxsysload . " - processes: " . $deferred . " / " . $active . " / " . $waiting_processes . $processlist . " - maximum: " . $queues . " / " . $maxqueues , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
// Are there fewer workers running as possible? Then fork a new one.
2019-03-08 06:53:36 +01:00
if ( ! Config :: get ( " system " , " worker_dont_fork " , false ) && ( $queues > ( $active + 1 )) && self :: entriesExists ()) {
2018-10-30 14:58:45 +01:00
Logger :: log ( " Active workers: " . $active . " / " . $queues . " Fork a new worker. " , Logger :: DEBUG );
2018-06-15 20:18:20 +02:00
if ( Config :: get ( 'system' , 'worker_daemon_mode' , false )) {
self :: IPCSetJobState ( true );
} else {
self :: spawnWorker ();
}
2017-11-05 11:33:46 +01:00
}
}
2018-06-20 00:53:02 +02:00
// if there are too much worker, we don't spawn a new one.
if ( Config :: get ( 'system' , 'worker_daemon_mode' , false ) && ( $active > $queues )) {
2018-06-15 20:18:20 +02:00
self :: IPCSetJobState ( false );
}
2018-06-20 00:53:02 +02:00
return $active > $queues ;
2017-11-05 11:33:46 +01:00
}
/**
2017-11-19 22:47:21 +01:00
* @ brief Returns the number of active worker processes
2017-11-05 11:33:46 +01:00
*
2017-11-19 22:47:21 +01:00
* @ return integer Number of active worker processes
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function activeWorkers ()
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
$count = DBA :: count ( 'process' , [ 'command' => 'Worker.php' ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
return $count ;
2017-11-05 11:33:46 +01:00
}
2019-02-17 20:20:24 +01:00
/**
* @ brief Returns waiting jobs for the current process id
*
* @ return array waiting workerqueue jobs
* @ throws \Exception
*/
2019-02-17 19:55:17 +01:00
private static function getWaitingJobForPID ()
{
$stamp = ( float ) microtime ( true );
$r = DBA :: select ( 'workerqueue' , [], [ 'pid' => getmypid (), 'done' => false ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
if ( DBA :: isResult ( $r )) {
return DBA :: toArray ( $r );
}
DBA :: close ( $r );
return false ;
}
2019-02-17 20:20:24 +01:00
/**
* @ brief Returns the next jobs that should be executed
*
* @ return array array with next jobs
* @ throws \Exception
*/
2019-02-17 19:55:17 +01:00
private static function nextProcess ()
2019-02-16 16:03:37 +01:00
{
$priority = self :: nextPriority ();
if ( empty ( $priority )) {
2019-02-21 20:32:31 +01:00
Logger :: info ( 'No tasks found' );
2019-02-16 16:03:37 +01:00
return [];
}
2019-03-23 07:08:18 +01:00
$limit = Config :: get ( 'system' , 'worker_fetch_limit' , 1 );
2019-02-16 16:03:37 +01:00
$ids = [];
$stamp = ( float ) microtime ( true );
$condition = [ " `priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ? " , $priority , DateTimeFormat :: utcNow ()];
2019-03-23 07:08:18 +01:00
$tasks = DBA :: select ( 'workerqueue' , [ 'id' , 'parameter' ], $condition , [ 'limit' => $limit , 'order' => [ 'created' ]]);
2019-02-16 16:03:37 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
while ( $task = DBA :: fetch ( $tasks )) {
$ids [] = $task [ 'id' ];
2019-03-23 07:08:18 +01:00
// Only continue that loop while we are storing commands that can be processed quickly
$command = json_decode ( $task [ 'parameter' ])[ 0 ];
if ( ! in_array ( $command , self :: FAST_COMMANDS )) {
break ;
}
2019-02-16 16:03:37 +01:00
}
DBA :: close ( $tasks );
2019-03-23 07:08:18 +01:00
Logger :: info ( 'Found:' , [ 'priority' => $priority , 'id' => $ids ]);
2019-02-16 16:03:37 +01:00
return $ids ;
}
2019-02-17 20:20:24 +01:00
/**
* @ brief Returns the priority of the next workerqueue job
*
* @ return string priority
* @ throws \Exception
*/
2019-02-17 19:55:17 +01:00
private static function nextPriority ()
2019-02-16 16:03:37 +01:00
{
$waiting = [];
$priorities = [ PRIORITY_CRITICAL , PRIORITY_HIGH , PRIORITY_MEDIUM , PRIORITY_LOW , PRIORITY_NEGLIGIBLE ];
foreach ( $priorities as $priority ) {
$stamp = ( float ) microtime ( true );
if ( DBA :: exists ( 'workerqueue' , [ " `priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ? " , $priority , DateTimeFormat :: utcNow ()])) {
$waiting [ $priority ] = true ;
}
self :: $db_duration += ( microtime ( true ) - $stamp );
}
if ( ! empty ( $waiting [ PRIORITY_CRITICAL ])) {
return PRIORITY_CRITICAL ;
}
$running = [];
2019-02-17 04:22:29 +01:00
$running_total = 0 ;
2019-02-16 16:03:37 +01:00
$stamp = ( float ) microtime ( true );
$processes = DBA :: p ( " SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
INNER JOIN `workerqueue` ON `workerqueue` . `pid` = `process` . `pid`
WHERE NOT `done` GROUP BY `priority` " );
self :: $db_duration += ( microtime ( true ) - $stamp );
while ( $process = DBA :: fetch ( $processes )) {
$running [ $process [ 'priority' ]] = $process [ 'running' ];
2019-02-17 04:22:29 +01:00
$running_total += $process [ 'running' ];
2019-02-16 16:03:37 +01:00
}
DBA :: close ( $processes );
foreach ( $priorities as $priority ) {
if ( ! empty ( $waiting [ $priority ]) && empty ( $running [ $priority ])) {
2019-02-21 20:32:31 +01:00
Logger :: info ( 'No running worker found with priority {priority} - assigning it.' , [ 'priority' => $priority ]);
2019-02-16 16:03:37 +01:00
return $priority ;
}
}
2019-02-17 04:22:29 +01:00
$active = max ( self :: activeWorkers (), $running_total );
$priorities = max ( count ( $waiting ), count ( $running ));
$exponent = 2 ;
$total = 0 ;
for ( $i = 1 ; $i <= $priorities ; ++ $i ) {
$total += pow ( $i , $exponent );
2019-02-16 16:03:37 +01:00
}
2019-02-17 04:22:29 +01:00
$limit = [];
for ( $i = 1 ; $i <= $priorities ; ++ $i ) {
$limit [ $priorities - $i ] = max ( 1 , round ( $active * ( pow ( $i , $exponent ) / $total )));
}
$i = 0 ;
foreach ( $running as $priority => $workers ) {
if ( $workers < $limit [ $i ++ ]) {
2019-02-21 20:32:31 +01:00
Logger :: info ( 'Priority {priority} has got {workers} workers out of a limit of {limit}' , [ 'priority' => $priority , 'workers' => $workers , 'limit' => $limit [ $i - 1 ]]);
2019-02-17 04:22:29 +01:00
return $priority ;
}
2019-02-16 16:03:37 +01:00
}
if ( ! empty ( $waiting )) {
2019-02-25 13:11:35 +01:00
$priority = array_keys ( $waiting )[ 0 ];
2019-02-21 20:32:31 +01:00
Logger :: info ( 'No underassigned priority found, now taking the highest priority.' , [ 'priority' => $priority ]);
2019-02-17 04:22:29 +01:00
return $priority ;
2019-02-16 16:03:37 +01:00
}
return false ;
}
2017-11-05 11:33:46 +01:00
/**
* @ brief Find and claim the next worker process for us
*
* @ return boolean Have we found something ?
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2019-02-17 19:55:17 +01:00
private static function findWorkerProcesses ()
2017-11-19 23:04:40 +01:00
{
2017-11-05 11:33:46 +01:00
$mypid = getmypid ();
2019-02-16 16:03:37 +01:00
$ids = self :: nextProcess ();
2019-02-17 19:55:17 +01:00
// If there is no result we check without priority limit
if ( empty ( $ids )) {
2019-03-23 07:08:18 +01:00
$limit = Config :: get ( 'system' , 'worker_fetch_limit' , 1 );
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2019-02-17 20:20:24 +01:00
$condition = [ " `pid` = 0 AND NOT `done` AND `next_try` < ? " , DateTimeFormat :: utcNow ()];
2019-03-23 07:08:18 +01:00
$tasks = DBA :: select ( 'workerqueue' , [ 'id' , 'parameter' ], $condition , [ 'limit' => $limit , 'order' => [ 'priority' , 'created' ]]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
2019-03-23 07:08:18 +01:00
while ( $task = DBA :: fetch ( $tasks )) {
$ids [] = $task [ 'id' ];
// Only continue that loop while we are storing commands that can be processed quickly
$command = json_decode ( $task [ 'parameter' ])[ 0 ];
if ( ! in_array ( $command , self :: FAST_COMMANDS )) {
break ;
}
2017-11-05 11:33:46 +01:00
}
2019-03-23 07:08:18 +01:00
DBA :: close ( $tasks );
2017-11-05 11:33:46 +01:00
}
2019-02-17 19:55:17 +01:00
if ( ! empty ( $ids )) {
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2019-02-17 20:20:24 +01:00
$condition = [ 'id' => $ids , 'done' => false , 'pid' => 0 ];
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow (), 'pid' => $mypid ], $condition );
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
}
2019-02-17 19:55:17 +01:00
return ! empty ( $ids );
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Returns the next worker process
*
* @ return string SQL statement
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2019-02-17 19:55:17 +01:00
public static function workerProcess ()
2017-11-19 23:04:40 +01:00
{
2017-11-05 11:33:46 +01:00
// There can already be jobs for us in the queue.
2019-02-17 19:55:17 +01:00
$waiting = self :: getWaitingJobForPID ();
if ( ! empty ( $waiting )) {
return $waiting ;
2017-11-05 11:33:46 +01:00
}
2019-02-08 22:48:41 +01:00
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-05 20:57:31 +02:00
if ( ! Lock :: acquire ( 'worker_process' )) {
2017-11-05 11:33:46 +01:00
return false ;
}
2019-02-10 00:10:15 +01:00
self :: $lock_duration += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
2019-02-17 19:55:17 +01:00
$found = self :: findWorkerProcesses ();
2017-11-05 11:33:46 +01:00
2018-07-05 20:57:31 +02:00
Lock :: release ( 'worker_process' );
2017-11-05 11:33:46 +01:00
if ( $found ) {
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
$r = DBA :: select ( 'workerqueue' , [], [ 'pid' => getmypid (), 'done' => false ]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-21 04:03:40 +02:00
return DBA :: toArray ( $r );
2017-11-05 11:33:46 +01:00
}
2017-11-05 18:13:37 +01:00
return false ;
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Removes a workerqueue entry from the current process
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
public static function unclaimProcess ()
{
2017-11-05 11:33:46 +01:00
$mypid = getmypid ();
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-10-21 07:53:47 +02:00
DBA :: update ( 'workerqueue' , [ 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 ], [ 'pid' => $mypid , 'done' => false ]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Call the front end worker
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
public static function callWorker ()
{
2017-11-05 11:33:46 +01:00
if ( ! Config :: get ( " system " , " frontend_worker " )) {
return ;
}
$url = System :: baseUrl () . " /worker " ;
2019-06-10 14:34:54 +02:00
Network :: fetchUrl ( $url , false , 1 );
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Call the front end worker if there aren ' t any active
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
public static function executeIfIdle ()
{
2017-11-05 11:33:46 +01:00
if ( ! Config :: get ( " system " , " frontend_worker " )) {
return ;
}
2017-11-19 22:47:21 +01:00
// Do we have "proc_open"? Then we can fork the worker
2017-11-05 11:33:46 +01:00
if ( function_exists ( " proc_open " )) {
// When was the last time that we called the worker?
// Less than one minute? Then we quit
if (( time () - Config :: get ( " system " , " worker_started " )) < 60 ) {
return ;
}
2017-11-07 03:22:52 +01:00
Config :: set ( " system " , " worker_started " , time ());
2017-11-05 11:33:46 +01:00
// Do we have enough running workers? Then we quit here.
if ( self :: tooMuchWorkers ()) {
// Cleaning dead processes
self :: killStaleWorkers ();
2018-01-16 01:08:28 +01:00
Process :: deleteInactive ();
2017-11-05 11:33:46 +01:00
return ;
}
self :: runCron ();
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Call worker' , Logger :: DEBUG );
2017-11-05 16:28:55 +01:00
self :: spawnWorker ();
2017-11-05 11:33:46 +01:00
return ;
}
// We cannot execute background processes.
// We now run the processes from the frontend.
// This won't work with long running processes.
self :: runCron ();
self :: clearProcesses ();
2018-06-19 23:33:07 +02:00
$workers = self :: activeWorkers ();
2017-11-05 11:33:46 +01:00
2018-06-19 23:33:07 +02:00
if ( $workers == 0 ) {
2017-11-05 11:33:46 +01:00
self :: callWorker ();
}
}
/**
* @ brief Removes long running worker processes
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
public static function clearProcesses ()
{
2017-11-05 11:33:46 +01:00
$timeout = Config :: get ( " system " , " frontend_worker_timeout " , 10 );
/// @todo We should clean up the corresponding workerqueue entries as well
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-01-15 14:05:12 +01:00
$condition = [ " `created` < ? AND `command` = 'worker.php' " ,
2018-01-27 03:38:34 +01:00
DateTimeFormat :: utc ( " now - " . $timeout . " minutes " )];
2018-07-20 14:19:26 +02:00
DBA :: delete ( 'process' , $condition );
2019-02-10 00:10:15 +01:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 11:33:46 +01:00
}
/**
* @ brief Runs the cron processes
2017-11-19 23:04:40 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
*/
2017-11-19 23:04:40 +01:00
private static function runCron ()
{
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Add cron entries' , Logger :: DEBUG );
2017-11-05 11:33:46 +01:00
// Check for spooled items
2019-02-08 22:48:41 +01:00
self :: add ([ 'priority' => PRIORITY_HIGH , 'force_priority' => true ], 'SpoolPost' );
2017-11-05 11:33:46 +01:00
// Run the cron job that calls all other jobs
2019-02-08 22:48:41 +01:00
self :: add ([ 'priority' => PRIORITY_MEDIUM , 'force_priority' => true ], 'Cron' );
2017-11-05 11:33:46 +01:00
// Cleaning dead processes
self :: killStaleWorkers ();
}
2017-11-19 23:33:07 +01:00
/**
2018-05-14 07:02:18 +02:00
* @ brief Spawns a new worker
2019-01-06 22:06:53 +01:00
* @ param bool $do_cron
2017-11-19 23:33:07 +01:00
* @ return void
2019-01-06 22:06:53 +01:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-19 23:33:07 +01:00
*/
2018-06-06 05:48:04 +02:00
public static function spawnWorker ( $do_cron = false )
2017-11-19 23:33:07 +01:00
{
2018-07-23 13:40:52 +02:00
$command = 'bin/worker.php' ;
2018-06-06 05:48:04 +02:00
2018-07-24 08:15:58 +02:00
$args = [ 'no_cron' => ! $do_cron ];
2018-06-06 05:48:04 +02:00
2018-07-23 13:40:52 +02:00
get_app () -> proc_run ( $command , $args );
2018-06-15 20:18:20 +02:00
// after spawning we have to remove the flag.
if ( Config :: get ( 'system' , 'worker_daemon_mode' , false )) {
self :: IPCSetJobState ( false );
}
2017-11-05 16:28:55 +01:00
}
2017-11-05 11:33:46 +01:00
/**
* @ brief Adds tasks to the worker queue
*
2017-11-06 16:38:15 +01:00
* @ param ( integer | array ) priority or parameter array , strings are deprecated and are ignored
2017-11-05 11:33:46 +01:00
*
* next args are passed as $cmd command line
2017-11-19 19:59:55 +01:00
* or : Worker :: add ( PRIORITY_HIGH , " Notifier " , " drop " , $drop_id );
2017-11-19 20:47:04 +01:00
* or : Worker :: add ( array ( 'priority' => PRIORITY_HIGH , 'dont_fork' => true ), " CreateShadowEntry " , $post_id );
2017-11-05 11:33:46 +01:00
*
2019-01-06 22:06:53 +01:00
* @ return boolean " false " if proc_run couldn ' t be executed
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 11:33:46 +01:00
* @ note $cmd and string args are surrounded with " "
*
* @ hooks 'proc_run'
2019-01-06 22:06:53 +01:00
* array $arr
2017-11-05 11:33:46 +01:00
*
*/
2017-11-19 23:04:40 +01:00
public static function add ( $cmd )
{
2018-02-11 17:18:39 +01:00
$args = func_get_args ();
2017-11-05 11:33:46 +01:00
2018-02-11 17:18:39 +01:00
if ( ! count ( $args )) {
2017-11-05 11:33:46 +01:00
return false ;
}
2018-01-15 14:05:12 +01:00
$arr = [ 'args' => $args , 'run_cmd' => true ];
2017-11-05 11:33:46 +01:00
2018-12-26 07:06:24 +01:00
Hook :: callAll ( " proc_run " , $arr );
2017-11-05 11:33:46 +01:00
if ( ! $arr [ 'run_cmd' ] || ! count ( $args )) {
return true ;
}
$priority = PRIORITY_MEDIUM ;
2018-05-13 04:40:52 +02:00
$dont_fork = Config :: get ( " system " , " worker_dont_fork " , false );
2018-01-27 03:38:34 +01:00
$created = DateTimeFormat :: utcNow ();
2019-02-08 22:48:41 +01:00
$force_priority = false ;
2017-11-05 11:33:46 +01:00
2018-02-13 03:26:35 +01:00
$run_parameter = array_shift ( $args );
2017-11-05 11:33:46 +01:00
if ( is_int ( $run_parameter )) {
$priority = $run_parameter ;
} elseif ( is_array ( $run_parameter )) {
if ( isset ( $run_parameter [ 'priority' ])) {
$priority = $run_parameter [ 'priority' ];
}
if ( isset ( $run_parameter [ 'created' ])) {
$created = $run_parameter [ 'created' ];
}
if ( isset ( $run_parameter [ 'dont_fork' ])) {
$dont_fork = $run_parameter [ 'dont_fork' ];
}
2019-02-08 22:48:41 +01:00
if ( isset ( $run_parameter [ 'force_priority' ])) {
$force_priority = $run_parameter [ 'force_priority' ];
}
2017-11-05 11:33:46 +01:00
}
2018-02-11 17:18:39 +01:00
$parameters = json_encode ( $args );
2018-07-20 14:19:26 +02:00
$found = DBA :: exists ( 'workerqueue' , [ 'parameter' => $parameters , 'done' => false ]);
2017-11-05 11:33:46 +01:00
// Quit if there was a database error - a precaution for the update process to 3.5.3
2018-07-20 14:19:26 +02:00
if ( DBA :: errorNo () != 0 ) {
2017-11-05 11:33:46 +01:00
return false ;
}
if ( ! $found ) {
2018-07-20 14:19:26 +02:00
DBA :: insert ( 'workerqueue' , [ 'parameter' => $parameters , 'created' => $created , 'priority' => $priority ]);
2019-02-08 22:48:41 +01:00
} elseif ( $force_priority ) {
DBA :: update ( 'workerqueue' , [ 'priority' => $priority ], [ 'parameter' => $parameters , 'done' => false , 'pid' => 0 ]);
2017-11-05 11:33:46 +01:00
}
2018-06-02 07:17:32 +02:00
// Should we quit and wait for the worker to be called as a cronjob?
if ( $dont_fork ) {
2018-06-02 00:09:27 +02:00
return true ;
}
2017-11-05 11:33:46 +01:00
// If there is a lock then we don't have to check for too much worker
2018-07-05 20:57:31 +02:00
if ( ! Lock :: acquire ( 'worker' , 0 )) {
2017-11-05 11:33:46 +01:00
return true ;
}
// If there are already enough workers running, don't fork another one
$quit = self :: tooMuchWorkers ();
2018-07-05 20:57:31 +02:00
Lock :: release ( 'worker' );
2017-11-05 11:33:46 +01:00
if ( $quit ) {
return true ;
}
2018-06-15 20:18:20 +02:00
// We tell the daemon that a new job entry exists
if ( Config :: get ( 'system' , 'worker_daemon_mode' , false )) {
// We don't have to set the IPC flag - this is done in "tooMuchWorkers"
return true ;
}
2017-11-19 22:47:21 +01:00
// Now call the worker to execute the jobs that we just added to the queue
2017-11-05 16:28:55 +01:00
self :: spawnWorker ();
2017-11-05 11:33:46 +01:00
return true ;
}
2018-01-16 01:08:28 +01:00
2018-10-15 07:19:35 +02:00
/**
* Defers the current worker entry
*/
public static function defer ()
{
if ( empty ( BaseObject :: getApp () -> queue )) {
return ;
}
$queue = BaseObject :: getApp () -> queue ;
$retrial = $queue [ 'retrial' ];
$id = $queue [ 'id' ];
2019-02-08 22:48:41 +01:00
$priority = $queue [ 'priority' ];
2018-10-15 07:19:35 +02:00
if ( $retrial > 14 ) {
2018-10-30 14:58:45 +01:00
Logger :: log ( 'Id ' . $id . ' had been tried 14 times. We stop now.' , Logger :: DEBUG );
2018-10-23 05:54:18 +02:00
return ;
2018-10-15 07:19:35 +02:00
}
// Calculate the delay until the next trial
$delay = (( $retrial + 3 ) ** 4 ) + ( rand ( 1 , 30 ) * ( $retrial + 1 ));
$next = DateTimeFormat :: utc ( 'now + ' . $delay . ' seconds' );
2019-02-08 22:48:41 +01:00
if (( $priority < PRIORITY_MEDIUM ) && ( $retrial > 2 )) {
$priority = PRIORITY_MEDIUM ;
} elseif (( $priority < PRIORITY_LOW ) && ( $retrial > 5 )) {
$priority = PRIORITY_LOW ;
} elseif (( $priority < PRIORITY_NEGLIGIBLE ) && ( $retrial > 7 )) {
$priority = PRIORITY_NEGLIGIBLE ;
}
Logger :: log ( 'Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue [ 'priority' ] . '/' . $priority , Logger :: DEBUG );
2018-10-15 07:19:35 +02:00
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2019-02-08 22:48:41 +01:00
$fields = [ 'retrial' => $retrial + 1 , 'next_try' => $next , 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 , 'priority' => $priority ];
2018-10-15 07:19:35 +02:00
DBA :: update ( 'workerqueue' , $fields , [ 'id' => $id ]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2018-10-15 07:19:35 +02:00
}
2018-01-16 01:08:28 +01:00
/**
* Log active processes into the " process " table
*
* @ brief Log active processes into the " process " table
*/
public static function startProcess ()
{
$trace = debug_backtrace ( DEBUG_BACKTRACE_IGNORE_ARGS , 1 );
$command = basename ( $trace [ 0 ][ 'file' ]);
Process :: deleteInactive ();
Process :: insert ( $command );
}
/**
* Remove the active process from the " process " table
*
* @ brief Remove the active process from the " process " table
* @ return bool
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2018-01-16 01:08:28 +01:00
*/
public static function endProcess ()
{
return Process :: deleteByPid ();
}
2018-06-02 00:09:27 +02:00
2018-06-02 07:03:23 +02:00
/**
* Set the flag if some job is waiting
*
* @ brief Set the flag if some job is waiting
* @ param boolean $jobs Is there a waiting job ?
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2018-06-02 07:03:23 +02:00
*/
2018-06-02 00:09:27 +02:00
public static function IPCSetJobState ( $jobs )
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
DBA :: update ( 'worker-ipc' , [ 'jobs' => $jobs ], [ 'key' => 1 ], true );
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2018-06-02 00:09:27 +02:00
}
2018-06-02 07:03:23 +02:00
/**
* Checks if some worker job waits to be executed
*
* @ brief Checks if some worker job waits to be executed
* @ return bool
2019-01-06 22:06:53 +01:00
* @ throws \Exception
2018-06-02 07:03:23 +02:00
*/
2018-06-02 00:09:27 +02:00
public static function IPCJobsExists ()
{
2019-02-10 00:10:15 +01:00
$stamp = ( float ) microtime ( true );
2018-07-20 14:19:26 +02:00
$row = DBA :: selectFirst ( 'worker-ipc' , [ 'jobs' ], [ 'key' => 1 ]);
2019-02-10 00:10:15 +01:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-06-02 00:09:27 +02:00
// When we don't have a row, no job is running
2018-07-21 14:46:04 +02:00
if ( ! DBA :: isResult ( $row )) {
2018-06-02 00:09:27 +02:00
return false ;
}
return ( bool ) $row [ 'jobs' ];
}
2017-11-05 11:33:46 +01:00
}