Conversation entries will now be stored asynchronous if possible

This commit is contained in:
Michael 2022-08-10 09:28:18 +00:00
parent 548b7f43a5
commit d9aee0b3ea
8 changed files with 120 additions and 56 deletions

View file

@ -1,6 +1,6 @@
-- ------------------------------------------ -- ------------------------------------------
-- Friendica 2022.09-dev (Giant Rhubarb) -- Friendica 2022.09-dev (Giant Rhubarb)
-- DB_UPDATE_VERSION 1478 -- DB_UPDATE_VERSION 1479
-- ------------------------------------------ -- ------------------------------------------
@ -1726,13 +1726,13 @@ CREATE TABLE IF NOT EXISTS `arrived-activity` (
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities'; ) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities';
-- --
-- TABLE processed-activity -- TABLE fetched-activity
-- --
CREATE TABLE IF NOT EXISTS `processed-activity` ( CREATE TABLE IF NOT EXISTS `fetched-activity` (
`object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity', `object-id` varbinary(255) NOT NULL COMMENT 'object id of fetched activity',
`received` datetime COMMENT 'Receiving date', `received` datetime COMMENT 'Receiving date',
PRIMARY KEY(`object-id`) PRIMARY KEY(`object-id`)
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities'; ) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of fetched activities';
-- --
-- TABLE worker-ipc -- TABLE worker-ipc

View file

@ -26,6 +26,7 @@ Database Tables
| [event](help/database/db_event) | Events | | [event](help/database/db_event) | Events |
| [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation | | [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation |
| [fetch-entry](help/database/db_fetch-entry) | | | [fetch-entry](help/database/db_fetch-entry) | |
| [fetched-activity](help/database/db_fetched-activity) | Id of fetched activities |
| [fsuggest](help/database/db_fsuggest) | friend suggestion stuff | | [fsuggest](help/database/db_fsuggest) | friend suggestion stuff |
| [group](help/database/db_group) | privacy groups, group info | | [group](help/database/db_group) | privacy groups, group info |
| [group_member](help/database/db_group_member) | privacy groups, member info | | [group_member](help/database/db_group_member) | privacy groups, member info |
@ -68,7 +69,6 @@ Database Tables
| [post-user](help/database/db_post-user) | User specific post data | | [post-user](help/database/db_post-user) | User specific post data |
| [post-user-notification](help/database/db_post-user-notification) | User post notifications | | [post-user-notification](help/database/db_post-user-notification) | User post notifications |
| [process](help/database/db_process) | Currently running system processes | | [process](help/database/db_process) | Currently running system processes |
| [processed-activity](help/database/db_processed-activity) | Id of processed activities |
| [profile](help/database/db_profile) | user profiles data | | [profile](help/database/db_profile) | user profiles data |
| [profile_field](help/database/db_profile_field) | Custom profile fields | | [profile_field](help/database/db_profile_field) | Custom profile fields |
| [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers | | [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers |

View file

@ -0,0 +1,22 @@
Table fetched-activity
===========
Id of fetched activities
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| --------- | ----------------------------- | -------------- | ---- | --- | ------- | ----- |
| object-id | object id of fetched activity | varbinary(255) | NO | PRI | NULL | |
| received | Receiving date | datetime | YES | | NULL | |
Indexes
------------
| Name | Fields |
| ------- | --------- |
| PRIMARY | object-id |
Return to [database documentation](help/database)

View file

@ -1,22 +0,0 @@
Table processed-activity
===========
Id of processed activities
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
| received | Receiving date | datetime | YES | | NULL | |
Indexes
------------
| Name | Fields |
| ------- | --------- |
| PRIMARY | object-id |
Return to [database documentation](help/database)

View file

@ -854,7 +854,7 @@ class Conversation
$row['direction'] = ['direction' => 7, 'title' => $this->l10n->t('You had been addressed (%s).', 'bcc')]; $row['direction'] = ['direction' => 7, 'title' => $this->l10n->t('You had been addressed (%s).', 'bcc')];
break; break;
case ItemModel::PR_FOLLOWER: case ItemModel::PR_FOLLOWER:
$row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['author-name'])]; $row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['causer-name'] ?: $row['author-name'])];
break; break;
case ItemModel::PR_TAG: case ItemModel::PR_TAG:
$row['direction'] = ['direction' => 4, 'title' => $this->l10n->t('You subscribed to one or more tags in this post.')]; $row['direction'] = ['direction' => 4, 'title' => $this->l10n->t('You subscribed to one or more tags in this post.')];

View file

@ -1513,7 +1513,7 @@ class Item
$is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE); $is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE);
if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) && if (($uid != 0) && (($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE && DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE &&
!in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) { !in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) {
Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]); Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]);

View file

@ -69,20 +69,20 @@ class Processor
*/ */
private static function addActivityId(string $id) private static function addActivityId(string $id)
{ {
DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); DBA::delete('fetched-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); DBA::insert('fetched-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
} }
/** /**
* Checks if the given object id has just been processed * Checks if the given object id has just been fetched
* *
* @param string $id * @param string $id
* *
* @return boolean * @return boolean
*/ */
private static function isProcessed(string $id): bool private static function isFetched(string $id): bool
{ {
return DBA::exists('processed-activity', ['object-id' => $id]); return DBA::exists('fetched-activity', ['object-id' => $id]);
} }
/** /**
@ -303,13 +303,6 @@ class Processor
*/ */
public static function createItem(array $activity, bool $fetch_parents): array public static function createItem(array $activity, bool $fetch_parents): array
{ {
if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) {
Logger::info('Id is already processed', ['id' => $activity['id']]);
return [];
}
self::addActivityId($activity['id']);
$item = []; $item = [];
$item['verb'] = Activity::POST; $item['verb'] = Activity::POST;
$item['thr-parent'] = $activity['reply-to-id']; $item['thr-parent'] = $activity['reply-to-id'];
@ -333,6 +326,7 @@ class Processor
if (!empty($conversation)) { if (!empty($conversation)) {
Logger::debug('Got conversation', ['conversation' => $item['conversation'], 'parent' => $conversation]); Logger::debug('Got conversation', ['conversation' => $item['conversation'], 'parent' => $conversation]);
$item['parent-uri'] = $conversation['uri']; $item['parent-uri'] = $conversation['uri'];
$item['parent-uri-id'] = ItemURI::getIdByURI($item['parent-uri']);
} }
} else { } else {
$conversation = []; $conversation = [];
@ -350,7 +344,7 @@ class Processor
} }
if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
$result = self::fetchParent($activity); $result = self::fetchParent($activity, !empty($conversation));
if (!empty($result)) { if (!empty($result)) {
if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
$item['thr-parent'] = $result; $item['thr-parent'] = $result;
@ -457,6 +451,8 @@ class Processor
return []; return [];
} }
$item['thr-parent-id'] = ItemURI::getIdByURI($item['thr-parent']);
$item = self::processContent($activity, $item); $item = self::processContent($activity, $item);
if (empty($item)) { if (empty($item)) {
Logger::info('Message was not processed'); Logger::info('Message was not processed');
@ -489,14 +485,26 @@ class Processor
* Fetch and process parent posts for the given activity * Fetch and process parent posts for the given activity
* *
* @param array $activity * @param array $activity
* @param bool $in_background
* *
* @return string * @return string
*/ */
private static function fetchParent(array $activity): string private static function fetchParent(array $activity, bool $in_background = false): string
{ {
if (self::isFetched($activity['reply-to-id'])) {
Logger::info('Id is already fetched', ['id' => $activity['reply-to-id']]);
return '';
}
self::addActivityId($activity['reply-to-id']);
if (!DI::config()->get('system', 'fetch_by_worker')) {
$in_background = false;
}
$recursion_depth = $activity['recursion-depth'] ?? 0; $recursion_depth = $activity['recursion-depth'] ?? 0;
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) {
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
@ -525,17 +533,19 @@ class Processor
} }
} elseif (self::isActivityGone($activity['reply-to-id'])) { } elseif (self::isActivityGone($activity['reply-to-id'])) {
Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]); Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
if (!empty($activity['entry-id'])) { if ($in_background) {
// fetching in background is done for all activities where we have got the conversation
// There we only delete the single activity and not thr whole thread since we can store the
// other posts in the thread even with missing posts.
Queue::remove($activity);
} elseif (!empty($activity['entry-id'])) {
Queue::deleteById($activity['entry-id']); Queue::deleteById($activity['entry-id']);
} }
return ''; return '';
} else { } elseif (!$in_background) {
Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
} } elseif ($in_background) {
Logger::notice('Fetching is done in the background.', ['parent' => $activity['reply-to-id']]);
if (Queue::hasWorker($activity['worker-id'] ?? 0)) {
Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
return '';
} }
if (!Fetch::hasWorker($activity['reply-to-id'])) { if (!Fetch::hasWorker($activity['reply-to-id'])) {
@ -1056,6 +1066,10 @@ class Processor
Logger::info('Accepting post', ['uid' => $receiver, 'url' => $item['uri']]); Logger::info('Accepting post', ['uid' => $receiver, 'url' => $item['uri']]);
} }
if (!self::hasParents($item, $receiver)) {
continue;
}
if (($item['gravity'] != GRAVITY_ACTIVITY) && ($activity['object_type'] == 'as:Event')) { if (($item['gravity'] != GRAVITY_ACTIVITY) && ($activity['object_type'] == 'as:Event')) {
$event_id = self::createEvent($activity, $item); $event_id = self::createEvent($activity, $item);
@ -1096,6 +1110,56 @@ class Processor
} }
} }
/**
* Checks if there are parent posts for the given receiver.
* If not, then the system will try to add them.
*
* @param array $item
* @param integer $receiver
* @return boolean
*/
private static function hasParents(array $item, int $receiver)
{
if (($receiver == 0) || ($item['gravity'] == GRAVITY_PARENT)) {
return true;
}
$fields = ['causer-id' => $item['causer-id'] ?? $item['author-id'], 'post-reason' => Item::PR_FETCHED];
$has_parents = false;
if (!empty($item['parent-uri-id'])) {
if (Post::exists(['uri-id' => $item['parent-uri-id'], 'uid' => $receiver])) {
$has_parents = true;
} elseif (Post::exists(['uri-id' => $item['parent-uri'], 'uid' => 0])) {
$stored = Item::storeForUserByUriId($item['parent-uri-id'], $receiver, $fields);
$has_parents = (bool)$stored;
if ($stored) {
Logger::notice('Inserted missing parent post', ['stored' => $stored, 'uid' => $receiver, 'parent' => $item['parent-uri']]);
} else {
Logger::notice('Parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'parent' => $item['parent-uri']]);
return false;
}
}
}
if (empty($item['parent-uri-id']) || ($item['thr-parent-id'] != $item['parent-uri-id'])) {
if (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => $receiver])) {
$has_parents = true;
} elseif (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => 0])) {
$stored = Item::storeForUserByUriId($item['thr-parent-id'], $receiver, $fields);
$has_parents = $has_parents || (bool)$stored;
if ($stored) {
Logger::notice('Inserted missing thread parent post', ['stored' => $stored, 'uid' => $receiver, 'thread-parent' => $item['thr-parent']]);
} else {
Logger::notice('Thread parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'thread-parent' => $item['thr-parent']]);
}
}
}
return $has_parents;
}
/** /**
* Store tags and mentions into the tag table * Store tags and mentions into the tag table
* *

View file

@ -55,7 +55,7 @@
use Friendica\Database\DBA; use Friendica\Database\DBA;
if (!defined('DB_UPDATE_VERSION')) { if (!defined('DB_UPDATE_VERSION')) {
define('DB_UPDATE_VERSION', 1478); define('DB_UPDATE_VERSION', 1479);
} }
return [ return [
@ -1734,10 +1734,10 @@ return [
], ],
"engine" => "MEMORY", "engine" => "MEMORY",
], ],
"processed-activity" => [ "fetched-activity" => [
"comment" => "Id of processed activities", "comment" => "Id of fetched activities",
"fields" => [ "fields" => [
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"], "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of fetched activity"],
"received" => ["type" => "datetime", "comment" => "Receiving date"], "received" => ["type" => "datetime", "comment" => "Receiving date"],
], ],
"indexes" => [ "indexes" => [