Merge remote-tracking branch 'upstream/develop' into issue-2864

This commit is contained in:
Michael 2017-06-07 18:17:37 +00:00
commit 5aee2fde1b
18 changed files with 416 additions and 208 deletions

View file

@ -1 +1 @@
3.5.3dev
3.5.3-dev

View file

@ -35,12 +35,13 @@ require_once 'include/features.php';
require_once 'include/identity.php';
require_once 'update.php';
require_once 'include/dbstructure.php';
require_once 'include/poller.php';
define ( 'FRIENDICA_PLATFORM', 'Friendica');
define ( 'FRIENDICA_CODENAME', 'Asparagus');
define ( 'FRIENDICA_VERSION', '3.5.3dev' );
define ( 'FRIENDICA_VERSION', '3.5.3-dev' );
define ( 'DFRN_PROTOCOL_VERSION', '2.23' );
define ( 'DB_UPDATE_VERSION', 1227 );
define ( 'DB_UPDATE_VERSION', 1228 );
/**
* @brief Constant with a HTML line break.
@ -1095,18 +1096,8 @@ function proc_run($cmd) {
return;
}
// Checking number of workers
$workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
// Get number of allowed number of worker threads
$queues = intval(get_config("system", "worker_queues"));
if ($queues == 0) {
$queues = 4;
}
// If there are already enough workers running, don't fork another one
if ($workers[0]["workers"] >= $queues) {
if (poller_too_much_workers()) {
return;
}

View file

@ -1,6 +1,6 @@
-- ------------------------------------------
-- Friendica 3.5.2-rc (Asparagus)
-- DB_UPDATE_VERSION 1227
-- Friendica 3.5.3dev (Asparagus)
-- DB_UPDATE_VERSION 1228
-- ------------------------------------------
@ -580,7 +580,7 @@ CREATE TABLE IF NOT EXISTS `locks` (
`id` int(11) NOT NULL auto_increment,
`name` varchar(128) NOT NULL DEFAULT '',
`locked` tinyint(1) NOT NULL DEFAULT 0,
`created` datetime DEFAULT '0001-01-01 00:00:00',
`pid` int(10) unsigned NOT NULL DEFAULT 0,
PRIMARY KEY(`id`)
) DEFAULT COLLATE utf8mb4_general_ci;
@ -1116,6 +1116,7 @@ CREATE TABLE IF NOT EXISTS `workerqueue` (
`executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
PRIMARY KEY(`id`),
INDEX `pid` (`pid`),
INDEX `parameter` (`parameter`(192)),
INDEX `priority_created` (`priority`,`created`)
) DEFAULT COLLATE utf8mb4_general_ci;

View file

@ -1,11 +1,11 @@
Table locks
===========
| Field | Description | Type | Null | Key | Default | Extra |
|---------|------------------|--------------|------|-----|---------------------|----------------|
| id | sequential ID | int(11) | NO | PRI | NULL | auto_increment |
| name | | varchar(128) | NO | | | |
| locked | | tinyint(1) | NO | | 0 | |
| created | | datetime | YES | | 0001-01-01 00:00:00 | |
| Field | Description | Type | Null | Key | Default | Extra |
|---------|------------------|------------------|------|-----|---------------------|----------------|
| id | sequential ID | int(11) | NO | PRI | NULL | auto_increment |
| name | | varchar(128) | NO | | | |
| locked | | tinyint(1) | NO | | 0 | |
| pid | Process ID | int(10) unsigned | NO | | 0 | |
Return to [database documentation](help/database)

View file

@ -35,6 +35,7 @@ Example: To set the directory value please add this line to your .htconfig.php:
* **db_loglimit_index_high** - Number of index rows to be logged anyway (for any index)
* **db_log_index_blacklist** - Blacklist of indexes that shouldn't be watched
* **dbclean** (Boolean) - Enable the automatic database cleanup process
* **dbclean-expire-days** (Integer) - Days after which remote items will be deleted. Own items, and marked or filed items are kept.
* **default_service_class** -
* **delivery_batch_count** - Number of deliveries per process. Default value is 1. (Disabled when using the worker)
* **diaspora_test** (Boolean) - For development only. Disables the message transfer.

View file

@ -246,10 +246,11 @@ function cron_poll_contacts($argc, $argv) {
logger("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact["nick"] . " " . $contact["name"]);
if (($contact['network'] == NETWORK_FEED) AND ($contact['priority'] <= 3)) {
proc_run(PRIORITY_MEDIUM, 'include/onepoll.php', intval($contact['id']));
$priority = PRIORITY_MEDIUM;
} else {
proc_run(PRIORITY_LOW, 'include/onepoll.php', intval($contact['id']));
$priority = PRIORITY_LOW;
}
proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', intval($contact['id']));
}
}
}

View file

@ -806,6 +806,41 @@ class dba {
return self::e($sql, $param);
}
/**
* @brief Locks a table for exclusive write access
*
* This function can be extended in the future to accept a table array as well.
*
* @param string $table Table name
*
* @return boolean was the lock successful?
*/
static public function lock($table) {
// See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html
self::e("SET autocommit=0");
$success = self::e("LOCK TABLES `".self::$dbo->escape($table)."` WRITE");
if (!$success) {
self::e("SET autocommit=1");
} else {
self::$in_transaction = true;
}
return $success;
}
/**
* @brief Unlocks all locked tables
*
* @return boolean was the unlock successful?
*/
static public function unlock() {
// See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html
self::e("COMMIT");
$success = self::e("UNLOCK TABLES");
self::e("SET autocommit=1");
self::$in_transaction = false;
return $success;
}
/**
* @brief Starts a transaction
*

View file

@ -17,9 +17,14 @@ function dbclean_run(&$argv, &$argc) {
$stage = 0;
}
// Get the expire days for step 8 and 9
$days = Config::get('system', 'dbclean-expire-days', 0);
if ($stage == 0) {
for ($i = 1; $i <= 7; $i++) {
if (!Config::get('system', 'finished-dbclean-'.$i, false)) {
for ($i = 1; $i <= 9; $i++) {
// Execute the background script for a step when it isn't finished.
// Execute step 8 and 9 only when $days is defined.
if (!Config::get('system', 'finished-dbclean-'.$i, false) AND (($i < 8) OR ($days > 0))) {
proc_run(PRIORITY_LOW, 'include/dbclean.php', $i);
}
}
@ -30,6 +35,19 @@ function dbclean_run(&$argv, &$argc) {
/**
* @brief Remove orphaned database entries
* @param integer $stage What should be deleted?
*
* Values for $stage:
* ------------------
* 1: Old global item entries from item table without user copy.
* 2: Items without parents.
* 3: Orphaned data from thread table.
* 4: Orphaned data from notify table.
* 5: Orphaned data from notify-threads table.
* 6: Orphaned data from sign table.
* 7: Orphaned data from term table.
* 8: Expired threads.
* 9: Old global item entries from expired threads
*/
function remove_orphans($stage = 0) {
global $db;
@ -39,6 +57,9 @@ function remove_orphans($stage = 0) {
// We split the deletion in many small tasks
$limit = 1000;
// Get the expire days for step 8 and 9
$days = Config::get('system', 'dbclean-expire-days', 0);
if ($stage == 1) {
$last_id = Config::get('system', 'dbclean-last-id-1', 0);
@ -61,11 +82,6 @@ function remove_orphans($stage = 0) {
logger("Done deleting ".$count." old global item entries from item table without user copy. Last ID: ".$last_id);
Config::set('system', 'dbclean-last-id-1', $last_id);
// We will eventually set this value when we found a good way to delete these items in another way.
// if ($count < $limit) {
// Config::set('system', 'finished-dbclean-1', true);
// }
} elseif ($stage == 2) {
$last_id = Config::get('system', 'dbclean-last-id-2', 0);
@ -216,11 +232,71 @@ function remove_orphans($stage = 0) {
if ($count < $limit) {
Config::set('system', 'finished-dbclean-7', true);
}
} elseif ($stage == 8) {
if ($days <= 0) {
return;
}
$last_id = Config::get('system', 'dbclean-last-id-8', 0);
logger("Deleting expired threads. Last ID: ".$last_id);
$r = dba::p("SELECT `thread`.`iid` FROM `thread`
INNER JOIN `contact` ON `thread`.`contact-id` = `contact`.`id` AND NOT `notify_new_posts`
WHERE `thread`.`received` < UTC_TIMESTAMP() - INTERVAL ? DAY
AND NOT `thread`.`mention` AND NOT `thread`.`starred`
AND NOT `thread`.`wall` AND NOT `thread`.`origin`
AND `thread`.`uid` != 0 AND `thread`.`iid` >= ?
AND NOT `thread`.`iid` IN (SELECT `parent` FROM `item`
WHERE (`item`.`starred` OR (`item`.`resource-id` != '')
OR (`item`.`file` != '') OR (`item`.`event-id` != '')
OR (`item`.`attach` != '') OR `item`.`wall` OR `item`.`origin`)
AND `item`.`parent` = `thread`.`iid`)
ORDER BY `thread`.`iid` LIMIT 1000", $days, $last_id);
$count = dba::num_rows($r);
if ($count > 0) {
logger("found expired threads: ".$count);
while ($thread = dba::fetch($r)) {
$last_id = $thread["iid"];
dba::delete('thread', array('iid' => $thread["iid"]));
}
} else {
logger("No expired threads found");
}
dba::close($r);
logger("Done deleting ".$count." expired threads. Last ID: ".$last_id);
Config::set('system', 'dbclean-last-id-8', $last_id);
} elseif ($stage == 9) {
if ($days <= 0) {
return;
}
$last_id = Config::get('system', 'dbclean-last-id-9', 0);
$till_id = Config::get('system', 'dbclean-last-id-8', 0);
logger("Deleting old global item entries from expired threads from ID ".$last_id." to ID ".$till_id);
$r = dba::p("SELECT `id` FROM `item` WHERE `uid` = 0 AND
NOT EXISTS (SELECT `guid` FROM `item` AS `i` WHERE `item`.`guid` = `i`.`guid` AND `i`.`uid` != 0) AND
`received` < UTC_TIMESTAMP() - INTERVAL 90 DAY AND `id` >= ? AND `id` <= ?
ORDER BY `id` LIMIT ".intval($limit), $last_id, $till_id);
$count = dba::num_rows($r);
if ($count > 0) {
logger("found global item entries from expired threads: ".$count);
while ($orphan = dba::fetch($r)) {
$last_id = $orphan["id"];
dba::delete('item', array('id' => $orphan["id"]));
}
} else {
logger("No global item entries from expired threads");
}
dba::close($r);
logger("Done deleting ".$count." old global item entries from expired threads. Last ID: ".$last_id);
Config::set('system', 'dbclean-last-id-9', $last_id);
}
// Call it again if not all entries were purged
if (($stage != 0) AND ($count > 0)) {
proc_run(PRIORITY_MEDIUM, 'include/dbclean.php');
}
}

View file

@ -1205,7 +1205,7 @@ function db_definition() {
"id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"),
"name" => array("type" => "varchar(128)", "not null" => "1", "default" => ""),
"locked" => array("type" => "tinyint(1)", "not null" => "1", "default" => "0"),
"created" => array("type" => "datetime", "default" => NULL_DATE),
"pid" => array("type" => "int(10) unsigned", "not null" => "1", "default" => "0"),
),
"indexes" => array(
"PRIMARY" => array("id"),
@ -1743,6 +1743,7 @@ function db_definition() {
"indexes" => array(
"PRIMARY" => array("id"),
"pid" => array("pid"),
"parameter" => array("parameter(192)"),
"priority_created" => array("priority", "created"),
)
);

View file

@ -1,80 +0,0 @@
<?php
// Provide some ability to lock a PHP function so that multiple processes
// can't run the function concurrently
if (! function_exists('lock_function')) {
function lock_function($fn_name, $block = true, $wait_sec = 2, $timeout = 30) {
if ( $wait_sec == 0 )
$wait_sec = 2; // don't let the user pick a value that's likely to crash the system
$got_lock = false;
$start = time();
do {
q("LOCK TABLE `locks` WRITE");
$r = q("SELECT `locked`, `created` FROM `locks` WHERE `name` = '%s' LIMIT 1",
dbesc($fn_name)
);
if ((dbm::is_result($r)) AND (!$r[0]['locked'] OR (strtotime($r[0]['created']) < time() - 3600))) {
q("UPDATE `locks` SET `locked` = 1, `created` = '%s' WHERE `name` = '%s'",
dbesc(datetime_convert()),
dbesc($fn_name)
);
$got_lock = true;
}
elseif (! dbm::is_result($r)) {
/// @TODO the Boolean value for count($r) should be equivalent to the Boolean value of $r
q("INSERT INTO `locks` (`name`, `created`, `locked`) VALUES ('%s', '%s', 1)",
dbesc($fn_name),
dbesc(datetime_convert())
);
$got_lock = true;
}
q("UNLOCK TABLES");
if (($block) && (! $got_lock))
sleep($wait_sec);
} while (($block) && (! $got_lock) && ((time() - $start) < $timeout));
logger('lock_function: function ' . $fn_name . ' with blocking = ' . $block . ' got_lock = ' . $got_lock . ' time = ' . (time() - $start), LOGGER_DEBUG);
return $got_lock;
}}
if (! function_exists('block_on_function_lock')) {
function block_on_function_lock($fn_name, $wait_sec = 2, $timeout = 30) {
if ( $wait_sec == 0 )
$wait_sec = 2; // don't let the user pick a value that's likely to crash the system
$start = time();
do {
$r = q("SELECT locked FROM locks WHERE name = '%s' LIMIT 1",
dbesc($fn_name)
);
if (dbm::is_result($r) && $r[0]['locked']) {
sleep($wait_sec);
}
} while (dbm::is_result($r) && $r[0]['locked'] && ((time() - $start) < $timeout));
return;
}}
if (! function_exists('unlock_function')) {
function unlock_function($fn_name) {
$r = q("UPDATE `locks` SET `locked` = 0, `created` = '%s' WHERE `name` = '%s'",
dbesc(NULL_DATE),
dbesc($fn_name)
);
logger('unlock_function: released lock for function ' . $fn_name, LOGGER_DEBUG);
return;
}}

View file

@ -498,7 +498,7 @@ function notifier_run(&$argv, &$argc){
}
logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
proc_run($priority, 'include/delivery.php', $cmd, $item_id, $contact['id']);
proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']);
}
}
@ -563,7 +563,7 @@ function notifier_run(&$argv, &$argc){
if ((! $mail) && (! $fsuggest) && (! $followup)) {
logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
proc_run($priority, 'include/delivery.php', $cmd, $item_id, $rr['id']);
proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']);
}
}
}
@ -603,7 +603,7 @@ function notifier_run(&$argv, &$argc){
}
// Handling the pubsubhubbub requests
proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php');
proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
}
logger('notifier: calling hooks', LOGGER_DEBUG);

View file

@ -2,13 +2,14 @@
use Friendica\App;
use Friendica\Core\Config;
use Friendica\Util\Lock;
if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
$directory = dirname($_SERVER["argv"][0]);
if (substr($directory, 0, 1) != "/")
if (substr($directory, 0, 1) != "/") {
$directory = $_SERVER["PWD"]."/".$directory;
}
$directory = realpath($directory."/..");
chdir($directory);
@ -19,16 +20,12 @@ require_once("boot.php");
function poller_run($argv, $argc){
global $a, $db;
if (is_null($a)) {
$a = new App(dirname(__DIR__));
}
$a = new App(dirname(__DIR__));
if (is_null($db)) {
@include(".htconfig.php");
require_once("include/dba.php");
$db = new dba($db_host, $db_user, $db_pass, $db_data);
unset($db_host, $db_user, $db_pass, $db_data);
};
@include(".htconfig.php");
require_once("include/dba.php");
$db = new dba($db_host, $db_user, $db_pass, $db_data);
unset($db_host, $db_user, $db_pass, $db_data);
Config::load();
@ -41,54 +38,74 @@ function poller_run($argv, $argc){
load_hooks();
// At first check the maximum load. We shouldn't continue with a high load
if ($a->maxload_reached()) {
logger('Pre check: maximum load reached, quitting.', LOGGER_DEBUG);
return;
}
// We now start the process. This is done after the load check since this could increase the load.
$a->start_process();
// At first we check the number of workers and quit if there are too much of them
// This is done at the top to avoid that too much code is executed without a need to do so,
// since the poller mostly quits here.
if (poller_too_much_workers()) {
poller_kill_stale_workers();
logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Do we have too few memory?
if ($a->min_memory_reached()) {
logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Possibly there are too much database connections
if (poller_max_connections_reached()) {
logger('Pre check: maximum connections reached, quitting.', LOGGER_DEBUG);
return;
}
if ($a->maxload_reached()) {
// Possibly there are too much database processes that block the system
if ($a->max_processes_reached()) {
logger('Pre check: maximum processes reached, quitting.', LOGGER_DEBUG);
return;
}
// Now we start additional cron processes if we should do so
if (($argc <= 1) OR ($argv[1] != "no_cron")) {
poller_run_cron();
}
if ($a->max_processes_reached()) {
return;
}
// Checking the number of workers
if (poller_too_much_workers()) {
poller_kill_stale_workers();
return;
}
$starttime = time();
// We fetch the next queue entry that is about to be executed
while ($r = poller_worker_process()) {
// If we got that queue entry we claim it for us
if (!poller_claim_process($r[0])) {
continue;
}
// Check free memory
if ($a->min_memory_reached()) {
logger('Memory limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
if (Lock::set('poller_worker', 0)) {
// Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Check free memory
if ($a->min_memory_reached()) {
logger('Memory limit reached, quitting.', LOGGER_DEBUG);
return;
}
Lock::remove('poller_worker');
}
// finally the work will be done
if (!poller_execute($r[0])) {
logger('Process execution failed, quitting.', LOGGER_DEBUG);
return;
@ -96,7 +113,7 @@ function poller_run($argv, $argc){
// Quit the poller once every hour
if (time() > ($starttime + 3600)) {
logger('Process lifetime reachted, quitting.', LOGGER_DEBUG);
logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
return;
}
}
@ -150,7 +167,6 @@ function poller_execute($queue) {
$funcname = str_replace(".php", "", basename($argv[0]))."_run";
if (function_exists($funcname)) {
poller_exec_function($queue, $funcname, $argv);
dba::delete('workerqueue', array('id' => $queue["id"]));
} else {
@ -226,24 +242,27 @@ function poller_exec_function($queue, $funcname, $argv) {
$o = "\nDatabase Read:\n";
foreach ($a->callstack["database"] AS $func => $time) {
$time = round($time, 3);
if ($time > 0)
if ($time > 0) {
$o .= $func.": ".$time."\n";
}
}
}
if (isset($a->callstack["database_write"])) {
$o .= "\nDatabase Write:\n";
foreach ($a->callstack["database_write"] AS $func => $time) {
$time = round($time, 3);
if ($time > 0)
if ($time > 0) {
$o .= $func.": ".$time."\n";
}
}
}
if (isset($a->callstack["network"])) {
$o .= "\nNetwork:\n";
foreach ($a->callstack["network"] AS $func => $time) {
$time = round($time, 3);
if ($time > 0)
if ($time > 0) {
$o .= $func.": ".$time."\n";
}
}
}
} else {
@ -284,27 +303,30 @@ function poller_max_connections_reached() {
if ($max == 0) {
// the maximum number of possible user connections can be a system variable
$r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
if ($r)
if (dbm::is_result($r)) {
$max = $r[0]["Value"];
}
// Or it can be granted. This overrides the system variable
$r = q("SHOW GRANTS");
if ($r)
if (dbm::is_result($r)) {
foreach ($r AS $grants) {
$grant = array_pop($grants);
if (stristr($grant, "GRANT USAGE ON"))
if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match))
if (stristr($grant, "GRANT USAGE ON")) {
if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) {
$max = $match[1];
}
}
}
}
}
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ($max != 0) {
$r = q("SHOW PROCESSLIST");
if (!dbm::is_result($r))
if (!dbm::is_result($r)) {
return false;
}
$used = count($r);
logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
@ -320,28 +342,28 @@ function poller_max_connections_reached() {
// We will now check for the system values.
// This limit could be reached although the user limits are fine.
$r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
if (!$r)
if (!dbm::is_result($r)) {
return false;
}
$max = intval($r[0]["Value"]);
if ($max == 0)
if ($max == 0) {
return false;
}
$r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
if (!$r)
if (!dbm::is_result($r)) {
return false;
}
$used = intval($r[0]["Value"]);
if ($used == 0)
if ($used == 0) {
return false;
}
logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
$level = $used / $max * 100;
if ($level < $maxlevel)
if ($level < $maxlevel) {
return false;
}
logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
return true;
}
@ -440,8 +462,7 @@ function poller_too_much_workers() {
dba::close($processes);
}
dba::close($entries);
$processlist = implode(', ', $listitem);
$processlist = ' ('.implode(', ', $listitem).')';
$s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
$entries = $s[0]["total"];
@ -460,7 +481,7 @@ function poller_too_much_workers() {
}
}
logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries." (".$processlist.") - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
// Are there fewer workers running as possible? Then fork a new one.
if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
@ -471,7 +492,7 @@ function poller_too_much_workers() {
}
}
return($active >= $queues);
return $active >= $queues;
}
/**
@ -482,7 +503,7 @@ function poller_too_much_workers() {
function poller_active_workers() {
$workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
return($workers[0]["processes"]);
return $workers[0]["processes"];
}
/**
@ -503,36 +524,37 @@ function poller_passing_slow(&$highest_priority) {
INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
// No active processes at all? Fine
if (!dbm::is_result($r))
return(false);
if (!dbm::is_result($r)) {
return false;
}
$priorities = array();
foreach ($r AS $line)
foreach ($r AS $line) {
$priorities[] = $line["priority"];
}
// Should not happen
if (count($priorities) == 0)
return(false);
if (count($priorities) == 0) {
return false;
}
$highest_priority = min($priorities);
// The highest process is already the slowest one?
// Then we quit
if ($highest_priority == PRIORITY_NEGLIGIBLE)
return(false);
if ($highest_priority == PRIORITY_NEGLIGIBLE) {
return false;
}
$high = 0;
foreach ($priorities AS $priority)
if ($priority == $highest_priority)
foreach ($priorities AS $priority) {
if ($priority == $highest_priority) {
++$high;
}
}
logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG);
$passing_slow = (($high/count($priorities)) > (2/3));
if ($passing_slow)
if ($passing_slow) {
logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG);
return($passing_slow);
}
return $passing_slow;
}
/**
@ -546,7 +568,7 @@ function poller_worker_process() {
$highest_priority = 0;
if (poller_passing_slow($highest_priority)) {
dba::e('LOCK TABLES `workerqueue` WRITE');
dba::lock('workerqueue');
// Are there waiting processes with a higher priority than the currently highest?
$r = q("SELECT * FROM `workerqueue`
@ -568,7 +590,7 @@ function poller_worker_process() {
return $r;
}
} else {
dba::e('LOCK TABLES `workerqueue` WRITE');
dba::lock('workerqueue');
}
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
@ -578,7 +600,7 @@ function poller_worker_process() {
// We only unlock the tables here, when we got no data
if (!dbm::is_result($r)) {
dba::e('UNLOCK TABLES');
dba::unlock();
}
return $r;
@ -598,7 +620,7 @@ function poller_claim_process($queue) {
$success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
array('id' => $queue["id"], 'pid' => 0));
dba::e('UNLOCK TABLES');
dba::unlock();
if (!$success) {
logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
@ -729,5 +751,7 @@ if (array_search(__file__,get_included_files())===0){
get_app()->end_process();
Lock::remove('poller_worker');
killme();
}

View file

@ -17,7 +17,7 @@ function pubsubpublish_run(&$argv, &$argc){
foreach ($r as $rr) {
logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php', $rr["id"]);
proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]);
}
}

View file

@ -27,7 +27,7 @@ function queue_run(&$argv, &$argc){
logger('queue: start');
// Handling the pubsubhubbub requests
proc_run(PRIORITY_HIGH,'include/pubsubpublish.php');
proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
$r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue`
INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id`
@ -51,7 +51,7 @@ function queue_run(&$argv, &$argc){
if (dbm::is_result($r)) {
foreach ($r as $q_item) {
logger('Call queue for id '.$q_item['id']);
proc_run(PRIORITY_LOW, "include/queue.php", $q_item['id']);
proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", $q_item['id']);
}
}
return;

View file

@ -1995,10 +1995,11 @@ function get_gcontact_id($contact) {
if (in_array($contact["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS)))
$contact["url"] = clean_contact_url($contact["url"]);
$r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 2",
dba::lock('gcontact');
$r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 1",
dbesc(normalise_link($contact["url"])));
if ($r) {
if (dbm::is_result($r)) {
$gcontact_id = $r[0]["id"];
// Update every 90 days
@ -2036,17 +2037,13 @@ function get_gcontact_id($contact) {
$doprobing = in_array($r[0]["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS, ""));
}
}
dba::unlock();
if ($doprobing) {
logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG);
proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"]));
}
if ((dbm::is_result($r)) AND (count($r) > 1) AND ($gcontact_id > 0) AND ($contact["url"] != ""))
q("DELETE FROM `gcontact` WHERE `nurl` = '%s' AND `id` != %d",
dbesc(normalise_link($contact["url"])),
intval($gcontact_id));
return $gcontact_id;
}

View file

@ -797,6 +797,8 @@ class App {
* @return bool Is the limit reached?
*/
function max_processes_reached() {
// Deactivated, needs more investigating if this check really makes sense
return false;
if ($this->is_backend()) {
$process = 'backend';

159
src/Util/Lock.php Normal file
View file

@ -0,0 +1,159 @@
<?php
namespace Friendica\Util;
/**
* @file src/Util/Lock.php
* @brief Functions for preventing parallel execution of functions
*
*/
use Friendica\Core\Config;
use Memcache;
use dba;
use dbm;
/**
* @brief This class contain Functions for preventing parallel execution of functions
*/
class Lock {
/**
* @brief Check for memcache and open a connection if configured
*
* @return object|boolean The memcache object - or "false" if not successful
*/
private static function connectMemcache() {
if (!function_exists('memcache_connect')) {
return false;
}
if (!Config::get('system', 'memcache')) {
return false;
}
$memcache_host = Config::get('system', 'memcache_host', '127.0.0.1');
$memcache_port = Config::get('system', 'memcache_port', 11211);
$memcache = new Memcache;
if (!$memcache->connect($memcache_host, $memcache_port)) {
return false;
}
return $memcache;
}
/**
* @brief Sets a lock for a given name
*
* @param string $fn_name Name of the lock
* @param integer $timeout Seconds until we give up
*
* @return boolean Was the lock successful?
*/
public static function set($fn_name, $timeout = 120) {
$got_lock = false;
$start = time();
$memcache = self::connectMemcache();
if (is_object($memcache)) {
$wait_sec = 0.2;
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
do {
$lock = $memcache->get($cachekey);
if (!is_bool($lock)) {
$pid = (int)$lock;
// When the process id isn't used anymore, we can safely claim the lock for us.
// Or we do want to lock something that was already locked by us.
if (!posix_kill($pid, 0) OR ($pid == getmypid())) {
$lock = false;
}
}
if (is_bool($lock)) {
$memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300);
$got_lock = true;
}
if (!$got_lock AND ($timeout > 0)) {
usleep($wait_sec * 1000000);
}
} while (!$got_lock AND ((time() - $start) < $timeout));
return $got_lock;
}
$wait_sec = 2;
do {
dba::lock('locks');
$lock = dba::select('locks', array('locked', 'pid'), array('name' => $fn_name), array('limit' => 1));
if (dbm::is_result($lock)) {
if ($lock['locked']) {
// When the process id isn't used anymore, we can safely claim the lock for us.
if (!posix_kill($lock['pid'], 0)) {
$lock['locked'] = false;
}
// We want to lock something that was already locked by us? So we got the lock.
if ($lock['pid'] == getmypid()) {
$got_lock = true;
}
}
if (!$lock['locked']) {
dba::update('locks', array('locked' => true, 'pid' => getmypid()), array('name' => $fn_name));
$got_lock = true;
}
} elseif (!dbm::is_result($lock)) {
dba::insert('locks', array('name' => $fn_name, 'locked' => true, 'pid' => getmypid()));
$got_lock = true;
}
dba::unlock();
if (!$got_lock AND ($timeout > 0)) {
sleep($wait_sec);
}
} while (!$got_lock AND ((time() - $start) < $timeout));
return $got_lock;
}
/**
* @brief Removes a lock if it was set by us
*
* @param string $fn_name Name of the lock
*/
public static function remove($fn_name) {
$memcache = self::connectMemcache();
if (is_object($memcache)) {
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
$lock = $memcache->get($cachekey);
if (!is_bool($lock)) {
if ((int)$lock == getmypid()) {
$memcache->delete($cachekey);
}
}
return;
}
dba::update('locks', array('locked' => false, 'pid' => 0), array('name' => $fn_name, 'pid' => getmypid()));
return;
}
/**
* @brief Removes all lock that were set by us
*/
public static function removeAll() {
$memcache = self::connectMemcache();
if (is_object($memcache)) {
// We cannot delete all cache entries, but this doesn't matter with memcache
return;
}
dba::update('locks', array('locked' => false, 'pid' => 0), array('pid' => getmypid()));
return;
}
}

View file

@ -1,6 +1,6 @@
<?php
define('UPDATE_VERSION' , 1227);
define('UPDATE_VERSION' , 1228);
/**
*