diff --git a/VERSION b/VERSION index 3fec5bc90..b35d44161 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.5.3dev +3.5.3-dev diff --git a/boot.php b/boot.php index 5f83cb3e4..50b8f03f6 100644 --- a/boot.php +++ b/boot.php @@ -35,12 +35,13 @@ require_once 'include/features.php'; require_once 'include/identity.php'; require_once 'update.php'; require_once 'include/dbstructure.php'; +require_once 'include/poller.php'; define ( 'FRIENDICA_PLATFORM', 'Friendica'); define ( 'FRIENDICA_CODENAME', 'Asparagus'); -define ( 'FRIENDICA_VERSION', '3.5.3dev' ); +define ( 'FRIENDICA_VERSION', '3.5.3-dev' ); define ( 'DFRN_PROTOCOL_VERSION', '2.23' ); -define ( 'DB_UPDATE_VERSION', 1227 ); +define ( 'DB_UPDATE_VERSION', 1228 ); /** * @brief Constant with a HTML line break. @@ -1095,18 +1096,8 @@ function proc_run($cmd) { return; } - // Checking number of workers - $workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE)); - - // Get number of allowed number of worker threads - $queues = intval(get_config("system", "worker_queues")); - - if ($queues == 0) { - $queues = 4; - } - // If there are already enough workers running, don't fork another one - if ($workers[0]["workers"] >= $queues) { + if (poller_too_much_workers()) { return; } diff --git a/database.sql b/database.sql index 4a5946ef3..7f7e975e7 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ --- Friendica 3.5.2-rc (Asparagus) --- DB_UPDATE_VERSION 1227 +-- Friendica 3.5.3dev (Asparagus) +-- DB_UPDATE_VERSION 1228 -- ------------------------------------------ @@ -580,7 +580,7 @@ CREATE TABLE IF NOT EXISTS `locks` ( `id` int(11) NOT NULL auto_increment, `name` varchar(128) NOT NULL DEFAULT '', `locked` tinyint(1) NOT NULL DEFAULT 0, - `created` datetime DEFAULT '0001-01-01 00:00:00', + `pid` int(10) unsigned NOT NULL DEFAULT 0, PRIMARY KEY(`id`) ) DEFAULT COLLATE utf8mb4_general_ci; @@ -1116,6 +1116,7 @@ CREATE TABLE IF NOT EXISTS `workerqueue` ( `executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', PRIMARY KEY(`id`), INDEX `pid` (`pid`), + INDEX `parameter` (`parameter`(192)), INDEX `priority_created` (`priority`,`created`) ) DEFAULT COLLATE utf8mb4_general_ci; diff --git a/doc/database/db_locks.md b/doc/database/db_locks.md index 00556dd95..4de6fbf96 100644 --- a/doc/database/db_locks.md +++ b/doc/database/db_locks.md @@ -1,11 +1,11 @@ Table locks =========== -| Field | Description | Type | Null | Key | Default | Extra | -|---------|------------------|--------------|------|-----|---------------------|----------------| -| id | sequential ID | int(11) | NO | PRI | NULL | auto_increment | -| name | | varchar(128) | NO | | | | -| locked | | tinyint(1) | NO | | 0 | | -| created | | datetime | YES | | 0001-01-01 00:00:00 | | +| Field | Description | Type | Null | Key | Default | Extra | +|---------|------------------|------------------|------|-----|---------------------|----------------| +| id | sequential ID | int(11) | NO | PRI | NULL | auto_increment | +| name | | varchar(128) | NO | | | | +| locked | | tinyint(1) | NO | | 0 | | +| pid | Process ID | int(10) unsigned | NO | | 0 | | Return to [database documentation](help/database) diff --git a/doc/htconfig.md b/doc/htconfig.md index 542b42b29..7887184d8 100644 --- a/doc/htconfig.md +++ b/doc/htconfig.md @@ -35,6 +35,7 @@ Example: To set the directory value please add this line to your .htconfig.php: * **db_loglimit_index_high** - Number of index rows to be logged anyway (for any index) * **db_log_index_blacklist** - Blacklist of indexes that shouldn't be watched * **dbclean** (Boolean) - Enable the automatic database cleanup process +* **dbclean-expire-days** (Integer) - Days after which remote items will be deleted. Own items, and marked or filed items are kept. * **default_service_class** - * **delivery_batch_count** - Number of deliveries per process. Default value is 1. (Disabled when using the worker) * **diaspora_test** (Boolean) - For development only. Disables the message transfer. diff --git a/include/cron.php b/include/cron.php index 3702bf8b3..70b87e969 100644 --- a/include/cron.php +++ b/include/cron.php @@ -246,10 +246,11 @@ function cron_poll_contacts($argc, $argv) { logger("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact["nick"] . " " . $contact["name"]); if (($contact['network'] == NETWORK_FEED) AND ($contact['priority'] <= 3)) { - proc_run(PRIORITY_MEDIUM, 'include/onepoll.php', intval($contact['id'])); + $priority = PRIORITY_MEDIUM; } else { - proc_run(PRIORITY_LOW, 'include/onepoll.php', intval($contact['id'])); + $priority = PRIORITY_LOW; } + proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', intval($contact['id'])); } } } diff --git a/include/dba.php b/include/dba.php index b9e6c32d5..1f428bf46 100644 --- a/include/dba.php +++ b/include/dba.php @@ -806,6 +806,41 @@ class dba { return self::e($sql, $param); } + /** + * @brief Locks a table for exclusive write access + * + * This function can be extended in the future to accept a table array as well. + * + * @param string $table Table name + * + * @return boolean was the lock successful? + */ + static public function lock($table) { + // See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html + self::e("SET autocommit=0"); + $success = self::e("LOCK TABLES `".self::$dbo->escape($table)."` WRITE"); + if (!$success) { + self::e("SET autocommit=1"); + } else { + self::$in_transaction = true; + } + return $success; + } + + /** + * @brief Unlocks all locked tables + * + * @return boolean was the unlock successful? + */ + static public function unlock() { + // See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html + self::e("COMMIT"); + $success = self::e("UNLOCK TABLES"); + self::e("SET autocommit=1"); + self::$in_transaction = false; + return $success; + } + /** * @brief Starts a transaction * diff --git a/include/dbclean.php b/include/dbclean.php index ff4993c4a..8da78d64b 100644 --- a/include/dbclean.php +++ b/include/dbclean.php @@ -17,9 +17,14 @@ function dbclean_run(&$argv, &$argc) { $stage = 0; } + // Get the expire days for step 8 and 9 + $days = Config::get('system', 'dbclean-expire-days', 0); + if ($stage == 0) { - for ($i = 1; $i <= 7; $i++) { - if (!Config::get('system', 'finished-dbclean-'.$i, false)) { + for ($i = 1; $i <= 9; $i++) { + // Execute the background script for a step when it isn't finished. + // Execute step 8 and 9 only when $days is defined. + if (!Config::get('system', 'finished-dbclean-'.$i, false) AND (($i < 8) OR ($days > 0))) { proc_run(PRIORITY_LOW, 'include/dbclean.php', $i); } } @@ -30,6 +35,19 @@ function dbclean_run(&$argv, &$argc) { /** * @brief Remove orphaned database entries + * @param integer $stage What should be deleted? + * + * Values for $stage: + * ------------------ + * 1: Old global item entries from item table without user copy. + * 2: Items without parents. + * 3: Orphaned data from thread table. + * 4: Orphaned data from notify table. + * 5: Orphaned data from notify-threads table. + * 6: Orphaned data from sign table. + * 7: Orphaned data from term table. + * 8: Expired threads. + * 9: Old global item entries from expired threads */ function remove_orphans($stage = 0) { global $db; @@ -39,6 +57,9 @@ function remove_orphans($stage = 0) { // We split the deletion in many small tasks $limit = 1000; + // Get the expire days for step 8 and 9 + $days = Config::get('system', 'dbclean-expire-days', 0); + if ($stage == 1) { $last_id = Config::get('system', 'dbclean-last-id-1', 0); @@ -61,11 +82,6 @@ function remove_orphans($stage = 0) { logger("Done deleting ".$count." old global item entries from item table without user copy. Last ID: ".$last_id); Config::set('system', 'dbclean-last-id-1', $last_id); - - // We will eventually set this value when we found a good way to delete these items in another way. - // if ($count < $limit) { - // Config::set('system', 'finished-dbclean-1', true); - // } } elseif ($stage == 2) { $last_id = Config::get('system', 'dbclean-last-id-2', 0); @@ -216,11 +232,71 @@ function remove_orphans($stage = 0) { if ($count < $limit) { Config::set('system', 'finished-dbclean-7', true); } + } elseif ($stage == 8) { + if ($days <= 0) { + return; + } + + $last_id = Config::get('system', 'dbclean-last-id-8', 0); + + logger("Deleting expired threads. Last ID: ".$last_id); + $r = dba::p("SELECT `thread`.`iid` FROM `thread` + INNER JOIN `contact` ON `thread`.`contact-id` = `contact`.`id` AND NOT `notify_new_posts` + WHERE `thread`.`received` < UTC_TIMESTAMP() - INTERVAL ? DAY + AND NOT `thread`.`mention` AND NOT `thread`.`starred` + AND NOT `thread`.`wall` AND NOT `thread`.`origin` + AND `thread`.`uid` != 0 AND `thread`.`iid` >= ? + AND NOT `thread`.`iid` IN (SELECT `parent` FROM `item` + WHERE (`item`.`starred` OR (`item`.`resource-id` != '') + OR (`item`.`file` != '') OR (`item`.`event-id` != '') + OR (`item`.`attach` != '') OR `item`.`wall` OR `item`.`origin`) + AND `item`.`parent` = `thread`.`iid`) + ORDER BY `thread`.`iid` LIMIT 1000", $days, $last_id); + $count = dba::num_rows($r); + if ($count > 0) { + logger("found expired threads: ".$count); + while ($thread = dba::fetch($r)) { + $last_id = $thread["iid"]; + dba::delete('thread', array('iid' => $thread["iid"])); + } + } else { + logger("No expired threads found"); + } + dba::close($r); + logger("Done deleting ".$count." expired threads. Last ID: ".$last_id); + + Config::set('system', 'dbclean-last-id-8', $last_id); + } elseif ($stage == 9) { + if ($days <= 0) { + return; + } + + $last_id = Config::get('system', 'dbclean-last-id-9', 0); + $till_id = Config::get('system', 'dbclean-last-id-8', 0); + + logger("Deleting old global item entries from expired threads from ID ".$last_id." to ID ".$till_id); + $r = dba::p("SELECT `id` FROM `item` WHERE `uid` = 0 AND + NOT EXISTS (SELECT `guid` FROM `item` AS `i` WHERE `item`.`guid` = `i`.`guid` AND `i`.`uid` != 0) AND + `received` < UTC_TIMESTAMP() - INTERVAL 90 DAY AND `id` >= ? AND `id` <= ? + ORDER BY `id` LIMIT ".intval($limit), $last_id, $till_id); + $count = dba::num_rows($r); + if ($count > 0) { + logger("found global item entries from expired threads: ".$count); + while ($orphan = dba::fetch($r)) { + $last_id = $orphan["id"]; + dba::delete('item', array('id' => $orphan["id"])); + } + } else { + logger("No global item entries from expired threads"); + } + dba::close($r); + logger("Done deleting ".$count." old global item entries from expired threads. Last ID: ".$last_id); + + Config::set('system', 'dbclean-last-id-9', $last_id); } // Call it again if not all entries were purged if (($stage != 0) AND ($count > 0)) { proc_run(PRIORITY_MEDIUM, 'include/dbclean.php'); } - } diff --git a/include/dbstructure.php b/include/dbstructure.php index 156453d93..161df46f6 100644 --- a/include/dbstructure.php +++ b/include/dbstructure.php @@ -1205,7 +1205,7 @@ function db_definition() { "id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"), "name" => array("type" => "varchar(128)", "not null" => "1", "default" => ""), "locked" => array("type" => "tinyint(1)", "not null" => "1", "default" => "0"), - "created" => array("type" => "datetime", "default" => NULL_DATE), + "pid" => array("type" => "int(10) unsigned", "not null" => "1", "default" => "0"), ), "indexes" => array( "PRIMARY" => array("id"), @@ -1743,6 +1743,7 @@ function db_definition() { "indexes" => array( "PRIMARY" => array("id"), "pid" => array("pid"), + "parameter" => array("parameter(192)"), "priority_created" => array("priority", "created"), ) ); diff --git a/include/lock.php b/include/lock.php deleted file mode 100644 index 64f6319ef..000000000 --- a/include/lock.php +++ /dev/null @@ -1,80 +0,0 @@ - $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']); } } @@ -563,7 +563,7 @@ function notifier_run(&$argv, &$argc){ if ((! $mail) && (! $fsuggest) && (! $followup)) { logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]); - proc_run($priority, 'include/delivery.php', $cmd, $item_id, $rr['id']); + proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']); } } } @@ -603,7 +603,7 @@ function notifier_run(&$argv, &$argc){ } // Handling the pubsubhubbub requests - proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php'); + proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php'); } logger('notifier: calling hooks', LOGGER_DEBUG); diff --git a/include/poller.php b/include/poller.php index 0011559c6..fcabe5d8e 100644 --- a/include/poller.php +++ b/include/poller.php @@ -2,13 +2,14 @@ use Friendica\App; use Friendica\Core\Config; +use Friendica\Util\Lock; if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) { $directory = dirname($_SERVER["argv"][0]); - if (substr($directory, 0, 1) != "/") + if (substr($directory, 0, 1) != "/") { $directory = $_SERVER["PWD"]."/".$directory; - + } $directory = realpath($directory."/.."); chdir($directory); @@ -19,16 +20,12 @@ require_once("boot.php"); function poller_run($argv, $argc){ global $a, $db; - if (is_null($a)) { - $a = new App(dirname(__DIR__)); - } + $a = new App(dirname(__DIR__)); - if (is_null($db)) { - @include(".htconfig.php"); - require_once("include/dba.php"); - $db = new dba($db_host, $db_user, $db_pass, $db_data); - unset($db_host, $db_user, $db_pass, $db_data); - }; + @include(".htconfig.php"); + require_once("include/dba.php"); + $db = new dba($db_host, $db_user, $db_pass, $db_data); + unset($db_host, $db_user, $db_pass, $db_data); Config::load(); @@ -41,54 +38,74 @@ function poller_run($argv, $argc){ load_hooks(); + // At first check the maximum load. We shouldn't continue with a high load + if ($a->maxload_reached()) { + logger('Pre check: maximum load reached, quitting.', LOGGER_DEBUG); + return; + } + + // We now start the process. This is done after the load check since this could increase the load. $a->start_process(); + // At first we check the number of workers and quit if there are too much of them + // This is done at the top to avoid that too much code is executed without a need to do so, + // since the poller mostly quits here. + if (poller_too_much_workers()) { + poller_kill_stale_workers(); + logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } + + // Do we have too few memory? if ($a->min_memory_reached()) { + logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG); return; } + // Possibly there are too much database connections if (poller_max_connections_reached()) { + logger('Pre check: maximum connections reached, quitting.', LOGGER_DEBUG); return; } - if ($a->maxload_reached()) { + // Possibly there are too much database processes that block the system + if ($a->max_processes_reached()) { + logger('Pre check: maximum processes reached, quitting.', LOGGER_DEBUG); return; } + // Now we start additional cron processes if we should do so if (($argc <= 1) OR ($argv[1] != "no_cron")) { poller_run_cron(); } - if ($a->max_processes_reached()) { - return; - } - - // Checking the number of workers - if (poller_too_much_workers()) { - poller_kill_stale_workers(); - return; - } - $starttime = time(); + // We fetch the next queue entry that is about to be executed while ($r = poller_worker_process()) { + // If we got that queue entry we claim it for us if (!poller_claim_process($r[0])) { continue; } - // Check free memory - if ($a->min_memory_reached()) { - logger('Memory limit reached, quitting.', LOGGER_DEBUG); - return; - } - - // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers()) { - logger('Active worker limit reached, quitting.', LOGGER_DEBUG); - return; + // To avoid the quitting of multiple pollers only one poller at a time will execute the check + if (Lock::set('poller_worker', 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (poller_too_much_workers()) { + logger('Active worker limit reached, quitting.', LOGGER_DEBUG); + return; + } + + // Check free memory + if ($a->min_memory_reached()) { + logger('Memory limit reached, quitting.', LOGGER_DEBUG); + return; + } + Lock::remove('poller_worker'); } + // finally the work will be done if (!poller_execute($r[0])) { logger('Process execution failed, quitting.', LOGGER_DEBUG); return; @@ -96,7 +113,7 @@ function poller_run($argv, $argc){ // Quit the poller once every hour if (time() > ($starttime + 3600)) { - logger('Process lifetime reachted, quitting.', LOGGER_DEBUG); + logger('Process lifetime reached, quitting.', LOGGER_DEBUG); return; } } @@ -150,7 +167,6 @@ function poller_execute($queue) { $funcname = str_replace(".php", "", basename($argv[0]))."_run"; if (function_exists($funcname)) { - poller_exec_function($queue, $funcname, $argv); dba::delete('workerqueue', array('id' => $queue["id"])); } else { @@ -226,24 +242,27 @@ function poller_exec_function($queue, $funcname, $argv) { $o = "\nDatabase Read:\n"; foreach ($a->callstack["database"] AS $func => $time) { $time = round($time, 3); - if ($time > 0) + if ($time > 0) { $o .= $func.": ".$time."\n"; + } } } if (isset($a->callstack["database_write"])) { $o .= "\nDatabase Write:\n"; foreach ($a->callstack["database_write"] AS $func => $time) { $time = round($time, 3); - if ($time > 0) + if ($time > 0) { $o .= $func.": ".$time."\n"; + } } } if (isset($a->callstack["network"])) { $o .= "\nNetwork:\n"; foreach ($a->callstack["network"] AS $func => $time) { $time = round($time, 3); - if ($time > 0) + if ($time > 0) { $o .= $func.": ".$time."\n"; + } } } } else { @@ -284,27 +303,30 @@ function poller_max_connections_reached() { if ($max == 0) { // the maximum number of possible user connections can be a system variable $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'"); - if ($r) + if (dbm::is_result($r)) { $max = $r[0]["Value"]; - + } // Or it can be granted. This overrides the system variable $r = q("SHOW GRANTS"); - if ($r) + if (dbm::is_result($r)) { foreach ($r AS $grants) { $grant = array_pop($grants); - if (stristr($grant, "GRANT USAGE ON")) - if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) + if (stristr($grant, "GRANT USAGE ON")) { + if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) { $max = $match[1]; + } + } } + } } // If $max is set we will use the processlist to determine the current number of connections // The processlist only shows entries of the current user if ($max != 0) { $r = q("SHOW PROCESSLIST"); - if (!dbm::is_result($r)) + if (!dbm::is_result($r)) { return false; - + } $used = count($r); logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG); @@ -320,28 +342,28 @@ function poller_max_connections_reached() { // We will now check for the system values. // This limit could be reached although the user limits are fine. $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'"); - if (!$r) + if (!dbm::is_result($r)) { return false; - + } $max = intval($r[0]["Value"]); - if ($max == 0) + if ($max == 0) { return false; - + } $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'"); - if (!$r) + if (!dbm::is_result($r)) { return false; - + } $used = intval($r[0]["Value"]); - if ($used == 0) + if ($used == 0) { return false; - + } logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG); $level = $used / $max * 100; - if ($level < $maxlevel) + if ($level < $maxlevel) { return false; - + } logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max); return true; } @@ -440,8 +462,7 @@ function poller_too_much_workers() { dba::close($processes); } dba::close($entries); - - $processlist = implode(', ', $listitem); + $processlist = ' ('.implode(', ', $listitem).')'; $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE)); $entries = $s[0]["total"]; @@ -460,7 +481,7 @@ function poller_too_much_workers() { } } - logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries." (".$processlist.") - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG); + logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG); // Are there fewer workers running as possible? Then fork a new one. if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) { @@ -471,7 +492,7 @@ function poller_too_much_workers() { } } - return($active >= $queues); + return $active >= $queues; } /** @@ -482,7 +503,7 @@ function poller_too_much_workers() { function poller_active_workers() { $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'"); - return($workers[0]["processes"]); + return $workers[0]["processes"]; } /** @@ -503,36 +524,37 @@ function poller_passing_slow(&$highest_priority) { INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`"); // No active processes at all? Fine - if (!dbm::is_result($r)) - return(false); - + if (!dbm::is_result($r)) { + return false; + } $priorities = array(); - foreach ($r AS $line) + foreach ($r AS $line) { $priorities[] = $line["priority"]; - + } // Should not happen - if (count($priorities) == 0) - return(false); - + if (count($priorities) == 0) { + return false; + } $highest_priority = min($priorities); // The highest process is already the slowest one? // Then we quit - if ($highest_priority == PRIORITY_NEGLIGIBLE) - return(false); - + if ($highest_priority == PRIORITY_NEGLIGIBLE) { + return false; + } $high = 0; - foreach ($priorities AS $priority) - if ($priority == $highest_priority) + foreach ($priorities AS $priority) { + if ($priority == $highest_priority) { ++$high; - + } + } logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG); $passing_slow = (($high/count($priorities)) > (2/3)); - if ($passing_slow) + if ($passing_slow) { logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG); - - return($passing_slow); + } + return $passing_slow; } /** @@ -546,7 +568,7 @@ function poller_worker_process() { $highest_priority = 0; if (poller_passing_slow($highest_priority)) { - dba::e('LOCK TABLES `workerqueue` WRITE'); + dba::lock('workerqueue'); // Are there waiting processes with a higher priority than the currently highest? $r = q("SELECT * FROM `workerqueue` @@ -568,7 +590,7 @@ function poller_worker_process() { return $r; } } else { - dba::e('LOCK TABLES `workerqueue` WRITE'); + dba::lock('workerqueue'); } // If there is no result (or we shouldn't pass lower processes) we check without priority limit @@ -578,7 +600,7 @@ function poller_worker_process() { // We only unlock the tables here, when we got no data if (!dbm::is_result($r)) { - dba::e('UNLOCK TABLES'); + dba::unlock(); } return $r; @@ -598,7 +620,7 @@ function poller_claim_process($queue) { $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), array('id' => $queue["id"], 'pid' => 0)); - dba::e('UNLOCK TABLES'); + dba::unlock(); if (!$success) { logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); @@ -729,5 +751,7 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); + Lock::remove('poller_worker'); + killme(); } diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index cde256a40..1112969f2 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -17,7 +17,7 @@ function pubsubpublish_run(&$argv, &$argc){ foreach ($r as $rr) { logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); - proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php', $rr["id"]); + proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]); } } diff --git a/include/queue.php b/include/queue.php index dbed46047..cf5ed9a81 100644 --- a/include/queue.php +++ b/include/queue.php @@ -27,7 +27,7 @@ function queue_run(&$argv, &$argc){ logger('queue: start'); // Handling the pubsubhubbub requests - proc_run(PRIORITY_HIGH,'include/pubsubpublish.php'); + proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php'); $r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue` INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id` @@ -51,7 +51,7 @@ function queue_run(&$argv, &$argc){ if (dbm::is_result($r)) { foreach ($r as $q_item) { logger('Call queue for id '.$q_item['id']); - proc_run(PRIORITY_LOW, "include/queue.php", $q_item['id']); + proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", $q_item['id']); } } return; diff --git a/include/socgraph.php b/include/socgraph.php index fbac08cc9..7a39e388b 100644 --- a/include/socgraph.php +++ b/include/socgraph.php @@ -1995,10 +1995,11 @@ function get_gcontact_id($contact) { if (in_array($contact["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS))) $contact["url"] = clean_contact_url($contact["url"]); - $r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 2", + dba::lock('gcontact'); + $r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 1", dbesc(normalise_link($contact["url"]))); - if ($r) { + if (dbm::is_result($r)) { $gcontact_id = $r[0]["id"]; // Update every 90 days @@ -2036,17 +2037,13 @@ function get_gcontact_id($contact) { $doprobing = in_array($r[0]["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS, "")); } } + dba::unlock(); if ($doprobing) { logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG); proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"])); } - if ((dbm::is_result($r)) AND (count($r) > 1) AND ($gcontact_id > 0) AND ($contact["url"] != "")) - q("DELETE FROM `gcontact` WHERE `nurl` = '%s' AND `id` != %d", - dbesc(normalise_link($contact["url"])), - intval($gcontact_id)); - return $gcontact_id; } diff --git a/src/App.php b/src/App.php index d671c5f1a..26cfcaadb 100644 --- a/src/App.php +++ b/src/App.php @@ -797,6 +797,8 @@ class App { * @return bool Is the limit reached? */ function max_processes_reached() { + // Deactivated, needs more investigating if this check really makes sense + return false; if ($this->is_backend()) { $process = 'backend'; diff --git a/src/Util/Lock.php b/src/Util/Lock.php new file mode 100644 index 000000000..26ccdc9dd --- /dev/null +++ b/src/Util/Lock.php @@ -0,0 +1,159 @@ +connect($memcache_host, $memcache_port)) { + return false; + } + + return $memcache; + } + + /** + * @brief Sets a lock for a given name + * + * @param string $fn_name Name of the lock + * @param integer $timeout Seconds until we give up + * + * @return boolean Was the lock successful? + */ + public static function set($fn_name, $timeout = 120) { + $got_lock = false; + $start = time(); + + $memcache = self::connectMemcache(); + if (is_object($memcache)) { + $wait_sec = 0.2; + $cachekey = get_app()->get_hostname().";lock:".$fn_name; + + do { + $lock = $memcache->get($cachekey); + + if (!is_bool($lock)) { + $pid = (int)$lock; + + // When the process id isn't used anymore, we can safely claim the lock for us. + // Or we do want to lock something that was already locked by us. + if (!posix_kill($pid, 0) OR ($pid == getmypid())) { + $lock = false; + } + } + if (is_bool($lock)) { + $memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300); + $got_lock = true; + } + if (!$got_lock AND ($timeout > 0)) { + usleep($wait_sec * 1000000); + } + } while (!$got_lock AND ((time() - $start) < $timeout)); + + return $got_lock; + } + + $wait_sec = 2; + + do { + dba::lock('locks'); + $lock = dba::select('locks', array('locked', 'pid'), array('name' => $fn_name), array('limit' => 1)); + + if (dbm::is_result($lock)) { + if ($lock['locked']) { + // When the process id isn't used anymore, we can safely claim the lock for us. + if (!posix_kill($lock['pid'], 0)) { + $lock['locked'] = false; + } + // We want to lock something that was already locked by us? So we got the lock. + if ($lock['pid'] == getmypid()) { + $got_lock = true; + } + } + if (!$lock['locked']) { + dba::update('locks', array('locked' => true, 'pid' => getmypid()), array('name' => $fn_name)); + $got_lock = true; + } + } elseif (!dbm::is_result($lock)) { + dba::insert('locks', array('name' => $fn_name, 'locked' => true, 'pid' => getmypid())); + $got_lock = true; + } + + dba::unlock(); + + if (!$got_lock AND ($timeout > 0)) { + sleep($wait_sec); + } + } while (!$got_lock AND ((time() - $start) < $timeout)); + + return $got_lock; + } + + /** + * @brief Removes a lock if it was set by us + * + * @param string $fn_name Name of the lock + */ + public static function remove($fn_name) { + $memcache = self::connectMemcache(); + if (is_object($memcache)) { + $cachekey = get_app()->get_hostname().";lock:".$fn_name; + $lock = $memcache->get($cachekey); + + if (!is_bool($lock)) { + if ((int)$lock == getmypid()) { + $memcache->delete($cachekey); + } + } + return; + } + + dba::update('locks', array('locked' => false, 'pid' => 0), array('name' => $fn_name, 'pid' => getmypid())); + return; + } + + /** + * @brief Removes all lock that were set by us + */ + public static function removeAll() { + $memcache = self::connectMemcache(); + if (is_object($memcache)) { + // We cannot delete all cache entries, but this doesn't matter with memcache + return; + } + + dba::update('locks', array('locked' => false, 'pid' => 0), array('pid' => getmypid())); + return; + } +} diff --git a/update.php b/update.php index 7561a9af1..76620a48a 100644 --- a/update.php +++ b/update.php @@ -1,6 +1,6 @@