Enforcing standards ahead of moving App to src
This commit is contained in:
parent
0747ce0e6e
commit
68c1939d12
13 changed files with 1292 additions and 1298 deletions
600
include/sync.php
600
include/sync.php
|
|
@ -7,14 +7,12 @@
|
|||
*/
|
||||
function sync_pull($url)
|
||||
{
|
||||
|
||||
global $a;
|
||||
|
||||
//If we support it that is.
|
||||
if($a->config['syncing']['enable_pulling']){
|
||||
q("INSERT INTO `sync-pull-queue` (`url`) VALUES ('%s')", dbesc($url));
|
||||
}
|
||||
|
||||
global $a;
|
||||
|
||||
//If we support it that is.
|
||||
if ($a->config['syncing']['enable_pulling']) {
|
||||
q("INSERT INTO `sync-pull-queue` (`url`) VALUES ('%s')", dbesc($url));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -24,16 +22,14 @@ function sync_pull($url)
|
|||
*/
|
||||
function sync_push($url)
|
||||
{
|
||||
|
||||
global $a;
|
||||
|
||||
//If we support it that is.
|
||||
if($a->config['syncing']['enable_pushing']){
|
||||
q("INSERT INTO `sync-push-queue` (`url`) VALUES ('%s')", dbesc($url));
|
||||
}
|
||||
|
||||
sync_mark($url);
|
||||
|
||||
global $a;
|
||||
|
||||
//If we support it that is.
|
||||
if ($a->config['syncing']['enable_pushing']) {
|
||||
q("INSERT INTO `sync-push-queue` (`url`) VALUES ('%s')", dbesc($url));
|
||||
}
|
||||
|
||||
sync_mark($url);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -44,21 +40,20 @@ function sync_push($url)
|
|||
*/
|
||||
function sync_mark($url)
|
||||
{
|
||||
|
||||
global $a;
|
||||
|
||||
//If we support it that is.
|
||||
if(!$a->config['syncing']['enable_pulling']){
|
||||
return;
|
||||
}
|
||||
|
||||
$exists = count(q("SELECT * FROM `sync-timestamps` WHERE `url`='%s'", dbesc($url)));
|
||||
|
||||
if(!$exists)
|
||||
q("INSERT INTO `sync-timestamps` (`url`, `modified`) VALUES ('%s', NOW())", dbesc($url));
|
||||
else
|
||||
q("UPDATE `sync-timestamps` SET `modified`=NOW() WHERE `url`='%s'", dbesc($url));
|
||||
|
||||
global $a;
|
||||
|
||||
//If we support it that is.
|
||||
if (!$a->config['syncing']['enable_pulling']) {
|
||||
return;
|
||||
}
|
||||
|
||||
$exists = count(q("SELECT * FROM `sync-timestamps` WHERE `url`='%s'", dbesc($url)));
|
||||
|
||||
if (!$exists) {
|
||||
q("INSERT INTO `sync-timestamps` (`url`, `modified`) VALUES ('%s', NOW())", dbesc($url));
|
||||
} else {
|
||||
q("UPDATE `sync-timestamps` SET `modified`=NOW() WHERE `url`='%s'", dbesc($url));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -70,27 +65,26 @@ function sync_mark($url)
|
|||
*/
|
||||
function push_worker($target, $batch)
|
||||
{
|
||||
|
||||
//Lets be nice, we're only doing a background job here...
|
||||
pcntl_setpriority(5);
|
||||
|
||||
//Find our target's submit URL.
|
||||
$submit = $target['base_url'].'/submit';
|
||||
|
||||
foreach($batch as $item){
|
||||
set_time_limit(30); //This should work for 1 submit.
|
||||
msg("Submitting {$item['url']} to $submit");
|
||||
fetch_url($submit.'?url='.bin2hex($item['url']));
|
||||
}
|
||||
|
||||
//Lets be nice, we're only doing a background job here...
|
||||
pcntl_setpriority(5);
|
||||
|
||||
//Find our target's submit URL.
|
||||
$submit = $target['base_url'] . '/submit';
|
||||
|
||||
foreach ($batch as $item) {
|
||||
set_time_limit(30); //This should work for 1 submit.
|
||||
msg("Submitting {$item['url']} to $submit");
|
||||
fetch_url($submit . '?url=' . bin2hex($item['url']));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an array of push targets.
|
||||
* @return array Push targets.
|
||||
*/
|
||||
function get_push_targets(){
|
||||
return q("SELECT * FROM `sync-targets` WHERE `push`=b'1'");
|
||||
function get_push_targets()
|
||||
{
|
||||
return q("SELECT * FROM `sync-targets` WHERE `push`=b'1'");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,8 +92,9 @@ function get_push_targets(){
|
|||
* @param object $a The App instance.
|
||||
* @return array Batch of URL's.
|
||||
*/
|
||||
function get_push_batch($a){
|
||||
return q("SELECT * FROM `sync-push-queue` LIMIT %u", intval($a->config['syncing']['max_push_items']));
|
||||
function get_push_batch($a)
|
||||
{
|
||||
return q("SELECT * FROM `sync-push-queue` LIMIT %u", intval($a->config['syncing']['max_push_items']));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -109,35 +104,31 @@ function get_push_batch($a){
|
|||
*/
|
||||
function get_pushing_job($a)
|
||||
{
|
||||
|
||||
//When pushing is requested...
|
||||
if(!!$a->config['syncing']['enable_pushing']){
|
||||
|
||||
//Find our targets.
|
||||
$targets = get_push_targets();
|
||||
|
||||
//No targets?
|
||||
if(!count($targets)){
|
||||
msg('Pushing enabled, but no push targets.');
|
||||
$batch = array();
|
||||
}
|
||||
|
||||
//If we have targets, get our batch.
|
||||
else{
|
||||
$batch = get_push_batch($a);
|
||||
if(!count($batch)) msg('Empty pushing queue.'); //No batch, means no work.
|
||||
}
|
||||
|
||||
}
|
||||
//When pushing is requested...
|
||||
if (!!$a->config['syncing']['enable_pushing']) {
|
||||
|
||||
//No pushing if it's disabled.
|
||||
else{
|
||||
$targets = array();
|
||||
$batch = array();
|
||||
}
|
||||
|
||||
return array($targets, $batch);
|
||||
|
||||
//Find our targets.
|
||||
$targets = get_push_targets();
|
||||
|
||||
//No targets?
|
||||
if (!count($targets)) {
|
||||
msg('Pushing enabled, but no push targets.');
|
||||
$batch = array();
|
||||
}
|
||||
|
||||
//If we have targets, get our batch.
|
||||
else {
|
||||
$batch = get_push_batch($a);
|
||||
if (!count($batch))
|
||||
msg('Empty pushing queue.'); //No batch, means no work.
|
||||
}
|
||||
} else {
|
||||
//No pushing if it's disabled.
|
||||
$targets = array();
|
||||
$batch = array();
|
||||
}
|
||||
|
||||
return array($targets, $batch);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -153,74 +144,76 @@ function get_pushing_job($a)
|
|||
*/
|
||||
function run_pushing_job($targets, $batch, $db_host, $db_user, $db_pass, $db_data, $install)
|
||||
{
|
||||
|
||||
//Create a thread for each target we want to serve push messages to.
|
||||
//Not good creating more, because it would stress their server too much.
|
||||
$threadc = count($targets);
|
||||
$threads = array();
|
||||
|
||||
//Do we only have 1 target? No need for threads.
|
||||
if($threadc === 1){
|
||||
msg('No threads needed. Only one pushing target.');
|
||||
push_worker($targets[0], $batch);
|
||||
}
|
||||
|
||||
//When we need threads.
|
||||
elseif($threadc > 1){
|
||||
|
||||
//POSIX threads only.
|
||||
if(!function_exists('pcntl_fork')){
|
||||
msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
|
||||
}
|
||||
|
||||
//Debug...
|
||||
$items = count($batch);
|
||||
msg("Creating $threadc push threads for $items items.");
|
||||
|
||||
//Loop while we need more threads.
|
||||
for($i = 0; $i < $threadc; $i++){
|
||||
|
||||
$pid = pcntl_fork();
|
||||
if($pid === -1) msg('Error: something went wrong with the fork. '.pcntl_strerror(), true);
|
||||
|
||||
//You're a child, go do some labor!
|
||||
if($pid === 0){push_worker($targets[$i], $batch); exit;}
|
||||
|
||||
//Store the list of PID's.
|
||||
if($pid > 0) $threads[] = $pid;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Wait for all child processes.
|
||||
$theading_problems = false;
|
||||
foreach($threads as $pid){
|
||||
pcntl_waitpid($pid, $status);
|
||||
if($status !== 0){
|
||||
$theading_problems = true;
|
||||
msg("Bad process return value $pid:$status");
|
||||
}
|
||||
}
|
||||
|
||||
//If we did not have any "threading" problems.
|
||||
if(!$theading_problems){
|
||||
|
||||
//Reconnect
|
||||
global $db;
|
||||
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
|
||||
|
||||
//Create a query for deleting this queue.
|
||||
$where = array();
|
||||
foreach($batch as $item) $where[] = dbesc($item['url']);
|
||||
$where = "WHERE `url` IN ('".implode("', '", $where)."')";
|
||||
|
||||
//Remove the items from queue.
|
||||
q("DELETE FROM `sync-push-queue` $where LIMIT %u", count($batch));
|
||||
msg('Removed items from push queue.');
|
||||
|
||||
}
|
||||
|
||||
//Create a thread for each target we want to serve push messages to.
|
||||
//Not good creating more, because it would stress their server too much.
|
||||
$threadc = count($targets);
|
||||
$threads = array();
|
||||
|
||||
//Do we only have 1 target? No need for threads.
|
||||
if ($threadc === 1) {
|
||||
msg('No threads needed. Only one pushing target.');
|
||||
push_worker($targets[0], $batch);
|
||||
} elseif ($threadc > 1) {
|
||||
//When we need threads.
|
||||
|
||||
//POSIX threads only.
|
||||
if (!function_exists('pcntl_fork')) {
|
||||
msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
|
||||
}
|
||||
|
||||
//Debug...
|
||||
$items = count($batch);
|
||||
msg("Creating $threadc push threads for $items items.");
|
||||
|
||||
//Loop while we need more threads.
|
||||
for ($i = 0; $i < $threadc; $i++) {
|
||||
$pid = pcntl_fork();
|
||||
if ($pid === -1)
|
||||
msg('Error: something went wrong with the fork. ' . pcntl_strerror(), true);
|
||||
|
||||
//You're a child, go do some labor!
|
||||
if ($pid === 0) {
|
||||
push_worker($targets[$i], $batch);
|
||||
exit;
|
||||
}
|
||||
|
||||
//Store the list of PID's.
|
||||
if ($pid > 0) {
|
||||
$threads[] = $pid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Wait for all child processes.
|
||||
$theading_problems = false;
|
||||
|
||||
foreach ($threads as $pid) {
|
||||
pcntl_waitpid($pid, $status);
|
||||
|
||||
if ($status !== 0) {
|
||||
$theading_problems = true;
|
||||
msg("Bad process return value $pid:$status");
|
||||
}
|
||||
}
|
||||
|
||||
//If we did not have any "threading" problems.
|
||||
if (!$theading_problems) {
|
||||
//Reconnect
|
||||
global $db;
|
||||
|
||||
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
|
||||
|
||||
//Create a query for deleting this queue.
|
||||
$where = array();
|
||||
foreach ($batch as $item) {
|
||||
$where[] = dbesc($item['url']);
|
||||
}
|
||||
$where = "WHERE `url` IN ('" . implode("', '", $where) . "')";
|
||||
|
||||
//Remove the items from queue.
|
||||
q("DELETE FROM `sync-push-queue` $where LIMIT %u", count($batch));
|
||||
msg('Removed items from push queue.');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -228,19 +221,21 @@ function run_pushing_job($targets, $batch, $db_host, $db_user, $db_pass, $db_dat
|
|||
* @param object $a The App instance.
|
||||
* @return array Batch of URL's.
|
||||
*/
|
||||
function get_queued_pull_batch($a){
|
||||
//Randomize this, to prevent scraping the same servers too much or dead URL's.
|
||||
$batch = q("SELECT * FROM `sync-pull-queue` ORDER BY RAND() LIMIT %u", intval($a->config['syncing']['max_pull_items']));
|
||||
msg(sprintf('Pulling %u items from queue.', count($batch)));
|
||||
return $batch;
|
||||
function get_queued_pull_batch($a)
|
||||
{
|
||||
//Randomize this, to prevent scraping the same servers too much or dead URL's.
|
||||
$batch = q("SELECT * FROM `sync-pull-queue` ORDER BY RAND() LIMIT %u", intval($a->config['syncing']['max_pull_items']));
|
||||
msg(sprintf('Pulling %u items from queue.', count($batch)));
|
||||
return $batch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an array of pull targets.
|
||||
* @return array Pull targets.
|
||||
*/
|
||||
function get_pull_targets(){
|
||||
return q("SELECT * FROM `sync-targets` WHERE `pull`=b'1'");
|
||||
function get_pull_targets()
|
||||
{
|
||||
return q("SELECT * FROM `sync-targets` WHERE `pull`=b'1'");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -250,57 +245,59 @@ function get_pull_targets(){
|
|||
*/
|
||||
function get_remote_pull_batch($a)
|
||||
{
|
||||
|
||||
//Find our targets.
|
||||
$targets = get_pull_targets();
|
||||
|
||||
msg(sprintf('Pulling from %u remote targets.', count($targets)));
|
||||
|
||||
//No targets, means no batch.
|
||||
if(!count($targets))
|
||||
return array();
|
||||
|
||||
//Pull a list of URL's from each target.
|
||||
$urls = array();
|
||||
foreach($targets as $i => $target){
|
||||
|
||||
//First pull, or an update?
|
||||
if(!$target['dt_last_pull'])
|
||||
$url = $target['base_url'].'/sync/pull/all';
|
||||
else
|
||||
$url = $target['base_url'].'/sync/pull/since/'.intval($target['dt_last_pull']);
|
||||
|
||||
//Go for it :D
|
||||
$targets[$i]['pull_data'] = json_decode(fetch_url($url), true);
|
||||
|
||||
//If we didn't get any JSON.
|
||||
if(!$targets[$i]['pull_data']){
|
||||
msg(sprintf('Failed to pull from "%s".', $url));
|
||||
continue;
|
||||
}
|
||||
|
||||
//Add all entries as keys, to remove duplicates.
|
||||
foreach($targets[$i]['pull_data']['results'] as $url)
|
||||
$urls[$url]=true;
|
||||
|
||||
}
|
||||
|
||||
//Now that we have our URL's. Store them in the queue.
|
||||
foreach($urls as $url=>$bool){
|
||||
if($url) sync_pull($url);
|
||||
}
|
||||
|
||||
//Since this all worked out, mark each source with the timestamp of pulling.
|
||||
foreach($targets as $target){
|
||||
if($target['pull_data'] && $target['pull_data']['now']){
|
||||
msg('New pull timestamp '.$target['pull_data']['now'].' for '.$target['base_url']);
|
||||
q("UPDATE `sync-targets` SET `dt_last_pull`=%u WHERE `base_url`='%s'", $target['pull_data']['now'], dbesc($target['base_url']));
|
||||
}
|
||||
}
|
||||
|
||||
//Finally, return a batch of this.
|
||||
return get_queued_pull_batch($a);
|
||||
|
||||
//Find our targets.
|
||||
$targets = get_pull_targets();
|
||||
|
||||
msg(sprintf('Pulling from %u remote targets.', count($targets)));
|
||||
|
||||
//No targets, means no batch.
|
||||
if (!count($targets)) {
|
||||
return array();
|
||||
}
|
||||
|
||||
//Pull a list of URL's from each target.
|
||||
$urls = array();
|
||||
|
||||
foreach ($targets as $i => $target) {
|
||||
//First pull, or an update?
|
||||
if (!$target['dt_last_pull']) {
|
||||
$url = $target['base_url'] . '/sync/pull/all';
|
||||
} else {
|
||||
$url = $target['base_url'] . '/sync/pull/since/' . intval($target['dt_last_pull']);
|
||||
}
|
||||
|
||||
//Go for it :D
|
||||
$targets[$i]['pull_data'] = json_decode(fetch_url($url), true);
|
||||
|
||||
//If we didn't get any JSON.
|
||||
if (!$targets[$i]['pull_data']) {
|
||||
msg(sprintf('Failed to pull from "%s".', $url));
|
||||
continue;
|
||||
}
|
||||
|
||||
//Add all entries as keys, to remove duplicates.
|
||||
foreach ($targets[$i]['pull_data']['results'] as $url) {
|
||||
$urls[$url] = true;
|
||||
}
|
||||
}
|
||||
|
||||
//Now that we have our URL's. Store them in the queue.
|
||||
foreach ($urls as $url => $bool) {
|
||||
if ($url) {
|
||||
sync_pull($url);
|
||||
}
|
||||
}
|
||||
|
||||
//Since this all worked out, mark each source with the timestamp of pulling.
|
||||
foreach ($targets as $target) {
|
||||
if ($target['pull_data'] && $target['pull_data']['now']) {
|
||||
msg('New pull timestamp ' . $target['pull_data']['now'] . ' for ' . $target['base_url']);
|
||||
q("UPDATE `sync-targets` SET `dt_last_pull`=%u WHERE `base_url`='%s'", $target['pull_data']['now'], dbesc($target['base_url']));
|
||||
}
|
||||
}
|
||||
|
||||
//Finally, return a batch of this.
|
||||
return get_queued_pull_batch($a);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -310,19 +307,22 @@ function get_remote_pull_batch($a)
|
|||
*/
|
||||
function get_pulling_job($a)
|
||||
{
|
||||
|
||||
//No pulling today...
|
||||
if(!$a->config['syncing']['enable_pulling'])
|
||||
return array();
|
||||
|
||||
//Firstly, finish the items from our queue.
|
||||
$batch = get_queued_pull_batch($a);
|
||||
if(count($batch)) return $batch;
|
||||
|
||||
//If that is empty, fill the queue with remote items and return a batch of that.
|
||||
$batch = get_remote_pull_batch($a);
|
||||
if(count($batch)) return $batch;
|
||||
|
||||
//No pulling today...
|
||||
if (!$a->config['syncing']['enable_pulling']) {
|
||||
return array();
|
||||
}
|
||||
|
||||
//Firstly, finish the items from our queue.
|
||||
$batch = get_queued_pull_batch($a);
|
||||
if (count($batch)) {
|
||||
return $batch;
|
||||
}
|
||||
|
||||
//If that is empty, fill the queue with remote items and return a batch of that.
|
||||
$batch = get_remote_pull_batch($a);
|
||||
if (count($batch)) {
|
||||
return $batch;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -340,30 +340,28 @@ function get_pulling_job($a)
|
|||
*/
|
||||
function pull_worker($i, $threadc, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install)
|
||||
{
|
||||
|
||||
//Lets be nice, we're only doing maintenance here...
|
||||
pcntl_setpriority(5);
|
||||
|
||||
//Get personal DBA's.
|
||||
global $db;
|
||||
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
|
||||
|
||||
//Get our (round-robin) workload from the batch.
|
||||
$workload = array();
|
||||
while(isset($pull_batch[$i])){
|
||||
$entry = $pull_batch[$i];
|
||||
$workload[] = $entry;
|
||||
$i+=$threadc;
|
||||
}
|
||||
|
||||
//While we've got work to do.
|
||||
while(count($workload)){
|
||||
$entry = array_pop($workload);
|
||||
set_time_limit(20); //This should work for 1 submit.
|
||||
msg("Submitting ".$entry['url']);
|
||||
run_submit($entry['url']);
|
||||
}
|
||||
|
||||
//Lets be nice, we're only doing maintenance here...
|
||||
pcntl_setpriority(5);
|
||||
|
||||
//Get personal DBA's.
|
||||
global $db;
|
||||
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
|
||||
|
||||
//Get our (round-robin) workload from the batch.
|
||||
$workload = array();
|
||||
while (isset($pull_batch[$i])) {
|
||||
$entry = $pull_batch[$i];
|
||||
$workload[] = $entry;
|
||||
$i += $threadc;
|
||||
}
|
||||
|
||||
//While we've got work to do.
|
||||
while (count($workload)) {
|
||||
$entry = array_pop($workload);
|
||||
set_time_limit(20); //This should work for 1 submit.
|
||||
msg("Submitting " . $entry['url']);
|
||||
run_submit($entry['url']);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -379,62 +377,66 @@ function pull_worker($i, $threadc, $pull_batch, $db_host, $db_user, $db_pass, $d
|
|||
*/
|
||||
function run_pulling_job($a, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install)
|
||||
{
|
||||
|
||||
//We need the scraper.
|
||||
require_once('include/submit.php');
|
||||
|
||||
//POSIX threads only.
|
||||
if(!function_exists('pcntl_fork')){
|
||||
msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
|
||||
}
|
||||
|
||||
//Create the threads we need.
|
||||
$items = count($pull_batch);
|
||||
$threadc = min($a->config['syncing']['pulling_threads'], $items); //Don't need more threads than items.
|
||||
$threads = array();
|
||||
|
||||
msg("Creating $threadc pulling threads for $items profiles.");
|
||||
|
||||
//Build the threads.
|
||||
for($i = 0; $i < $threadc; $i++){
|
||||
|
||||
$pid = pcntl_fork();
|
||||
if($pid === -1) msg('Error: something went wrong with the fork. '.pcntl_strerror(), true);
|
||||
|
||||
//You're a child, go do some labor!
|
||||
if($pid === 0){pull_worker($i, $threadc, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install); exit;}
|
||||
|
||||
//Store the list of PID's.
|
||||
if($pid > 0) $threads[] = $pid;
|
||||
|
||||
}
|
||||
|
||||
//Wait for all child processes.
|
||||
$theading_problems = false;
|
||||
foreach($threads as $pid){
|
||||
pcntl_waitpid($pid, $status);
|
||||
if($status !== 0){
|
||||
$theading_problems = true;
|
||||
msg("Bad process return value $pid:$status");
|
||||
}
|
||||
}
|
||||
|
||||
//If we did not have any "threading" problems.
|
||||
if(!$theading_problems){
|
||||
|
||||
//Reconnect
|
||||
global $db;
|
||||
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
|
||||
|
||||
//Create a query for deleting this queue.
|
||||
$where = array();
|
||||
foreach($pull_batch as $item) $where[] = dbesc($item['url']);
|
||||
$where = "WHERE `url` IN ('".implode("', '", $where)."')";
|
||||
|
||||
//Remove the items from queue.
|
||||
q("DELETE FROM `sync-pull-queue` $where LIMIT %u", count($pull_batch));
|
||||
msg('Removed items from pull queue.');
|
||||
|
||||
}
|
||||
|
||||
//We need the scraper.
|
||||
require_once 'include/submit.php';
|
||||
|
||||
//POSIX threads only.
|
||||
if (!function_exists('pcntl_fork')) {
|
||||
msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
|
||||
}
|
||||
|
||||
//Create the threads we need.
|
||||
$items = count($pull_batch);
|
||||
$threadc = min($a->config['syncing']['pulling_threads'], $items); //Don't need more threads than items.
|
||||
$threads = array();
|
||||
|
||||
msg("Creating $threadc pulling threads for $items profiles.");
|
||||
|
||||
//Build the threads.
|
||||
for ($i = 0; $i < $threadc; $i++) {
|
||||
$pid = pcntl_fork();
|
||||
if ($pid === -1) {
|
||||
msg('Error: something went wrong with the fork. ' . pcntl_strerror(), true);
|
||||
}
|
||||
|
||||
//You're a child, go do some labor!
|
||||
if ($pid === 0) {
|
||||
pull_worker($i, $threadc, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install);
|
||||
exit;
|
||||
}
|
||||
|
||||
//Store the list of PID's.
|
||||
if ($pid > 0) {
|
||||
$threads[] = $pid;
|
||||
}
|
||||
}
|
||||
|
||||
//Wait for all child processes.
|
||||
$theading_problems = false;
|
||||
foreach ($threads as $pid) {
|
||||
pcntl_waitpid($pid, $status);
|
||||
if ($status !== 0) {
|
||||
$theading_problems = true;
|
||||
msg("Bad process return value $pid:$status");
|
||||
}
|
||||
}
|
||||
|
||||
//If we did not have any "threading" problems.
|
||||
if (!$theading_problems) {
|
||||
//Reconnect
|
||||
global $db;
|
||||
|
||||
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
|
||||
|
||||
//Create a query for deleting this queue.
|
||||
$where = array();
|
||||
foreach ($pull_batch as $item) {
|
||||
$where[] = dbesc($item['url']);
|
||||
}
|
||||
$where = "WHERE `url` IN ('" . implode("', '", $where) . "')";
|
||||
|
||||
//Remove the items from queue.
|
||||
q("DELETE FROM `sync-pull-queue` $where LIMIT %u", count($pull_batch));
|
||||
msg('Removed items from pull queue.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue