diff --git a/boot.php b/boot.php index b4f36eb84e..7d7d66ab3e 100644 --- a/boot.php +++ b/boot.php @@ -42,7 +42,7 @@ define ( 'FRIENDICA_PLATFORM', 'Friendica'); define ( 'FRIENDICA_CODENAME', 'Asparagus'); define ( 'FRIENDICA_VERSION', '3.5.3-dev' ); define ( 'DFRN_PROTOCOL_VERSION', '2.23' ); -define ( 'DB_UPDATE_VERSION', 1230 ); +define ( 'DB_UPDATE_VERSION', 1231 ); /** * @brief Constant with a HTML line break. diff --git a/database.sql b/database.sql index 7f7e975e72..a3f937587c 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ --- Friendica 3.5.3dev (Asparagus) --- DB_UPDATE_VERSION 1228 +-- Friendica 3.5.3-dev (Asparagus) +-- DB_UPDATE_VERSION 1231 -- ------------------------------------------ @@ -1114,9 +1114,11 @@ CREATE TABLE IF NOT EXISTS `workerqueue` ( `created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', `pid` int(11) NOT NULL DEFAULT 0, `executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', + `done` tinyint(1) NOT NULL DEFAULT 0, PRIMARY KEY(`id`), INDEX `pid` (`pid`), - INDEX `parameter` (`parameter`(192)), - INDEX `priority_created` (`priority`,`created`) + INDEX `parameter` (`parameter`(64)), + INDEX `priority_created` (`priority`,`created`), + INDEX `executed` (`executed`) ) DEFAULT COLLATE utf8mb4_general_ci; diff --git a/include/cron.php b/include/cron.php index 9c2e6aeaa3..0e58778440 100644 --- a/include/cron.php +++ b/include/cron.php @@ -84,7 +84,7 @@ function cron_run(&$argv, &$argc){ proc_run(PRIORITY_LOW, "include/cronjobs.php", "update_photo_albums"); // Delete all done workerqueue entries - dba::delete('workerqueue', array('done' => true)); + dba::e('DELETE FROM `workerqueue` WHERE `done` AND `executed` < UTC_TIMESTAMP() - INTERVAL 12 HOUR'); } // Poll contacts diff --git a/include/dbstructure.php b/include/dbstructure.php index 268bbbb669..32fd304f4b 100644 --- a/include/dbstructure.php +++ b/include/dbstructure.php @@ -1747,6 +1747,7 @@ function db_definition() { "pid" => array("pid"), "parameter" => array("parameter(64)"), "priority_created" => array("priority", "created"), + "executed" => array("executed"), ) ); diff --git a/include/poller.php b/include/poller.php index 379aadcd67..d9394b080b 100644 --- a/include/poller.php +++ b/include/poller.php @@ -85,6 +85,8 @@ function poller_run($argv, $argc){ poller_run_cron(); } + $refetched = false; + $starttime = time(); // We fetch the next queue entry that is about to be executed @@ -121,6 +123,13 @@ function poller_run($argv, $argc){ logger('Process lifetime reached, quitting.', LOGGER_DEBUG); return; } + + // If possible we will fetch new jobs for this worker + if (!$refetched && Lock::set('poller_worker_process', 0)) { + $refetched = find_worker_processes(); + Lock::remove('poller_worker_process'); + } + } logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } @@ -235,7 +244,7 @@ function poller_execute($queue) { * @param array $argv Array of values to be passed to the function */ function poller_exec_function($queue, $funcname, $argv) { - global $poller_up_start, $poller_db_duration; + global $poller_up_start, $poller_db_duration, $poller_lock_duration; $a = get_app(); @@ -284,7 +293,11 @@ function poller_exec_function($queue, $funcname, $argv) { * The execution time is the productive time. * By changing parameters like the maximum number of workers we can check the effectivness. */ - logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG); + logger('DB: '.number_format($poller_db_duration, 2). + ' - Lock: '.number_format($poller_lock_duration, 2). + ' - Rest: '.number_format($up_duration - $poller_db_duration - $poller_lock_duration, 2). + ' - Execution: '.number_format($duration, 2), LOGGER_DEBUG); + $poller_lock_duration = 0; if ($duration > 3600) { logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG); @@ -448,7 +461,7 @@ function poller_kill_stale_workers() { foreach ($r AS $pid) { if (!posix_kill($pid["pid"], 0)) { dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), - array('pid' => $pid["pid"])); + array('pid' => $pid["pid"], 'done' => false)); } else { // Kill long running processes @@ -475,7 +488,7 @@ function poller_kill_stale_workers() { // Additionally we are lowering the priority. dba::update('workerqueue', array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0), - array('pid' => $pid["pid"])); + array('pid' => $pid["pid"], 'done' => false)); } else { logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); } @@ -512,7 +525,9 @@ function poller_too_much_workers() { $listitem = array(); // Adding all processes with no workerqueue entry - $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)"); + $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS + (SELECT id FROM `workerqueue` + WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done` AND `pid` != ?)", getmypid()); if ($process = dba::fetch($processes)) { $listitem[0] = "0:".$process["running"]; } @@ -528,7 +543,16 @@ function poller_too_much_workers() { dba::close($processes); } dba::close($entries); - $processlist = ' ('.implode(', ', $listitem).')'; + + $jobs_per_minute = 0; + + $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE"); + if ($job = dba::fetch($jobs)) { + $jobs_per_minute = number_format($job['jobs'] / 10, 0); + } + dba::close($jobs); + + $processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')'; } $entries = poller_total_entries(); @@ -628,7 +652,10 @@ function find_worker_processes() { // Check if we should pass some low priority process $highest_priority = 0; $found = false; - $limit = Config::get('system', 'worker_fetch_limit', 5); + + // The higher the number of parallel workers, the more we prefetch to prevent concurring access + $limit = Config::get("system", "worker_queues", 4) * 2; + $limit = Config::get('system', 'worker_fetch_limit', $limit); if (poller_passing_slow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? @@ -644,7 +671,7 @@ function find_worker_processes() { // Give slower processes some processing time $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` > ? AND NOT `done` - ORDER BY `priority`, `created` LIMIT 1", + ORDER BY `priority`, `created` LIMIT ".intval($limit), datetime_convert(), getmypid(), NULL_DATE, $highest_priority); if ($result) { $found = (dba::affected_rows() > 0); @@ -669,14 +696,29 @@ function find_worker_processes() { * @return string SQL statement */ function poller_worker_process() { - global $poller_db_duration; + global $poller_db_duration, $poller_lock_duration; $stamp = (float)microtime(true); - $found = find_worker_processes(); + // There can already be jobs for us in the queue. + $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid())); + if (dbm::is_result($r)) { + $poller_db_duration += (microtime(true) - $stamp); + return $r; + } + $stamp = (float)microtime(true); + if (!Lock::set('poller_worker_process')) { + return false; + } + $poller_lock_duration = (microtime(true) - $stamp); + + $stamp = (float)microtime(true); + $found = find_worker_processes(); $poller_db_duration += (microtime(true) - $stamp); + Lock::remove('poller_worker_process'); + if ($found) { $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid())); } @@ -689,7 +731,7 @@ function poller_worker_process() { function poller_unclaim_process() { $mypid = getmypid(); - dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid)); + dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid, 'done' => false)); } /** @@ -793,6 +835,7 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); Lock::remove('poller_worker'); + Lock::remove('poller_worker_process'); killme(); } diff --git a/src/Util/Lock.php b/src/Util/Lock.php index 36f408cf32..a50faf2d90 100644 --- a/src/Util/Lock.php +++ b/src/Util/Lock.php @@ -17,6 +17,8 @@ use dbm; * @brief This class contain Functions for preventing parallel execution of functions */ class Lock { + private static $semaphore = array(); + /** * @brief Check for memcache and open a connection if configured * @@ -43,6 +45,25 @@ class Lock { return $memcache; } + /** + * @brief Creates a semaphore key + * + * @param string $fn_name Name of the lock + * + * @return ressource the semaphore key + */ + private static function semaphore_key($fn_name) { + $temp = get_temppath(); + + $file = $temp.'/'.$fn_name.'.sem'; + + if (!file_exists($file)) { + file_put_contents($file, $function); + } + + return ftok($file, 'f'); + } + /** * @brief Sets a lock for a given name * @@ -55,6 +76,13 @@ class Lock { $got_lock = false; $start = time(); + if (function_exists('sem_get')) { + self::$semaphore[$fn_name] = sem_get(self::semaphore_key($fn_name)); + if (self::$semaphore[$fn_name]) { + return sem_acquire(self::$semaphore[$fn_name], ($timeout == 0)); + } + } + $memcache = self::connectMemcache(); if (is_object($memcache)) { $cachekey = get_app()->get_hostname().";lock:".$fn_name; @@ -128,6 +156,11 @@ class Lock { * @param string $fn_name Name of the lock */ public static function remove($fn_name) { + if (function_exists('sem_get') && self::$semaphore[$fn_name]) { + sem_release(self::$semaphore[$fn_name]); + return; + } + $memcache = self::connectMemcache(); if (is_object($memcache)) { $cachekey = get_app()->get_hostname().";lock:".$fn_name; diff --git a/update.php b/update.php index 09f11918c5..7cfdb231e0 100644 --- a/update.php +++ b/update.php @@ -1,6 +1,6 @@