Split expire.php in several processes / small worker changes

This commit is contained in:
Michael 2017-07-06 05:48:02 +00:00
parent fd36e4e8b0
commit 30b0a035f9
8 changed files with 79 additions and 38 deletions

View file

@ -253,7 +253,7 @@ function cron_poll_contacts($argc, $argv) {
} else { } else {
$priority = PRIORITY_LOW; $priority = PRIORITY_LOW;
} }
proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', intval($contact['id'])); proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', (int)$contact['id']);
} }
} }
} }

View file

@ -47,13 +47,12 @@ function discover_poco_run(&$argv, &$argc) {
logger('start '.$search); logger('start '.$search);
if ($mode == 8) { if ($mode == 8) {
$profile_url = base64_decode($argv[2]); if ($argv[2] != "") {
if ($profile_url != "") { poco_last_updated($argv[2], true);
poco_last_updated($profile_url, true);
} }
} elseif ($mode == 7) { } elseif ($mode == 7) {
if ($argc == 6) { if ($argc == 6) {
$url = base64_decode($argv[5]); $url = $argv[5];
} else { } else {
$url = ''; $url = '';
} }
@ -63,7 +62,7 @@ function discover_poco_run(&$argv, &$argc) {
} elseif ($mode == 5) { } elseif ($mode == 5) {
update_server(); update_server();
} elseif ($mode == 4) { } elseif ($mode == 4) {
$server_url = base64_decode($argv[2]); $server_url = $argv[2];
if ($server_url == "") { if ($server_url == "") {
return; return;
} }
@ -119,7 +118,7 @@ function update_server() {
} }
logger('Update server status for server '.$server["url"], LOGGER_DEBUG); logger('Update server status for server '.$server["url"], LOGGER_DEBUG);
proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode($server["url"])); proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", $server["url"]);
if (++$updated > 250) { if (++$updated > 250) {
return; return;
@ -178,7 +177,7 @@ function discover_users() {
if ((($server_url == "") && ($user["network"] == NETWORK_FEED)) || $force_update || poco_check_server($server_url, $user["network"])) { if ((($server_url == "") && ($user["network"] == NETWORK_FEED)) || $force_update || poco_check_server($server_url, $user["network"])) {
logger('Check profile '.$user["url"]); logger('Check profile '.$user["url"]);
proc_run(PRIORITY_LOW, "include/discover_poco.php", "check_profile", base64_encode($user["url"])); proc_run(PRIORITY_LOW, "include/discover_poco.php", "check_profile", $user["url"]);
if (++$checked > 100) { if (++$checked > 100) {
return; return;

View file

@ -9,14 +9,32 @@ function expire_run(&$argv, &$argc){
require_once('include/items.php'); require_once('include/items.php');
require_once('include/Contact.php'); require_once('include/Contact.php');
load_hooks();
if (($argc == 2) && (intval($argv[1]) > 0)) {
$user = dba::select('user', array('uid', 'username', 'expire'), array('uid' => $argv[1]), array('limit' => 1));
if (dbm::is_result($user)) {
logger('Expire items for user '.$user['uid'].' ('.$user['username'].') - interval: '.$user['expire'], LOGGER_DEBUG);
item_expire($user['uid'], $user['expire']);
logger('Expire items for user '.$user['uid'].' ('.$user['username'].') - done ', LOGGER_DEBUG);
}
return;
} elseif (($argc == 3) && ($argv[1] == 'hook') && is_array($a->hooks) && array_key_exists("expire", $a->hooks)) {
foreach ($a->hooks["expire"] as $hook) {
if ($hook[1] == $argv[2]) {
logger("Calling expire hook '" . $hook[1] . "'", LOGGER_DEBUG);
call_single_hook($a, $name, $hook, $data);
}
}
return;
}
// physically remove anything that has been deleted for more than two months // physically remove anything that has been deleted for more than two months
$r = dba::p("SELECT `id` FROM `item` WHERE `deleted` AND `changed` < UTC_TIMESTAMP() - INTERVAL 60 DAY"); $r = dba::p("SELECT `id` FROM `item` WHERE `deleted` AND `changed` < UTC_TIMESTAMP() - INTERVAL 60 DAY");
if (dbm::is_result($r)) { while ($row = dba::fetch($r)) {
while ($row = dba::fetch($r)) { dba::delete('item', array('id' => $row['id']));
dba::delete('item', array('id' => $row['id']));
}
dba::close($r);
} }
dba::close($r);
// make this optional as it could have a performance impact on large sites // make this optional as it could have a performance impact on large sites
if (intval(get_config('system', 'optimize_items'))) { if (intval(get_config('system', 'optimize_items'))) {
@ -25,17 +43,25 @@ function expire_run(&$argv, &$argc){
logger('expire: start'); logger('expire: start');
$r = q("SELECT `uid`, `username`, `expire` FROM `user` WHERE `expire` != 0"); $r = dba::p("SELECT `uid`, `username` FROM `user` WHERE `expire` != 0");
if (dbm::is_result($r)) { while ($row = dba::fetch($r)) {
foreach ($r as $rr) { logger('Calling expiry for user '.$row['uid'].' ('.$row['username'].')', LOGGER_DEBUG);
logger('Expire: ' . $rr['username'] . ' interval: ' . $rr['expire'], LOGGER_DEBUG); proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
item_expire($rr['uid'], $rr['expire']); 'include/expire.php', (int)$row['uid']);
}
dba::close($r);
logger('expire: calling hooks');
if (is_array($a->hooks) && array_key_exists('expire', $a->hooks)) {
foreach ($a->hooks['expire'] as $hook) {
logger("Calling expire hook for '" . $hook[1] . "'", LOGGER_DEBUG);
proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
'include/expire.php', 'hook', $hook[1]);
} }
} }
load_hooks(); logger('expire: end');
call_hooks('expire');
return; return;
} }

View file

@ -10,7 +10,7 @@ function gprobe_run(&$argv, &$argc){
if ($argc != 2) { if ($argc != 2) {
return; return;
} }
$url = hex2bin($argv[1]); $url = $argv[1];
$r = q("SELECT `id`, `url`, `network` FROM `gcontact` WHERE `nurl` = '%s' ORDER BY `id` LIMIT 1", $r = q("SELECT `id`, `url`, `network` FROM `gcontact` WHERE `nurl` = '%s' ORDER BY `id` LIMIT 1",
dbesc(normalise_link($url)) dbesc(normalise_link($url))

View file

@ -888,7 +888,7 @@ function zrl_init(App $a) {
return; return;
} }
proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($tmp_str)); proc_run(PRIORITY_LOW, 'include/gprobe.php', $tmp_str);
$arr = array('zrl' => $tmp_str, 'url' => $a->cmd); $arr = array('zrl' => $tmp_str, 'url' => $a->cmd);
call_hooks('zrl_init', $arr); call_hooks('zrl_init', $arr);
} }

View file

@ -88,9 +88,11 @@ function poller_run($argv, $argc){
$starttime = time(); $starttime = time();
// We fetch the next queue entry that is about to be executed // We fetch the next queue entry that is about to be executed
while ($r = poller_worker_process()) { while ($r = poller_worker_process($passing_slow)) {
$refetched = false; // When we are processing jobs with a lower priority, we don't refetch new jobs
// Otherwise fast jobs could wait behind slow ones and could be blocked.
$refetched = $passing_slow;
foreach ($r AS $entry) { foreach ($r AS $entry) {
// Assure that the priority is an integer value // Assure that the priority is an integer value
@ -105,7 +107,7 @@ function poller_run($argv, $argc){
// If possible we will fetch new jobs for this worker // If possible we will fetch new jobs for this worker
if (!$refetched && Lock::set('poller_worker_process', 0)) { if (!$refetched && Lock::set('poller_worker_process', 0)) {
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$refetched = find_worker_processes(); $refetched = find_worker_processes($passing_slow);
$poller_db_duration += (microtime(true) - $stamp); $poller_db_duration += (microtime(true) - $stamp);
Lock::remove('poller_worker_process'); Lock::remove('poller_worker_process');
} }
@ -666,19 +668,31 @@ function poller_passing_slow(&$highest_priority) {
/** /**
* @brief Find and claim the next worker process for us * @brief Find and claim the next worker process for us
* *
* @param boolean $passing_slow Returns if we had passed low priority processes
* @return boolean Have we found something? * @return boolean Have we found something?
*/ */
function find_worker_processes() { function find_worker_processes(&$passing_slow) {
$mypid = getmypid(); $mypid = getmypid();
// Check if we should pass some low priority process // Check if we should pass some low priority process
$highest_priority = 0; $highest_priority = 0;
$found = false; $found = false;
$passing_slow = false;
// The higher the number of parallel workers, the more we prefetch to prevent concurring access // The higher the number of parallel workers, the more we prefetch to prevent concurring access
$limit = Config::get("system", "worker_queues", 4); // We decrease the limit with the number of entries left in the queue
$limit = Config::get('system', 'worker_fetch_limit', $limit); $worker_queues = Config::get("system", "worker_queues", 4);
$queue_length = Config::get('system', 'worker_fetch_limit', $worker_queues);
$lower_job_limit = $worker_queues * $queue_length * 2;
$jobs = poller_total_entries();
// Now do some magic
$exponent = 2;
$slope = $queue_length / pow($lower_job_limit, $exponent);
$limit = min($queue_length, ceil($slope * pow($jobs, $exponent)));
logger('Total: '.$jobs.' - Maximum: '.$queue_length.' - jobs per queue: '.$limit, LOGGER_DEBUG);
if (poller_passing_slow($highest_priority)) { if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest? // Are there waiting processes with a higher priority than the currently highest?
@ -707,6 +721,7 @@ function find_worker_processes() {
dba::close($result); dba::close($result);
$found = (count($ids) > 0); $found = (count($ids) > 0);
$passing_slow = $found;
} }
} }
@ -734,9 +749,10 @@ function find_worker_processes() {
/** /**
* @brief Returns the next worker process * @brief Returns the next worker process
* *
* @param boolean $passing_slow Returns if we had passed low priority processes
* @return string SQL statement * @return string SQL statement
*/ */
function poller_worker_process() { function poller_worker_process(&$passing_slow) {
global $poller_db_duration, $poller_lock_duration; global $poller_db_duration, $poller_lock_duration;
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
@ -755,7 +771,7 @@ function poller_worker_process() {
$poller_lock_duration = (microtime(true) - $stamp); $poller_lock_duration = (microtime(true) - $stamp);
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
$found = find_worker_processes(); $found = find_worker_processes($passing_slow);
$poller_db_duration += (microtime(true) - $stamp); $poller_db_duration += (microtime(true) - $stamp);
Lock::remove('poller_worker_process'); Lock::remove('poller_worker_process');

View file

@ -52,7 +52,7 @@ function queue_run(&$argv, &$argc) {
if (dbm::is_result($r)) { if (dbm::is_result($r)) {
foreach ($r as $q_item) { foreach ($r as $q_item) {
logger('Call queue for id '.$q_item['id']); logger('Call queue for id '.$q_item['id']);
proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", $q_item['id']); proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", (int)$q_item['id']);
} }
} }
return; return;

View file

@ -38,7 +38,7 @@ require_once 'include/Photo.php';
*/ */
function poco_load($cid, $uid = 0, $zcid = 0, $url = null) { function poco_load($cid, $uid = 0, $zcid = 0, $url = null) {
// Call the function "poco_load_worker" via the worker // Call the function "poco_load_worker" via the worker
proc_run(PRIORITY_LOW, "include/discover_poco.php", "poco_load", intval($cid), intval($uid), intval($zcid), base64_encode($url)); proc_run(PRIORITY_LOW, "include/discover_poco.php", "poco_load", (int)$cid, (int)$uid, (int)$zcid, $url);
} }
/** /**
@ -1668,7 +1668,7 @@ function poco_fetch_serverlist($poco) {
$r = q("SELECT `nurl` FROM `gserver` WHERE `nurl` = '%s'", dbesc(normalise_link($server_url))); $r = q("SELECT `nurl` FROM `gserver` WHERE `nurl` = '%s'", dbesc(normalise_link($server_url)));
if (!dbm::is_result($r)) { if (!dbm::is_result($r)) {
logger("Call server check for server ".$server_url, LOGGER_DEBUG); logger("Call server check for server ".$server_url, LOGGER_DEBUG);
proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode($server_url)); proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", $server_url);
} }
} }
} }
@ -1690,7 +1690,7 @@ function poco_discover_federation() {
$servers = json_decode($serverdata); $servers = json_decode($serverdata);
foreach ($servers->pods as $server) { foreach ($servers->pods as $server) {
proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode("https://".$server->host)); proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", "https://".$server->host);
} }
} }
@ -1703,7 +1703,7 @@ function poco_discover_federation() {
foreach ($servers as $server) { foreach ($servers as $server) {
$url = (is_null($server->https_score) ? 'http' : 'https').'://'.$server->name; $url = (is_null($server->https_score) ? 'http' : 'https').'://'.$server->name;
proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode($url)); proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", $url);
} }
} }
} }
@ -1813,7 +1813,7 @@ function poco_discover($complete = false) {
} }
logger('Update directory from server '.$server['url'].' with ID '.$server['id'], LOGGER_DEBUG); logger('Update directory from server '.$server['url'].' with ID '.$server['id'], LOGGER_DEBUG);
proc_run(PRIORITY_LOW, "include/discover_poco.php", "update_server_directory", intval($server['id'])); proc_run(PRIORITY_LOW, "include/discover_poco.php", "update_server_directory", (int)$server['id']);
if (!$complete && (--$no_of_queries == 0)) { if (!$complete && (--$no_of_queries == 0)) {
break; break;
@ -2091,7 +2091,7 @@ function get_gcontact_id($contact) {
if ($doprobing) { if ($doprobing) {
logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG); logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG);
proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"])); proc_run(PRIORITY_LOW, 'include/gprobe.php', $contact["url"]);
} }
return $gcontact_id; return $gcontact_id;