New process table for a better detection of running workers
This commit is contained in:
		
					parent
					
						
							
								5ebeb7e85c
							
						
					
				
			
			
				commit
				
					
						22f32d9721
					
				
			
		
					 23 changed files with 107 additions and 13 deletions
				
			
		
							
								
								
									
										41
									
								
								boot.php
									
										
									
									
									
								
							
							
						
						
									
										41
									
								
								boot.php
									
										
									
									
									
								
							|  | @ -38,7 +38,7 @@ define ( 'FRIENDICA_PLATFORM',     'Friendica'); | |||
| define ( 'FRIENDICA_CODENAME',     'Asparagus'); | ||||
| define ( 'FRIENDICA_VERSION',      '3.5-dev' ); | ||||
| define ( 'DFRN_PROTOCOL_VERSION',  '2.23'    ); | ||||
| define ( 'DB_UPDATE_VERSION',      1201      ); | ||||
| define ( 'DB_UPDATE_VERSION',      1202      ); | ||||
| 
 | ||||
| /** | ||||
|  * @brief Constant with a HTML line break. | ||||
|  | @ -1099,6 +1099,42 @@ class App { | |||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * @brief Log active processes into the "process" table | ||||
| 	 */ | ||||
| 	function start_process() { | ||||
| 		$trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1); | ||||
| 
 | ||||
| 		$command = basename($trace[0]["file"]); | ||||
| 
 | ||||
| 		$this->remove_inactive_processes(); | ||||
| 
 | ||||
| 		$r = q("SELECT `pid` FROM `process` WHERE `pid` = %d", intval(getmypid())); | ||||
| 		if(!dbm::is_result($r)) | ||||
| 			q("INSERT INTO `process` (`pid`,`command`,`created`) VALUES (%d, '%s', '%s')", | ||||
| 				intval(getmypid()), | ||||
| 				dbesc($command), | ||||
| 				dbesc(datetime_convert())); | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * @brief Remove inactive processes | ||||
| 	 */ | ||||
| 	function remove_inactive_processes() { | ||||
| 		$r = q("SELECT `pid` FROM `process`"); | ||||
| 		if(dbm::is_result($r)) | ||||
| 			foreach ($r AS $process) | ||||
| 				if (!posix_kill($process["pid"], 0)) | ||||
| 					q("DELETE FROM `process` WHERE `pid` = %d", intval($process["pid"])); | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * @brief Remove the active process from the "process" table | ||||
| 	 */ | ||||
| 	function end_process() { | ||||
| 		q("DELETE FROM `process` WHERE `pid` = %d", intval(getmypid())); | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * @brief Returns a string with a callstack. Can be used for logging. | ||||
| 	 * | ||||
|  | @ -1699,6 +1735,9 @@ function login($register = false, $hiddens=false) { | |||
|  * @brief Used to end the current process, after saving session state. | ||||
|  */ | ||||
| function killme() { | ||||
| 
 | ||||
| 	get_app()->end_process(); | ||||
| 
 | ||||
| 	if (!get_app()->is_backend()) | ||||
| 		session_write_close(); | ||||
| 
 | ||||
|  |  | |||
|  | @ -58,6 +58,8 @@ if(is_null($db)) { | |||
| 	unset($db_host, $db_user, $db_pass, $db_data); | ||||
| }; | ||||
| 
 | ||||
| $a->start_process(); | ||||
| 
 | ||||
| // the logfile to which to write, should be writeable by the user which is running the server
 | ||||
| $sLogFile = get_config('jabber','logfile'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -19,6 +19,8 @@ function cli_startup() { | |||
|     	unset($db_host, $db_user, $db_pass, $db_data); | ||||
|   	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 
 | ||||
| 	load_config('config'); | ||||
|  |  | |||
|  | @ -27,6 +27,7 @@ function cron_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
|  |  | |||
|  | @ -17,6 +17,8 @@ function cronhooks_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -27,6 +27,7 @@ function cronjobs_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
|  |  | |||
|  | @ -1114,6 +1114,17 @@ function db_definition() { | |||
| 					"choice" => array("choice"), | ||||
| 					) | ||||
| 			); | ||||
| 	$database["process"] = array( | ||||
| 			"fields" => array( | ||||
| 					"pid" => array("type" => "int(10) unsigned", "not null" => "1", "primary" => "1"), | ||||
| 					"command" => array("type" => "varchar(32)", "not null" => "1", "default" => ""), | ||||
| 					"created" => array("type" => "datetime", "not null" => "1", "default" => "0000-00-00 00:00:00"), | ||||
| 					), | ||||
| 			"indexes" => array( | ||||
| 					"PRIMARY" => array("pid"), | ||||
| 					"command" => array("command"), | ||||
| 					) | ||||
| 			); | ||||
| 	$database["profile"] = array( | ||||
| 			"fields" => array( | ||||
| 					"id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"), | ||||
|  | @ -1456,6 +1467,8 @@ function dbstructure_run(&$argv, &$argc) { | |||
| 			unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	} | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	if ($argc==2) { | ||||
| 		switch ($argv[1]) { | ||||
| 			case "update": | ||||
|  |  | |||
|  | @ -16,6 +16,8 @@ function dbupdate_run(&$argv, &$argc) { | |||
| 		        unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	} | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	load_config('config'); | ||||
| 	load_config('system'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -21,6 +21,8 @@ function delivery_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	} | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once("include/session.php"); | ||||
| 	require_once("include/datetime.php"); | ||||
| 	require_once('include/items.php'); | ||||
|  |  | |||
|  | @ -15,6 +15,8 @@ function directory_run(&$argv, &$argc){ | |||
| 				unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	load_config('config'); | ||||
| 	load_config('system'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,6 +18,8 @@ function discover_poco_run(&$argv, &$argc){ | |||
| 	unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -16,6 +16,8 @@ function expire_run(&$argv, &$argc){ | |||
|     	unset($db_host, $db_user, $db_pass, $db_data); | ||||
|   	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
| 	require_once('include/items.php'); | ||||
|  |  | |||
|  | @ -18,6 +18,8 @@ function gprobe_run(&$argv, &$argc){ | |||
|     	unset($db_host, $db_user, $db_pass, $db_data); | ||||
|   	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -54,6 +54,8 @@ function notifier_run(&$argv, &$argc){ | |||
| 			unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	} | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once("include/session.php"); | ||||
| 	require_once("include/datetime.php"); | ||||
| 	require_once('include/items.php'); | ||||
|  |  | |||
|  | @ -24,6 +24,7 @@ function onepoll_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/session.php'); | ||||
| 	require_once('include/datetime.php'); | ||||
|  |  | |||
|  | @ -29,6 +29,10 @@ function poller_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	$mypid = getmypid(); | ||||
| 
 | ||||
| 	if ($a->max_processes_reached()) | ||||
| 		return; | ||||
| 
 | ||||
|  | @ -81,15 +85,19 @@ function poller_run(&$argv, &$argc){ | |||
| 
 | ||||
| 		q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'", | ||||
| 			dbesc(datetime_convert()), | ||||
| 			intval(getmypid()), | ||||
| 			intval($mypid), | ||||
| 			intval($r[0]["id"])); | ||||
| 
 | ||||
| 		// Assure that there are no tasks executed twice
 | ||||
| 		$id = q("SELECT `id` FROM `workerqueue` WHERE `id` = %d AND `pid` = %d", | ||||
| 			intval($r[0]["id"]), | ||||
| 			intval(getmypid())); | ||||
| 		$id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"])); | ||||
| 		if (!$id) { | ||||
| 			logger("Queue item ".$r[0]["id"]." was executed multiple times - skip this execution", LOGGER_DEBUG); | ||||
| 			logger("Queue item ".$r[0]["id"]." vanished - skip this execution", LOGGER_DEBUG); | ||||
| 			continue; | ||||
| 		} elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) { | ||||
| 			logger("Entry for queue item ".$r[0]["id"]." wasn't stored - we better stop here", LOGGER_DEBUG); | ||||
| 			return; | ||||
| 		} elseif ($id[0]["pid"] != $mypid) { | ||||
| 			logger("Queue item ".$r[0]["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); | ||||
| 			continue; | ||||
| 		} | ||||
| 
 | ||||
|  | @ -111,15 +119,15 @@ function poller_run(&$argv, &$argc){ | |||
| 		$funcname = str_replace(".php", "", basename($argv[0]))."_run"; | ||||
| 
 | ||||
| 		if (function_exists($funcname)) { | ||||
| 			logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]); | ||||
| 			logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]); | ||||
| 			$funcname($argv, $argc); | ||||
| 
 | ||||
| 			if ($cooldown > 0) { | ||||
| 				logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds"); | ||||
| 				logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds"); | ||||
| 				sleep($cooldown); | ||||
| 			} | ||||
| 
 | ||||
| 			logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done"); | ||||
| 			logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done"); | ||||
| 
 | ||||
| 			q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"])); | ||||
| 		} else | ||||
|  | @ -319,9 +327,9 @@ function poller_too_much_workers() { | |||
| } | ||||
| 
 | ||||
| function poller_active_workers() { | ||||
| 	$workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'"); | ||||
| 	$workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'"); | ||||
| 
 | ||||
| 	return($workers[0]["workers"]); | ||||
| 	return($workers[0]["processes"]); | ||||
| } | ||||
| 
 | ||||
| if (array_search(__file__,get_included_files())===0){ | ||||
|  |  | |||
|  | @ -70,6 +70,8 @@ function pubsubpublish_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/items.php'); | ||||
| 
 | ||||
| 	load_config('config'); | ||||
|  |  | |||
|  | @ -17,6 +17,7 @@ function queue_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once("include/session.php"); | ||||
| 	require_once("include/datetime.php"); | ||||
|  |  | |||
|  | @ -14,6 +14,8 @@ if(is_null($db)) { | |||
| 	unset($db_host, $db_user, $db_pass, $db_data); | ||||
| } | ||||
| 
 | ||||
| $a->start_process(); | ||||
| 
 | ||||
| load_config('config'); | ||||
| load_config('system'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -14,6 +14,8 @@ if(is_null($db)) { | |||
| 	unset($db_host, $db_user, $db_pass, $db_data); | ||||
| } | ||||
| 
 | ||||
| $a->start_process(); | ||||
| 
 | ||||
| load_config('config'); | ||||
| load_config('system'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -14,6 +14,8 @@ if(is_null($db)) { | |||
| 	unset($db_host, $db_user, $db_pass, $db_data); | ||||
| } | ||||
| 
 | ||||
| $a->start_process(); | ||||
| 
 | ||||
| load_config('config'); | ||||
| load_config('system'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -16,6 +16,8 @@ function update_gcontact_run(&$argv, &$argc){ | |||
| 		unset($db_host, $db_user, $db_pass, $db_data); | ||||
| 	}; | ||||
| 
 | ||||
| 	$a->start_process(); | ||||
| 
 | ||||
| 	require_once('include/Scrape.php'); | ||||
| 	require_once("include/socgraph.php"); | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,6 +1,6 @@ | |||
| <?php | ||||
| 
 | ||||
| define('UPDATE_VERSION' , 1201); | ||||
| define('UPDATE_VERSION' , 1202); | ||||
| 
 | ||||
| /** | ||||
|  * | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue