Just found the handbrake ...
This commit is contained in:
		
					parent
					
						
							
								b9bcc921a6
							
						
					
				
			
			
				commit
				
					
						99b86c9fd9
					
				
			
		
					 2 changed files with 50 additions and 24 deletions
				
			
		| 
						 | 
				
			
			@ -627,6 +627,12 @@ class dba {
 | 
			
		|||
					self::$dbo->errorno = mysql_errno(self::$dbo->db);
 | 
			
		||||
				} else {
 | 
			
		||||
					self::$dbo->affected_rows = mysql_affected_rows($retval);
 | 
			
		||||
 | 
			
		||||
					// Due to missing mysql_* support this here wasn't tested at all
 | 
			
		||||
					// See here: http://php.net/manual/en/function.mysql-num-rows.php
 | 
			
		||||
					if (self::$dbo->affected_rows <= 0) {
 | 
			
		||||
						self::$dbo->affected_rows = mysql_num_rows($retval);
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				break;
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -1038,7 +1044,7 @@ class dba {
 | 
			
		|||
					$sql = "DELETE FROM `".$command['table']."` WHERE `".
 | 
			
		||||
						implode("` = ? AND `", array_keys($command['param']))."` = ?";
 | 
			
		||||
 | 
			
		||||
					logger(dba::replace_parameters($sql, $command['param']), LOGGER_DATA);
 | 
			
		||||
					logger(self::replace_parameters($sql, $command['param']), LOGGER_DATA);
 | 
			
		||||
 | 
			
		||||
					if (!self::e($sql, $command['param'])) {
 | 
			
		||||
						if ($do_transaction) {
 | 
			
		||||
| 
						 | 
				
			
			@ -1068,7 +1074,7 @@ class dba {
 | 
			
		|||
						$sql = "DELETE FROM `".$table."` WHERE `".$field."` IN (".
 | 
			
		||||
							substr(str_repeat("?, ", count($field_values)), 0, -2).");";
 | 
			
		||||
 | 
			
		||||
						logger(dba::replace_parameters($sql, $field_values), LOGGER_DATA);
 | 
			
		||||
						logger(self::replace_parameters($sql, $field_values), LOGGER_DATA);
 | 
			
		||||
 | 
			
		||||
						if (!self::e($sql, $field_values)) {
 | 
			
		||||
							if ($do_transaction) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,7 +18,7 @@ if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
 | 
			
		|||
require_once("boot.php");
 | 
			
		||||
 | 
			
		||||
function poller_run($argv, $argc){
 | 
			
		||||
	global $a, $db, $poller_up_start;
 | 
			
		||||
	global $a, $db, $poller_up_start, $poller_db_duration;
 | 
			
		||||
 | 
			
		||||
	$poller_up_start = microtime(true);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -104,8 +104,9 @@ function poller_run($argv, $argc){
 | 
			
		|||
 | 
			
		||||
			// If possible we will fetch new jobs for this worker
 | 
			
		||||
			if (!$refetched && Lock::set('poller_worker_process', 0)) {
 | 
			
		||||
				logger('Blubb: a');
 | 
			
		||||
				$stamp = (float)microtime(true);
 | 
			
		||||
				$refetched = find_worker_processes();
 | 
			
		||||
				$poller_db_duration += (microtime(true) - $stamp);
 | 
			
		||||
				Lock::remove('poller_worker_process');
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -545,15 +546,16 @@ function poller_too_much_workers() {
 | 
			
		|||
			}
 | 
			
		||||
			dba::close($entries);
 | 
			
		||||
 | 
			
		||||
			$jobs_per_minute = 0;
 | 
			
		||||
 | 
			
		||||
			$jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE");
 | 
			
		||||
			if ($job = dba::fetch($jobs)) {
 | 
			
		||||
				$jobs_per_minute = number_format($job['jobs'] / 10, 0);
 | 
			
		||||
			$intervals = array(1, 10, 60);
 | 
			
		||||
			$jobs_per_minute = array();
 | 
			
		||||
			foreach ($intervals AS $interval) {
 | 
			
		||||
				$jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ".intval($interval)." MINUTE");
 | 
			
		||||
				if ($job = dba::fetch($jobs)) {
 | 
			
		||||
					$jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0);
 | 
			
		||||
				}
 | 
			
		||||
				dba::close($jobs);
 | 
			
		||||
			}
 | 
			
		||||
			dba::close($jobs);
 | 
			
		||||
 | 
			
		||||
			$processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
 | 
			
		||||
			$processlist = ' - jpm: '.implode('/', $jobs_per_minute).' ('.implode(', ', $listitem).')';
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		$entries = poller_total_entries();
 | 
			
		||||
| 
						 | 
				
			
			@ -664,34 +666,52 @@ function find_worker_processes($mypid = 0) {
 | 
			
		|||
 | 
			
		||||
	if (poller_passing_slow($highest_priority)) {
 | 
			
		||||
		// Are there waiting processes with a higher priority than the currently highest?
 | 
			
		||||
		$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
 | 
			
		||||
		$result = dba::p("SELECT `id` FROM `workerqueue`
 | 
			
		||||
					WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
 | 
			
		||||
					ORDER BY `priority`, `created` LIMIT ".intval($limit),
 | 
			
		||||
				datetime_convert(), $mypid, NULL_DATE, $highest_priority);
 | 
			
		||||
		if ($result) {
 | 
			
		||||
			$found = (dba::affected_rows() > 0);
 | 
			
		||||
				NULL_DATE, $highest_priority);
 | 
			
		||||
 | 
			
		||||
		while ($id = dba::fetch($result)) {
 | 
			
		||||
			$ids[] = $id["id"];
 | 
			
		||||
		}
 | 
			
		||||
		dba::close($result);
 | 
			
		||||
 | 
			
		||||
		$found = (count($ids) > 0);
 | 
			
		||||
 | 
			
		||||
		if (!$found) {
 | 
			
		||||
			// Give slower processes some processing time
 | 
			
		||||
			$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
 | 
			
		||||
			$result = dba::p("SELECT `id` FROM `workerqueue`
 | 
			
		||||
						WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
 | 
			
		||||
						ORDER BY `priority`, `created` LIMIT ".intval($limit),
 | 
			
		||||
					datetime_convert(), $mypid, NULL_DATE, $highest_priority);
 | 
			
		||||
			if ($result) {
 | 
			
		||||
				$found = (dba::affected_rows() > 0);
 | 
			
		||||
					NULL_DATE, $highest_priority);
 | 
			
		||||
 | 
			
		||||
			while ($id = dba::fetch($result)) {
 | 
			
		||||
				$ids[] = $id["id"];
 | 
			
		||||
			}
 | 
			
		||||
			dba::close($result);
 | 
			
		||||
 | 
			
		||||
			$found = (count($ids) > 0);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 | 
			
		||||
	if (!$found) {
 | 
			
		||||
		$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
 | 
			
		||||
				datetime_convert(), $mypid, NULL_DATE);
 | 
			
		||||
		if ($result) {
 | 
			
		||||
			$found = (dba::affected_rows() > 0);
 | 
			
		||||
		$result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE);
 | 
			
		||||
 | 
			
		||||
		while ($id = dba::fetch($result)) {
 | 
			
		||||
			$ids[] = $id["id"];
 | 
			
		||||
		}
 | 
			
		||||
		dba::close($result);
 | 
			
		||||
 | 
			
		||||
		$found = (count($ids) > 0);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if ($found) {
 | 
			
		||||
		$sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;";
 | 
			
		||||
		array_unshift($ids, datetime_convert(), $mypid);
 | 
			
		||||
		dba::e($sql, $ids);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return $found;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue