Merge pull request #8981 from annando/probe-lock
Remove probe caching, locking cleanup
This commit is contained in:
commit
e23f7d5a35
11 changed files with 55 additions and 73 deletions
|
@ -39,6 +39,8 @@ class Worker
|
|||
|
||||
const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
|
||||
|
||||
const LOCK_PROCESS = 'worker_process';
|
||||
const LOCK_WORKER = 'worker';
|
||||
|
||||
private static $up_start;
|
||||
private static $db_duration = 0;
|
||||
|
@ -125,9 +127,9 @@ class Worker
|
|||
}
|
||||
|
||||
// Trying to fetch new processes - but only once when successful
|
||||
if (!$refetched && DI::lock()->acquire('worker_process', 0)) {
|
||||
if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) {
|
||||
self::findWorkerProcesses();
|
||||
DI::lock()->release('worker_process');
|
||||
DI::lock()->release(self::LOCK_PROCESS);
|
||||
self::$state = self::STATE_REFETCH;
|
||||
$refetched = true;
|
||||
} else {
|
||||
|
@ -139,21 +141,21 @@ class Worker
|
|||
if (!self::getWaitingJobForPID()) {
|
||||
self::$state = self::STATE_LONG_LOOP;
|
||||
|
||||
if (DI::lock()->acquire('worker', 0)) {
|
||||
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
|
||||
// Count active workers and compare them with a maximum value that depends on the load
|
||||
if (self::tooMuchWorkers()) {
|
||||
Logger::info('Active worker limit reached, quitting.');
|
||||
DI::lock()->release('worker');
|
||||
DI::lock()->release(self::LOCK_WORKER);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check free memory
|
||||
if (DI::process()->isMinMemoryReached()) {
|
||||
Logger::info('Memory limit reached, quitting.');
|
||||
DI::lock()->release('worker');
|
||||
DI::lock()->release(self::LOCK_WORKER);
|
||||
return;
|
||||
}
|
||||
DI::lock()->release('worker');
|
||||
DI::lock()->release(self::LOCK_WORKER);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -949,14 +951,14 @@ class Worker
|
|||
}
|
||||
|
||||
$stamp = (float)microtime(true);
|
||||
if (!DI::lock()->acquire('worker_process')) {
|
||||
if (!DI::lock()->acquire(self::LOCK_PROCESS)) {
|
||||
return false;
|
||||
}
|
||||
self::$lock_duration += (microtime(true) - $stamp);
|
||||
|
||||
$found = self::findWorkerProcesses();
|
||||
|
||||
DI::lock()->release('worker_process');
|
||||
DI::lock()->release(self::LOCK_PROCESS);
|
||||
|
||||
if ($found) {
|
||||
$stamp = (float)microtime(true);
|
||||
|
@ -1194,13 +1196,13 @@ class Worker
|
|||
}
|
||||
|
||||
// If there is a lock then we don't have to check for too much worker
|
||||
if (!DI::lock()->acquire('worker', 0)) {
|
||||
if (!DI::lock()->acquire(self::LOCK_WORKER, 0)) {
|
||||
return $added;
|
||||
}
|
||||
|
||||
// If there are already enough workers running, don't fork another one
|
||||
$quit = self::tooMuchWorkers();
|
||||
DI::lock()->release('worker');
|
||||
DI::lock()->release(self::LOCK_WORKER);
|
||||
|
||||
if ($quit) {
|
||||
return $added;
|
||||
|
|
|
@ -251,7 +251,7 @@ class Contact
|
|||
$updated = max($contact['success_update'], $contact['updated'], $contact['last-update'], $contact['failure_update']);
|
||||
if ((($updated < DateTimeFormat::utc('now -7 days')) || empty($contact['avatar'])) &&
|
||||
in_array($contact['network'], Protocol::FEDERATED)) {
|
||||
Worker::add(PRIORITY_LOW, "UpdateContact", $contact['id'], ($uid == 0 ? 'force' : ''));
|
||||
Worker::add(PRIORITY_LOW, "UpdateContact", $contact['id']);
|
||||
}
|
||||
|
||||
// Remove the internal fields
|
||||
|
@ -409,7 +409,7 @@ class Contact
|
|||
}
|
||||
|
||||
// Update the existing contact
|
||||
self::updateFromProbe($contact['id'], '', true);
|
||||
self::updateFromProbe($contact['id']);
|
||||
|
||||
// And fetch the result
|
||||
$contact = DBA::selectFirst('contact', ['baseurl'], ['id' => $contact['id']]);
|
||||
|
@ -1249,9 +1249,9 @@ class Contact
|
|||
|
||||
if ($background_update && !$probed && in_array($data["network"], array_merge(Protocol::NATIVE_SUPPORT, [Protocol::PUMPIO]))) {
|
||||
// Update in the background when we fetched the data solely from the database
|
||||
Worker::add(PRIORITY_MEDIUM, "UpdateContact", $contact_id, ($uid == 0 ? 'force' : ''));
|
||||
Worker::add(PRIORITY_MEDIUM, "UpdateContact", $contact_id);
|
||||
} elseif (!empty($data['network'])) {
|
||||
self::updateFromProbeArray($contact_id, $data, false);
|
||||
self::updateFromProbeArray($contact_id, $data);
|
||||
} else {
|
||||
Logger::info('Invalid data', ['url' => $url, 'data' => $data]);
|
||||
}
|
||||
|
@ -1814,31 +1814,29 @@ class Contact
|
|||
/**
|
||||
* @param integer $id contact id
|
||||
* @param string $network Optional network we are probing for
|
||||
* @param boolean $force Optional forcing of network probing (otherwise we use the cached data)
|
||||
* @return boolean
|
||||
* @throws HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function updateFromProbe(int $id, string $network = '', bool $force = false)
|
||||
public static function updateFromProbe(int $id, string $network = '')
|
||||
{
|
||||
$contact = DBA::selectFirst('contact', ['uid', 'url'], ['id' => $id]);
|
||||
if (!DBA::isResult($contact)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$ret = Probe::uri($contact['url'], $network, $contact['uid'], !$force);
|
||||
return self::updateFromProbeArray($id, $ret, $force);
|
||||
$ret = Probe::uri($contact['url'], $network, $contact['uid']);
|
||||
return self::updateFromProbeArray($id, $ret);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param integer $id contact id
|
||||
* @param array $ret Probed data
|
||||
* @param boolean $force Optional forcing of network probing (otherwise we use the cached data)
|
||||
* @return boolean
|
||||
* @throws HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
private static function updateFromProbeArray(int $id, array $ret, bool $force = false)
|
||||
private static function updateFromProbeArray(int $id, array $ret)
|
||||
{
|
||||
/*
|
||||
Warning: Never ever fetch the public key via Probe::uri and write it into the contacts.
|
||||
|
@ -1878,7 +1876,7 @@ class Contact
|
|||
|
||||
// If Probe::uri fails the network code will be different ("feed" or "unkn")
|
||||
if (in_array($ret['network'], [Protocol::FEED, Protocol::PHANTOM]) && ($ret['network'] != $contact['network'])) {
|
||||
if ($force && ($uid == 0)) {
|
||||
if ($uid == 0) {
|
||||
self::updateContact($id, $uid, $ret['url'], ['failed' => true, 'last-update' => $updated, 'failure_update' => $updated]);
|
||||
}
|
||||
return false;
|
||||
|
@ -1933,7 +1931,7 @@ class Contact
|
|||
}
|
||||
|
||||
if (!empty($ret['photo']) && ($ret['network'] != Protocol::FEED)) {
|
||||
self::updateAvatar($id, $ret['photo'], $update || $force);
|
||||
self::updateAvatar($id, $ret['photo'], $update);
|
||||
}
|
||||
|
||||
if (!$update) {
|
||||
|
@ -1941,7 +1939,10 @@ class Contact
|
|||
|
||||
// Update the public contact
|
||||
if ($uid != 0) {
|
||||
self::updateFromProbeByURL($ret['url']);
|
||||
$contact = self::getByURL($ret['url'], false, ['id']);
|
||||
if (!empty($contact['id'])) {
|
||||
self::updateFromProbeArray($contact['id'], $ret);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -1963,7 +1964,7 @@ class Contact
|
|||
$ret['name-date'] = $updated;
|
||||
}
|
||||
|
||||
if ($force && ($uid == 0)) {
|
||||
if ($uid == 0) {
|
||||
$ret['last-update'] = $updated;
|
||||
$ret['success_update'] = $updated;
|
||||
$ret['failed'] = false;
|
||||
|
@ -1976,7 +1977,13 @@ class Contact
|
|||
return true;
|
||||
}
|
||||
|
||||
public static function updateFromProbeByURL($url, $force = false)
|
||||
/**
|
||||
* @param integer $url contact url
|
||||
* @return integer Contact id
|
||||
* @throws HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function updateFromProbeByURL($url)
|
||||
{
|
||||
$id = self::getIdForURL($url);
|
||||
|
||||
|
@ -1984,7 +1991,7 @@ class Contact
|
|||
return $id;
|
||||
}
|
||||
|
||||
self::updateFromProbe($id, '', $force);
|
||||
self::updateFromProbe($id);
|
||||
|
||||
return $id;
|
||||
}
|
||||
|
@ -2080,7 +2087,7 @@ class Contact
|
|||
if (!empty($arr['contact']['name'])) {
|
||||
$ret = $arr['contact'];
|
||||
} else {
|
||||
$ret = Probe::uri($url, $network, $user['uid'], false);
|
||||
$ret = Probe::uri($url, $network, $user['uid']);
|
||||
}
|
||||
|
||||
if (($network != '') && ($ret['network'] != $network)) {
|
||||
|
@ -2371,7 +2378,7 @@ class Contact
|
|||
}
|
||||
|
||||
// Ensure to always have the correct network type, independent from the connection request method
|
||||
self::updateFromProbe($contact['id'], '', true);
|
||||
self::updateFromProbe($contact['id']);
|
||||
|
||||
return true;
|
||||
} else {
|
||||
|
@ -2400,7 +2407,7 @@ class Contact
|
|||
$contact_id = DBA::lastInsertId();
|
||||
|
||||
// Ensure to always have the correct network type, independent from the connection request method
|
||||
self::updateFromProbe($contact_id, '', true);
|
||||
self::updateFromProbe($contact_id);
|
||||
|
||||
self::updateAvatar($contact_id, $photo, true);
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ class Probe extends BaseModule
|
|||
$res = '';
|
||||
|
||||
if (!empty($addr)) {
|
||||
$res = NetworkProbe::uri($addr, '', 0, false);
|
||||
$res = NetworkProbe::uri($addr, '', 0);
|
||||
$res = print_r($res, true);
|
||||
}
|
||||
|
||||
|
|
|
@ -328,16 +328,8 @@ class Probe
|
|||
* @throws HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function uri($uri, $network = '', $uid = -1, $cache = true)
|
||||
public static function uri($uri, $network = '', $uid = -1)
|
||||
{
|
||||
$cachekey = 'Probe::uri:' . $network . ':' . $uri;
|
||||
if ($cache) {
|
||||
$result = DI::cache()->get($cachekey);
|
||||
if (!is_null($result)) {
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
if ($uid == -1) {
|
||||
$uid = local_user();
|
||||
}
|
||||
|
@ -408,14 +400,7 @@ class Probe
|
|||
$data['hide'] = self::getHideStatus($data['url']);
|
||||
}
|
||||
|
||||
$data = self::rearrangeData($data);
|
||||
|
||||
// Only store into the cache if the value seems to be valid
|
||||
if (!in_array($data['network'], [Protocol::PHANTOM, Protocol::MAIL])) {
|
||||
DI::cache()->set($cachekey, $data, Duration::DAY);
|
||||
}
|
||||
|
||||
return $data;
|
||||
return self::rearrangeData($data);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -822,7 +822,7 @@ class Processor
|
|||
}
|
||||
|
||||
Logger::info('Updating profile', ['object' => $activity['object_id']]);
|
||||
Contact::updateFromProbeByURL($activity['object_id'], true);
|
||||
Contact::updateFromProbeByURL($activity['object_id']);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -679,7 +679,7 @@ class Receiver
|
|||
return;
|
||||
}
|
||||
|
||||
if (Contact::updateFromProbe($cid, '', true)) {
|
||||
if (Contact::updateFromProbe($cid)) {
|
||||
Logger::info('Update was successful', ['id' => $cid, 'uid' => $uid, 'url' => $url]);
|
||||
}
|
||||
|
||||
|
|
|
@ -2878,14 +2878,13 @@ class DFRN
|
|||
* Checks if the given contact url does support DFRN
|
||||
*
|
||||
* @param string $url profile url
|
||||
* @param boolean $update Update the profile
|
||||
* @return boolean
|
||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function isSupportedByContactUrl($url, $update = false)
|
||||
public static function isSupportedByContactUrl($url)
|
||||
{
|
||||
$probe = Probe::uri($url, Protocol::DFRN, 0, !$update);
|
||||
$probe = Probe::uri($url, Protocol::DFRN);
|
||||
return $probe['network'] == Protocol::DFRN;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -544,15 +544,8 @@ class OStatus
|
|||
} elseif ($item['contact-id'] < 0) {
|
||||
Logger::log("Item with uri ".$item["uri"]." is from a blocked contact.", Logger::DEBUG);
|
||||
} else {
|
||||
// We are having duplicated entries. Hopefully this solves it.
|
||||
if (DI::lock()->acquire('ostatus_process_item_insert')) {
|
||||
$ret = Item::insert($item);
|
||||
DI::lock()->release('ostatus_process_item_insert');
|
||||
Logger::log("Item with uri ".$item["uri"]." for user ".$importer["uid"].' stored. Return value: '.$ret);
|
||||
} else {
|
||||
$ret = Item::insert($item);
|
||||
Logger::log("We couldn't lock - but tried to store the item anyway. Return value is ".$ret);
|
||||
}
|
||||
$ret = Item::insert($item);
|
||||
Logger::log("Item with uri ".$item["uri"]." for user ".$importer["uid"].' stored. Return value: '.$ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2218,14 +2211,13 @@ class OStatus
|
|||
* Checks if the given contact url does support OStatus
|
||||
*
|
||||
* @param string $url profile url
|
||||
* @param boolean $update Update the profile
|
||||
* @return boolean
|
||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function isSupportedByContactUrl($url, $update = false)
|
||||
public static function isSupportedByContactUrl($url)
|
||||
{
|
||||
$probe = Probe::uri($url, Protocol::OSTATUS, 0, !$update);
|
||||
$probe = Probe::uri($url, Protocol::OSTATUS);
|
||||
return $probe['network'] == Protocol::OSTATUS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -168,7 +168,7 @@ class Cron
|
|||
$oldest_id = $contact['id'];
|
||||
$oldest_date = $contact['last-update'];
|
||||
}
|
||||
Worker::add(PRIORITY_LOW, "UpdateContact", $contact['id'], 'force');
|
||||
Worker::add(PRIORITY_LOW, "UpdateContact", $contact['id']);
|
||||
++$count;
|
||||
}
|
||||
Logger::info('Initiated update for public contacts', ['interval' => $count, 'id' => $oldest_id, 'oldest' => $oldest_date]);
|
||||
|
|
|
@ -61,12 +61,12 @@ class OnePoll
|
|||
}
|
||||
|
||||
if (($contact['network'] != Protocol::MAIL) || $force) {
|
||||
Contact::updateFromProbe($contact_id, '', $force);
|
||||
Contact::updateFromProbe($contact_id);
|
||||
}
|
||||
|
||||
// Special treatment for wrongly detected local contacts
|
||||
if (!$force && ($contact['network'] != Protocol::DFRN) && Contact::isLocalById($contact_id)) {
|
||||
Contact::updateFromProbe($contact_id, Protocol::DFRN, true);
|
||||
Contact::updateFromProbe($contact_id, Protocol::DFRN);
|
||||
$contact = DBA::selectFirst('contact', [], ['id' => $contact_id]);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,14 +29,11 @@ class UpdateContact
|
|||
/**
|
||||
* Update contact data via probe
|
||||
* @param int $contact_id Contact ID
|
||||
* @param string $command
|
||||
*/
|
||||
public static function execute($contact_id, $command = '')
|
||||
public static function execute($contact_id)
|
||||
{
|
||||
$force = ($command == "force");
|
||||
$success = Contact::updateFromProbe($contact_id);
|
||||
|
||||
$success = Contact::updateFromProbe($contact_id, '', $force);
|
||||
|
||||
Logger::info('Updated from probe', ['id' => $contact_id, 'force' => $force, 'success' => $success]);
|
||||
Logger::info('Updated from probe', ['id' => $contact_id, 'success' => $success]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue