The worker can now run from the frontend as well
This commit is contained in:
		
					parent
					
						
							
								5e466aade0
							
						
					
				
			
			
				commit
				
					
						e856ab9a09
					
				
			
		
					 5 changed files with 196 additions and 94 deletions
				
			
		
							
								
								
									
										4
									
								
								boot.php
									
										
									
									
									
								
							
							
						
						
									
										4
									
								
								boot.php
									
										
									
									
									
								
							|  | @ -1385,6 +1385,10 @@ class App { | |||
| 
 | ||||
| 	function proc_run($args) { | ||||
| 
 | ||||
| 		if (!function_exists("proc_open")) { | ||||
| 			return; | ||||
| 		} | ||||
| 
 | ||||
| 		// Add the php path if it is a php call
 | ||||
| 		if (count($args) && ($args[0] === 'php' OR !is_string($args[0]))) { | ||||
| 
 | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ Example: To set the directory value please add this line to your .htconfig.php: | |||
| * disable_email_validation (Boolean) - Disables the check if a mail address is in a valid format and can be resolved via DNS. | ||||
| * disable_url_validation (Boolean) - Disables the DNS lookup of an URL. | ||||
| * event_input_format - Default value is "ymd". | ||||
| * frontend_worker (Boolean) - Activates the frontend worker which acts as a replacement for running the poller via the command line. | ||||
| * ignore_cache (Boolean) - For development only. Disables the item cache. | ||||
| * like_no_comment (Boolean) - Don't update the "commented" value of an item when it is liked. | ||||
| * local_block (Boolean) - Used in conjunction with "block_public". | ||||
|  |  | |||
|  | @ -15,7 +15,7 @@ use \Friendica\Core\PConfig; | |||
| 
 | ||||
| require_once("boot.php"); | ||||
| 
 | ||||
| function poller_run(&$argv, &$argc){ | ||||
| function poller_run($argv, $argc){ | ||||
| 	global $a, $db; | ||||
| 
 | ||||
| 	if(is_null($a)) { | ||||
|  | @ -35,8 +35,6 @@ function poller_run(&$argv, &$argc){ | |||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	$mypid = getmypid(); | ||||
| 
 | ||||
| 	if ($a->max_processes_reached()) | ||||
| 		return; | ||||
| 
 | ||||
|  | @ -53,14 +51,7 @@ function poller_run(&$argv, &$argc){ | |||
| 	} | ||||
| 
 | ||||
| 	if(($argc <= 1) OR ($argv[1] != "no_cron")) { | ||||
| 		// Run the cron job that calls all other jobs
 | ||||
| 		proc_run(PRIORITY_MEDIUM, "include/cron.php"); | ||||
| 
 | ||||
| 		// Run the cronhooks job separately from cron for being able to use a different timing
 | ||||
| 		proc_run(PRIORITY_MEDIUM, "include/cronhooks.php"); | ||||
| 
 | ||||
| 		// Cleaning dead processes
 | ||||
| 		poller_kill_stale_workers(); | ||||
| 		poller_run_cron(); | ||||
| 	} else | ||||
| 		// Sleep four seconds before checking for running processes again to avoid having too many workers
 | ||||
| 		sleep(4); | ||||
|  | @ -69,95 +60,18 @@ function poller_run(&$argv, &$argc){ | |||
| 	if (poller_too_much_workers()) | ||||
| 		return; | ||||
| 
 | ||||
| 	$cooldown = Config::get("system", "worker_cooldown", 0); | ||||
| 
 | ||||
| 	$starttime = time(); | ||||
| 
 | ||||
| 	while ($r = poller_worker_process()) { | ||||
| 
 | ||||
| 		// Quit when in maintenance
 | ||||
| 		if (get_config('system', 'maintenance', true)) | ||||
| 			return; | ||||
| 
 | ||||
| 		// Constantly check the number of parallel database processes
 | ||||
| 		if ($a->max_processes_reached()) | ||||
| 			return; | ||||
| 
 | ||||
| 		// Constantly check the number of available database connections to let the frontend be accessible at any time
 | ||||
| 		if (poller_max_connections_reached()) | ||||
| 			return; | ||||
| 
 | ||||
| 		// Count active workers and compare them with a maximum value that depends on the load
 | ||||
| 		if (poller_too_much_workers()) | ||||
| 		if (poller_too_much_workers()) { | ||||
| 			return; | ||||
| 
 | ||||
| 		$upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0", | ||||
| 			dbesc(datetime_convert()), | ||||
| 			intval($mypid), | ||||
| 			intval($r[0]["id"])); | ||||
| 
 | ||||
| 		if (!$upd) { | ||||
| 			logger("Couldn't update queue entry ".$r[0]["id"]." - skip this execution", LOGGER_DEBUG); | ||||
| 			q("COMMIT"); | ||||
| 			continue; | ||||
| 		} | ||||
| 
 | ||||
| 		// Assure that there are no tasks executed twice
 | ||||
| 		$id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"])); | ||||
| 		if (!$id) { | ||||
| 			logger("Queue item ".$r[0]["id"]." vanished - skip this execution", LOGGER_DEBUG); | ||||
| 			q("COMMIT"); | ||||
| 			continue; | ||||
| 		} elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) { | ||||
| 			logger("Entry for queue item ".$r[0]["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); | ||||
| 			q("COMMIT"); | ||||
| 			continue; | ||||
| 		} elseif ($id[0]["pid"] != $mypid) { | ||||
| 			logger("Queue item ".$r[0]["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); | ||||
| 			q("COMMIT"); | ||||
| 			continue; | ||||
| 		if (!poller_execute($r[0])) { | ||||
| 			return; | ||||
| 		} | ||||
| 		q("COMMIT"); | ||||
| 
 | ||||
| 		$argv = json_decode($r[0]["parameter"]); | ||||
| 
 | ||||
| 		$argc = count($argv); | ||||
| 
 | ||||
| 		// Check for existance and validity of the include file
 | ||||
| 		$include = $argv[0]; | ||||
| 
 | ||||
| 		if (!validate_include($include)) { | ||||
| 			logger("Include file ".$argv[0]." is not valid!"); | ||||
| 			q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"])); | ||||
| 			continue; | ||||
| 		} | ||||
| 
 | ||||
| 		require_once($include); | ||||
| 
 | ||||
| 		$funcname = str_replace(".php", "", basename($argv[0]))."_run"; | ||||
| 
 | ||||
| 		if (function_exists($funcname)) { | ||||
| 			logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]); | ||||
| 
 | ||||
| 			// For better logging create a new process id for every worker call
 | ||||
| 			// But preserve the old one for the worker
 | ||||
| 			$old_process_id = $a->process_id; | ||||
| 			$a->process_id = uniqid("wrk", true); | ||||
| 
 | ||||
| 			$funcname($argv, $argc); | ||||
| 
 | ||||
| 			$a->process_id = $old_process_id; | ||||
| 
 | ||||
| 			if ($cooldown > 0) { | ||||
| 				logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds"); | ||||
| 				sleep($cooldown); | ||||
| 			} | ||||
| 
 | ||||
| 			logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done"); | ||||
| 
 | ||||
| 			q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"])); | ||||
| 		} else | ||||
| 			logger("Function ".$funcname." does not exist"); | ||||
| 
 | ||||
| 		// Quit the poller once every hour
 | ||||
| 		if (time() > ($starttime + 3600)) | ||||
|  | @ -166,6 +80,108 @@ function poller_run(&$argv, &$argc){ | |||
| 
 | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * @brief Execute a worker entry | ||||
|  * | ||||
|  * @param array $queue Workerqueue entry | ||||
|  * | ||||
|  * @return boolean "true" if further processing should be stopped | ||||
|  */ | ||||
| function poller_execute($queue) { | ||||
| 
 | ||||
| 	$a = get_app(); | ||||
| 
 | ||||
| 	$mypid = getmypid(); | ||||
| 
 | ||||
| 	$cooldown = Config::get("system", "worker_cooldown", 0); | ||||
| 
 | ||||
| 	// Quit when in maintenance
 | ||||
| 	if (get_config('system', 'maintenance', true)) { | ||||
| 		return false; | ||||
| 	} | ||||
| 
 | ||||
| 	// Constantly check the number of parallel database processes
 | ||||
| 	if ($a->max_processes_reached()) { | ||||
| 		return false; | ||||
| 	} | ||||
| 
 | ||||
| 	// Constantly check the number of available database connections to let the frontend be accessible at any time
 | ||||
| 	if (poller_max_connections_reached()) { | ||||
| 		return false; | ||||
| 	} | ||||
| 
 | ||||
| 	$upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0", | ||||
| 		dbesc(datetime_convert()), | ||||
| 		intval($mypid), | ||||
| 		intval($queue["id"])); | ||||
| 
 | ||||
| 	if (!$upd) { | ||||
| 		logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); | ||||
| 		q("COMMIT"); | ||||
| 		return true; | ||||
| 	} | ||||
| 
 | ||||
| 	// Assure that there are no tasks executed twice
 | ||||
| 	$id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); | ||||
| 	if (!$id) { | ||||
| 		logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG); | ||||
| 		q("COMMIT"); | ||||
| 		return true; | ||||
| 	} elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) { | ||||
| 		logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); | ||||
| 		q("COMMIT"); | ||||
| 		return true; | ||||
| 	} elseif ($id[0]["pid"] != $mypid) { | ||||
| 		logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); | ||||
| 		q("COMMIT"); | ||||
| 		return true; | ||||
| 	} | ||||
| 	q("COMMIT"); | ||||
| 
 | ||||
| 	$argv = json_decode($queue["parameter"]); | ||||
| 
 | ||||
| 	$argc = count($argv); | ||||
| 
 | ||||
| 	// Check for existance and validity of the include file
 | ||||
| 	$include = $argv[0]; | ||||
| 
 | ||||
| 	if (!validate_include($include)) { | ||||
| 		logger("Include file ".$argv[0]." is not valid!"); | ||||
| 		q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); | ||||
| 		return true; | ||||
| 	} | ||||
| 
 | ||||
| 	require_once($include); | ||||
| 
 | ||||
| 	$funcname = str_replace(".php", "", basename($argv[0]))."_run"; | ||||
| 
 | ||||
| 	if (function_exists($funcname)) { | ||||
| 		logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]); | ||||
| 
 | ||||
| 		// For better logging create a new process id for every worker call
 | ||||
| 		// But preserve the old one for the worker
 | ||||
| 		$old_process_id = $a->process_id; | ||||
| 		$a->process_id = uniqid("wrk", true); | ||||
| 
 | ||||
| 		$funcname($argv, $argc); | ||||
| 
 | ||||
| 		$a->process_id = $old_process_id; | ||||
| 
 | ||||
| 		if ($cooldown > 0) { | ||||
| 			logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds"); | ||||
| 			sleep($cooldown); | ||||
| 		} | ||||
| 
 | ||||
| 		logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done"); | ||||
| 
 | ||||
| 		q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); | ||||
| 	} else { | ||||
| 		logger("Function ".$funcname." does not exist"); | ||||
| 	} | ||||
| 
 | ||||
| 	return true; | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * @brief Checks if the number of database connections has reached a critical limit. | ||||
|  * | ||||
|  | @ -394,8 +410,7 @@ function poller_passing_slow(&$highest_priority) { | |||
| 
 | ||||
| 	$r = q("SELECT `priority`
 | ||||
| 		FROM `process` | ||||
| 		INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` | ||||
| 		WHERE `process`.`command` = 'poller.php'");
 | ||||
| 		INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
 | ||||
| 
 | ||||
| 	// No active processes at all? Fine
 | ||||
| 	if (!dbm::is_result($r)) | ||||
|  | @ -435,7 +450,6 @@ function poller_passing_slow(&$highest_priority) { | |||
|  * | ||||
|  * @return string SQL statement | ||||
|  */ | ||||
| 
 | ||||
| function poller_worker_process() { | ||||
| 
 | ||||
| 	q("START TRANSACTION;"); | ||||
|  | @ -464,6 +478,47 @@ function poller_worker_process() { | |||
| 	return $r; | ||||
| } | ||||
| 
 | ||||
| function call_worker() { | ||||
| 	if (!get_config("system", "frontend_worker")) { | ||||
| 		return; | ||||
| 	} | ||||
| 
 | ||||
| 	$url = get_app()->get_baseurl()."/worker"; | ||||
| 	fetch_url($url, false, $redirects, 1); | ||||
| } | ||||
| 
 | ||||
| function call_worker_if_idle() { | ||||
| 	if (!get_config("system", "frontend_worker")) { | ||||
| 		return; | ||||
| 	} | ||||
| 
 | ||||
| 	poller_run_cron(); | ||||
| 
 | ||||
| 	clear_worker_processes(); | ||||
| 
 | ||||
| 	$workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php'"); | ||||
| 
 | ||||
| 	if ($workers[0]["processes"] == 0) { | ||||
| 		call_worker(); | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| function clear_worker_processes() { | ||||
| 	q("DELETE FROM `process` WHERE `created` < '%s' AND `command` = 'worker.php'", | ||||
| 		dbesc(datetime_convert('UTC','UTC',"now - 10 minutes"))); | ||||
| } | ||||
| 
 | ||||
| function poller_run_cron() { | ||||
| 	// Run the cron job that calls all other jobs
 | ||||
| 	proc_run(PRIORITY_MEDIUM, "include/cron.php"); | ||||
| 
 | ||||
| 	// Run the cronhooks job separately from cron for being able to use a different timing
 | ||||
| 	proc_run(PRIORITY_MEDIUM, "include/cronhooks.php"); | ||||
| 
 | ||||
| 	// Cleaning dead processes
 | ||||
| 	poller_kill_stale_workers(); | ||||
| } | ||||
| 
 | ||||
| if (array_search(__file__,get_included_files())===0){ | ||||
| 	poller_run($_SERVER["argv"],$_SERVER["argc"]); | ||||
| 
 | ||||
|  |  | |||
|  | @ -99,6 +99,10 @@ if (!$a->is_backend()) { | |||
| 	$stamp1 = microtime(true); | ||||
| 	session_start(); | ||||
| 	$a->save_timestamp($stamp1, "parser"); | ||||
| } else { | ||||
| 	require_once "include/poller.php"; | ||||
| 
 | ||||
| 	call_worker_if_idle(); | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  |  | |||
							
								
								
									
										38
									
								
								mod/worker.php
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								mod/worker.php
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,38 @@ | |||
| <?php | ||||
| require_once("include/poller.php"); | ||||
| 
 | ||||
| use \Friendica\Core\Config; | ||||
| use \Friendica\Core\PConfig; | ||||
| 
 | ||||
| function worker_init($a){ | ||||
| 
 | ||||
| 	if (!get_config("system", "frontend_worker")) { | ||||
| 		return; | ||||
| 	} | ||||
| 
 | ||||
| 	clear_worker_processes(); | ||||
| 
 | ||||
| 	$workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php'"); | ||||
| 
 | ||||
| 	if ($workers[0]["processes"] > Config::get("system", "worker_queues", 4)) { | ||||
| 		return; | ||||
| 	} | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	logger("Front end worker started: ".getmypid()); | ||||
| 
 | ||||
| 	call_worker(); | ||||
| 
 | ||||
| 	if ($r = poller_worker_process()) { | ||||
| 		poller_execute($r[0]); | ||||
| 	} | ||||
| 
 | ||||
| 	call_worker(); | ||||
| 
 | ||||
| 	$a->end_process(); | ||||
| 
 | ||||
| 	logger("Front end worker ended: ".getmypid()); | ||||
| 
 | ||||
| 	killme(); | ||||
| } | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue