Merge pull request #3551 from annando/semaphore-locking
Semaphore based locking and hopefully the fix for the workerqueue
This commit is contained in:
commit
f846233a6b
7 changed files with 102 additions and 18 deletions
2
boot.php
2
boot.php
|
@ -42,7 +42,7 @@ define ( 'FRIENDICA_PLATFORM', 'Friendica');
|
||||||
define ( 'FRIENDICA_CODENAME', 'Asparagus');
|
define ( 'FRIENDICA_CODENAME', 'Asparagus');
|
||||||
define ( 'FRIENDICA_VERSION', '3.5.3-dev' );
|
define ( 'FRIENDICA_VERSION', '3.5.3-dev' );
|
||||||
define ( 'DFRN_PROTOCOL_VERSION', '2.23' );
|
define ( 'DFRN_PROTOCOL_VERSION', '2.23' );
|
||||||
define ( 'DB_UPDATE_VERSION', 1230 );
|
define ( 'DB_UPDATE_VERSION', 1231 );
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Constant with a HTML line break.
|
* @brief Constant with a HTML line break.
|
||||||
|
|
10
database.sql
10
database.sql
|
@ -1,6 +1,6 @@
|
||||||
-- ------------------------------------------
|
-- ------------------------------------------
|
||||||
-- Friendica 3.5.3dev (Asparagus)
|
-- Friendica 3.5.3-dev (Asparagus)
|
||||||
-- DB_UPDATE_VERSION 1228
|
-- DB_UPDATE_VERSION 1231
|
||||||
-- ------------------------------------------
|
-- ------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@ -1114,9 +1114,11 @@ CREATE TABLE IF NOT EXISTS `workerqueue` (
|
||||||
`created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
|
`created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
|
||||||
`pid` int(11) NOT NULL DEFAULT 0,
|
`pid` int(11) NOT NULL DEFAULT 0,
|
||||||
`executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
|
`executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
|
||||||
|
`done` tinyint(1) NOT NULL DEFAULT 0,
|
||||||
PRIMARY KEY(`id`),
|
PRIMARY KEY(`id`),
|
||||||
INDEX `pid` (`pid`),
|
INDEX `pid` (`pid`),
|
||||||
INDEX `parameter` (`parameter`(192)),
|
INDEX `parameter` (`parameter`(64)),
|
||||||
INDEX `priority_created` (`priority`,`created`)
|
INDEX `priority_created` (`priority`,`created`),
|
||||||
|
INDEX `executed` (`executed`)
|
||||||
) DEFAULT COLLATE utf8mb4_general_ci;
|
) DEFAULT COLLATE utf8mb4_general_ci;
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ function cron_run(&$argv, &$argc){
|
||||||
proc_run(PRIORITY_LOW, "include/cronjobs.php", "update_photo_albums");
|
proc_run(PRIORITY_LOW, "include/cronjobs.php", "update_photo_albums");
|
||||||
|
|
||||||
// Delete all done workerqueue entries
|
// Delete all done workerqueue entries
|
||||||
dba::delete('workerqueue', array('done' => true));
|
dba::e('DELETE FROM `workerqueue` WHERE `done` AND `executed` < UTC_TIMESTAMP() - INTERVAL 12 HOUR');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poll contacts
|
// Poll contacts
|
||||||
|
|
|
@ -1747,6 +1747,7 @@ function db_definition() {
|
||||||
"pid" => array("pid"),
|
"pid" => array("pid"),
|
||||||
"parameter" => array("parameter(64)"),
|
"parameter" => array("parameter(64)"),
|
||||||
"priority_created" => array("priority", "created"),
|
"priority_created" => array("priority", "created"),
|
||||||
|
"executed" => array("executed"),
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -85,6 +85,8 @@ function poller_run($argv, $argc){
|
||||||
poller_run_cron();
|
poller_run_cron();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$refetched = false;
|
||||||
|
|
||||||
$starttime = time();
|
$starttime = time();
|
||||||
|
|
||||||
// We fetch the next queue entry that is about to be executed
|
// We fetch the next queue entry that is about to be executed
|
||||||
|
@ -121,6 +123,13 @@ function poller_run($argv, $argc){
|
||||||
logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
|
logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If possible we will fetch new jobs for this worker
|
||||||
|
if (!$refetched && Lock::set('poller_worker_process', 0)) {
|
||||||
|
$refetched = find_worker_processes();
|
||||||
|
Lock::remove('poller_worker_process');
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
|
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
|
||||||
}
|
}
|
||||||
|
@ -235,7 +244,7 @@ function poller_execute($queue) {
|
||||||
* @param array $argv Array of values to be passed to the function
|
* @param array $argv Array of values to be passed to the function
|
||||||
*/
|
*/
|
||||||
function poller_exec_function($queue, $funcname, $argv) {
|
function poller_exec_function($queue, $funcname, $argv) {
|
||||||
global $poller_up_start, $poller_db_duration;
|
global $poller_up_start, $poller_db_duration, $poller_lock_duration;
|
||||||
|
|
||||||
$a = get_app();
|
$a = get_app();
|
||||||
|
|
||||||
|
@ -284,7 +293,11 @@ function poller_exec_function($queue, $funcname, $argv) {
|
||||||
* The execution time is the productive time.
|
* The execution time is the productive time.
|
||||||
* By changing parameters like the maximum number of workers we can check the effectivness.
|
* By changing parameters like the maximum number of workers we can check the effectivness.
|
||||||
*/
|
*/
|
||||||
logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
|
logger('DB: '.number_format($poller_db_duration, 2).
|
||||||
|
' - Lock: '.number_format($poller_lock_duration, 2).
|
||||||
|
' - Rest: '.number_format($up_duration - $poller_db_duration - $poller_lock_duration, 2).
|
||||||
|
' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
|
||||||
|
$poller_lock_duration = 0;
|
||||||
|
|
||||||
if ($duration > 3600) {
|
if ($duration > 3600) {
|
||||||
logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
|
logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
|
||||||
|
@ -448,7 +461,7 @@ function poller_kill_stale_workers() {
|
||||||
foreach ($r AS $pid) {
|
foreach ($r AS $pid) {
|
||||||
if (!posix_kill($pid["pid"], 0)) {
|
if (!posix_kill($pid["pid"], 0)) {
|
||||||
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
|
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
|
||||||
array('pid' => $pid["pid"]));
|
array('pid' => $pid["pid"], 'done' => false));
|
||||||
} else {
|
} else {
|
||||||
// Kill long running processes
|
// Kill long running processes
|
||||||
|
|
||||||
|
@ -475,7 +488,7 @@ function poller_kill_stale_workers() {
|
||||||
// Additionally we are lowering the priority.
|
// Additionally we are lowering the priority.
|
||||||
dba::update('workerqueue',
|
dba::update('workerqueue',
|
||||||
array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
|
array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
|
||||||
array('pid' => $pid["pid"]));
|
array('pid' => $pid["pid"], 'done' => false));
|
||||||
} else {
|
} else {
|
||||||
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
|
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
|
||||||
}
|
}
|
||||||
|
@ -512,7 +525,9 @@ function poller_too_much_workers() {
|
||||||
$listitem = array();
|
$listitem = array();
|
||||||
|
|
||||||
// Adding all processes with no workerqueue entry
|
// Adding all processes with no workerqueue entry
|
||||||
$processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)");
|
$processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS
|
||||||
|
(SELECT id FROM `workerqueue`
|
||||||
|
WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done` AND `pid` != ?)", getmypid());
|
||||||
if ($process = dba::fetch($processes)) {
|
if ($process = dba::fetch($processes)) {
|
||||||
$listitem[0] = "0:".$process["running"];
|
$listitem[0] = "0:".$process["running"];
|
||||||
}
|
}
|
||||||
|
@ -528,7 +543,16 @@ function poller_too_much_workers() {
|
||||||
dba::close($processes);
|
dba::close($processes);
|
||||||
}
|
}
|
||||||
dba::close($entries);
|
dba::close($entries);
|
||||||
$processlist = ' ('.implode(', ', $listitem).')';
|
|
||||||
|
$jobs_per_minute = 0;
|
||||||
|
|
||||||
|
$jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE");
|
||||||
|
if ($job = dba::fetch($jobs)) {
|
||||||
|
$jobs_per_minute = number_format($job['jobs'] / 10, 0);
|
||||||
|
}
|
||||||
|
dba::close($jobs);
|
||||||
|
|
||||||
|
$processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
|
||||||
}
|
}
|
||||||
|
|
||||||
$entries = poller_total_entries();
|
$entries = poller_total_entries();
|
||||||
|
@ -628,7 +652,10 @@ function find_worker_processes() {
|
||||||
// Check if we should pass some low priority process
|
// Check if we should pass some low priority process
|
||||||
$highest_priority = 0;
|
$highest_priority = 0;
|
||||||
$found = false;
|
$found = false;
|
||||||
$limit = Config::get('system', 'worker_fetch_limit', 5);
|
|
||||||
|
// The higher the number of parallel workers, the more we prefetch to prevent concurring access
|
||||||
|
$limit = Config::get("system", "worker_queues", 4) * 2;
|
||||||
|
$limit = Config::get('system', 'worker_fetch_limit', $limit);
|
||||||
|
|
||||||
if (poller_passing_slow($highest_priority)) {
|
if (poller_passing_slow($highest_priority)) {
|
||||||
// Are there waiting processes with a higher priority than the currently highest?
|
// Are there waiting processes with a higher priority than the currently highest?
|
||||||
|
@ -644,7 +671,7 @@ function find_worker_processes() {
|
||||||
// Give slower processes some processing time
|
// Give slower processes some processing time
|
||||||
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
|
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
|
||||||
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
|
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
|
||||||
ORDER BY `priority`, `created` LIMIT 1",
|
ORDER BY `priority`, `created` LIMIT ".intval($limit),
|
||||||
datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
|
datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
|
||||||
if ($result) {
|
if ($result) {
|
||||||
$found = (dba::affected_rows() > 0);
|
$found = (dba::affected_rows() > 0);
|
||||||
|
@ -669,14 +696,29 @@ function find_worker_processes() {
|
||||||
* @return string SQL statement
|
* @return string SQL statement
|
||||||
*/
|
*/
|
||||||
function poller_worker_process() {
|
function poller_worker_process() {
|
||||||
global $poller_db_duration;
|
global $poller_db_duration, $poller_lock_duration;
|
||||||
|
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
|
|
||||||
$found = find_worker_processes();
|
// There can already be jobs for us in the queue.
|
||||||
|
$r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
|
||||||
|
if (dbm::is_result($r)) {
|
||||||
|
$poller_db_duration += (microtime(true) - $stamp);
|
||||||
|
return $r;
|
||||||
|
}
|
||||||
|
|
||||||
|
$stamp = (float)microtime(true);
|
||||||
|
if (!Lock::set('poller_worker_process')) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
$poller_lock_duration = (microtime(true) - $stamp);
|
||||||
|
|
||||||
|
$stamp = (float)microtime(true);
|
||||||
|
$found = find_worker_processes();
|
||||||
$poller_db_duration += (microtime(true) - $stamp);
|
$poller_db_duration += (microtime(true) - $stamp);
|
||||||
|
|
||||||
|
Lock::remove('poller_worker_process');
|
||||||
|
|
||||||
if ($found) {
|
if ($found) {
|
||||||
$r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
|
$r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
|
||||||
}
|
}
|
||||||
|
@ -689,7 +731,7 @@ function poller_worker_process() {
|
||||||
function poller_unclaim_process() {
|
function poller_unclaim_process() {
|
||||||
$mypid = getmypid();
|
$mypid = getmypid();
|
||||||
|
|
||||||
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid));
|
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid, 'done' => false));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -793,6 +835,7 @@ if (array_search(__file__,get_included_files())===0){
|
||||||
get_app()->end_process();
|
get_app()->end_process();
|
||||||
|
|
||||||
Lock::remove('poller_worker');
|
Lock::remove('poller_worker');
|
||||||
|
Lock::remove('poller_worker_process');
|
||||||
|
|
||||||
killme();
|
killme();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@ use dbm;
|
||||||
* @brief This class contain Functions for preventing parallel execution of functions
|
* @brief This class contain Functions for preventing parallel execution of functions
|
||||||
*/
|
*/
|
||||||
class Lock {
|
class Lock {
|
||||||
|
private static $semaphore = array();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Check for memcache and open a connection if configured
|
* @brief Check for memcache and open a connection if configured
|
||||||
*
|
*
|
||||||
|
@ -43,6 +45,25 @@ class Lock {
|
||||||
return $memcache;
|
return $memcache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Creates a semaphore key
|
||||||
|
*
|
||||||
|
* @param string $fn_name Name of the lock
|
||||||
|
*
|
||||||
|
* @return ressource the semaphore key
|
||||||
|
*/
|
||||||
|
private static function semaphore_key($fn_name) {
|
||||||
|
$temp = get_temppath();
|
||||||
|
|
||||||
|
$file = $temp.'/'.$fn_name.'.sem';
|
||||||
|
|
||||||
|
if (!file_exists($file)) {
|
||||||
|
file_put_contents($file, $function);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ftok($file, 'f');
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Sets a lock for a given name
|
* @brief Sets a lock for a given name
|
||||||
*
|
*
|
||||||
|
@ -55,6 +76,13 @@ class Lock {
|
||||||
$got_lock = false;
|
$got_lock = false;
|
||||||
$start = time();
|
$start = time();
|
||||||
|
|
||||||
|
if (function_exists('sem_get')) {
|
||||||
|
self::$semaphore[$fn_name] = sem_get(self::semaphore_key($fn_name));
|
||||||
|
if (self::$semaphore[$fn_name]) {
|
||||||
|
return sem_acquire(self::$semaphore[$fn_name], ($timeout == 0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$memcache = self::connectMemcache();
|
$memcache = self::connectMemcache();
|
||||||
if (is_object($memcache)) {
|
if (is_object($memcache)) {
|
||||||
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
|
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
|
||||||
|
@ -128,6 +156,11 @@ class Lock {
|
||||||
* @param string $fn_name Name of the lock
|
* @param string $fn_name Name of the lock
|
||||||
*/
|
*/
|
||||||
public static function remove($fn_name) {
|
public static function remove($fn_name) {
|
||||||
|
if (function_exists('sem_get') && self::$semaphore[$fn_name]) {
|
||||||
|
sem_release(self::$semaphore[$fn_name]);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
$memcache = self::connectMemcache();
|
$memcache = self::connectMemcache();
|
||||||
if (is_object($memcache)) {
|
if (is_object($memcache)) {
|
||||||
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
|
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
define('UPDATE_VERSION' , 1230);
|
define('UPDATE_VERSION' , 1231);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -1729,3 +1729,8 @@ function update_1202() {
|
||||||
$r = q("UPDATE `user` SET `account-type` = %d WHERE `page-flags` IN (%d, %d)",
|
$r = q("UPDATE `user` SET `account-type` = %d WHERE `page-flags` IN (%d, %d)",
|
||||||
dbesc(ACCOUNT_TYPE_COMMUNITY), dbesc(PAGE_COMMUNITY), dbesc(PAGE_PRVGROUP));
|
dbesc(ACCOUNT_TYPE_COMMUNITY), dbesc(PAGE_COMMUNITY), dbesc(PAGE_PRVGROUP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function update_1230() {
|
||||||
|
// For this special case we have to use the old update routine
|
||||||
|
$r = q("ALTER TABLE `workerqueue` ADD `done2` tinyint(1) NOT NULL DEFAULT 0");
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue