Merge pull request #3522 from annando/1706-lock

Some more poller speed and lock functionality
This commit is contained in:
Tobias Diekershoff 2017-06-07 09:29:54 +02:00 committed by GitHub
commit 66458d7313
15 changed files with 329 additions and 198 deletions

View file

@ -35,12 +35,13 @@ require_once 'include/features.php';
require_once 'include/identity.php'; require_once 'include/identity.php';
require_once 'update.php'; require_once 'update.php';
require_once 'include/dbstructure.php'; require_once 'include/dbstructure.php';
require_once 'include/poller.php';
define ( 'FRIENDICA_PLATFORM', 'Friendica'); define ( 'FRIENDICA_PLATFORM', 'Friendica');
define ( 'FRIENDICA_CODENAME', 'Asparagus'); define ( 'FRIENDICA_CODENAME', 'Asparagus');
define ( 'FRIENDICA_VERSION', '3.5.3-dev' ); define ( 'FRIENDICA_VERSION', '3.5.3-dev' );
define ( 'DFRN_PROTOCOL_VERSION', '2.23' ); define ( 'DFRN_PROTOCOL_VERSION', '2.23' );
define ( 'DB_UPDATE_VERSION', 1227 ); define ( 'DB_UPDATE_VERSION', 1228 );
/** /**
* @brief Constant with a HTML line break. * @brief Constant with a HTML line break.
@ -1095,18 +1096,8 @@ function proc_run($cmd) {
return; return;
} }
// Checking number of workers
$workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
// Get number of allowed number of worker threads
$queues = intval(get_config("system", "worker_queues"));
if ($queues == 0) {
$queues = 4;
}
// If there are already enough workers running, don't fork another one // If there are already enough workers running, don't fork another one
if ($workers[0]["workers"] >= $queues) { if (poller_too_much_workers()) {
return; return;
} }

View file

@ -1,6 +1,6 @@
-- ------------------------------------------ -- ------------------------------------------
-- Friendica 3.5.2-rc (Asparagus) -- Friendica 3.5.3dev (Asparagus)
-- DB_UPDATE_VERSION 1227 -- DB_UPDATE_VERSION 1228
-- ------------------------------------------ -- ------------------------------------------
@ -580,7 +580,7 @@ CREATE TABLE IF NOT EXISTS `locks` (
`id` int(11) NOT NULL auto_increment, `id` int(11) NOT NULL auto_increment,
`name` varchar(128) NOT NULL DEFAULT '', `name` varchar(128) NOT NULL DEFAULT '',
`locked` tinyint(1) NOT NULL DEFAULT 0, `locked` tinyint(1) NOT NULL DEFAULT 0,
`created` datetime DEFAULT '0001-01-01 00:00:00', `pid` int(10) unsigned NOT NULL DEFAULT 0,
PRIMARY KEY(`id`) PRIMARY KEY(`id`)
) DEFAULT COLLATE utf8mb4_general_ci; ) DEFAULT COLLATE utf8mb4_general_ci;
@ -1116,6 +1116,7 @@ CREATE TABLE IF NOT EXISTS `workerqueue` (
`executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', `executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
PRIMARY KEY(`id`), PRIMARY KEY(`id`),
INDEX `pid` (`pid`), INDEX `pid` (`pid`),
INDEX `parameter` (`parameter`(192)),
INDEX `priority_created` (`priority`,`created`) INDEX `priority_created` (`priority`,`created`)
) DEFAULT COLLATE utf8mb4_general_ci; ) DEFAULT COLLATE utf8mb4_general_ci;

View file

@ -2,10 +2,10 @@ Table locks
=========== ===========
| Field | Description | Type | Null | Key | Default | Extra | | Field | Description | Type | Null | Key | Default | Extra |
|---------|------------------|--------------|------|-----|---------------------|----------------| |---------|------------------|------------------|------|-----|---------------------|----------------|
| id | sequential ID | int(11) | NO | PRI | NULL | auto_increment | | id | sequential ID | int(11) | NO | PRI | NULL | auto_increment |
| name | | varchar(128) | NO | | | | | name | | varchar(128) | NO | | | |
| locked | | tinyint(1) | NO | | 0 | | | locked | | tinyint(1) | NO | | 0 | |
| created | | datetime | YES | | 0001-01-01 00:00:00 | | | pid | Process ID | int(10) unsigned | NO | | 0 | |
Return to [database documentation](help/database) Return to [database documentation](help/database)

View file

@ -246,10 +246,11 @@ function cron_poll_contacts($argc, $argv) {
logger("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact["nick"] . " " . $contact["name"]); logger("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact["nick"] . " " . $contact["name"]);
if (($contact['network'] == NETWORK_FEED) AND ($contact['priority'] <= 3)) { if (($contact['network'] == NETWORK_FEED) AND ($contact['priority'] <= 3)) {
proc_run(PRIORITY_MEDIUM, 'include/onepoll.php', intval($contact['id'])); $priority = PRIORITY_MEDIUM;
} else { } else {
proc_run(PRIORITY_LOW, 'include/onepoll.php', intval($contact['id'])); $priority = PRIORITY_LOW;
} }
proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', intval($contact['id']));
} }
} }
} }

View file

@ -806,6 +806,41 @@ class dba {
return self::e($sql, $param); return self::e($sql, $param);
} }
/**
* @brief Locks a table for exclusive write access
*
* This function can be extended in the future to accept a table array as well.
*
* @param string $table Table name
*
* @return boolean was the lock successful?
*/
static public function lock($table) {
// See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html
self::e("SET autocommit=0");
$success = self::e("LOCK TABLES `".self::$dbo->escape($table)."` WRITE");
if (!$success) {
self::e("SET autocommit=1");
} else {
self::$in_transaction = true;
}
return $success;
}
/**
* @brief Unlocks all locked tables
*
* @return boolean was the unlock successful?
*/
static public function unlock() {
// See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html
self::e("COMMIT");
$success = self::e("UNLOCK TABLES");
self::e("SET autocommit=1");
self::$in_transaction = false;
return $success;
}
/** /**
* @brief Starts a transaction * @brief Starts a transaction
* *

View file

@ -1205,7 +1205,7 @@ function db_definition() {
"id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"), "id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"),
"name" => array("type" => "varchar(128)", "not null" => "1", "default" => ""), "name" => array("type" => "varchar(128)", "not null" => "1", "default" => ""),
"locked" => array("type" => "tinyint(1)", "not null" => "1", "default" => "0"), "locked" => array("type" => "tinyint(1)", "not null" => "1", "default" => "0"),
"created" => array("type" => "datetime", "default" => NULL_DATE), "pid" => array("type" => "int(10) unsigned", "not null" => "1", "default" => "0"),
), ),
"indexes" => array( "indexes" => array(
"PRIMARY" => array("id"), "PRIMARY" => array("id"),
@ -1743,6 +1743,7 @@ function db_definition() {
"indexes" => array( "indexes" => array(
"PRIMARY" => array("id"), "PRIMARY" => array("id"),
"pid" => array("pid"), "pid" => array("pid"),
"parameter" => array("parameter(192)"),
"priority_created" => array("priority", "created"), "priority_created" => array("priority", "created"),
) )
); );

View file

@ -1,80 +0,0 @@
<?php
// Provide some ability to lock a PHP function so that multiple processes
// can't run the function concurrently
if (! function_exists('lock_function')) {
function lock_function($fn_name, $block = true, $wait_sec = 2, $timeout = 30) {
if ( $wait_sec == 0 )
$wait_sec = 2; // don't let the user pick a value that's likely to crash the system
$got_lock = false;
$start = time();
do {
q("LOCK TABLE `locks` WRITE");
$r = q("SELECT `locked`, `created` FROM `locks` WHERE `name` = '%s' LIMIT 1",
dbesc($fn_name)
);
if ((dbm::is_result($r)) AND (!$r[0]['locked'] OR (strtotime($r[0]['created']) < time() - 3600))) {
q("UPDATE `locks` SET `locked` = 1, `created` = '%s' WHERE `name` = '%s'",
dbesc(datetime_convert()),
dbesc($fn_name)
);
$got_lock = true;
}
elseif (! dbm::is_result($r)) {
/// @TODO the Boolean value for count($r) should be equivalent to the Boolean value of $r
q("INSERT INTO `locks` (`name`, `created`, `locked`) VALUES ('%s', '%s', 1)",
dbesc($fn_name),
dbesc(datetime_convert())
);
$got_lock = true;
}
q("UNLOCK TABLES");
if (($block) && (! $got_lock))
sleep($wait_sec);
} while (($block) && (! $got_lock) && ((time() - $start) < $timeout));
logger('lock_function: function ' . $fn_name . ' with blocking = ' . $block . ' got_lock = ' . $got_lock . ' time = ' . (time() - $start), LOGGER_DEBUG);
return $got_lock;
}}
if (! function_exists('block_on_function_lock')) {
function block_on_function_lock($fn_name, $wait_sec = 2, $timeout = 30) {
if ( $wait_sec == 0 )
$wait_sec = 2; // don't let the user pick a value that's likely to crash the system
$start = time();
do {
$r = q("SELECT locked FROM locks WHERE name = '%s' LIMIT 1",
dbesc($fn_name)
);
if (dbm::is_result($r) && $r[0]['locked']) {
sleep($wait_sec);
}
} while (dbm::is_result($r) && $r[0]['locked'] && ((time() - $start) < $timeout));
return;
}}
if (! function_exists('unlock_function')) {
function unlock_function($fn_name) {
$r = q("UPDATE `locks` SET `locked` = 0, `created` = '%s' WHERE `name` = '%s'",
dbesc(NULL_DATE),
dbesc($fn_name)
);
logger('unlock_function: released lock for function ' . $fn_name, LOGGER_DEBUG);
return;
}}

View file

@ -498,7 +498,7 @@ function notifier_run(&$argv, &$argc){
} }
logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG); logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
proc_run($priority, 'include/delivery.php', $cmd, $item_id, $contact['id']); proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']);
} }
} }
@ -563,7 +563,7 @@ function notifier_run(&$argv, &$argc){
if ((! $mail) && (! $fsuggest) && (! $followup)) { if ((! $mail) && (! $fsuggest) && (! $followup)) {
logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]); logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
proc_run($priority, 'include/delivery.php', $cmd, $item_id, $rr['id']); proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']);
} }
} }
} }
@ -603,7 +603,7 @@ function notifier_run(&$argv, &$argc){
} }
// Handling the pubsubhubbub requests // Handling the pubsubhubbub requests
proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php'); proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
} }
logger('notifier: calling hooks', LOGGER_DEBUG); logger('notifier: calling hooks', LOGGER_DEBUG);

View file

@ -2,13 +2,14 @@
use Friendica\App; use Friendica\App;
use Friendica\Core\Config; use Friendica\Core\Config;
use Friendica\Util\Lock;
if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) { if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
$directory = dirname($_SERVER["argv"][0]); $directory = dirname($_SERVER["argv"][0]);
if (substr($directory, 0, 1) != "/") if (substr($directory, 0, 1) != "/") {
$directory = $_SERVER["PWD"]."/".$directory; $directory = $_SERVER["PWD"]."/".$directory;
}
$directory = realpath($directory."/.."); $directory = realpath($directory."/..");
chdir($directory); chdir($directory);
@ -19,16 +20,12 @@ require_once("boot.php");
function poller_run($argv, $argc){ function poller_run($argv, $argc){
global $a, $db; global $a, $db;
if (is_null($a)) {
$a = new App(dirname(__DIR__)); $a = new App(dirname(__DIR__));
}
if (is_null($db)) {
@include(".htconfig.php"); @include(".htconfig.php");
require_once("include/dba.php"); require_once("include/dba.php");
$db = new dba($db_host, $db_user, $db_pass, $db_data); $db = new dba($db_host, $db_user, $db_pass, $db_data);
unset($db_host, $db_user, $db_pass, $db_data); unset($db_host, $db_user, $db_pass, $db_data);
};
Config::load(); Config::load();
@ -41,54 +38,74 @@ function poller_run($argv, $argc){
load_hooks(); load_hooks();
// At first check the maximum load. We shouldn't continue with a high load
if ($a->maxload_reached()) {
logger('Pre check: maximum load reached, quitting.', LOGGER_DEBUG);
return;
}
// We now start the process. This is done after the load check since this could increase the load.
$a->start_process(); $a->start_process();
// At first we check the number of workers and quit if there are too much of them
// This is done at the top to avoid that too much code is executed without a need to do so,
// since the poller mostly quits here.
if (poller_too_much_workers()) {
poller_kill_stale_workers();
logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Do we have too few memory?
if ($a->min_memory_reached()) { if ($a->min_memory_reached()) {
logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG);
return; return;
} }
// Possibly there are too much database connections
if (poller_max_connections_reached()) { if (poller_max_connections_reached()) {
logger('Pre check: maximum connections reached, quitting.', LOGGER_DEBUG);
return; return;
} }
if ($a->maxload_reached()) { // Possibly there are too much database processes that block the system
if ($a->max_processes_reached()) {
logger('Pre check: maximum processes reached, quitting.', LOGGER_DEBUG);
return; return;
} }
// Now we start additional cron processes if we should do so
if (($argc <= 1) OR ($argv[1] != "no_cron")) { if (($argc <= 1) OR ($argv[1] != "no_cron")) {
poller_run_cron(); poller_run_cron();
} }
if ($a->max_processes_reached()) {
return;
}
// Checking the number of workers
if (poller_too_much_workers()) {
poller_kill_stale_workers();
return;
}
$starttime = time(); $starttime = time();
// We fetch the next queue entry that is about to be executed
while ($r = poller_worker_process()) { while ($r = poller_worker_process()) {
// If we got that queue entry we claim it for us
if (!poller_claim_process($r[0])) { if (!poller_claim_process($r[0])) {
continue; continue;
} }
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
if (Lock::set('poller_worker', 0)) {
// Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Check free memory // Check free memory
if ($a->min_memory_reached()) { if ($a->min_memory_reached()) {
logger('Memory limit reached, quitting.', LOGGER_DEBUG); logger('Memory limit reached, quitting.', LOGGER_DEBUG);
return; return;
} }
Lock::remove('poller_worker');
// Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
} }
// finally the work will be done
if (!poller_execute($r[0])) { if (!poller_execute($r[0])) {
logger('Process execution failed, quitting.', LOGGER_DEBUG); logger('Process execution failed, quitting.', LOGGER_DEBUG);
return; return;
@ -96,7 +113,7 @@ function poller_run($argv, $argc){
// Quit the poller once every hour // Quit the poller once every hour
if (time() > ($starttime + 3600)) { if (time() > ($starttime + 3600)) {
logger('Process lifetime reachted, quitting.', LOGGER_DEBUG); logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
return; return;
} }
} }
@ -150,7 +167,6 @@ function poller_execute($queue) {
$funcname = str_replace(".php", "", basename($argv[0]))."_run"; $funcname = str_replace(".php", "", basename($argv[0]))."_run";
if (function_exists($funcname)) { if (function_exists($funcname)) {
poller_exec_function($queue, $funcname, $argv); poller_exec_function($queue, $funcname, $argv);
dba::delete('workerqueue', array('id' => $queue["id"])); dba::delete('workerqueue', array('id' => $queue["id"]));
} else { } else {
@ -226,26 +242,29 @@ function poller_exec_function($queue, $funcname, $argv) {
$o = "\nDatabase Read:\n"; $o = "\nDatabase Read:\n";
foreach ($a->callstack["database"] AS $func => $time) { foreach ($a->callstack["database"] AS $func => $time) {
$time = round($time, 3); $time = round($time, 3);
if ($time > 0) if ($time > 0) {
$o .= $func.": ".$time."\n"; $o .= $func.": ".$time."\n";
} }
} }
}
if (isset($a->callstack["database_write"])) { if (isset($a->callstack["database_write"])) {
$o .= "\nDatabase Write:\n"; $o .= "\nDatabase Write:\n";
foreach ($a->callstack["database_write"] AS $func => $time) { foreach ($a->callstack["database_write"] AS $func => $time) {
$time = round($time, 3); $time = round($time, 3);
if ($time > 0) if ($time > 0) {
$o .= $func.": ".$time."\n"; $o .= $func.": ".$time."\n";
} }
} }
}
if (isset($a->callstack["network"])) { if (isset($a->callstack["network"])) {
$o .= "\nNetwork:\n"; $o .= "\nNetwork:\n";
foreach ($a->callstack["network"] AS $func => $time) { foreach ($a->callstack["network"] AS $func => $time) {
$time = round($time, 3); $time = round($time, 3);
if ($time > 0) if ($time > 0) {
$o .= $func.": ".$time."\n"; $o .= $func.": ".$time."\n";
} }
} }
}
} else { } else {
$o = ''; $o = '';
} }
@ -284,27 +303,30 @@ function poller_max_connections_reached() {
if ($max == 0) { if ($max == 0) {
// the maximum number of possible user connections can be a system variable // the maximum number of possible user connections can be a system variable
$r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'"); $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
if ($r) if (dbm::is_result($r)) {
$max = $r[0]["Value"]; $max = $r[0]["Value"];
}
// Or it can be granted. This overrides the system variable // Or it can be granted. This overrides the system variable
$r = q("SHOW GRANTS"); $r = q("SHOW GRANTS");
if ($r) if (dbm::is_result($r)) {
foreach ($r AS $grants) { foreach ($r AS $grants) {
$grant = array_pop($grants); $grant = array_pop($grants);
if (stristr($grant, "GRANT USAGE ON")) if (stristr($grant, "GRANT USAGE ON")) {
if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) {
$max = $match[1]; $max = $match[1];
} }
} }
}
}
}
// If $max is set we will use the processlist to determine the current number of connections // If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user // The processlist only shows entries of the current user
if ($max != 0) { if ($max != 0) {
$r = q("SHOW PROCESSLIST"); $r = q("SHOW PROCESSLIST");
if (!dbm::is_result($r)) if (!dbm::is_result($r)) {
return false; return false;
}
$used = count($r); $used = count($r);
logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG); logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
@ -320,28 +342,28 @@ function poller_max_connections_reached() {
// We will now check for the system values. // We will now check for the system values.
// This limit could be reached although the user limits are fine. // This limit could be reached although the user limits are fine.
$r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'"); $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
if (!$r) if (!dbm::is_result($r)) {
return false; return false;
}
$max = intval($r[0]["Value"]); $max = intval($r[0]["Value"]);
if ($max == 0) if ($max == 0) {
return false; return false;
}
$r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'"); $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
if (!$r) if (!dbm::is_result($r)) {
return false; return false;
}
$used = intval($r[0]["Value"]); $used = intval($r[0]["Value"]);
if ($used == 0) if ($used == 0) {
return false; return false;
}
logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG); logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
$level = $used / $max * 100; $level = $used / $max * 100;
if ($level < $maxlevel) if ($level < $maxlevel) {
return false; return false;
}
logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max); logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
return true; return true;
} }
@ -440,8 +462,7 @@ function poller_too_much_workers() {
dba::close($processes); dba::close($processes);
} }
dba::close($entries); dba::close($entries);
$processlist = ' ('.implode(', ', $listitem).')';
$processlist = implode(', ', $listitem);
$s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
$entries = $s[0]["total"]; $entries = $s[0]["total"];
@ -460,7 +481,7 @@ function poller_too_much_workers() {
} }
} }
logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries." (".$processlist.") - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG); logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
// Are there fewer workers running as possible? Then fork a new one. // Are there fewer workers running as possible? Then fork a new one.
if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) { if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
@ -471,7 +492,7 @@ function poller_too_much_workers() {
} }
} }
return($active >= $queues); return $active >= $queues;
} }
/** /**
@ -482,7 +503,7 @@ function poller_too_much_workers() {
function poller_active_workers() { function poller_active_workers() {
$workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'"); $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
return($workers[0]["processes"]); return $workers[0]["processes"];
} }
/** /**
@ -503,36 +524,37 @@ function poller_passing_slow(&$highest_priority) {
INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`"); INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
// No active processes at all? Fine // No active processes at all? Fine
if (!dbm::is_result($r)) if (!dbm::is_result($r)) {
return(false); return false;
}
$priorities = array(); $priorities = array();
foreach ($r AS $line) foreach ($r AS $line) {
$priorities[] = $line["priority"]; $priorities[] = $line["priority"];
}
// Should not happen // Should not happen
if (count($priorities) == 0) if (count($priorities) == 0) {
return(false); return false;
}
$highest_priority = min($priorities); $highest_priority = min($priorities);
// The highest process is already the slowest one? // The highest process is already the slowest one?
// Then we quit // Then we quit
if ($highest_priority == PRIORITY_NEGLIGIBLE) if ($highest_priority == PRIORITY_NEGLIGIBLE) {
return(false); return false;
}
$high = 0; $high = 0;
foreach ($priorities AS $priority) foreach ($priorities AS $priority) {
if ($priority == $highest_priority) if ($priority == $highest_priority) {
++$high; ++$high;
}
}
logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG); logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG);
$passing_slow = (($high/count($priorities)) > (2/3)); $passing_slow = (($high/count($priorities)) > (2/3));
if ($passing_slow) if ($passing_slow) {
logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG); logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG);
}
return($passing_slow); return $passing_slow;
} }
/** /**
@ -546,7 +568,7 @@ function poller_worker_process() {
$highest_priority = 0; $highest_priority = 0;
if (poller_passing_slow($highest_priority)) { if (poller_passing_slow($highest_priority)) {
dba::e('LOCK TABLES `workerqueue` WRITE'); dba::lock('workerqueue');
// Are there waiting processes with a higher priority than the currently highest? // Are there waiting processes with a higher priority than the currently highest?
$r = q("SELECT * FROM `workerqueue` $r = q("SELECT * FROM `workerqueue`
@ -568,7 +590,7 @@ function poller_worker_process() {
return $r; return $r;
} }
} else { } else {
dba::e('LOCK TABLES `workerqueue` WRITE'); dba::lock('workerqueue');
} }
// If there is no result (or we shouldn't pass lower processes) we check without priority limit // If there is no result (or we shouldn't pass lower processes) we check without priority limit
@ -578,7 +600,7 @@ function poller_worker_process() {
// We only unlock the tables here, when we got no data // We only unlock the tables here, when we got no data
if (!dbm::is_result($r)) { if (!dbm::is_result($r)) {
dba::e('UNLOCK TABLES'); dba::unlock();
} }
return $r; return $r;
@ -598,7 +620,7 @@ function poller_claim_process($queue) {
$success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
array('id' => $queue["id"], 'pid' => 0)); array('id' => $queue["id"], 'pid' => 0));
dba::e('UNLOCK TABLES'); dba::unlock();
if (!$success) { if (!$success) {
logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
@ -729,5 +751,7 @@ if (array_search(__file__,get_included_files())===0){
get_app()->end_process(); get_app()->end_process();
Lock::remove('poller_worker');
killme(); killme();
} }

View file

@ -17,7 +17,7 @@ function pubsubpublish_run(&$argv, &$argc){
foreach ($r as $rr) { foreach ($r as $rr) {
logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php', $rr["id"]); proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]);
} }
} }

View file

@ -27,7 +27,7 @@ function queue_run(&$argv, &$argc){
logger('queue: start'); logger('queue: start');
// Handling the pubsubhubbub requests // Handling the pubsubhubbub requests
proc_run(PRIORITY_HIGH,'include/pubsubpublish.php'); proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
$r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue` $r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue`
INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id` INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id`
@ -51,7 +51,7 @@ function queue_run(&$argv, &$argc){
if (dbm::is_result($r)) { if (dbm::is_result($r)) {
foreach ($r as $q_item) { foreach ($r as $q_item) {
logger('Call queue for id '.$q_item['id']); logger('Call queue for id '.$q_item['id']);
proc_run(PRIORITY_LOW, "include/queue.php", $q_item['id']); proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", $q_item['id']);
} }
} }
return; return;

View file

@ -1995,10 +1995,11 @@ function get_gcontact_id($contact) {
if (in_array($contact["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS))) if (in_array($contact["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS)))
$contact["url"] = clean_contact_url($contact["url"]); $contact["url"] = clean_contact_url($contact["url"]);
$r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 2", dba::lock('gcontact');
$r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 1",
dbesc(normalise_link($contact["url"]))); dbesc(normalise_link($contact["url"])));
if ($r) { if (dbm::is_result($r)) {
$gcontact_id = $r[0]["id"]; $gcontact_id = $r[0]["id"];
// Update every 90 days // Update every 90 days
@ -2036,17 +2037,13 @@ function get_gcontact_id($contact) {
$doprobing = in_array($r[0]["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS, "")); $doprobing = in_array($r[0]["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS, ""));
} }
} }
dba::unlock();
if ($doprobing) { if ($doprobing) {
logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG); logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG);
proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"])); proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"]));
} }
if ((dbm::is_result($r)) AND (count($r) > 1) AND ($gcontact_id > 0) AND ($contact["url"] != ""))
q("DELETE FROM `gcontact` WHERE `nurl` = '%s' AND `id` != %d",
dbesc(normalise_link($contact["url"])),
intval($gcontact_id));
return $gcontact_id; return $gcontact_id;
} }

View file

@ -797,6 +797,8 @@ class App {
* @return bool Is the limit reached? * @return bool Is the limit reached?
*/ */
function max_processes_reached() { function max_processes_reached() {
// Deactivated, needs more investigating if this check really makes sense
return false;
if ($this->is_backend()) { if ($this->is_backend()) {
$process = 'backend'; $process = 'backend';

159
src/Util/Lock.php Normal file
View file

@ -0,0 +1,159 @@
<?php
namespace Friendica\Util;
/**
* @file src/Util/Lock.php
* @brief Functions for preventing parallel execution of functions
*
*/
use Friendica\Core\Config;
use Memcache;
use dba;
use dbm;
/**
* @brief This class contain Functions for preventing parallel execution of functions
*/
class Lock {
/**
* @brief Check for memcache and open a connection if configured
*
* @return object|boolean The memcache object - or "false" if not successful
*/
private static function connectMemcache() {
if (!function_exists('memcache_connect')) {
return false;
}
if (!Config::get('system', 'memcache')) {
return false;
}
$memcache_host = Config::get('system', 'memcache_host', '127.0.0.1');
$memcache_port = Config::get('system', 'memcache_port', 11211);
$memcache = new Memcache;
if (!$memcache->connect($memcache_host, $memcache_port)) {
return false;
}
return $memcache;
}
/**
* @brief Sets a lock for a given name
*
* @param string $fn_name Name of the lock
* @param integer $timeout Seconds until we give up
*
* @return boolean Was the lock successful?
*/
public static function set($fn_name, $timeout = 120) {
$got_lock = false;
$start = time();
$memcache = self::connectMemcache();
if (is_object($memcache)) {
$wait_sec = 0.2;
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
do {
$lock = $memcache->get($cachekey);
if (!is_bool($lock)) {
$pid = (int)$lock;
// When the process id isn't used anymore, we can safely claim the lock for us.
// Or we do want to lock something that was already locked by us.
if (!posix_kill($pid, 0) OR ($pid == getmypid())) {
$lock = false;
}
}
if (is_bool($lock)) {
$memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300);
$got_lock = true;
}
if (!$got_lock AND ($timeout > 0)) {
usleep($wait_sec * 1000000);
}
} while (!$got_lock AND ((time() - $start) < $timeout));
return $got_lock;
}
$wait_sec = 2;
do {
dba::lock('locks');
$lock = dba::select('locks', array('locked', 'pid'), array('name' => $fn_name), array('limit' => 1));
if (dbm::is_result($lock)) {
if ($lock['locked']) {
// When the process id isn't used anymore, we can safely claim the lock for us.
if (!posix_kill($lock['pid'], 0)) {
$lock['locked'] = false;
}
// We want to lock something that was already locked by us? So we got the lock.
if ($lock['pid'] == getmypid()) {
$got_lock = true;
}
}
if (!$lock['locked']) {
dba::update('locks', array('locked' => true, 'pid' => getmypid()), array('name' => $fn_name));
$got_lock = true;
}
} elseif (!dbm::is_result($lock)) {
dba::insert('locks', array('name' => $fn_name, 'locked' => true, 'pid' => getmypid()));
$got_lock = true;
}
dba::unlock();
if (!$got_lock AND ($timeout > 0)) {
sleep($wait_sec);
}
} while (!$got_lock AND ((time() - $start) < $timeout));
return $got_lock;
}
/**
* @brief Removes a lock if it was set by us
*
* @param string $fn_name Name of the lock
*/
public static function remove($fn_name) {
$memcache = self::connectMemcache();
if (is_object($memcache)) {
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
$lock = $memcache->get($cachekey);
if (!is_bool($lock)) {
if ((int)$lock == getmypid()) {
$memcache->delete($cachekey);
}
}
return;
}
dba::update('locks', array('locked' => false, 'pid' => 0), array('name' => $fn_name, 'pid' => getmypid()));
return;
}
/**
* @brief Removes all lock that were set by us
*/
public static function removeAll() {
$memcache = self::connectMemcache();
if (is_object($memcache)) {
// We cannot delete all cache entries, but this doesn't matter with memcache
return;
}
dba::update('locks', array('locked' => false, 'pid' => 0), array('pid' => getmypid()));
return;
}
}

View file

@ -1,6 +1,6 @@
<?php <?php
define('UPDATE_VERSION' , 1227); define('UPDATE_VERSION' , 1228);
/** /**
* *