The worker can now call methods in the worker namespace

This commit is contained in:
Michael 2017-11-12 07:21:23 +00:00
parent be13332807
commit 06ce34eda1

View file

@ -211,6 +211,34 @@ class Worker {
// Check for existance and validity of the include file // Check for existance and validity of the include file
$include = $argv[0]; $include = $argv[0];
if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if (!isset(self::$last_update)) {
self::$last_update = strtotime($queue["executed"]);
}
$age = (time() - self::$last_update) / 60;
self::$last_update = time();
if ($age > 1) {
$stamp = (float)microtime(true);
dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false));
self::$db_duration += (microtime(true) - $stamp);
}
array_shift($argv);
self::execFunction($queue, $include, $argv, true);
$stamp = (float)microtime(true);
if (dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]))) {
Config::set('system', 'last_poller_execution', datetime_convert());
}
self::$db_duration = (microtime(true) - $stamp);
return true;
}
// The script could be provided as full path or only with the function name // The script could be provided as full path or only with the function name
if ($include == basename($include)) { if ($include == basename($include)) {
$include = "include/".$include.".php"; $include = "include/".$include.".php";
@ -242,7 +270,7 @@ class Worker {
self::$db_duration += (microtime(true) - $stamp); self::$db_duration += (microtime(true) - $stamp);
} }
self::execFunction($queue, $funcname, $argv); self::execFunction($queue, $funcname, $argv, false);
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
if (dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]))) { if (dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]))) {
@ -264,7 +292,7 @@ class Worker {
* @param string $funcname name of the function * @param string $funcname name of the function
* @param array $argv Array of values to be passed to the function * @param array $argv Array of values to be passed to the function
*/ */
private static function execFunction($queue, $funcname, $argv) { private static function execFunction($queue, $funcname, $argv, $method_call) {
$a = get_app(); $a = get_app();
$mypid = getmypid(); $mypid = getmypid();
@ -303,7 +331,11 @@ class Worker {
// Reset global data to avoid interferences // Reset global data to avoid interferences
unset($_SESSION); unset($_SESSION);
$funcname($argv, $argc); if ($method_call) {
call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
} else {
$funcname($argv, $argc);
}
$a->process_id = $old_process_id; $a->process_id = $old_process_id;
unset($a->queue); unset($a->queue);
@ -873,7 +905,7 @@ class Worker {
self::add(PRIORITY_MEDIUM, "cron"); self::add(PRIORITY_MEDIUM, "cron");
// Run the cronhooks job separately from cron for being able to use a different timing // Run the cronhooks job separately from cron for being able to use a different timing
self::add(PRIORITY_MEDIUM, "cronhooks"); self::add(PRIORITY_MEDIUM, "CronHooks");
// Cleaning dead processes // Cleaning dead processes
self::killStaleWorkers(); self::killStaleWorkers();