Added syncing (push and pull) and refactored a few functions.

This commit is contained in:
Beanow 2014-08-09 00:46:53 +02:00
commit 1fe9bb9b5b
11 changed files with 732 additions and 155 deletions

93
include/cron_sync.php Normal file
View file

@ -0,0 +1,93 @@
<?php
/*
#TODO:
* First do the pulls then the pushes.
If pull prevents the push, the push queue just creates a backlog until it gets a chance to push.
* When doing a first-pull, there's a safety mechanism for the timeout and detecting duplicate attempts.
1. Perform all JSON pulls on the source servers.
2. Combine the results into one giant pool of URLs.
3. Write this pool to a file (TODO-file).
4. Shuffle the pool in RAM.
5. Start threads for crawling.
6. Every finished crawl attempt (successful or not) should write to a 2nd file (DONE-file).
IF the first-pull times out, don't do anything else.
Otherwise, mark the dates we last performed a pull from each server.
* When resuming a first-pull.
1. Check for the TODO-file and the DONE-file.
2. Remove the entries in the DONE-file from the pool in the TODO-file.
3. Replace the TODO-file with the updated pool.
4. Perform steps 4, 5 and 6 (shuffle, create threads and crawl) from before.
This way you can resume without repeating attempts.
* Write documentation about syncing.
* Create "official" directory policy for my directory.
* Decide if a retry mechanism is desirable for pulling (for the failed attempts).
After all, you did imply trust when you indicated to pull from that source...
This could be done easily by doing a /sync/pull/all again from those sources.
* Decide if cron_sync.php should be split into push pull and pull-all commands.
*/
// Debug stuff.
ini_set('display_errors', 1);
ini_set('log_errors','0');
error_reporting(E_ALL^E_NOTICE);
$start_syncing = time();
//Startup.
require_once('boot.php');
$a = new App;
//Create a simple log function for CLI use.
global $verbose;
$verbose = $argv[1] === 'verbose';
function msg($message, $fatal=false){
global $verbose;
if($verbose || $fatal) echo($message.PHP_EOL);
logger($message);
if($fatal) exit(1);
};
//Config.
require_once(".htconfig.php");
//Connect the DB.
require_once("dba.php");
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
//Import syncing functions.
require_once('sync.php');
//Get work for pulling.
$pull_batch = get_pulling_job($a);
//Get work for pushing.
list($push_targets, $push_batch) = get_pushing_job($a);
//Close the connection for now. Process forking and DB connections are not the best of friends.
$db->getdb()->close();
if(count($pull_batch))
run_pulling_job($a, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install);
//Do our multi-fork job, if we have a batch and targets.
if(count($push_targets) && count($push_batch))
run_pushing_job($push_targets, $push_batch, $db_host, $db_user, $db_pass, $db_data, $install);
//Log the time it took.
$time = time() - $start_syncing;
msg("Syncing completed. Took $time seconds.");

438
include/sync.php Normal file
View file

@ -0,0 +1,438 @@
<?php
/**
* Pull this URL to our pulling queue.
* @param string $url
* @return void
*/
function sync_pull($url)
{
global $a;
//If we support it that is.
if($a->config['syncing']['enable_pulling']){
q("INSERT INTO `sync-pull-queue` (`url`) VALUES ('%s')", dbesc($url));
}
}
/**
* Push this URL to our pushing queue as well as mark it as modified using sync_mark.
* @param string $url
* @return void
*/
function sync_push($url)
{
global $a;
//If we support it that is.
if($a->config['syncing']['enable_pushing']){
q("INSERT INTO `sync-push-queue` (`url`) VALUES ('%s')", dbesc($url));
}
sync_mark($url);
}
/**
* Mark a URL as modified in some way or form.
* This will cause anyone that pulls our changes to see this profile listed.
* @param string $url
* @return void
*/
function sync_mark($url)
{
global $a;
//If we support it that is.
if(!$a->config['syncing']['enable_pulling']){
return;
}
$exists = count(q("SELECT * FROM `sync-timestamps` WHERE `url`='%s'", dbesc($url)));
if(!$exists)
q("INSERT INTO `sync-timestamps` (`url`, `modified`) VALUES ('%s', NOW())", dbesc($url));
else
q("UPDATE `sync-timestamps` SET `modified`=NOW() WHERE `url`='%s'", dbesc($url));
}
/**
* For a single fork during the push jobs.
* Takes a lower priority and pushes a batch of items.
* @param string $target A sync-target database row.
* @param array $batch The batch of items to submit.
* @return void
*/
function push_worker($target, $batch)
{
//Lets be nice, we're only doing a background job here...
pcntl_setpriority(5);
//Find our target's submit URL.
$submit = $target['base_url'].'/submit';
foreach($batch as $item){
set_time_limit(30); //This should work for 1 submit.
msg("Submitting {$item['url']} to $submit");
fetch_url($submit.'?url='.bin2hex($item['url']));
}
}
/**
* Gets an array of push targets.
* @return array Push targets.
*/
function get_push_targets(){
return q("SELECT * FROM `sync-targets` WHERE `push`=b'1'");
}
/**
* Gets a batch of URL's to push.
* @param object $a The App instance.
* @return array Batch of URL's.
*/
function get_push_batch($a){
return q("SELECT * FROM `sync-push-queue` LIMIT %u", intval($a->config['syncing']['max_push_items']));
}
/**
* Gets the push targets as well as a batch of URL's for a pushing job.
* @param object $a The App instance.
* @return list($targets, $batch) A list of both the targets array and batch array.
*/
function get_pushing_job($a)
{
//When pushing is requested...
if(!!$a->config['syncing']['enable_pushing']){
//Find our targets.
$targets = get_push_targets();
//No targets?
if(!count($targets)){
msg('Pushing enabled, but no push targets.');
$batch = array();
}
//If we have targets, get our batch.
else{
$batch = get_push_batch($a);
if(!count($batch)) msg('Empty pushing queue.'); //No batch, means no work.
}
}
//No pushing if it's disabled.
else{
$targets = array();
$batch = array();
}
return array($targets, $batch);
}
/**
* Runs a pushing job, creating a thread for each target.
* @param array $targets Pushing targets.
* @param array $batch Batch of URL's to push.
* @param string $db_host DB host to connect to.
* @param string $db_user DB user to connect with.
* @param string $db_pass DB pass to connect with.
* @param mixed $db_data Nobody knows.
* @param mixed $install Maybe a boolean.
* @return void
*/
function run_pushing_job($targets, $batch, $db_host, $db_user, $db_pass, $db_data, $install)
{
//Create a thread for each target we want to serve push messages to.
//Not good creating more, because it would stress their server too much.
$threadc = count($targets);
$threads = array();
//Do we only have 1 target? No need for threads.
if($threadc === 1){
msg('No threads needed. Only one pushing target.');
push_worker($targets[0], $batch);
}
//When we need threads.
elseif($threadc > 1){
//POSIX threads only.
if(!function_exists('pcntl_fork')){
msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
}
//Debug...
$items = count($batch);
msg("Creating $threadc push threads for $items items.");
//Loop while we need more threads.
for($i = 0; $i < $threadc; $i++){
$pid = pcntl_fork();
if($pid === -1) msg('Error: something went wrong with the fork. '.pcntl_strerror(), true);
//You're a child, go do some labor!
if($pid === 0){push_worker($targets[$i], $batch); exit;}
//Store the list of PID's.
if($pid > 0) $threads[] = $pid;
}
}
//Wait for all child processes.
$theading_problems = false;
foreach($threads as $pid){
pcntl_waitpid($pid, $status);
if($status !== 0){
$theading_problems = true;
msg("Bad process return value $pid:$status");
}
}
//If we did not have any "threading" problems.
if(!$theading_problems){
//Reconnect
global $db;
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
//Create a query for deleting this queue.
$where = array();
foreach($batch as $item) $where[] = dbesc($item['url']);
$where = "WHERE `url` IN ('".implode("', '", $where)."')";
//Remove the items from queue.
q("DELETE FROM `sync-push-queue` $where LIMIT %u", count($batch));
msg('Removed items from push queue.');
}
}
/**
* Gets a batch of URL's to push.
* @param object $a The App instance.
* @return array Batch of URL's.
*/
function get_queued_pull_batch($a){
//Randomize this, to prevent scraping the same servers too much or dead URL's.
$batch = q("SELECT * FROM `sync-pull-queue` ORDER BY RAND() LIMIT %u", intval($a->config['syncing']['max_pull_items']));
msg(sprintf('Pulling %u items from queue.', count($batch)));
return $batch;
}
/**
* Gets an array of pull targets.
* @return array Pull targets.
*/
function get_pull_targets(){
return q("SELECT * FROM `sync-targets` WHERE `pull`=b'1'");
}
/**
* Gets a batch of URL's to push.
* @param object $a The App instance.
* @return array Batch of URL's.
*/
function get_remote_pull_batch($a)
{
//Find our targets.
$targets = get_pull_targets();
msg(sprintf('Pulling from %u remote targets.', count($targets)));
//No targets, means no batch.
if(!count($targets))
return array();
//Pull a list of URL's from each target.
$urls = array();
foreach($targets as $target){
//First pull, or an update?
if(!$target['dt_last_pull'])
$url = $target['base_url'].'/sync/pull/all';
else
$url = $target['base_url'].'/sync/pull/since/'.intval($target['dt_last_pull']);
//Go for it :D
$target['pull_data'] = json_decode(fetch_url($url), true);
//If we didn't get any JSON.
if($target['pull_data'] === null){
msg(sprintf('Failed to pull from "%s".', $url));
continue;
}
//Add all entries as keys, to remove duplicates.
foreach($target['pull_data']['results'] as $url)
$urls[$url]=true;
}
//Now that we have our URL's. Store them in the queue.
foreach($urls as $url=>$bool){
if($url) sync_pull($url);
}
//Since this all worked out, mark each source with the timestamp of pulling.
foreach($targets as $target){
if($targets['pull_data'] && $targets['pull_data']['now'])
q("UPDATE `sync-targets` SET `dt_last_pull`=%u WHERE `base_url`='%s'", $targets['pull_data']['now'], dbesc($targets['base_url']));
}
//Finally, return a batch of this.
return get_queued_pull_batch($a);
}
/**
* Gathers an array of URL's to scrape from the pulling targets.
* @param object $a The App instance.
* @return array URL's to scrape.
*/
function get_pulling_job($a)
{
//No pulling today...
if(!$a->config['syncing']['enable_pulling'])
return array();
//Firstly, finish the items from our queue.
$batch = get_queued_pull_batch($a);
if(count($batch)) return $batch;
//If that is empty, fill the queue with remote items and return a batch of that.
$batch = get_remote_pull_batch($a);
if(count($batch)) return $batch;
}
/**
* For a single fork during the pull jobs.
* Takes a lower priority and pulls a batch of items.
* @param int $i The index number of this worker (for round-robin).
* @param int $threadc The amount of workers (for round-robin).
* @param array $pull_batch A batch of URL's to pull.
* @param string $db_host DB host to connect to.
* @param string $db_user DB user to connect with.
* @param string $db_pass DB pass to connect with.
* @param mixed $db_data Nobody knows.
* @param mixed $install Maybe a boolean.
* @return void
*/
function pull_worker($i, $threadc, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install)
{
//Lets be nice, we're only doing maintenance here...
pcntl_setpriority(5);
//Get personal DBA's.
global $db;
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
//Get our (round-robin) workload from the batch.
$workload = array();
while(isset($pull_batch[$i])){
$entry = $pull_batch[$i];
$workload[] = $entry;
$i+=$threadc;
}
//While we've got work to do.
while(count($workload)){
$entry = array_pop($workload);
set_time_limit(20); //This should work for 1 submit.
msg("Submitting ".$entry['url']);
run_submit($entry['url']);
}
}
/**
* Runs a pulling job, creating several threads to do so.
* @param object $a The App instance.
* @param array $pull_batch A batch of URL's to pull.
* @param string $db_host DB host to connect to.
* @param string $db_user DB user to connect with.
* @param string $db_pass DB pass to connect with.
* @param mixed $db_data Nobody knows.
* @param mixed $install Maybe a boolean.
* @return void
*/
function run_pulling_job($a, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install)
{
//We need the scraper.
require_once('include/submit.php');
//POSIX threads only.
if(!function_exists('pcntl_fork')){
msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
}
//Create the threads we need.
$items = count($pull_batch);
$threadc = min($a->config['syncing']['pulling_threads'], $items); //Don't need more threads than items.
$threads = array();
msg("Creating $threadc pulling threads for $items profiles.");
//Build the threads.
for($i = 0; $i < $threadc; $i++){
$pid = pcntl_fork();
if($pid === -1) msg('Error: something went wrong with the fork. '.pcntl_strerror(), true);
//You're a child, go do some labor!
if($pid === 0){pull_worker($i, $threadc, $pull_batch, $db_host, $db_user, $db_pass, $db_data, $install); exit;}
//Store the list of PID's.
if($pid > 0) $threads[] = $pid;
}
//Wait for all child processes.
$theading_problems = false;
foreach($threads as $pid){
pcntl_waitpid($pid, $status);
if($status !== 0){
$theading_problems = true;
msg("Bad process return value $pid:$status");
}
}
//If we did not have any "threading" problems.
if(!$theading_problems){
//Reconnect
global $db;
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
//Create a query for deleting this queue.
$where = array();
foreach($pull_batch as $item) $where[] = dbesc($item['url']);
$where = "WHERE `url` IN ('".implode("', '", $where)."')";
//Remove the items from queue.
q("DELETE FROM `sync-pull-queue` $where LIMIT %u", count($pull_batch));
msg('Removed items from pull queue.');
}
}

View file

@ -1,141 +0,0 @@
<?php
// Debug stuff.
ini_set('display_errors', 1);
ini_set('log_errors','0');
error_reporting(E_ALL^E_NOTICE);
$start_syncing = time();
//Startup.
require_once('boot.php');
$a = new App;
//Create a simple log function for CLI use.
$verbose = $argv[1] === 'verbose';
$msg = function($message, $fatal=false)use($verbose){
if($verbose || $fatal) echo($message.PHP_EOL);
logger($message);
if($fatal) exit(1);
};
//Config.
require_once(".htconfig.php");
//No pushing? Leave... because we haven't implemented pulling yet.
if(!$a->config['syncing']['enable_pushing']){
$msg('No push support enabled in your settings.', true);
}
//Connect the DB.
require_once("dba.php");
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
//Find our targets.
$targets = q("SELECT * FROM `sync-targets` WHERE `push`=b'1'");
if(!count($targets)) $msg('No targets.', true); //No targets, means no work.
//Get our batch of URL's.
$batch = q("SELECT * FROM `sync-queue` LIMIT %u", intval($a->config['syncing']['max_push_items']));
if(!count($batch)) $msg('Empty queue.', true); //No batch, means no work.
//Close the connection for now. Process forking and DB connections are not the best of friends.
$db->getdb()->close();
//Create a thread for each target we want to serve push messages to.
//No good creating more, because it would stress their server too much.
$threadc = count($targets);
$threads = array();
//Do we only have 1 target? No need for threads.
if($threadc === 1){
//Pretend to be worker #1.
$pid = 0;
$i = 0;
$main = true;
$msg('No threads needed. Only one pushing target.');
}
//When we need threads.
else{
//POSIX threads only.
if(!function_exists('pcntl_fork')){
$msg('Error: no pcntl_fork support. Are you running a different OS? Report an issue please.', true);
}
//Debug...
$items = count($batch);
$msg("Creating $threadc push threads for $items items.");
//Loop while we need more threads.
for($i = 0; $i < $threadc; $i++){
$pid = pcntl_fork();
if($pid === -1) $msg('Error: something went wrong with the fork. '.pcntl_strerror(), true);
//You're a child, go do some labor!
if($pid === 0) break;
//Store the list of PID's.
if($pid > 0) $threads[] = $pid;
}
//Are we the main thread?
$main = $pid !== 0;
}
//The work for child processes.
if($pid === 0){
//Lets be nice, we're only doing a background job here...
pcntl_setpriority(5);
//Find our target's submit URL.
$submit = $targets[$i]['base_url'].'/submit';
foreach($batch as $item){
set_time_limit(30); //This should work for 1 submit.
$msg("Submitting {$item['url']} to $submit");
fetch_url($submit.'?url='.bin2hex($item['url']));
}
}
//The main process.
if($main){
//Wait for all child processes.
$all_good = true;
foreach($threads as $pid){
pcntl_waitpid($pid, $status);
if($status !== 0){
$all_good = false;
$msg("Bad process return value $pid:$status");
}
}
//If we did not have any "threading" problems.
if($all_good){
//Reconnect
$db = new dba($db_host, $db_user, $db_pass, $db_data, $install);
//Create a query for deleting this queue.
$where = array();
foreach($batch as $item) $where[] = dbesc($item['url']);
$where = "WHERE `url` IN ('".implode("', '", $where)."')";
//Remove the items from queue.
q("DELETE FROM `sync-queue` $where LIMIT %u", intval($a->config['syncing']['max_push_items']));
$msg('Removed items from queue.');
}
//Log the time it took.
$time = time() - $start_syncing;
$msg("Syncing completed. Took $time seconds.");
}