Friendica Communications Platform (please note that this is a clone of the repository at github, issues are handled there) https://friendi.ca
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
10KB

  1. <?php
  2. if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
  3. $directory = dirname($_SERVER["argv"][0]);
  4. if (substr($directory, 0, 1) != "/")
  5. $directory = $_SERVER["PWD"]."/".$directory;
  6. $directory = realpath($directory."/..");
  7. chdir($directory);
  8. }
  9. use \Friendica\Core\Config;
  10. use \Friendica\Core\PConfig;
  11. require_once("boot.php");
  12. function poller_run(&$argv, &$argc){
  13. global $a, $db;
  14. if(is_null($a)) {
  15. $a = new App;
  16. }
  17. if(is_null($db)) {
  18. @include(".htconfig.php");
  19. require_once("include/dba.php");
  20. $db = new dba($db_host, $db_user, $db_pass, $db_data);
  21. unset($db_host, $db_user, $db_pass, $db_data);
  22. };
  23. $a->start_process();
  24. $mypid = getmypid();
  25. if ($a->max_processes_reached())
  26. return;
  27. if (poller_max_connections_reached())
  28. return;
  29. if (App::maxload_reached())
  30. return;
  31. // Checking the number of workers
  32. if (poller_too_much_workers()) {
  33. poller_kill_stale_workers();
  34. return;
  35. }
  36. if(($argc <= 1) OR ($argv[1] != "no_cron")) {
  37. // Run the cron job that calls all other jobs
  38. proc_run(PRIORITY_MEDIUM, "include/cron.php");
  39. // Run the cronhooks job separately from cron for being able to use a different timing
  40. proc_run(PRIORITY_MEDIUM, "include/cronhooks.php");
  41. // Cleaning dead processes
  42. poller_kill_stale_workers();
  43. } else
  44. // Sleep four seconds before checking for running processes again to avoid having too many workers
  45. sleep(4);
  46. // Checking number of workers
  47. if (poller_too_much_workers())
  48. return;
  49. $cooldown = Config::get("system", "worker_cooldown", 0);
  50. $starttime = time();
  51. while ($r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority`, `created` LIMIT 1")) {
  52. // Constantly check the number of parallel database processes
  53. if ($a->max_processes_reached())
  54. return;
  55. // Constantly check the number of available database connections to let the frontend be accessible at any time
  56. if (poller_max_connections_reached())
  57. return;
  58. // Count active workers and compare them with a maximum value that depends on the load
  59. if (poller_too_much_workers())
  60. return;
  61. q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'",
  62. dbesc(datetime_convert()),
  63. intval($mypid),
  64. intval($r[0]["id"]));
  65. // Assure that there are no tasks executed twice
  66. $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
  67. if (!$id) {
  68. logger("Queue item ".$r[0]["id"]." vanished - skip this execution", LOGGER_DEBUG);
  69. continue;
  70. } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
  71. logger("Entry for queue item ".$r[0]["id"]." wasn't stored - we better stop here", LOGGER_DEBUG);
  72. return;
  73. } elseif ($id[0]["pid"] != $mypid) {
  74. logger("Queue item ".$r[0]["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
  75. continue;
  76. }
  77. $argv = json_decode($r[0]["parameter"]);
  78. $argc = count($argv);
  79. // Check for existance and validity of the include file
  80. $include = $argv[0];
  81. if (!validate_include($include)) {
  82. logger("Include file ".$argv[0]." is not valid!");
  83. q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
  84. continue;
  85. }
  86. require_once($include);
  87. $funcname = str_replace(".php", "", basename($argv[0]))."_run";
  88. if (function_exists($funcname)) {
  89. logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]);
  90. $funcname($argv, $argc);
  91. if ($cooldown > 0) {
  92. logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
  93. sleep($cooldown);
  94. }
  95. logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done");
  96. q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
  97. } else
  98. logger("Function ".$funcname." does not exist");
  99. // Quit the poller once every hour
  100. if (time() > ($starttime + 3600))
  101. return;
  102. }
  103. }
  104. /**
  105. * @brief Checks if the number of database connections has reached a critical limit.
  106. *
  107. * @return bool Are more than 3/4 of the maximum connections used?
  108. */
  109. function poller_max_connections_reached() {
  110. // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
  111. $max = get_config("system", "max_connections");
  112. // Fetch the percentage level where the poller will get active
  113. $maxlevel = get_config("system", "max_connections_level");
  114. if ($maxlevel == 0)
  115. $maxlevel = 75;
  116. if ($max == 0) {
  117. // the maximum number of possible user connections can be a system variable
  118. $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
  119. if ($r)
  120. $max = $r[0]["Value"];
  121. // Or it can be granted. This overrides the system variable
  122. $r = q("SHOW GRANTS");
  123. if ($r)
  124. foreach ($r AS $grants) {
  125. $grant = array_pop($grants);
  126. if (stristr($grant, "GRANT USAGE ON"))
  127. if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match))
  128. $max = $match[1];
  129. }
  130. }
  131. // If $max is set we will use the processlist to determine the current number of connections
  132. // The processlist only shows entries of the current user
  133. if ($max != 0) {
  134. $r = q("SHOW PROCESSLIST");
  135. if (!$r)
  136. return false;
  137. $used = count($r);
  138. logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
  139. $level = ($used / $max) * 100;
  140. if ($level >= $maxlevel) {
  141. logger("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
  142. return true;
  143. }
  144. }
  145. // We will now check for the system values.
  146. // This limit could be reached although the user limits are fine.
  147. $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
  148. if (!$r)
  149. return false;
  150. $max = intval($r[0]["Value"]);
  151. if ($max == 0)
  152. return false;
  153. $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
  154. if (!$r)
  155. return false;
  156. $used = intval($r[0]["Value"]);
  157. if ($used == 0)
  158. return false;
  159. logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
  160. $level = $used / $max * 100;
  161. if ($level < $maxlevel)
  162. return false;
  163. logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
  164. return true;
  165. }
  166. /**
  167. * @brief fix the queue entry if the worker process died
  168. *
  169. */
  170. function poller_kill_stale_workers() {
  171. $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
  172. if (!dbm::is_result($r)) {
  173. // No processing here needed
  174. return;
  175. }
  176. foreach($r AS $pid)
  177. if (!posix_kill($pid["pid"], 0))
  178. q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `pid` = 0 WHERE `pid` = %d",
  179. intval($pid["pid"]));
  180. else {
  181. // Kill long running processes
  182. // Check if the priority is in a valid range
  183. if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE)))
  184. $pid["priority"] = PRIORITY_MEDIUM;
  185. // Define the maximum durations
  186. $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
  187. $max_duration = $max_duration_defaults[$pid["priority"]];
  188. $argv = json_decode($pid["parameter"]);
  189. $argv[0] = basename($argv[0]);
  190. // How long is the process already running?
  191. $duration = (time() - strtotime($pid["executed"])) / 60;
  192. if ($duration > $max_duration) {
  193. logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
  194. posix_kill($pid["pid"], SIGTERM);
  195. // We killed the stale process.
  196. // To avoid a blocking situation we reschedule the process at the beginning of the queue.
  197. // Additionally we are lowering the priority.
  198. q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `created` = '%s',
  199. `priority` = %d, `pid` = 0 WHERE `pid` = %d",
  200. dbesc(datetime_convert()),
  201. intval(PRIORITY_NEGLIGIBLE),
  202. intval($pid["pid"]));
  203. } else
  204. logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
  205. }
  206. }
  207. function poller_too_much_workers() {
  208. $queues = get_config("system", "worker_queues");
  209. if ($queues == 0)
  210. $queues = 4;
  211. $maxqueues = $queues;
  212. $active = poller_active_workers();
  213. // Decrease the number of workers at higher load
  214. $load = current_load();
  215. if($load) {
  216. $maxsysload = intval(get_config('system','maxloadavg'));
  217. if($maxsysload < 1)
  218. $maxsysload = 50;
  219. $maxworkers = $queues;
  220. // Some magical mathemathics to reduce the workers
  221. $exponent = 3;
  222. $slope = $maxworkers / pow($maxsysload, $exponent);
  223. $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
  224. $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00'");
  225. $entries = $s[0]["total"];
  226. if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) {
  227. $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority` LIMIT 1");
  228. $top_priority = $s[0]["priority"];
  229. $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` != '0000-00-00 00:00:00' LIMIT 1",
  230. intval($top_priority));
  231. $high_running = dbm::is_result($s);
  232. if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) {
  233. logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
  234. $queues = $active + 1;
  235. }
  236. }
  237. logger("Current load: ".$load." - maximum: ".$maxsysload." - current queues: ".$active."/".$entries." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
  238. // Are there fewer workers running as possible? Then fork a new one.
  239. if (!get_config("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
  240. logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
  241. $args = array("php", "include/poller.php", "no_cron");
  242. $a = get_app();
  243. $a->proc_run($args);
  244. }
  245. }
  246. return($active >= $queues);
  247. }
  248. function poller_active_workers() {
  249. $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
  250. return($workers[0]["processes"]);
  251. }
  252. if (array_search(__file__,get_included_files())===0){
  253. poller_run($_SERVER["argv"],$_SERVER["argc"]);
  254. get_app()->end_process();
  255. killme();
  256. }
  257. ?>