Replace direct database calls and anonymous arrays by repository calls and entity objects
This commit is contained in:
parent
49654c0d99
commit
a744b8b56f
|
@ -204,32 +204,31 @@ class Cron
|
||||||
*/
|
*/
|
||||||
private static function deliverPosts()
|
private static function deliverPosts()
|
||||||
{
|
{
|
||||||
$deliveries = DBA::p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM `delivery-queue` GROUP BY `gsid` ORDER BY RAND()");
|
foreach(DI::deliveryQueueItemRepo()->selectAggregateByServerId() as $delivery) {
|
||||||
while ($delivery = DBA::fetch($deliveries)) {
|
if ($delivery->failed > 0) {
|
||||||
if ($delivery['failed'] > 0) {
|
Logger::info('Removing failed deliveries', ['gsid' => $delivery->targetServerId, 'failed' => $delivery->failed]);
|
||||||
Logger::info('Removing failed deliveries', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed']]);
|
DI::deliveryQueueItemRepo()->removeFailedByServerId($delivery->targetServerId, DI::config()->get('system', 'worker_defer_limit'));
|
||||||
Delivery::removeFailedQueue($delivery['gsid']);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (($delivery['failed'] < 3) || GServer::isReachableById($delivery['gsid'])) {
|
if (($delivery->failed < 3) || GServer::isReachableById($delivery->targetServerId)) {
|
||||||
$priority = Worker::PRIORITY_HIGH;
|
$priority = Worker::PRIORITY_HIGH;
|
||||||
} elseif ($delivery['failed'] < 6) {
|
} elseif ($delivery->failed < 6) {
|
||||||
$priority = Worker::PRIORITY_MEDIUM;
|
$priority = Worker::PRIORITY_MEDIUM;
|
||||||
} elseif ($delivery['failed'] < 8) {
|
} elseif ($delivery->failed < 8) {
|
||||||
$priority = Worker::PRIORITY_LOW;
|
$priority = Worker::PRIORITY_LOW;
|
||||||
} else {
|
} else {
|
||||||
$priority = Worker::PRIORITY_NEGLIGIBLE;
|
$priority = Worker::PRIORITY_NEGLIGIBLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery['gsid'])) {
|
if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery->targetServerId)) {
|
||||||
Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed'], 'priority' => $priority]);
|
Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery->targetServerId, 'failed' => $delivery->failed, 'priority' => $priority]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optimizing this table only last seconds
|
// Optimizing this table only last seconds
|
||||||
if (DI::config()->get('system', 'optimize_tables')) {
|
if (DI::config()->get('system', 'optimize_tables')) {
|
||||||
Logger::info('Optimize start');
|
Logger::info('Optimize start');
|
||||||
DBA::e("OPTIMIZE TABLE `delivery-queue`");
|
DI::deliveryQueueItemRepo()->optimizeStorage();
|
||||||
Logger::info('Optimize end');
|
Logger::info('Optimize end');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -517,7 +517,7 @@ class DBA
|
||||||
* Build the table query substring from one or more tables, with or without a schema.
|
* Build the table query substring from one or more tables, with or without a schema.
|
||||||
*
|
*
|
||||||
* Expected formats:
|
* Expected formats:
|
||||||
* - table
|
* - [table]
|
||||||
* - [table1, table2, ...]
|
* - [table1, table2, ...]
|
||||||
* - [schema1 => table1, schema2 => table2, table3, ...]
|
* - [schema1 => table1, schema2 => table2, table3, ...]
|
||||||
*
|
*
|
||||||
|
|
|
@ -23,6 +23,7 @@ namespace Friendica\Worker;
|
||||||
|
|
||||||
use Friendica\Core\Logger;
|
use Friendica\Core\Logger;
|
||||||
use Friendica\Core\Worker;
|
use Friendica\Core\Worker;
|
||||||
|
use Friendica\DI;
|
||||||
use Friendica\Model\GServer;
|
use Friendica\Model\GServer;
|
||||||
use Friendica\Protocol\Delivery as ProtocolDelivery;
|
use Friendica\Protocol\Delivery as ProtocolDelivery;
|
||||||
|
|
||||||
|
@ -33,19 +34,19 @@ class BulkDelivery
|
||||||
$server_failure = false;
|
$server_failure = false;
|
||||||
$delivery_failure = false;
|
$delivery_failure = false;
|
||||||
|
|
||||||
$posts = ProtocolDelivery::selectQueueForServer($gsid);
|
$deliveryQueueItems = DI::deliveryQueueItemRepo()->selectByServerId($gsid, DI::config()->get('system', 'worker_defer_limit'));
|
||||||
foreach ($posts as $post) {
|
foreach ($deliveryQueueItems as $deliveryQueueItem) {
|
||||||
if (!$server_failure && ProtocolDelivery::deliver($post['command'], $post['uri-id'], $post['cid'], $post['uid'])) {
|
if (!$server_failure && ProtocolDelivery::deliver($deliveryQueueItem->command, $deliveryQueueItem->postUriId, $deliveryQueueItem->targetContactId, $deliveryQueueItem->senderUserId)) {
|
||||||
ProtocolDelivery::removeQueue($post['uri-id'], $post['gsid']);
|
DI::deliveryQueueItemRepo()->remove($deliveryQueueItem);
|
||||||
Logger::debug('Delivery successful', $post);
|
Logger::debug('Delivery successful', $deliveryQueueItem->toArray());
|
||||||
} else {
|
} else {
|
||||||
ProtocolDelivery::incrementFailedQueue($post['uri-id'], $post['gsid']);
|
DI::deliveryQueueItemRepo()->incrementFailed($deliveryQueueItem);
|
||||||
$delivery_failure = true;
|
$delivery_failure = true;
|
||||||
|
|
||||||
if (!$server_failure) {
|
if (!$server_failure) {
|
||||||
$server_failure = !GServer::isReachableById($gsid);
|
$server_failure = !GServer::isReachableById($gsid);
|
||||||
}
|
}
|
||||||
Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $post]);
|
Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $deliveryQueueItem]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +55,7 @@ class BulkDelivery
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($delivery_failure) {
|
if ($delivery_failure) {
|
||||||
ProtocolDelivery::removeFailedQueue($gsid);
|
DI::deliveryQueueItemRepo()->removeFailedByServerId($gsid, DI::config()->get('system', 'worker_defer_limit'));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -592,7 +592,8 @@ class Notifier
|
||||||
|
|
||||||
if (!empty($contact['gsid']) && DI::config()->get('system', 'bulk_delivery')) {
|
if (!empty($contact['gsid']) && DI::config()->get('system', 'bulk_delivery')) {
|
||||||
$delivery_queue_count++;
|
$delivery_queue_count++;
|
||||||
Delivery::addQueue($cmd, $post_uriid, $target_item['created'], $contact['id'], $contact['gsid'], $sender_uid);
|
$deliveryQueueItem = DI::deliveryQueueItemFactory()->createFromDelivery($cmd, $post_uriid, new \DateTimeImmutable($target_item['created']), $contact['id'], $contact['gsid'], $sender_uid);
|
||||||
|
DI::deliveryQueueItemRepo()->save($deliveryQueueItem);
|
||||||
Worker::add(['priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true], 'BulkDelivery', $contact['gsid']);
|
Worker::add(['priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true], 'BulkDelivery', $contact['gsid']);
|
||||||
} else {
|
} else {
|
||||||
if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) {
|
if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) {
|
||||||
|
|
Loading…
Reference in a new issue