1
0
Fork 0

Merge pull request #11759 from annando/enqueue-posts

Fetch missing posts via a queue
This commit is contained in:
Hypolite Petovan 2022-07-24 15:38:44 -04:00 committed by GitHub
commit a5d679ea95
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 908 additions and 362 deletions

View file

@ -1,6 +1,6 @@
-- ------------------------------------------
-- Friendica 2022.09-dev (Giant Rhubarb)
-- DB_UPDATE_VERSION 1473
-- DB_UPDATE_VERSION 1474
-- ------------------------------------------
@ -724,6 +724,44 @@ CREATE TABLE IF NOT EXISTS `hook` (
UNIQUE INDEX `hook_file_function` (`hook`,`file`,`function`)
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='addon hook registry';
--
-- TABLE inbox-entry
--
CREATE TABLE IF NOT EXISTS `inbox-entry` (
`id` int unsigned NOT NULL auto_increment COMMENT 'sequential ID',
`activity-id` varbinary(255) COMMENT 'id of the incoming activity',
`object-id` varbinary(255) COMMENT '',
`in-reply-to-id` varbinary(255) COMMENT '',
`conversation` varbinary(255) COMMENT '',
`type` varchar(64) COMMENT 'Type of the activity',
`object-type` varchar(64) COMMENT 'Type of the object activity',
`object-object-type` varchar(64) COMMENT 'Type of the object\'s object activity',
`received` datetime COMMENT 'Receiving date',
`activity` mediumtext COMMENT 'The JSON activity',
`signer` varchar(255) COMMENT '',
`push` boolean COMMENT 'Is the entry pushed or have pulled it?',
`trust` boolean COMMENT 'Do we trust this entry?',
`wid` int unsigned COMMENT 'Workerqueue id',
PRIMARY KEY(`id`),
UNIQUE INDEX `activity-id` (`activity-id`),
INDEX `object-id` (`object-id`),
INDEX `received` (`received`),
INDEX `wid` (`wid`),
FOREIGN KEY (`wid`) REFERENCES `workerqueue` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Incoming activity';
--
-- TABLE inbox-entry-receiver
--
CREATE TABLE IF NOT EXISTS `inbox-entry-receiver` (
`queue-id` int unsigned NOT NULL COMMENT '',
`uid` mediumint unsigned NOT NULL COMMENT 'User id',
PRIMARY KEY(`queue-id`,`uid`),
INDEX `uid` (`uid`),
FOREIGN KEY (`queue-id`) REFERENCES `inbox-entry` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE,
FOREIGN KEY (`uid`) REFERENCES `user` (`uid`) ON UPDATE RESTRICT ON DELETE CASCADE
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Receiver for the incoming activity';
--
-- TABLE inbox-status
--
@ -1287,6 +1325,7 @@ CREATE TABLE IF NOT EXISTS `post-tag` (
--
CREATE TABLE IF NOT EXISTS `post-thread` (
`uri-id` int unsigned NOT NULL COMMENT 'Id of the item-uri table entry that contains the item uri',
`conversation-id` int unsigned COMMENT 'Id of the item-uri table entry that contains the conversation uri',
`owner-id` int unsigned NOT NULL DEFAULT 0 COMMENT 'Item owner',
`author-id` int unsigned NOT NULL DEFAULT 0 COMMENT 'Item author',
`causer-id` int unsigned COMMENT 'Link to the contact table with uid=0 of the contact that caused the item creation',
@ -1296,12 +1335,14 @@ CREATE TABLE IF NOT EXISTS `post-thread` (
`changed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Date that something in the conversation changed, indicating clients should fetch the conversation again',
`commented` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT '',
PRIMARY KEY(`uri-id`),
INDEX `conversation-id` (`conversation-id`),
INDEX `owner-id` (`owner-id`),
INDEX `author-id` (`author-id`),
INDEX `causer-id` (`causer-id`),
INDEX `received` (`received`),
INDEX `commented` (`commented`),
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE,
FOREIGN KEY (`conversation-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE,
FOREIGN KEY (`owner-id`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT,
FOREIGN KEY (`author-id`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT,
FOREIGN KEY (`causer-id`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT
@ -1380,6 +1421,7 @@ CREATE TABLE IF NOT EXISTS `post-user` (
--
CREATE TABLE IF NOT EXISTS `post-thread-user` (
`uri-id` int unsigned NOT NULL COMMENT 'Id of the item-uri table entry that contains the item uri',
`conversation-id` int unsigned COMMENT 'Id of the item-uri table entry that contains the conversation uri',
`owner-id` int unsigned NOT NULL DEFAULT 0 COMMENT 'Item owner',
`author-id` int unsigned NOT NULL DEFAULT 0 COMMENT 'Item author',
`causer-id` int unsigned COMMENT 'Link to the contact table with uid=0 of the contact that caused the item creation',
@ -1404,6 +1446,7 @@ CREATE TABLE IF NOT EXISTS `post-thread-user` (
`post-user-id` int unsigned COMMENT 'Id of the post-user table',
PRIMARY KEY(`uid`,`uri-id`),
INDEX `uri-id` (`uri-id`),
INDEX `conversation-id` (`conversation-id`),
INDEX `owner-id` (`owner-id`),
INDEX `author-id` (`author-id`),
INDEX `causer-id` (`causer-id`),
@ -1418,6 +1461,7 @@ CREATE TABLE IF NOT EXISTS `post-thread-user` (
INDEX `uid_starred` (`uid`,`starred`),
INDEX `uid_mention` (`uid`,`mention`),
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE,
FOREIGN KEY (`conversation-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE,
FOREIGN KEY (`owner-id`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT,
FOREIGN KEY (`author-id`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT,
FOREIGN KEY (`causer-id`) REFERENCES `contact` (`id`) ON UPDATE RESTRICT ON DELETE RESTRICT,
@ -1709,6 +1753,8 @@ CREATE VIEW `post-user-view` AS SELECT
`post-user`.`parent-uri-id` AS `parent-uri-id`,
`thr-parent-item-uri`.`uri` AS `thr-parent`,
`post-user`.`thr-parent-id` AS `thr-parent-id`,
`conversation-item-uri`.`uri` AS `conversation`,
`post-thread-user`.`conversation-id` AS `conversation-id`,
`item-uri`.`guid` AS `guid`,
`post-user`.`wall` AS `wall`,
`post-user`.`gravity` AS `gravity`,
@ -1857,6 +1903,7 @@ CREATE VIEW `post-user-view` AS SELECT
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post-user`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post-user`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post-user`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread-user`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post-user`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post-user`.`vid`
LEFT JOIN `event` ON `event`.`id` = `post-user`.`event-id`
@ -1883,6 +1930,8 @@ CREATE VIEW `post-thread-user-view` AS SELECT
`post-user`.`parent-uri-id` AS `parent-uri-id`,
`thr-parent-item-uri`.`uri` AS `thr-parent`,
`post-user`.`thr-parent-id` AS `thr-parent-id`,
`conversation-item-uri`.`uri` AS `conversation`,
`post-thread-user`.`conversation-id` AS `conversation-id`,
`item-uri`.`guid` AS `guid`,
`post-thread-user`.`wall` AS `wall`,
`post-user`.`gravity` AS `gravity`,
@ -2030,6 +2079,7 @@ CREATE VIEW `post-thread-user-view` AS SELECT
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post-thread-user`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post-user`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post-user`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread-user`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post-user`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post-user`.`vid`
LEFT JOIN `event` ON `event`.`id` = `post-user`.`event-id`
@ -2052,6 +2102,8 @@ CREATE VIEW `post-view` AS SELECT
`post`.`parent-uri-id` AS `parent-uri-id`,
`thr-parent-item-uri`.`uri` AS `thr-parent`,
`post`.`thr-parent-id` AS `thr-parent-id`,
`conversation-item-uri`.`uri` AS `conversation`,
`post-thread`.`conversation-id` AS `conversation-id`,
`item-uri`.`guid` AS `guid`,
`post`.`gravity` AS `gravity`,
`external-item-uri`.`uri` AS `extid`,
@ -2169,6 +2221,7 @@ CREATE VIEW `post-view` AS SELECT
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post`.`vid`
LEFT JOIN `diaspora-interaction` ON `diaspora-interaction`.`uri-id` = `post`.`uri-id`
@ -2188,6 +2241,8 @@ CREATE VIEW `post-thread-view` AS SELECT
`post`.`parent-uri-id` AS `parent-uri-id`,
`thr-parent-item-uri`.`uri` AS `thr-parent`,
`post`.`thr-parent-id` AS `thr-parent-id`,
`conversation-item-uri`.`uri` AS `conversation`,
`post-thread`.`conversation-id` AS `conversation-id`,
`item-uri`.`guid` AS `guid`,
`post`.`gravity` AS `gravity`,
`external-item-uri`.`uri` AS `extid`,
@ -2305,6 +2360,7 @@ CREATE VIEW `post-thread-view` AS SELECT
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post-thread`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post`.`vid`
LEFT JOIN `diaspora-interaction` ON `diaspora-interaction`.`uri-id` = `post-thread`.`uri-id`

View file

@ -31,6 +31,8 @@ Database Tables
| [gserver](help/database/db_gserver) | Global servers |
| [gserver-tag](help/database/db_gserver-tag) | Tags that the server has subscribed |
| [hook](help/database/db_hook) | addon hook registry |
| [inbox-entry](help/database/db_inbox-entry) | Incoming activity |
| [inbox-entry-receiver](help/database/db_inbox-entry-receiver) | Receiver for the incoming activity |
| [inbox-status](help/database/db_inbox-status) | Status of ActivityPub inboxes |
| [intro](help/database/db_intro) | |
| [item-uri](help/database/db_item-uri) | URI and GUID for items |

View file

@ -0,0 +1,30 @@
Table inbox-entry-receiver
===========
Receiver for the incoming activity
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| -------- | ----------- | ------------------ | ---- | --- | ------- | ----- |
| queue-id | | int unsigned | NO | PRI | NULL | |
| uid | User id | mediumint unsigned | NO | PRI | NULL | |
Indexes
------------
| Name | Fields |
| ------- | ------------- |
| PRIMARY | queue-id, uid |
| uid | uid |
Foreign Keys
------------
| Field | Target Table | Target Field |
|-------|--------------|--------------|
| queue-id | [inbox-entry](help/database/db_inbox-entry) | id |
| uid | [user](help/database/db_user) | uid |
Return to [database documentation](help/database)

View file

@ -0,0 +1,44 @@
Table inbox-entry
===========
Incoming activity
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| ------------------ | -------------------------------------- | -------------- | ---- | --- | ------- | -------------- |
| id | sequential ID | int unsigned | NO | PRI | NULL | auto_increment |
| activity-id | id of the incoming activity | varbinary(255) | YES | | NULL | |
| object-id | | varbinary(255) | YES | | NULL | |
| in-reply-to-id | | varbinary(255) | YES | | NULL | |
| conversation | | varbinary(255) | YES | | NULL | |
| type | Type of the activity | varchar(64) | YES | | NULL | |
| object-type | Type of the object activity | varchar(64) | YES | | NULL | |
| object-object-type | Type of the object's object activity | varchar(64) | YES | | NULL | |
| received | Receiving date | datetime | YES | | NULL | |
| activity | The JSON activity | mediumtext | YES | | NULL | |
| signer | | varchar(255) | YES | | NULL | |
| push | Is the entry pushed or have pulled it? | boolean | YES | | NULL | |
| trust | Do we trust this entry? | boolean | YES | | NULL | |
| wid | Workerqueue id | int unsigned | YES | | NULL | |
Indexes
------------
| Name | Fields |
| ----------- | ------------------- |
| PRIMARY | id |
| activity-id | UNIQUE, activity-id |
| object-id | object-id |
| received | received |
| wid | wid |
Foreign Keys
------------
| Field | Target Table | Target Field |
|-------|--------------|--------------|
| wid | [workerqueue](help/database/db_workerqueue) | id |
Return to [database documentation](help/database)

View file

@ -6,31 +6,32 @@ Thread related data per user
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| ------------ | ------------------------------------------------------------------------------------------------------- | ------------------ | ---- | --- | ------------------- | ----- |
| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | |
| owner-id | Item owner | int unsigned | NO | | 0 | |
| author-id | Item author | int unsigned | NO | | 0 | |
| causer-id | Link to the contact table with uid=0 of the contact that caused the item creation | int unsigned | YES | | NULL | |
| network | | char(4) | NO | | | |
| created | | datetime | NO | | 0001-01-01 00:00:00 | |
| received | | datetime | NO | | 0001-01-01 00:00:00 | |
| changed | Date that something in the conversation changed, indicating clients should fetch the conversation again | datetime | NO | | 0001-01-01 00:00:00 | |
| commented | | datetime | NO | | 0001-01-01 00:00:00 | |
| uid | Owner id which owns this copy of the item | mediumint unsigned | NO | PRI | 0 | |
| pinned | deprecated | boolean | NO | | 0 | |
| starred | | boolean | NO | | 0 | |
| ignored | Ignore updates for this thread | boolean | NO | | 0 | |
| wall | This item was posted to the wall of uid | boolean | NO | | 0 | |
| mention | | boolean | NO | | 0 | |
| pubmail | | boolean | NO | | 0 | |
| forum_mode | Deprecated | tinyint unsigned | NO | | 0 | |
| contact-id | contact.id | int unsigned | NO | | 0 | |
| unseen | post has not been seen | boolean | NO | | 1 | |
| hidden | Marker to hide the post from the user | boolean | NO | | 0 | |
| origin | item originated at this site | boolean | NO | | 0 | |
| psid | ID of the permission set of this post | int unsigned | YES | | NULL | |
| post-user-id | Id of the post-user table | int unsigned | YES | | NULL | |
| Field | Description | Type | Null | Key | Default | Extra |
| --------------- | ------------------------------------------------------------------------------------------------------- | ------------------ | ---- | --- | ------------------- | ----- |
| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | |
| conversation-id | Id of the item-uri table entry that contains the conversation uri | int unsigned | YES | | NULL | |
| owner-id | Item owner | int unsigned | NO | | 0 | |
| author-id | Item author | int unsigned | NO | | 0 | |
| causer-id | Link to the contact table with uid=0 of the contact that caused the item creation | int unsigned | YES | | NULL | |
| network | | char(4) | NO | | | |
| created | | datetime | NO | | 0001-01-01 00:00:00 | |
| received | | datetime | NO | | 0001-01-01 00:00:00 | |
| changed | Date that something in the conversation changed, indicating clients should fetch the conversation again | datetime | NO | | 0001-01-01 00:00:00 | |
| commented | | datetime | NO | | 0001-01-01 00:00:00 | |
| uid | Owner id which owns this copy of the item | mediumint unsigned | NO | PRI | 0 | |
| pinned | deprecated | boolean | NO | | 0 | |
| starred | | boolean | NO | | 0 | |
| ignored | Ignore updates for this thread | boolean | NO | | 0 | |
| wall | This item was posted to the wall of uid | boolean | NO | | 0 | |
| mention | | boolean | NO | | 0 | |
| pubmail | | boolean | NO | | 0 | |
| forum_mode | Deprecated | tinyint unsigned | NO | | 0 | |
| contact-id | contact.id | int unsigned | NO | | 0 | |
| unseen | post has not been seen | boolean | NO | | 1 | |
| hidden | Marker to hide the post from the user | boolean | NO | | 0 | |
| origin | item originated at this site | boolean | NO | | 0 | |
| psid | ID of the permission set of this post | int unsigned | YES | | NULL | |
| post-user-id | Id of the post-user table | int unsigned | YES | | NULL | |
Indexes
------------
@ -39,6 +40,7 @@ Indexes
| ----------------- | ------------------- |
| PRIMARY | uid, uri-id |
| uri-id | uri-id |
| conversation-id | conversation-id |
| owner-id | owner-id |
| author-id | author-id |
| causer-id | causer-id |
@ -59,6 +61,7 @@ Foreign Keys
| Field | Target Table | Target Field |
|-------|--------------|--------------|
| uri-id | [item-uri](help/database/db_item-uri) | id |
| conversation-id | [item-uri](help/database/db_item-uri) | id |
| owner-id | [contact](help/database/db_contact) | id |
| author-id | [contact](help/database/db_contact) | id |
| causer-id | [contact](help/database/db_contact) | id |

View file

@ -6,29 +6,31 @@ Thread related data
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| --------- | ------------------------------------------------------------------------------------------------------- | ------------ | ---- | --- | ------------------- | ----- |
| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | |
| owner-id | Item owner | int unsigned | NO | | 0 | |
| author-id | Item author | int unsigned | NO | | 0 | |
| causer-id | Link to the contact table with uid=0 of the contact that caused the item creation | int unsigned | YES | | NULL | |
| network | | char(4) | NO | | | |
| created | | datetime | NO | | 0001-01-01 00:00:00 | |
| received | | datetime | NO | | 0001-01-01 00:00:00 | |
| changed | Date that something in the conversation changed, indicating clients should fetch the conversation again | datetime | NO | | 0001-01-01 00:00:00 | |
| commented | | datetime | NO | | 0001-01-01 00:00:00 | |
| Field | Description | Type | Null | Key | Default | Extra |
| --------------- | ------------------------------------------------------------------------------------------------------- | ------------ | ---- | --- | ------------------- | ----- |
| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | |
| conversation-id | Id of the item-uri table entry that contains the conversation uri | int unsigned | YES | | NULL | |
| owner-id | Item owner | int unsigned | NO | | 0 | |
| author-id | Item author | int unsigned | NO | | 0 | |
| causer-id | Link to the contact table with uid=0 of the contact that caused the item creation | int unsigned | YES | | NULL | |
| network | | char(4) | NO | | | |
| created | | datetime | NO | | 0001-01-01 00:00:00 | |
| received | | datetime | NO | | 0001-01-01 00:00:00 | |
| changed | Date that something in the conversation changed, indicating clients should fetch the conversation again | datetime | NO | | 0001-01-01 00:00:00 | |
| commented | | datetime | NO | | 0001-01-01 00:00:00 | |
Indexes
------------
| Name | Fields |
| --------- | --------- |
| PRIMARY | uri-id |
| owner-id | owner-id |
| author-id | author-id |
| causer-id | causer-id |
| received | received |
| commented | commented |
| Name | Fields |
| --------------- | --------------- |
| PRIMARY | uri-id |
| conversation-id | conversation-id |
| owner-id | owner-id |
| author-id | author-id |
| causer-id | causer-id |
| received | received |
| commented | commented |
Foreign Keys
------------
@ -36,6 +38,7 @@ Foreign Keys
| Field | Target Table | Target Field |
|-------|--------------|--------------|
| uri-id | [item-uri](help/database/db_item-uri) | id |
| conversation-id | [item-uri](help/database/db_item-uri) | id |
| owner-id | [contact](help/database/db_contact) | id |
| author-id | [contact](help/database/db_contact) | id |
| causer-id | [contact](help/database/db_contact) | id |

View file

@ -26,7 +26,6 @@ use Friendica\Core\Cache\Enum\Duration;
use Friendica\Core\Logger;
use Friendica\Core\System;
use Friendica\Database\DBA;
use Friendica\Database\DBStructure;
use Friendica\DI;
use Friendica\Network\HTTPClient\Client\HttpClientAccept;
use Friendica\Network\HTTPException;
@ -539,4 +538,28 @@ class APContact
HTTPSignature::setInboxStatus($url, true, $shared);
}
/**
* Check if the apcontact is a relay account
*
* @param array $apcontact
*
* @return bool
*/
public static function isRelay(array $apcontact): bool
{
if ($apcontact['nick'] != 'relay') {
return false;
}
if ($apcontact['type'] == 'Application') {
return true;
}
if (in_array($apcontact['type'], ['Group', 'Service']) && is_null($apcontact['outbox'])) {
return true;
}
return false;
}
}

View file

@ -83,7 +83,7 @@ class Item
// Field list that is used to display the items
const DISPLAY_FIELDLIST = [
'uid', 'id', 'parent', 'guid', 'network', 'gravity',
'uri-id', 'uri', 'thr-parent-id', 'thr-parent', 'parent-uri-id', 'parent-uri',
'uri-id', 'uri', 'thr-parent-id', 'thr-parent', 'parent-uri-id', 'parent-uri', 'conversation',
'commented', 'created', 'edited', 'received', 'verb', 'object-type', 'postopts', 'plink',
'wall', 'private', 'starred', 'origin', 'parent-origin', 'title', 'body', 'language',
'content-warning', 'location', 'coord', 'app', 'rendered-hash', 'rendered-html', 'object',
@ -103,7 +103,7 @@ class Item
// Field list that is used to deliver items via the protocols
const DELIVER_FIELDLIST = ['uid', 'id', 'parent', 'uri-id', 'uri', 'thr-parent', 'parent-uri', 'guid',
'parent-guid', 'received', 'created', 'edited', 'verb', 'object-type', 'object', 'target',
'parent-guid', 'conversation', 'received', 'created', 'edited', 'verb', 'object-type', 'object', 'target',
'private', 'title', 'body', 'raw-body', 'location', 'coord', 'app',
'inform', 'deleted', 'extid', 'post-type', 'post-reason', 'gravity',
'allow_cid', 'allow_gid', 'deny_cid', 'deny_gid',
@ -116,7 +116,7 @@ class Item
// All fields in the item table
const ITEM_FIELDLIST = ['id', 'uid', 'parent', 'uri', 'parent-uri', 'thr-parent',
'guid', 'uri-id', 'parent-uri-id', 'thr-parent-id', 'vid',
'guid', 'uri-id', 'parent-uri-id', 'thr-parent-id', 'conversation', 'vid',
'contact-id', 'wall', 'gravity', 'extid', 'psid',
'created', 'edited', 'commented', 'received', 'changed', 'verb',
'postopts', 'plink', 'resource-id', 'event-id', 'inform',
@ -683,14 +683,19 @@ class Item
'uri-id', 'parent-uri-id',
'allow_cid', 'allow_gid', 'deny_cid', 'deny_gid',
'wall', 'private', 'origin', 'author-id'];
$condition = ['uri-id' => $item['thr-parent-id'], 'uid' => $item['uid']];
$condition = ['uri-id' => [$item['thr-parent-id'], $item['parent-uri-id']], 'uid' => $item['uid']];
$params = ['order' => ['id' => false]];
$parent = Post::selectFirst($fields, $condition, $params);
if (!DBA::isResult($parent) && $item['origin']) {
if (!DBA::isResult($parent) && Post::exists(['uri-id' => [$item['thr-parent-id'], $item['parent-uri-id']], 'uid' => 0])) {
$stored = Item::storeForUserByUriId($item['thr-parent-id'], $item['uid']);
Logger::info('Stored thread parent item for user', ['uri-id' => $item['thr-parent-id'], 'uid' => $item['uid'], 'stored' => $stored]);
$parent = Post::selectFirst($fields, $condition, $params);
if (!$stored && ($item['thr-parent-id'] != $item['parent-uri-id'])) {
$stored = Item::storeForUserByUriId($item['parent-uri-id'], $item['uid']);
}
if ($stored) {
Logger::info('Stored thread parent item for user', ['uri-id' => $item['thr-parent-id'], 'uid' => $item['uid'], 'stored' => $stored]);
$parent = Post::selectFirst($fields, $condition, $params);
}
}
if (!DBA::isResult($parent)) {
@ -787,10 +792,13 @@ class Item
// Backward compatibility: parent-uri used to be the direct parent uri.
// If it is provided without a thr-parent, it probably is the old behavior.
$item['thr-parent'] = trim($item['thr-parent'] ?? $item['parent-uri'] ?? $item['uri']);
$item['parent-uri'] = $item['thr-parent'];
if (empty($item['thr-parent']) || empty($item['parent-uri'])) {
$item['thr-parent'] = trim($item['thr-parent'] ?? $item['parent-uri'] ?? $item['uri']);
$item['parent-uri'] = $item['thr-parent'];
}
$item['thr-parent-id'] = $item['parent-uri-id'] = ItemURI::getIdByURI($item['thr-parent']);
$item['thr-parent-id'] = ItemURI::getIdByURI($item['thr-parent']);
$item['parent-uri-id'] = ItemURI::getIdByURI($item['parent-uri']);
// Store conversation data
$item = Conversation::insert($item);
@ -900,7 +908,7 @@ class Item
$item['contact-id'] = self::contactId($item);
if (!empty($item['direction']) && in_array($item['direction'], [Conversation::PUSH, Conversation::RELAY]) &&
empty($item['origin']) &&self::isTooOld($item)) {
empty($item['origin']) && self::isTooOld($item)) {
Logger::info('Item is too old', ['item' => $item]);
return 0;
}
@ -966,11 +974,19 @@ class Item
} else {
$parent_id = 0;
$parent_origin = $item['origin'];
if ($item['wall'] && empty($item['conversation'])) {
$item['conversation'] = $item['parent-uri'] . '#context';
}
}
$item['parent-uri-id'] = ItemURI::getIdByURI($item['parent-uri']);
$item['thr-parent-id'] = ItemURI::getIdByURI($item['thr-parent']);
if (!empty($item['conversation']) && empty($item['conversation-id'])) {
$item['conversation-id'] = ItemURI::getIdByURI($item['conversation']);
}
// Is this item available in the global items (with uid=0)?
if ($item['uid'] == 0) {
$item['global'] = true;
@ -3410,9 +3426,7 @@ class Item
return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
}
$fetchQueue = new ActivityPub\FetchQueue();
$fetched_uri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $uri);
$fetchQueue->process();
$fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri);
if ($fetched_uri) {
$item_id = self::searchByLink($fetched_uri, $uid);

View file

@ -24,7 +24,6 @@ namespace Friendica\Model\Post;
use Friendica\Database\DBA;
use \BadMethodCallException;
use Friendica\Database\Database;
use Friendica\Database\DBStructure;
use Friendica\DI;
class User
@ -44,10 +43,6 @@ class User
throw new BadMethodCallException('Empty URI_id');
}
if (DBA::exists('post-user', ['uri-id' => $uri_id, 'uid' => $uid])) {
return false;
}
$fields = DI::dbaDefinition()->truncateFieldsForTable('post-user', $data);
// Additionally assign the key fields
@ -59,6 +54,33 @@ class User
$fields['unseen'] = false;
}
// Does the entry already exist?
if (DBA::exists('post-user', ['uri-id' => $uri_id, 'uid' => $uid])) {
$postuser = DBA::selectFirst('post-user', [], ['uri-id' => $uri_id, 'uid' => $uid]);
// We quit here, when there are obvious differences
foreach (['created', 'owner-id', 'author-id', 'vid', 'network', 'private', 'wall', 'origin'] as $key) {
if ($fields[$key] != $postuser[$key]) {
return 0;
}
}
$update = [];
foreach (['gravity', 'parent-uri-id', 'thr-parent-id'] as $key) {
if ($fields[$key] != $postuser[$key]) {
$update[$key] = $fields[$key];
}
}
// When the parents changed, we apply these changes to the existing entry
if (!empty($update)) {
DBA::update('post-user', $update, ['id' => $postuser['id']]);
return $postuser['id'];
} else {
return 0;
}
}
if (!DBA::insert('post-user', $fields, Database::INSERT_IGNORE)) {
return 0;
}

View file

@ -123,7 +123,7 @@ class ActivityPubConversion extends BaseModule
'content' => visible_whitespace(var_export($object_data, true))
];
$item = ActivityPub\Processor::createItem(new ActivityPub\FetchQueue(), $object_data);
$item = ActivityPub\Processor::createItem($object_data);
$results[] = [
'title' => DI::l10n()->t('Result Item'),

View file

@ -25,7 +25,6 @@ use Friendica\Core\Logger;
use Friendica\Core\Protocol;
use Friendica\Model\APContact;
use Friendica\Model\User;
use Friendica\Protocol\ActivityPub\FetchQueue;
use Friendica\Util\HTTPSignature;
use Friendica\Util\JsonLD;
@ -224,14 +223,10 @@ class ActivityPub
$items = [];
}
$fetchQueue = new FetchQueue();
foreach ($items as $activity) {
$ldactivity = JsonLD::compact($activity);
ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, '', $uid, true);
ActivityPub\Receiver::processActivity($ldactivity, '', $uid, true);
}
$fetchQueue->process();
}
/**

View file

@ -1,57 +0,0 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Protocol\ActivityPub;
/**
* This class prevents maximum function nesting errors by flattening recursive calls to Processor::fetchMissingActivity
*/
class FetchQueue
{
/** @var FetchQueueItem[] */
protected $queue = [];
public function push(FetchQueueItem $item)
{
array_push($this->queue, $item);
}
/**
* Processes missing activities one by one. It is possible that a processing call will add additional missing
* activities, they will be processed in subsequent iterations of the loop.
*
* Since this process is self-contained, it isn't suitable to retrieve the URI of a single activity.
*
* The simplest way to get the URI of the first activity and ensures all the parents are fetched is this way:
*
* $fetchQueue = new ActivityPub\FetchQueue();
* $fetchedUri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $activityUri);
* $fetchQueue->process();
*/
public function process()
{
while (count($this->queue)) {
$fetchQueueItem = array_pop($this->queue);
call_user_func_array([Processor::class, 'fetchMissingActivity'], array_merge([$this], $fetchQueueItem->toParameters()));
}
}
}

View file

@ -1,62 +0,0 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Protocol\ActivityPub;
class FetchQueueItem
{
/** @var string */
private $url;
/** @var array */
private $child;
/** @var string */
private $relay_actor;
/** @var int */
private $completion;
/**
* This constructor matches the signature of Processor::fetchMissingActivity except for the default $completion value
*
* @param string $url
* @param array $child
* @param string $relay_actor
* @param int $completion
*/
public function __construct(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_AUTO)
{
$this->url = $url;
$this->child = $child;
$this->relay_actor = $relay_actor;
$this->completion = $completion;
}
/**
* Array meant to be used in call_user_function_array([Processor::class, 'fetchMissingActivity']). Caller needs to
* provide an instance of a FetchQueue that isn't included in these parameters.
*
* @see FetchQueue::process()
* @return array
*/
public function toParameters(): array
{
return [$this->url, $this->child, $this->relay_actor, $this->completion];
}
}

View file

@ -45,7 +45,9 @@ use Friendica\Protocol\Activity;
use Friendica\Protocol\ActivityPub;
use Friendica\Protocol\Relay;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\HTTPSignature;
use Friendica\Util\JsonLD;
use Friendica\Util\Network;
use Friendica\Util\Strings;
use Friendica\Worker\Delivery;
@ -189,17 +191,16 @@ class Processor
/**
* Updates a message
*
* @param FetchQueue $fetchQueue
* @param array $activity Activity array
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
public static function updateItem(FetchQueue $fetchQueue, array $activity)
public static function updateItem(array $activity)
{
$item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]);
if (!DBA::isResult($item)) {
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
$item = self::createItem($fetchQueue, $activity);
$item = self::createItem($activity);
if (empty($item)) {
return;
}
@ -223,6 +224,8 @@ class Processor
Post\History::add($item['uri-id'], $item);
Item::update($item, ['uri' => $activity['id']]);
Queue::remove($activity);
if ($activity['object_type'] == 'as:Event') {
$posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]);
while ($post = DBA::fetch($posts)) {
@ -260,13 +263,12 @@ class Processor
/**
* Prepares data for a message
*
* @param FetchQueue $fetchQueue
* @param array $activity Activity array
* @return array Internal item
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
public static function createItem(FetchQueue $fetchQueue, array $activity): array
public static function createItem(array $activity): array
{
$item = [];
$item['verb'] = Activity::POST;
@ -280,20 +282,64 @@ class Processor
$item['object-type'] = Activity\ObjectType::COMMENT;
}
if (!empty($activity['context'])) {
$item['conversation'] = $activity['context'];
} elseif (!empty($activity['conversation'])) {
$item['conversation'] = $activity['conversation'];
}
if (!empty($item['conversation'])) {
$conversation = Post::selectFirstThread(['uri'], ['conversation' => $item['conversation']]);
if (!empty($conversation)) {
Logger::debug('Got conversation', ['conversation' => $item['conversation'], 'parent' => $conversation]);
$item['parent-uri'] = $conversation['uri'];
}
} else {
$conversation = [];
}
if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id']]);
/**
* Instead of calling recursively self::fetchMissingActivity which can hit PHP's default function nesting
* limit of 256 recursive calls, we push the parent activity fetch parameters in this queue. The initial
* caller is responsible for processing the remaining queue once the original activity has been processed.
*/
$fetchQueue->push(new FetchQueueItem($activity['reply-to-id'], $activity));
$recursion_depth = $activity['recursion-depth'] ?? 0;
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
if ($recursion_depth < 10) {
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
// Recursively delete this and all depending entries
Queue::deleteById($activity['entry-id']);
return [];
}
$fetch_by_worker = empty($result);
} else {
Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
$fetch_by_worker = true;
}
if ($fetch_by_worker && Queue::hasWorker($activity)) {
Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
$fetch_by_worker = false;
if (!empty($conversation)) {
return [];
}
}
if ($fetch_by_worker) {
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
$activity['recursion-depth'] = 0;
$wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
Queue::setWorkerId($activity, $wid);
if (!empty($conversation)) {
return [];
}
} elseif (!empty($result)) {
if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
$item['thr-parent'] = $result;
}
}
}
$item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? '';
/// @todo What to do with $activity['context']?
if (empty($activity['directmessage']) && ($item['gravity'] != GRAVITY_PARENT) && !Post::exists(['uri' => $item['thr-parent']])) {
if (empty($conversation) && empty($activity['directmessage']) && ($item['gravity'] != GRAVITY_PARENT) && !Post::exists(['uri' => $item['thr-parent']])) {
Logger::info('Parent not found, message will be discarded.', ['thr-parent' => $item['thr-parent']]);
return [];
}
@ -413,6 +459,24 @@ class Processor
return $item;
}
/**
* Check if a given activity is no longer available
*
* @param string $url
*
* @return boolean
*/
private static function isActivityGone(string $url): bool
{
$curlResult = HTTPSignature::fetchRaw($url, 0);
if (Network::isUrlBlocked($url)) {
return true;
}
// @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON
return in_array($curlResult->getReturnCode(), [404]);
}
/**
* Delete items
*
@ -426,6 +490,7 @@ class Processor
Logger::info('Deleting item', ['object' => $activity['object_id'], 'owner' => $owner]);
Item::markForDeletion(['uri' => $activity['object_id'], 'owner-id' => $owner]);
Queue::remove($activity);
}
/**
@ -461,15 +526,15 @@ class Processor
/**
* Prepare the item array for an activity
*
* @param FetchQueue $fetchQueue
* @param array $activity Activity array
* @param string $verb Activity verb
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
public static function createActivity(FetchQueue $fetchQueue, array $activity, string $verb)
public static function createActivity(array $activity, string $verb)
{
$item = self::createItem($fetchQueue, $activity);
$activity['reply-to-id'] = $activity['object_id'];
$item = self::createItem($activity);
if (empty($item)) {
return;
}
@ -511,16 +576,13 @@ class Processor
}
}
if ($activity['target_id'] != $actor['featured']) {
return null;
$parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]);
if (empty($parent['uri-id'])) {
if (self::fetchMissingActivity($activity['object_id'], $activity, '', Receiver::COMPLETION_AUTO)) {
$parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]);
}
}
$id = Contact::getIdForURL($activity['actor']);
if (empty($id)) {
return null;
}
$parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id'], 'author-id' => $id]);
if (!empty($parent['uri-id'])) {
return $parent['uri-id'];
}
@ -543,6 +605,7 @@ class Processor
Logger::debug('Add post to featured collection', ['uri-id' => $uriid]);
Post\Collection::add($uriid, Post\Collection::FEATURED);
Queue::remove($activity);
}
/**
@ -560,6 +623,7 @@ class Processor
Logger::debug('Remove post from featured collection', ['uri-id' => $uriid]);
Post\Collection::remove($uriid, Post\Collection::FEATURED);
Queue::remove($activity);
}
/**
@ -646,14 +710,14 @@ class Processor
$item['raw-body'] = $content;
$item['body'] = Item::improveSharedDataInBody($item);
} else {
if (empty($activity['directmessage']) && ($item['thr-parent'] != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) {
$item_private = !in_array(0, $activity['item_receiver']);
$parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $item['thr-parent']]);
$parent_uri = $item['parent-uri'] ?? $item['thr-parent'];
if (empty($activity['directmessage']) && ($parent_uri != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) {
$parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $parent_uri]);
if (!DBA::isResult($parent)) {
Logger::warning('Unknown parent item.', ['uri' => $item['thr-parent']]);
Logger::warning('Unknown parent item.', ['uri' => $parent_uri]);
return false;
}
if ($item_private && ($parent['private'] != Item::PRIVATE)) {
if (($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
return false;
}
@ -778,6 +842,7 @@ class Processor
}
$stored = false;
$success = false;
ksort($activity['receiver']);
if (!self::isSolicitedMessage($activity, $item)) {
@ -890,8 +955,12 @@ class Processor
$item_id = Item::insert($item);
if ($item_id) {
Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]);
$success = true;
} else {
Logger::notice('Item insertion aborted', ['user' => $item['uid']]);
Logger::notice('Item insertion aborted', ['uri' => $item['uri'], 'uid' => $item['uid']]);
if (Item::isTooOld($item) || !Item::isValid($item)) {
Queue::remove($activity);
}
}
if ($item['uid'] == 0) {
@ -899,6 +968,11 @@ class Processor
}
}
if ($success) {
Queue::remove($activity);
Queue::processReplyByUri($item['uri']);
}
// Store send a follow request for every reshare - but only when the item had been stored
if ($stored && ($item['private'] != Item::PRIVATE) && ($item['gravity'] == GRAVITY_PARENT) && !empty($item['author-link']) && ($item['author-link'] != $item['owner-link'])) {
$author = APContact::getByURL($item['owner-link'], false);
@ -1115,7 +1189,6 @@ class Processor
/**
* Fetches missing posts
*
* @param FetchQueue $fetchQueue
* @param string $url message URL
* @param array $child activity array with the child of this message
* @param string $relay_actor Relay actor
@ -1124,7 +1197,7 @@ class Processor
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
public static function fetchMissingActivity(FetchQueue $fetchQueue, string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
{
if (!empty($child['receiver'])) {
$uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
@ -1134,7 +1207,7 @@ class Processor
$object = ActivityPub::fetchContent($url, $uid);
if (empty($object)) {
Logger::notice('Activity was not fetchable, aborting.', ['url' => $url]);
Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]);
return '';
}
@ -1143,20 +1216,27 @@ class Processor
return '';
}
$signer = [];
if (!empty($object['attributedTo'])) {
$attributed_to = $object['attributedTo'];
if (is_array($attributed_to)) {
$compacted = JsonLD::compact($object);
$attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
}
$signer[] = $attributed_to;
}
if (!empty($object['actor'])) {
$object_actor = $object['actor'];
} elseif (!empty($object['attributedTo'])) {
$object_actor = $object['attributedTo'];
if (is_array($object_actor)) {
$compacted = JsonLD::compact($object);
$object_actor = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
}
} elseif (!empty($attributed_to)) {
$object_actor = $attributed_to;
} else {
// Shouldn't happen
$object_actor = '';
}
$signer = [$object_actor];
$signer[] = $object_actor;
if (!empty($child['author'])) {
$actor = $child['author'];
@ -1186,6 +1266,8 @@ class Processor
$ldactivity = JsonLD::compact($activity);
$ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1;
if (!empty($relay_actor)) {
$ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);
$ldactivity['completion-mode'] = Receiver::COMPLETION_RELAY;
@ -1205,7 +1287,7 @@ class Processor
return '';
}
ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, json_encode($activity), $uid, true, false, $signer);
ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer);
Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]);
@ -1303,8 +1385,8 @@ class Processor
if (empty($contact)) {
Contact::update(['hub-verify' => $activity['id'], 'protocol' => Protocol::ACTIVITYPUB], ['id' => $cid]);
}
Logger::notice('Follow user ' . $uid . ' from contact ' . $cid . ' with id ' . $activity['id']);
Queue::remove($activity);
}
/**
@ -1348,6 +1430,7 @@ class Processor
Logger::info('Updating profile', ['object' => $activity['object_id']]);
Contact::updateFromProbeByURL($activity['object_id']);
Queue::remove($activity);
}
/**
@ -1376,6 +1459,7 @@ class Processor
DBA::close($contacts);
Logger::info('Deleted contact', ['object' => $activity['object_id']]);
Queue::remove($activity);
}
/**
@ -1400,6 +1484,7 @@ class Processor
Contact\User::setIsBlocked($cid, $uid, true);
Logger::info('Contact blocked user', ['contact' => $cid, 'user' => $uid]);
Queue::remove($activity);
}
/**
@ -1424,6 +1509,7 @@ class Processor
Contact\User::setIsBlocked($cid, $uid, false);
Logger::info('Contact unblocked user', ['contact' => $cid, 'user' => $uid]);
Queue::remove($activity);
}
/**
@ -1458,6 +1544,7 @@ class Processor
$condition = ['id' => $cid];
Contact::update($fields, $condition);
Logger::info('Accept contact request', ['contact' => $cid, 'user' => $uid]);
Queue::remove($activity);
}
/**
@ -1491,6 +1578,7 @@ class Processor
} else {
Logger::info('Rejected contact request', ['contact' => $cid, 'user' => $uid]);
}
Queue::remove($activity);
}
/**
@ -1516,6 +1604,7 @@ class Processor
}
Item::markForDeletion(['uri' => $activity['object_id'], 'author-id' => $author_id, 'gravity' => GRAVITY_ACTIVITY]);
Queue::remove($activity);
}
/**
@ -1552,6 +1641,7 @@ class Processor
Contact::removeFollower($contact);
Logger::info('Undo following request', ['contact' => $cid, 'user' => $uid]);
Queue::remove($activity);
}
/**

View file

@ -0,0 +1,251 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Protocol\ActivityPub;
use Friendica\Core\Logger;
use Friendica\Database\Database;
use Friendica\Database\DBA;
use Friendica\DI;
use Friendica\Util\DateTimeFormat;
/**
* This class handles the processing of incoming posts
*/
class Queue
{
/**
* Add activity to the queue
*
* @param array $activity
* @param string $type
* @param integer $uid
* @param string $http_signer
* @param boolean $push
* @return array
*/
public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push, bool $trust_source): array
{
$fields = [
'activity-id' => $activity['id'],
'object-id' => $activity['object_id'],
'type' => $type,
'object-type' => $activity['object_type'],
'activity' => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
'received' => DateTimeFormat::utcNow(),
'push' => $push,
'trust' => $trust_source,
];
if (!empty($activity['reply-to-id'])) {
$fields['in-reply-to-id'] = $activity['reply-to-id'];
}
if (!empty($activity['context'])) {
$fields['conversation'] = $activity['context'];
} elseif (!empty($activity['conversation'])) {
$fields['conversation'] = $activity['conversation'];
}
if (!empty($activity['object_object_type'])) {
$fields['object-object-type'] = $activity['object_object_type'];
}
if (!empty($http_signer)) {
$fields['signer'] = $http_signer;
}
DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
$queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]);
if (!empty($queue['id'])) {
$activity['entry-id'] = $queue['id'];
DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
}
return $activity;
}
/**
* Remove activity from the queue
*
* @param array $activity
* @return void
*/
public static function remove(array $activity = [])
{
if (empty($activity['entry-id'])) {
return;
}
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
}
/**
* Delete all entries that depend on the given worker id
*
* @param integer $wid
* @return void
*/
public static function deleteByWorkerId(int $wid)
{
$entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]);
while ($entry = DBA::fetch($entries)) {
self::deleteById($entry['id']);
}
DBA::close($entries);
}
/**
* Delete recursively an entry and all their children
*
* @param integer $id
* @return void
*/
public static function deleteById(int $id)
{
$entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]);
if (empty($entry)) {
return;
}
$children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
while ($child = DBA::fetch($children)) {
self::deleteById($child['id']);
}
DBA::close($children);
DBA::delete('inbox-entry', ['id' => $entry['id']]);
}
/**
* Set the worker id for the queue entry
*
* @param array $activity
* @param int $wid
* @return void
*/
public static function setWorkerId(array $activity, int $wid)
{
if (empty($activity['entry-id']) || empty($wid)) {
return;
}
DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]);
}
/**
* Check if there is an assigned worker task
*
* @param array $activity
* @return bool
*/
public static function hasWorker(array $activity = []): bool
{
if (empty($activity['worker-id'])) {
return false;
}
return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]);
}
/**
* Process the activity with the given id
*
* @param integer $id
* @return void
*/
public static function process(int $id)
{
$entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
if (empty($entry)) {
return;
}
Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
$activity = json_decode($entry['activity'], true);
$type = $entry['type'];
$push = $entry['push'];
$activity['entry-id'] = $entry['id'];
$activity['worker-id'] = $entry['wid'];
$activity['recursion-depth'] = 0;
$receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]);
while ($receiver = DBA::fetch($receivers)) {
if (!in_array($receiver['uid'], $activity['receiver'])) {
$activity['receiver'][] = $receiver['uid'];
}
}
DBA::close($receivers);
if (!Receiver::routeActivities($activity, $type, $push)) {
self::remove($activity);
}
}
/**
* Process all activities
*
* @return void
*/
public static function processAll()
{
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`trust` AND `wid` IS NULL"], ['order' => ['id' => true]]);
while ($entry = DBA::fetch($entries)) {
// We don't need to process entries that depend on already existing entries.
if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ["`id` != ? AND `object-id` = ?", $entry['id'], $entry['in-reply-to-id']])) {
continue;
}
Logger::debug('Process leftover entry', $entry);
self::process($entry['id']);
}
}
/**
* Clear old activities
*
* @return void
*/
public static function clear()
{
// We delete all entries that aren't associated with a worker entry after seven days.
// The other entries are deleted when the worker deferred for too long.
DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {
Logger::info('Optimize start');
DBA::e("OPTIMIZE TABLE `inbox-entry`");
Logger::info('Optimize end');
}
}
/**
* Process all activities that are children of a given post url
*
* @param string $uri
* @return void
*/
public static function processReplyByUri(string $uri)
{
$entries = DBA::select('inbox-entry', ['id'], ["`in-reply-to-id` = ? AND `object-id` != ?", $uri, $uri]);
while ($entry = DBA::fetch($entries)) {
self::process($entry['id']);
}
}
}

View file

@ -97,12 +97,13 @@ class Receiver
$ldactivity = JsonLD::compact($activity);
$actor = JsonLD::fetchElement($ldactivity, 'as:actor', '@id') ?? '';
$apcontact = APContact::getByURL($actor);
if (empty($apcontact)) {
Logger::notice('Unable to retrieve AP contact for actor - message is discarded', ['actor' => $actor]);
return;
} elseif ($apcontact['type'] == 'Application' && $apcontact['nick'] == 'relay') {
} elseif (APContact::isRelay($apcontact)) {
self::processRelayPost($ldactivity, $actor);
return;
} else {
@ -152,9 +153,7 @@ class Receiver
$trust_source = false;
}
$fetchQueue = new FetchQueue();
self::processActivity($fetchQueue, $ldactivity, $body, $uid, $trust_source, true, $signer);
$fetchQueue->process();
self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
}
/**
@ -168,18 +167,18 @@ class Receiver
{
$type = JsonLD::fetchElement($activity, '@type');
if (!$type) {
Logger::info('Empty type', ['activity' => $activity]);
Logger::info('Empty type', ['activity' => $activity, 'actor' => $actor]);
return;
}
if ($type != 'as:Announce') {
Logger::info('Not an announcement', ['activity' => $activity]);
Logger::info('Not an announcement', ['activity' => $activity, 'actor' => $actor]);
return;
}
$object_id = JsonLD::fetchElement($activity, 'as:object', '@id');
if (empty($object_id)) {
Logger::info('No object id found', ['activity' => $activity]);
Logger::info('No object id found', ['activity' => $activity, 'actor' => $actor]);
return;
}
@ -194,29 +193,25 @@ class Receiver
return;
}
Logger::info('Got relayed message id', ['id' => $object_id]);
Logger::info('Got relayed message id', ['id' => $object_id, 'actor' => $actor]);
$item_id = Item::searchByLink($object_id);
if ($item_id) {
Logger::info('Relayed message already exists', ['id' => $object_id, 'item' => $item_id]);
Logger::info('Relayed message already exists', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]);
return;
}
$fetchQueue = new FetchQueue();
$id = Processor::fetchMissingActivity($fetchQueue, $object_id, [], $actor, self::COMPLETION_RELAY);
$id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
if (empty($id)) {
Logger::notice('Relayed message had not been fetched', ['id' => $object_id]);
Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]);
return;
}
$fetchQueue->process();
$item_id = Item::searchByLink($object_id);
if ($item_id) {
Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id]);
Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]);
} else {
Logger::notice('Relayed message had not been stored', ['id' => $object_id]);
Logger::notice('Relayed message had not been stored', ['id' => $object_id, 'actor' => $actor]);
}
}
@ -278,6 +273,8 @@ class Receiver
public static function prepareObjectData(array $activity, int $uid, bool $push, bool &$trust_source): array
{
$id = JsonLD::fetchElement($activity, '@id');
$object_id = JsonLD::fetchElement($activity, 'as:object', '@id');
if (!empty($id) && !$trust_source) {
$fetch_uid = $uid ?: self::getBestUserForActivity($activity);
@ -288,7 +285,13 @@ class Receiver
if ($fetched_id == $id) {
Logger::info('Activity had been fetched successfully', ['id' => $id]);
$trust_source = true;
$activity = $object;
if ($id != $object_id) {
$activity = $object;
} else {
Logger::info('Fetched data is the object instead of the activity', ['id' => $id]);
unset($object['@context']);
$activity['as:object'] = $object;
}
} else {
Logger::info('Activity id is not equal', ['id' => $id, 'fetched' => $fetched_id]);
}
@ -442,17 +445,18 @@ class Receiver
$object_data['receiver'] = array_replace($object_data['receiver'] ?? [], $receivers);
$object_data['reception_type'] = array_replace($object_data['reception_type'] ?? [], $reception_types);
$author = $object_data['author'] ?? $actor;
if (!empty($author) && !empty($object_data['id'])) {
$author_host = parse_url($author, PHP_URL_HOST);
$id_host = parse_url($object_data['id'], PHP_URL_HOST);
if ($author_host == $id_host) {
Logger::info('Valid hosts', ['type' => $type, 'host' => $id_host]);
} else {
Logger::notice('Differing hosts on author and id', ['type' => $type, 'author' => $author_host, 'id' => $id_host]);
$trust_source = false;
}
}
// This check here interferes with Hubzilla posts where the author host differs from the host the post was created
// $author = $object_data['author'] ?? $actor;
// if (!empty($author) && !empty($object_data['id'])) {
// $author_host = parse_url($author, PHP_URL_HOST);
// $id_host = parse_url($object_data['id'], PHP_URL_HOST);
// if ($author_host == $id_host) {
// Logger::info('Valid hosts', ['type' => $type, 'host' => $id_host]);
// } else {
// Logger::notice('Differing hosts on author and id', ['type' => $type, 'author' => $author_host, 'id' => $id_host]);
// $trust_source = false;
// }
// }
Logger::info('Processing ' . $object_data['type'] . ' ' . $object_data['object_type'] . ' ' . $object_data['id']);
@ -478,7 +482,6 @@ class Receiver
/**
* Processes the activity object
*
* @param FetchQueue $fetchQueue
* @param array $activity Array with activity data
* @param string $body The unprocessed body
* @param int|null $uid User ID
@ -488,7 +491,7 @@ class Receiver
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
public static function processActivity(FetchQueue $fetchQueue, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [])
public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '')
{
$type = JsonLD::fetchElement($activity, '@type');
if (!$type) {
@ -535,11 +538,6 @@ class Receiver
$type = $object_data['type'];
}
if (!$trust_source) {
Logger::info('Activity trust could not be achieved.', ['id' => $object_data['object_id'], 'type' => $type, 'signer' => $signer, 'actor' => $actor, 'attributedTo' => $attributed_to]);
return;
}
if (!empty($body) && empty($object_data['raw'])) {
$object_data['raw'] = $body;
}
@ -562,28 +560,65 @@ class Receiver
$object_data['from-relay'] = $activity['from-relay'];
}
if ($type == 'as:Announce') {
$object_data['object_activity'] = $activity;
}
if ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted')) {
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
}
if (!$trust_source) {
Logger::info('Activity trust could not be achieved.', ['id' => $object_data['object_id'], 'type' => $type, 'signer' => $signer, 'actor' => $actor, 'attributedTo' => $attributed_to]);
return;
}
if (!empty($activity['recursion-depth'])) {
$object_data['recursion-depth'] = $activity['recursion-depth'];
}
if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) {
self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
}
if (!self::routeActivities($object_data, $type, $push)) {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
Queue::remove($object_data);
}
}
/**
* Route activities
*
* @param array $object_data
* @param string $type
* @param boolean $push
*
* @return boolean Could the activity be routed?
*/
public static function routeActivities(array $object_data, string $type, bool $push): bool
{
$activity = $object_data['object_activity'] ?? [];
switch ($type) {
case 'as:Create':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
$item = ActivityPub\Processor::createItem($object_data);
ActivityPub\Processor::postItem($object_data, $item);
} elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:Invite':
if (in_array($object_data['object_type'], ['as:Event'])) {
$item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
$item = ActivityPub\Processor::createItem($object_data);
ActivityPub\Processor::postItem($object_data, $item);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -594,78 +629,84 @@ class Receiver
ActivityPub\Processor::addToFeaturedCollection($object_data);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:Announce':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$actor = JsonLD::fetchElement($activity, 'as:actor', '@id');
$object_data['thread-completion'] = Contact::getIdForURL($actor);
$object_data['completion-mode'] = self::COMPLETION_ANNOUCE;
$item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
$item = ActivityPub\Processor::createItem($object_data);
if (empty($item)) {
return;
return false;
}
$item['post-reason'] = Item::PR_ANNOUNCEMENT;
ActivityPub\Processor::postItem($object_data, $item);
$announce_object_data = self::processObject($activity);
$announce_object_data['name'] = $type;
$announce_object_data['author'] = JsonLD::fetchElement($activity, 'as:actor', '@id');
$announce_object_data['object_id'] = $object_data['object_id'];
$announce_object_data['object_type'] = $object_data['object_type'];
$announce_object_data['push'] = $push;
if (!empty($activity)) {
$announce_object_data = self::processObject($activity);
$announce_object_data['name'] = $type;
$announce_object_data['author'] = $actor;
$announce_object_data['object_id'] = $object_data['object_id'];
$announce_object_data['object_type'] = $object_data['object_type'];
$announce_object_data['push'] = $push;
if (!empty($body)) {
$announce_object_data['raw'] = $body;
if (!empty($object_data['raw'])) {
$announce_object_data['raw'] = $object_data['raw'];
}
ActivityPub\Processor::createActivity($announce_object_data, Activity::ANNOUNCE);
}
ActivityPub\Processor::createActivity($fetchQueue, $announce_object_data, Activity::ANNOUNCE);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:Like':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::LIKE);
ActivityPub\Processor::createActivity($object_data, Activity::LIKE);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:Dislike':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::DISLIKE);
ActivityPub\Processor::createActivity($object_data, Activity::DISLIKE);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:TentativeAccept':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDMAYBE);
ActivityPub\Processor::createActivity($object_data, Activity::ATTENDMAYBE);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:Update':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::updateItem($fetchQueue, $object_data);
ActivityPub\Processor::updateItem($object_data);
} elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
ActivityPub\Processor::updatePerson($object_data);
} elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -676,8 +717,9 @@ class Receiver
ActivityPub\Processor::deletePerson($object_data);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -685,17 +727,18 @@ class Receiver
if (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
ActivityPub\Processor::blockAccount($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:Remove':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::removeFromFeaturedCollection($object_data);
ActivityPub\Processor::removeFromFeaturedCollection($object_data);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -704,9 +747,9 @@ class Receiver
ActivityPub\Processor::followUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$object_data['reply-to-id'] = $object_data['object_id'];
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::FOLLOW);
ActivityPub\Processor::createActivity($object_data, Activity::FOLLOW);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -714,9 +757,9 @@ class Receiver
if ($object_data['object_type'] == 'as:Follow') {
ActivityPub\Processor::acceptFollowUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTEND);
ActivityPub\Processor::createActivity($object_data, Activity::ATTEND);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -724,9 +767,9 @@ class Receiver
if ($object_data['object_type'] == 'as:Follow') {
ActivityPub\Processor::rejectFollowUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDNO);
ActivityPub\Processor::createActivity($object_data, Activity::ATTENDNO);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
@ -749,39 +792,43 @@ class Receiver
} elseif (in_array($object_data['object_type'], array_merge(self::ACTIVITY_TYPES, ['as:Announce', 'as:Create', ''])) &&
empty($object_data['object_object_type'])) {
// We cannot detect the target object. So we can ignore it.
Queue::remove($object_data);
} elseif (in_array($object_data['object_type'], ['as:Create']) &&
in_array($object_data['object_object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'as:View':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::VIEW);
ActivityPub\Processor::createActivity($object_data, Activity::VIEW);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
case 'litepub:EmojiReact':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::EMOJIREACT);
ActivityPub\Processor::createActivity($object_data, Activity::EMOJIREACT);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
return false;
}
break;
default:
Logger::info('Unknown activity: ' . $type . ' ' . $object_data['object_type']);
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
break;
return false;
}
return true;
}
/**
@ -805,7 +852,7 @@ class Receiver
}
$file = ($unknown ? 'unknown-' : 'unhandled-') . str_replace(':', '-', $type) . '-';
if (!empty($object_data['object_type'])) {
$file .= str_replace(':', '-', $object_data['object_type']) . '-';
}
@ -1793,29 +1840,6 @@ class Receiver
unset($object_data['receiver'][-1]);
unset($object_data['reception_type'][-1]);
// Common object data:
// Unhandled
// @context, type, actor, signature, mediaType, duration, replies, icon
// Also missing: (Defined in the standard, but currently unused)
// audience, preview, endTime, startTime, image
// Data in Notes:
// Unhandled
// contentMap, announcement_count, announcements, context_id, likes, like_count
// inReplyToStatusId, shares, quoteUrl, statusnetConversationId
// Data in video:
// To-Do?
// category, licence, language, commentsEnabled
// Unhandled
// views, waitTranscoding, state, support, subtitleLanguage
// likes, dislikes, shares, comments
return $object_data;
}
}

View file

@ -1515,26 +1515,6 @@ class Transmitter
return $body;
}
/**
* Fetches the "context" value for a givem item array from the "conversation" table
*
* @param array $item Item array
* @return string with context url
* @throws \Exception
*/
private static function fetchContextURLForItem(array $item): string
{
$conversation = DBA::selectFirst('conversation', ['conversation-href', 'conversation-uri'], ['item-uri' => $item['parent-uri']]);
if (DBA::isResult($conversation) && !empty($conversation['conversation-href'])) {
$context_uri = $conversation['conversation-href'];
} elseif (DBA::isResult($conversation) && !empty($conversation['conversation-uri'])) {
$context_uri = $conversation['conversation-uri'];
} else {
$context_uri = $item['parent-uri'] . '#context';
}
return $context_uri;
}
/**
* Returns if the post contains sensitive content ("nsfw")
*
@ -1646,7 +1626,7 @@ class Transmitter
$data['url'] = $link ?? $item['plink'];
$data['attributedTo'] = $item['author-link'];
$data['sensitive'] = self::isSensitive($item['uri-id']);
$data['context'] = self::fetchContextURLForItem($item);
$data['conversation'] = $data['context'] = $item['conversation'];
if (!empty($item['title'])) {
$data['name'] = BBCode::toPlaintext($item['title'], false);

View file

@ -177,6 +177,13 @@ class JsonLD
}
}
// Bookwyrm transmits "id" fields with "null", which isn't allowed.
array_walk_recursive($json, function (&$value, $key) {
if ($key == 'id' && is_null($value)) {
$value = '';
}
});
$jsonobj = json_decode(json_encode($json, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE));
try {

View file

@ -27,6 +27,7 @@ use Friendica\Core\Worker;
use Friendica\Database\DBA;
use Friendica\DI;
use Friendica\Model\Tag;
use Friendica\Protocol\ActivityPub\Queue;
use Friendica\Protocol\Relay;
class Cron
@ -88,6 +89,12 @@ class Cron
Tag::setLocalTrendingHashtags(24, 20);
Tag::setGlobalTrendingHashtags(24, 20);
// Remove old pending posts from the queue
Queue::clear();
// Process all unprocessed entries
Queue::processAll();
// Search for new contacts in the directory
if (DI::config()->get('system', 'synchronize_directory')) {
Worker::add(PRIORITY_LOW, 'PullDirectory');
@ -124,11 +131,11 @@ class Cron
if (DI::config()->get('system', 'optimize_tables')) {
Worker::add(PRIORITY_LOW, 'OptimizeTables');
}
DI::config()->set('system', 'last_cron_daily', time());
// Resubscribe to relay servers
Relay::reSubscribe();
DI::config()->set('system', 'last_cron_daily', time());
}
Logger::notice('end');

View file

@ -183,6 +183,7 @@ class ExpirePosts
AND NOT EXISTS(SELECT `parent-uri-id` FROM `post-user` WHERE `parent-uri-id` = `item-uri`.`id`)
AND NOT EXISTS(SELECT `thr-parent-id` FROM `post-user` WHERE `thr-parent-id` = `item-uri`.`id`)
AND NOT EXISTS(SELECT `external-id` FROM `post-user` WHERE `external-id` = `item-uri`.`id`)
AND NOT EXISTS(SELECT `conversation-id` FROM `post-thread` WHERE `conversation-id` = `item-uri`.`id`)
AND NOT EXISTS(SELECT `uri-id` FROM `mail` WHERE `uri-id` = `item-uri`.`id`)
AND NOT EXISTS(SELECT `uri-id` FROM `event` WHERE `uri-id` = `item-uri`.`id`)
AND NOT EXISTS(SELECT `uri-id` FROM `user-contact` WHERE `uri-id` = `item-uri`.`id`)

View file

@ -0,0 +1,57 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Worker;
use Friendica\Core\Logger;
use Friendica\Core\Worker;
use Friendica\DI;
use Friendica\Protocol\ActivityPub;
use Friendica\Protocol\ActivityPub\Queue;
use Friendica\Protocol\ActivityPub\Receiver;
class FetchMissingActivity
{
/**
* Fetch missing activities
* @param string $url Contact URL
*
* @return void
*/
public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL)
{
Logger::info('Start fetching missing activity', ['url' => $url]);
$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
if ($result) {
Logger::info('Successfully fetched missing activity', ['url' => $url]);
} elseif (!Worker::defer()) {
Logger::info('Activity could not be fetched', ['url' => $url]);
// recursively delete all entries that belong to this worker task
$queue = DI::app()->getQueue();
if (!empty($queue['id'])) {
Queue::deleteByWorkerId($queue['id']);
}
} else {
Logger::info('Fetching deferred', ['url' => $url]);
}
}
}

View file

@ -55,7 +55,7 @@
use Friendica\Database\DBA;
if (!defined('DB_UPDATE_VERSION')) {
define('DB_UPDATE_VERSION', 1473);
define('DB_UPDATE_VERSION', 1474);
}
return [
@ -784,6 +784,42 @@ return [
"hook_file_function" => ["UNIQUE", "hook", "file", "function"],
]
],
"inbox-entry" => [
"comment" => "Incoming activity",
"fields" => [
"id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"],
"activity-id" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"],
"object-id" => ["type" => "varbinary(255)", "comment" => ""],
"in-reply-to-id" => ["type" => "varbinary(255)", "comment" => ""],
"conversation" => ["type" => "varbinary(255)", "comment" => ""],
"type" => ["type" => "varchar(64)", "comment" => "Type of the activity"],
"object-type" => ["type" => "varchar(64)", "comment" => "Type of the object activity"],
"object-object-type" => ["type" => "varchar(64)", "comment" => "Type of the object's object activity"],
"received" => ["type" => "datetime", "comment" => "Receiving date"],
"activity" => ["type" => "mediumtext", "comment" => "The JSON activity"],
"signer" => ["type" => "varchar(255)", "comment" => ""],
"push" => ["type" => "boolean", "comment" => "Is the entry pushed or have pulled it?"],
"trust" => ["type" => "boolean", "comment" => "Do we trust this entry?"],
"wid" => ["type" => "int unsigned", "foreign" => ["workerqueue" => "id"], "comment" => "Workerqueue id"], ],
"indexes" => [
"PRIMARY" => ["id"],
"activity-id" => ["UNIQUE", "activity-id"],
"object-id" => ["object-id"],
"received" => ["received"],
"wid" => ["wid"],
]
],
"inbox-entry-receiver" => [
"comment" => "Receiver for the incoming activity",
"fields" => [
"queue-id" => ["type" => "int unsigned", "not null" => "1", "primary" => "1", "foreign" => ["inbox-entry" => "id"], "comment" => ""],
"uid" => ["type" => "mediumint unsigned", "not null" => "1", "primary" => "1", "foreign" => ["user" => "uid"], "comment" => "User id"],
],
"indexes" => [
"PRIMARY" => ["queue-id", "uid"],
"uid" => ["uid"],
]
],
"inbox-status" => [
"comment" => "Status of ActivityPub inboxes",
"fields" => [
@ -1321,6 +1357,7 @@ return [
"comment" => "Thread related data",
"fields" => [
"uri-id" => ["type" => "int unsigned", "not null" => "1", "primary" => "1", "foreign" => ["item-uri" => "id"], "comment" => "Id of the item-uri table entry that contains the item uri"],
"conversation-id" => ["type" => "int unsigned", "foreign" => ["item-uri" => "id"], "comment" => "Id of the item-uri table entry that contains the conversation uri"],
"owner-id" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "foreign" => ["contact" => "id", "on delete" => "restrict"], "comment" => "Item owner"],
"author-id" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "foreign" => ["contact" => "id", "on delete" => "restrict"], "comment" => "Item author"],
"causer-id" => ["type" => "int unsigned", "foreign" => ["contact" => "id", "on delete" => "restrict"], "comment" => "Link to the contact table with uid=0 of the contact that caused the item creation"],
@ -1332,6 +1369,7 @@ return [
],
"indexes" => [
"PRIMARY" => ["uri-id"],
"conversation-id" => ["conversation-id"],
"owner-id" => ["owner-id"],
"author-id" => ["author-id"],
"causer-id" => ["causer-id"],
@ -1400,6 +1438,7 @@ return [
"comment" => "Thread related data per user",
"fields" => [
"uri-id" => ["type" => "int unsigned", "not null" => "1", "primary" => "1", "foreign" => ["item-uri" => "id"], "comment" => "Id of the item-uri table entry that contains the item uri"],
"conversation-id" => ["type" => "int unsigned", "foreign" => ["item-uri" => "id"], "comment" => "Id of the item-uri table entry that contains the conversation uri"],
"owner-id" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "foreign" => ["contact" => "id", "on delete" => "restrict"], "comment" => "Item owner"],
"author-id" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "foreign" => ["contact" => "id", "on delete" => "restrict"], "comment" => "Item author"],
"causer-id" => ["type" => "int unsigned", "foreign" => ["contact" => "id", "on delete" => "restrict"], "comment" => "Link to the contact table with uid=0 of the contact that caused the item creation"],
@ -1426,6 +1465,7 @@ return [
"indexes" => [
"PRIMARY" => ["uid", "uri-id"],
"uri-id" => ["uri-id"],
"conversation-id" => ["conversation-id"],
"owner-id" => ["owner-id"],
"author-id" => ["author-id"],
"causer-id" => ["causer-id"],

View file

@ -70,6 +70,8 @@
"parent-uri-id" => ["post-user", "parent-uri-id"],
"thr-parent" => ["thr-parent-item-uri", "uri"],
"thr-parent-id" => ["post-user", "thr-parent-id"],
"conversation" => ["conversation-item-uri", "uri"],
"conversation-id" => ["post-thread-user", "conversation-id"],
"guid" => ["item-uri", "guid"],
"wall" => ["post-user", "wall"],
"gravity" => ["post-user", "gravity"],
@ -220,6 +222,7 @@
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post-user`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post-user`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post-user`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread-user`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post-user`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post-user`.`vid`
LEFT JOIN `event` ON `event`.`id` = `post-user`.`event-id`
@ -243,6 +246,8 @@
"parent-uri-id" => ["post-user", "parent-uri-id"],
"thr-parent" => ["thr-parent-item-uri", "uri"],
"thr-parent-id" => ["post-user", "thr-parent-id"],
"conversation" => ["conversation-item-uri", "uri"],
"conversation-id" => ["post-thread-user", "conversation-id"],
"guid" => ["item-uri", "guid"],
"wall" => ["post-thread-user", "wall"],
"gravity" => ["post-user", "gravity"],
@ -392,6 +397,7 @@
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post-thread-user`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post-user`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post-user`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread-user`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post-user`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post-user`.`vid`
LEFT JOIN `event` ON `event`.`id` = `post-user`.`event-id`
@ -411,6 +417,8 @@
"parent-uri-id" => ["post", "parent-uri-id"],
"thr-parent" => ["thr-parent-item-uri", "uri"],
"thr-parent-id" => ["post", "thr-parent-id"],
"conversation" => ["conversation-item-uri", "uri"],
"conversation-id" => ["post-thread", "conversation-id"],
"guid" => ["item-uri", "guid"],
"gravity" => ["post", "gravity"],
"extid" => ["external-item-uri", "uri"],
@ -530,6 +538,7 @@
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post`.`vid`
LEFT JOIN `diaspora-interaction` ON `diaspora-interaction`.`uri-id` = `post`.`uri-id`
@ -546,6 +555,8 @@
"parent-uri-id" => ["post", "parent-uri-id"],
"thr-parent" => ["thr-parent-item-uri", "uri"],
"thr-parent-id" => ["post", "thr-parent-id"],
"conversation" => ["conversation-item-uri", "uri"],
"conversation-id" => ["post-thread", "conversation-id"],
"guid" => ["item-uri", "guid"],
"gravity" => ["post", "gravity"],
"extid" => ["external-item-uri", "uri"],
@ -665,6 +676,7 @@
LEFT JOIN `item-uri` ON `item-uri`.`id` = `post-thread`.`uri-id`
LEFT JOIN `item-uri` AS `thr-parent-item-uri` ON `thr-parent-item-uri`.`id` = `post`.`thr-parent-id`
LEFT JOIN `item-uri` AS `parent-item-uri` ON `parent-item-uri`.`id` = `post`.`parent-uri-id`
LEFT JOIN `item-uri` AS `conversation-item-uri` ON `conversation-item-uri`.`id` = `post-thread`.`conversation-id`
LEFT JOIN `item-uri` AS `external-item-uri` ON `external-item-uri`.`id` = `post`.`external-id`
LEFT JOIN `verb` ON `verb`.`id` = `post`.`vid`
LEFT JOIN `diaspora-interaction` ON `diaspora-interaction`.`uri-id` = `post-thread`.`uri-id`

View file

@ -666,6 +666,10 @@ return [
// Logs every call to /inbox as a JSON file in Friendica's temporary directory
'ap_inbox_log' => false,
// ap_inbox_store_untrusted (Boolean)
// Store untrusted content in the inbox entries
'ap_inbox_store_untrusted' => false,
// total_ap_delivery (Boolean)
// Deliver via AP to every possible receiver and we suppress the delivery to these contacts with other protocols
'total_ap_delivery' => false,