Recursively delete failed worker tasks
This commit is contained in:
		
					parent
					
						
							
								c2b2e8ae9f
							
						
					
				
			
			
				commit
				
					
						06280aa5a3
					
				
			
		
					 5 changed files with 82 additions and 30 deletions
				
			
		|  | @ -551,16 +551,13 @@ class Processor | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if ($activity['target_id'] != $actor['featured']) { | 		$parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]); | ||||||
| 			return null; | 		if (empty($parent['uri-id'])) { | ||||||
|  | 			if (self::fetchMissingActivity($activity['object_id'], $activity, '', Receiver::COMPLETION_AUTO)) { | ||||||
|  | 				$parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		$id = Contact::getIdForURL($activity['actor']); |  | ||||||
| 		if (empty($id)) { |  | ||||||
| 			return null; |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		$parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id'], 'author-id' => $id]); |  | ||||||
| 		if (!empty($parent['uri-id'])) { | 		if (!empty($parent['uri-id'])) { | ||||||
| 			return $parent['uri-id']; | 			return $parent['uri-id']; | ||||||
| 		} | 		} | ||||||
|  | @ -1191,20 +1188,27 @@ class Processor | ||||||
| 			return ''; | 			return ''; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		$signer = []; | ||||||
|  | 
 | ||||||
|  | 		if (!empty($object['attributedTo'])) { | ||||||
|  | 			$attributed_to = $object['attributedTo']; | ||||||
|  | 			if (is_array($attributed_to)) { | ||||||
|  | 				$compacted = JsonLD::compact($object); | ||||||
|  | 				$attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id'); | ||||||
|  | 			} | ||||||
|  | 			$signer[] = $attributed_to;	 | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		if (!empty($object['actor'])) { | 		if (!empty($object['actor'])) { | ||||||
| 			$object_actor = $object['actor']; | 			$object_actor = $object['actor']; | ||||||
| 		} elseif (!empty($object['attributedTo'])) { | 		} elseif (!empty($attributed_to)) { | ||||||
| 			$object_actor = $object['attributedTo']; | 			$object_actor = $attributed_to; | ||||||
| 			if (is_array($object_actor)) { |  | ||||||
| 				$compacted = JsonLD::compact($object); |  | ||||||
| 				$object_actor = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id'); |  | ||||||
| 			} |  | ||||||
| 		} else { | 		} else { | ||||||
| 			// Shouldn't happen
 | 			// Shouldn't happen
 | ||||||
| 			$object_actor = ''; | 			$object_actor = ''; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		$signer = [$object_actor]; | 		$signer[] = $object_actor; | ||||||
| 
 | 
 | ||||||
| 		if (!empty($child['author'])) { | 		if (!empty($child['author'])) { | ||||||
| 			$actor = $child['author']; | 			$actor = $child['author']; | ||||||
|  |  | ||||||
|  | @ -95,6 +95,42 @@ class Queue | ||||||
| 		DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); | 		DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * Delete all entries that depend on the given worker id | ||||||
|  | 	 * | ||||||
|  | 	 * @param integer $wid | ||||||
|  | 	 * @return void | ||||||
|  | 	 */ | ||||||
|  | 	public static function deleteByWorkerId(int $wid) | ||||||
|  | 	{ | ||||||
|  | 		$entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]); | ||||||
|  | 		while ($entry = DBA::fetch($entries)) { | ||||||
|  | 			self::deleteById($entry['id']); | ||||||
|  | 		} | ||||||
|  | 		DBA::close($entries); | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/** | ||||||
|  | 	 * Delete recursively an entry and all their children | ||||||
|  | 	 * | ||||||
|  | 	 * @param integer $id | ||||||
|  | 	 * @return void | ||||||
|  | 	 */ | ||||||
|  | 	private static function deleteById(int $id) | ||||||
|  | 	{ | ||||||
|  | 		$entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]); | ||||||
|  | 		if (empty($entry)) { | ||||||
|  | 			return; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		$children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]); | ||||||
|  | 		while ($child = DBA::fetch($children)) { | ||||||
|  | 			self::deleteById($child['id']); | ||||||
|  | 		} | ||||||
|  | 		DBA::close($children); | ||||||
|  | 		DBA::delete('inbox-entry', ['id' => $entry['id']]); | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	/** | 	/** | ||||||
| 	 * Set the worker id for the queue entry | 	 * Set the worker id for the queue entry | ||||||
| 	 * | 	 * | ||||||
|  | @ -145,6 +181,7 @@ class Queue | ||||||
| 
 | 
 | ||||||
| 		$activity['entry-id']        = $entry['id']; | 		$activity['entry-id']        = $entry['id']; | ||||||
| 		$activity['worker-id']       = $entry['wid']; | 		$activity['worker-id']       = $entry['wid']; | ||||||
|  | 		$activity['recursion-depth'] = 0; | ||||||
| 
 | 
 | ||||||
| 		$receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]); | 		$receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]); | ||||||
| 		while ($receiver = DBA::fetch($receivers)) { | 		while ($receiver = DBA::fetch($receivers)) { | ||||||
|  | @ -166,8 +203,13 @@ class Queue | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function processAll() | 	public static function processAll() | ||||||
| 	{ | 	{ | ||||||
| 		$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]); | 		$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`wid` IS NULL"], ['order' => ['id' => true]]); | ||||||
| 		while ($entry = DBA::fetch($entries)) { | 		while ($entry = DBA::fetch($entries)) { | ||||||
|  | 			// We don't need to process entries that depend on already existing entries.
 | ||||||
|  | 			if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) { | ||||||
|  | 				continue; | ||||||
|  | 			} | ||||||
|  | 			Logger::debug('Process leftover entry', $entry); | ||||||
| 			self::process($entry['id']); | 			self::process($entry['id']); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -517,10 +517,6 @@ class Receiver | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if (($type == 'as:Add') && is_array($activity['as:object']) && (count($activity['as:object']) == 1)) { |  | ||||||
| 			$trust_source = false; |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// $trust_source is called by reference and is set to true if the content was retrieved successfully
 | 		// $trust_source is called by reference and is set to true if the content was retrieved successfully
 | ||||||
| 		$object_data = self::prepareObjectData($activity, $uid, $push, $trust_source); | 		$object_data = self::prepareObjectData($activity, $uid, $push, $trust_source); | ||||||
| 		if (empty($object_data)) { | 		if (empty($object_data)) { | ||||||
|  | @ -556,10 +552,6 @@ class Receiver | ||||||
| 			$object_data['thread-children-type'] = $activity['thread-children-type']; | 			$object_data['thread-children-type'] = $activity['thread-children-type']; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if (!empty($activity['recursion-depth'])) { |  | ||||||
| 			$object_data['recursion-depth'] = $activity['recursion-depth']; |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// Internal flag for posts that arrived via relay
 | 		// Internal flag for posts that arrived via relay
 | ||||||
| 		if (!empty($activity['from-relay'])) { | 		if (!empty($activity['from-relay'])) { | ||||||
| 			$object_data['from-relay'] = $activity['from-relay']; | 			$object_data['from-relay'] = $activity['from-relay']; | ||||||
|  | @ -571,6 +563,10 @@ class Receiver | ||||||
| 
 | 
 | ||||||
| 		$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push); | 		$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push); | ||||||
| 
 | 
 | ||||||
|  | 		if (!empty($activity['recursion-depth'])) { | ||||||
|  | 			$object_data['recursion-depth'] = $activity['recursion-depth']; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) { | 		if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) { | ||||||
| 			self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); | 			self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -129,10 +129,13 @@ class Cron | ||||||
| 				Worker::add(PRIORITY_LOW, 'OptimizeTables'); | 				Worker::add(PRIORITY_LOW, 'OptimizeTables'); | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			DI::config()->set('system', 'last_cron_daily', time()); | 			// Process all unprocessed entries
 | ||||||
|  | 			Queue::processAll(); | ||||||
| 
 | 
 | ||||||
| 			// Resubscribe to relay servers
 | 			// Resubscribe to relay servers
 | ||||||
| 			Relay::reSubscribe(); | 			Relay::reSubscribe(); | ||||||
|  | 
 | ||||||
|  | 			DI::config()->set('system', 'last_cron_daily', time()); | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		Logger::notice('end'); | 		Logger::notice('end'); | ||||||
|  |  | ||||||
|  | @ -23,6 +23,7 @@ namespace Friendica\Worker; | ||||||
| 
 | 
 | ||||||
| use Friendica\Core\Logger; | use Friendica\Core\Logger; | ||||||
| use Friendica\Core\Worker; | use Friendica\Core\Worker; | ||||||
|  | use Friendica\DI; | ||||||
| use Friendica\Protocol\ActivityPub; | use Friendica\Protocol\ActivityPub; | ||||||
| use Friendica\Protocol\ActivityPub\Queue; | use Friendica\Protocol\ActivityPub\Queue; | ||||||
| use Friendica\Protocol\ActivityPub\Receiver; | use Friendica\Protocol\ActivityPub\Receiver; | ||||||
|  | @ -32,6 +33,8 @@ class FetchMissingActivity | ||||||
| 	/** | 	/** | ||||||
| 	 * Fetch missing activities | 	 * Fetch missing activities | ||||||
| 	 * @param string $url Contact URL | 	 * @param string $url Contact URL | ||||||
|  | 	 *  | ||||||
|  | 	 * @return void | ||||||
| 	 */ | 	 */ | ||||||
| 	public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL) | 	public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL) | ||||||
| 	{ | 	{ | ||||||
|  | @ -39,10 +42,14 @@ class FetchMissingActivity | ||||||
| 		$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); | 		$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); | ||||||
| 		if ($result) { | 		if ($result) { | ||||||
| 			Logger::info('Successfully fetched missing activity', ['url' => $url]); | 			Logger::info('Successfully fetched missing activity', ['url' => $url]); | ||||||
| 			Queue::processReplyByUri($url); |  | ||||||
| 		} elseif (!Worker::defer()) { | 		} elseif (!Worker::defer()) { | ||||||
| 			// @todo perform recursive deletion of all entries
 |  | ||||||
| 			Logger::info('Activity could not be fetched', ['url' => $url]); | 			Logger::info('Activity could not be fetched', ['url' => $url]); | ||||||
|  | 
 | ||||||
|  | 			// recursively delete all entries that belong to this worker task
 | ||||||
|  | 			$queue = DI::app()->getQueue(); | ||||||
|  | 			if (!empty($queue['id'])) { | ||||||
|  | 				Queue::deleteByWorkerId($queue['id']); | ||||||
|  | 			} | ||||||
| 		} else { | 		} else { | ||||||
| 			Logger::info('Fetching deferred', ['url' => $url]); | 			Logger::info('Fetching deferred', ['url' => $url]); | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue