diff --git a/src/Core/Worker/Cron.php b/src/Core/Worker/Cron.php index 21f67c030..de6a808ac 100644 --- a/src/Core/Worker/Cron.php +++ b/src/Core/Worker/Cron.php @@ -204,32 +204,31 @@ class Cron */ private static function deliverPosts() { - $deliveries = DBA::p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM `delivery-queue` GROUP BY `gsid` ORDER BY RAND()"); - while ($delivery = DBA::fetch($deliveries)) { - if ($delivery['failed'] > 0) { - Logger::info('Removing failed deliveries', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed']]); - Delivery::removeFailedQueue($delivery['gsid']); + foreach(DI::deliveryQueueItemRepo()->selectAggregateByServerId() as $delivery) { + if ($delivery->failed > 0) { + Logger::info('Removing failed deliveries', ['gsid' => $delivery->targetServerId, 'failed' => $delivery->failed]); + DI::deliveryQueueItemRepo()->removeFailedByServerId($delivery->targetServerId, DI::config()->get('system', 'worker_defer_limit')); } - if (($delivery['failed'] < 3) || GServer::isReachableById($delivery['gsid'])) { + if (($delivery->failed < 3) || GServer::isReachableById($delivery->targetServerId)) { $priority = Worker::PRIORITY_HIGH; - } elseif ($delivery['failed'] < 6) { + } elseif ($delivery->failed < 6) { $priority = Worker::PRIORITY_MEDIUM; - } elseif ($delivery['failed'] < 8) { + } elseif ($delivery->failed < 8) { $priority = Worker::PRIORITY_LOW; } else { $priority = Worker::PRIORITY_NEGLIGIBLE; } - if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery['gsid'])) { - Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed'], 'priority' => $priority]); + if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery->targetServerId)) { + Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery->targetServerId, 'failed' => $delivery->failed, 'priority' => $priority]); } } // Optimizing this table only last seconds if (DI::config()->get('system', 'optimize_tables')) { Logger::info('Optimize start'); - DBA::e("OPTIMIZE TABLE `delivery-queue`"); + DI::deliveryQueueItemRepo()->optimizeStorage(); Logger::info('Optimize end'); } } diff --git a/src/Database/DBA.php b/src/Database/DBA.php index 2e4fb3767..ffeecfaa7 100644 --- a/src/Database/DBA.php +++ b/src/Database/DBA.php @@ -517,7 +517,7 @@ class DBA * Build the table query substring from one or more tables, with or without a schema. * * Expected formats: - * - table + * - [table] * - [table1, table2, ...] * - [schema1 => table1, schema2 => table2, table3, ...] * diff --git a/src/Worker/BulkDelivery.php b/src/Worker/BulkDelivery.php index 5e335f720..4ac77b213 100644 --- a/src/Worker/BulkDelivery.php +++ b/src/Worker/BulkDelivery.php @@ -23,6 +23,7 @@ namespace Friendica\Worker; use Friendica\Core\Logger; use Friendica\Core\Worker; +use Friendica\DI; use Friendica\Model\GServer; use Friendica\Protocol\Delivery as ProtocolDelivery; @@ -33,19 +34,19 @@ class BulkDelivery $server_failure = false; $delivery_failure = false; - $posts = ProtocolDelivery::selectQueueForServer($gsid); - foreach ($posts as $post) { - if (!$server_failure && ProtocolDelivery::deliver($post['command'], $post['uri-id'], $post['cid'], $post['uid'])) { - ProtocolDelivery::removeQueue($post['uri-id'], $post['gsid']); - Logger::debug('Delivery successful', $post); + $deliveryQueueItems = DI::deliveryQueueItemRepo()->selectByServerId($gsid, DI::config()->get('system', 'worker_defer_limit')); + foreach ($deliveryQueueItems as $deliveryQueueItem) { + if (!$server_failure && ProtocolDelivery::deliver($deliveryQueueItem->command, $deliveryQueueItem->postUriId, $deliveryQueueItem->targetContactId, $deliveryQueueItem->senderUserId)) { + DI::deliveryQueueItemRepo()->remove($deliveryQueueItem); + Logger::debug('Delivery successful', $deliveryQueueItem->toArray()); } else { - ProtocolDelivery::incrementFailedQueue($post['uri-id'], $post['gsid']); + DI::deliveryQueueItemRepo()->incrementFailed($deliveryQueueItem); $delivery_failure = true; if (!$server_failure) { $server_failure = !GServer::isReachableById($gsid); } - Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $post]); + Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $deliveryQueueItem]); } } @@ -54,7 +55,7 @@ class BulkDelivery } if ($delivery_failure) { - ProtocolDelivery::removeFailedQueue($gsid); + DI::deliveryQueueItemRepo()->removeFailedByServerId($gsid, DI::config()->get('system', 'worker_defer_limit')); } } } diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 0b7460c53..2d21a0937 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -592,7 +592,8 @@ class Notifier if (!empty($contact['gsid']) && DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; - Delivery::addQueue($cmd, $post_uriid, $target_item['created'], $contact['id'], $contact['gsid'], $sender_uid); + $deliveryQueueItem = DI::deliveryQueueItemFactory()->createFromDelivery($cmd, $post_uriid, new \DateTimeImmutable($target_item['created']), $contact['id'], $contact['gsid'], $sender_uid); + DI::deliveryQueueItemRepo()->save($deliveryQueueItem); Worker::add(['priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true], 'BulkDelivery', $contact['gsid']); } else { if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) {