Friendica Communications Platform (please note that this is a clone of the repository at github, issues are handled there) https://friendi.ca
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
8.1 KiB

11 years ago
10 years ago
11 years ago
11 years ago
  1. <?php
  2. if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
  3. $directory = dirname($_SERVER["argv"][0]);
  4. if (substr($directory, 0, 1) != "/")
  5. $directory = $_SERVER["PWD"]."/".$directory;
  6. $directory = realpath($directory."/..");
  7. chdir($directory);
  8. }
  9. use \Friendica\Core\Config;
  10. use \Friendica\Core\PConfig;
  11. require_once("boot.php");
  12. function poller_run(&$argv, &$argc){
  13. global $a, $db;
  14. if(is_null($a)) {
  15. $a = new App;
  16. }
  17. if(is_null($db)) {
  18. @include(".htconfig.php");
  19. require_once("include/dba.php");
  20. $db = new dba($db_host, $db_user, $db_pass, $db_data);
  21. unset($db_host, $db_user, $db_pass, $db_data);
  22. };
  23. if ($a->max_processes_reached())
  24. return;
  25. if (poller_max_connections_reached())
  26. return;
  27. if (App::maxload_reached())
  28. return;
  29. // Checking the number of workers
  30. if (poller_too_much_workers()) {
  31. poller_kill_stale_workers();
  32. return;
  33. }
  34. if(($argc <= 1) OR ($argv[1] != "no_cron")) {
  35. // Run the cron job that calls all other jobs
  36. proc_run("php","include/cron.php");
  37. // Run the cronhooks job separately from cron for being able to use a different timing
  38. proc_run("php","include/cronhooks.php");
  39. // Cleaning dead processes
  40. poller_kill_stale_workers();
  41. } else
  42. // Sleep four seconds before checking for running processes again to avoid having too many workers
  43. sleep(4);
  44. // Checking number of workers
  45. if (poller_too_much_workers())
  46. return;
  47. $cooldown = Config::get("system", "worker_cooldown", 0);
  48. $starttime = time();
  49. while ($r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority`, `created` LIMIT 1")) {
  50. // Constantly check the number of parallel database processes
  51. if ($a->max_processes_reached())
  52. return;
  53. // Constantly check the number of available database connections to let the frontend be accessible at any time
  54. if (poller_max_connections_reached())
  55. return;
  56. // Count active workers and compare them with a maximum value that depends on the load
  57. if (poller_too_much_workers())
  58. return;
  59. q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'",
  60. dbesc(datetime_convert()),
  61. intval(getmypid()),
  62. intval($r[0]["id"]));
  63. // Assure that there are no tasks executed twice
  64. $id = q("SELECT `id` FROM `workerqueue` WHERE `id` = %d AND `pid` = %d",
  65. intval($r[0]["id"]),
  66. intval(getmypid()));
  67. if (!$id) {
  68. logger("Queue item ".$r[0]["id"]." was executed multiple times - skip this execution", LOGGER_DEBUG);
  69. continue;
  70. }
  71. $argv = json_decode($r[0]["parameter"]);
  72. $argc = count($argv);
  73. // Check for existance and validity of the include file
  74. $include = $argv[0];
  75. if (!validate_include($include)) {
  76. logger("Include file ".$argv[0]." is not valid!");
  77. q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
  78. continue;
  79. }
  80. require_once($include);
  81. $funcname = str_replace(".php", "", basename($argv[0]))."_run";
  82. if (function_exists($funcname)) {
  83. logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]);
  84. $funcname($argv, $argc);
  85. if ($cooldown > 0) {
  86. logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
  87. sleep($cooldown);
  88. }
  89. logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done");
  90. q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
  91. } else
  92. logger("Function ".$funcname." does not exist");
  93. // Quit the poller once every hour
  94. if (time() > ($starttime + 3600))
  95. return;
  96. }
  97. }
  98. /**
  99. * @brief Checks if the number of database connections has reached a critical limit.
  100. *
  101. * @return bool Are more than 3/4 of the maximum connections used?
  102. */
  103. function poller_max_connections_reached() {
  104. // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
  105. $max = get_config("system", "max_connections");
  106. // Fetch the percentage level where the poller will get active
  107. $maxlevel = get_config("system", "max_connections_level");
  108. if ($maxlevel == 0)
  109. $maxlevel = 75;
  110. if ($max == 0) {
  111. // the maximum number of possible user connections can be a system variable
  112. $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
  113. if ($r)
  114. $max = $r[0]["Value"];
  115. // Or it can be granted. This overrides the system variable
  116. $r = q("SHOW GRANTS");
  117. if ($r)
  118. foreach ($r AS $grants) {
  119. $grant = array_pop($grants);
  120. if (stristr($grant, "GRANT USAGE ON"))
  121. if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match))
  122. $max = $match[1];
  123. }
  124. }
  125. // If $max is set we will use the processlist to determine the current number of connections
  126. // The processlist only shows entries of the current user
  127. if ($max != 0) {
  128. $r = q("SHOW PROCESSLIST");
  129. if (!$r)
  130. return false;
  131. $used = count($r);
  132. logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
  133. $level = ($used / $max) * 100;
  134. if ($level >= $maxlevel) {
  135. logger("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
  136. return true;
  137. }
  138. }
  139. // We will now check for the system values.
  140. // This limit could be reached although the user limits are fine.
  141. $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
  142. if (!$r)
  143. return false;
  144. $max = intval($r[0]["Value"]);
  145. if ($max == 0)
  146. return false;
  147. $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
  148. if (!$r)
  149. return false;
  150. $used = intval($r[0]["Value"]);
  151. if ($used == 0)
  152. return false;
  153. logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
  154. $level = $used / $max * 100;
  155. if ($level < $maxlevel)
  156. return false;
  157. logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
  158. return true;
  159. }
  160. /**
  161. * @brief fix the queue entry if the worker process died
  162. *
  163. */
  164. function poller_kill_stale_workers() {
  165. $r = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
  166. if (!dbm::is_result($r)) {
  167. // No processing here needed
  168. return;
  169. }
  170. foreach($r AS $pid)
  171. if (!posix_kill($pid["pid"], 0))
  172. q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `pid` = 0 WHERE `pid` = %d",
  173. intval($pid["pid"]));
  174. else {
  175. // Kill long running processes
  176. $duration = (time() - strtotime($pid["executed"])) / 60;
  177. if ($duration > 180) {
  178. logger("Worker process ".$pid["pid"]." took more than 3 hours. It will be killed now.");
  179. posix_kill($pid["pid"], SIGTERM);
  180. // Question: If a process is stale: Should we remove it or should we reschedule it?
  181. // By now we rescheduling it. It's maybe not the wisest decision?
  182. q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `pid` = 0 WHERE `pid` = %d",
  183. intval($pid["pid"]));
  184. } else
  185. logger("Worker process ".$pid["pid"]." now runs for ".round($duration)." minutes. That's okay.", LOGGER_DEBUG);
  186. }
  187. }
  188. function poller_too_much_workers() {
  189. $queues = get_config("system", "worker_queues");
  190. if ($queues == 0)
  191. $queues = 4;
  192. $maxqueues = $queues;
  193. $active = poller_active_workers();
  194. // Decrease the number of workers at higher load
  195. $load = current_load();
  196. if($load) {
  197. $maxsysload = intval(get_config('system','maxloadavg'));
  198. if($maxsysload < 1)
  199. $maxsysload = 50;
  200. $maxworkers = $queues;
  201. // Some magical mathemathics to reduce the workers
  202. $exponent = 3;
  203. $slope = $maxworkers / pow($maxsysload, $exponent);
  204. $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
  205. $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00'");
  206. $entries = $s[0]["total"];
  207. logger("Current load: ".$load." - maximum: ".$maxsysload." - current queues: ".$active."/".$entries." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
  208. }
  209. return($active >= $queues);
  210. }
  211. function poller_active_workers() {
  212. $workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
  213. return($workers[0]["workers"]);
  214. }
  215. if (array_search(__file__,get_included_files())===0){
  216. poller_run($_SERVER["argv"],$_SERVER["argc"]);
  217. killme();
  218. }
  219. ?>