Jetstream: Only complete threads when the drift allows it

This commit is contained in:
Michael 2025-06-12 02:42:18 +00:00
commit 13ade6a4e7
2 changed files with 37 additions and 14 deletions

View file

@ -38,6 +38,23 @@ use stdClass;
*/
class Jetstream
{
/**
* Maximum drift values in seconds for the threads completion.
* If the drift is higher than this value, only a few posts in a thread will be fetched.
*/
const MAX_DRIFT_THREAD_COMPLETION = 30;
/**
* Maximum drift values in seconds for the DID cap.
* If the drift is higher than this value, the number of DIDs will be capped.
*/
const MAX_DRIFT_DID_CAP = 60;
/**
* Maximum drift values in seconds for creating posts.
* If the drift is higher than this value, posts and reshares will not be created.
* The other collections will still be processed.
*/
const MAX_DRIFT_CREATE_POSTS = 1200;
private $uids = [];
private $self = [];
private $capped = false;
@ -362,7 +379,7 @@ class Jetstream
$drift = max(0, round(time() - $data->time_us / 1000000));
$this->keyValue->set('jetstream_drift', $drift);
if ($drift > 60 && !$this->capped) {
if ($drift > self::MAX_DRIFT_DID_CAP && !$this->capped) {
$this->capped = true;
$this->setOptions();
$this->logger->notice('Drift is too high, dids will be capped');
@ -389,7 +406,9 @@ class Jetstream
break;
case 'create':
$this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > 30));
if ($drift < self::MAX_DRIFT_CREATE_POSTS) {
$this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION));
}
break;
default:
@ -413,7 +432,9 @@ class Jetstream
break;
case 'create':
$this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > 30));
if ($drift < self::MAX_DRIFT_CREATE_POSTS) {
$this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION));
}
break;
default:

View file

@ -151,7 +151,7 @@ class Processor
if (!empty($root)) {
$item['parent-uri'] = $root;
$item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, false, Conversation::PARCEL_JETSTREAM);
$item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, !$dont_fetch, Conversation::PARCEL_JETSTREAM);
$item['gravity'] = Item::GRAVITY_COMMENT;
} else {
$item['gravity'] = Item::GRAVITY_PARENT;
@ -201,7 +201,7 @@ class Processor
$item['gravity'] = Item::GRAVITY_ACTIVITY;
$item['body'] = $item['verb'] = Activity::ANNOUNCE;
$item['thr-parent'] = $this->getUri($data->commit->record->subject);
$item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], false, Conversation::PARCEL_JETSTREAM);
$item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], !$dont_fetch, Conversation::PARCEL_JETSTREAM);
$id = Item::insert($item);
@ -694,14 +694,14 @@ class Processor
return $restrict ? Item::CANT_REPLY : null;
}
public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $always_fetch = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $complete_thread = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
{
$timestamp = microtime(true);
$stamp = Strings::getRandomHex(30);
$this->logger->debug('Fetch missing post', ['uri' => $uri, 'stamp' => $stamp]);
$fetched_uri = $this->getPostUri($uri, $uid);
if (!$always_fetch && !empty($fetched_uri)) {
if (!$complete_thread && !empty($fetched_uri)) {
return $fetched_uri;
}
@ -735,13 +735,13 @@ class Processor
$causer = Contact::getPublicContactId($causer, $uid);
}
if (!empty($data->thread->parent)) {
if (!empty($data->thread->parent) && $complete_thread) {
$parents = $this->fetchParents($data->thread->parent, $uid);
if (!empty($parents)) {
if ($data->thread->post->record->reply->root->uri != $parents[0]->uri) {
$parent_uri = $this->getUri($data->thread->post->record->reply->root);
$this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, false, $Protocol);
$this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, $complete_thread, $Protocol);
}
}
@ -751,7 +751,7 @@ class Processor
}
}
$uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol);
$uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol, $complete_thread);
if (microtime(true) - $timestamp > 2) {
$this->logger->debug('Fetched and processed post', ['duration' => round(microtime(true) - $timestamp, 3), 'uri' => $uri, 'stamp' => $stamp]);
}
@ -771,7 +771,7 @@ class Processor
return $parents;
}
private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol): string
private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol, bool $complete_thread): string
{
if (empty($thread->post)) {
$this->logger->info('Invalid post', ['post' => $thread]);
@ -794,9 +794,11 @@ class Processor
$uri = $fetched_uri;
}
foreach ($thread->replies ?? [] as $reply) {
$reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol);
$this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]);
if ($complete_thread) {
foreach ($thread->replies ?? [] as $reply) {
$reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol, $complete_thread);
$this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]);
}
}
return $uri;