368 lines
9.9 KiB
PHP
Executable File
368 lines
9.9 KiB
PHP
Executable File
#!/usr/bin/php
|
|
<?php
|
|
|
|
if (PHP_SAPI !== 'cli') {
|
|
die("This program can only be run on the command line.\n");
|
|
}
|
|
|
|
posix_setrlimit(POSIX_RLIMIT_FSIZE, 1024*1024*1024*5, 1024*1024*1024*5); // Limit max filesize to 5 GB
|
|
|
|
if(pidislocked()) {
|
|
_debug_log("ADB Rimo Import Broker läuft bereits (pidfile vorhanden)");
|
|
die("ADB Rimo Import Broker läuft bereits (pidfile vorhanden)\n");
|
|
}
|
|
if(!lockpid()) {
|
|
die("Error creating lock file!\n");
|
|
}
|
|
|
|
require("../../config/config.php");
|
|
|
|
$debug_log = false;
|
|
$valid_adb_sources = ["rimo-rest-api", "citycom-oan-api"];
|
|
/*
|
|
* Redirecting output so rlimit for filesize works
|
|
*/
|
|
$logfile = BASEDIR.'/var/log/adb-rimo-import-broker.log';
|
|
echo "Redirecting output to $logfile\n";
|
|
|
|
fclose(STDIN);
|
|
fclose(STDOUT);
|
|
fclose(STDERR);
|
|
$STDIN = fopen('/dev/null', 'r');
|
|
$STDOUT = fopen($logfile, 'a');
|
|
$STDERR = fopen($logfile, 'a');
|
|
|
|
define('mfUI',"cli");
|
|
define('FRONKDB_SQLDEBUG',false);
|
|
define("MFBASE_BYPASS_LOGIN", true);
|
|
error_reporting(E_ALL & ~(E_NOTICE | E_STRICT | E_DEPRECATED));
|
|
|
|
if(defined('MFLOCALE_TIME')) {
|
|
setlocale(LC_TIME, MFLOCALE_TIME);
|
|
}
|
|
if(defined('MFLOCALE_MONETARY')) {
|
|
setlocale(LC_MONETARY, MFLOCALE_MONETARY);
|
|
}
|
|
if(defined('MFLOCALE_NUMERIC')) {
|
|
setlocale(LC_NUMERIC, MFLOCALE_NUMERIC);
|
|
}
|
|
|
|
require_once(LIBDIR."/mvcfronk/mfRouter/mfRouter.php");
|
|
require_once(LIBDIR."/mvcfronk/mfBase/mfBaseModel.php");
|
|
require_once(LIBDIR."/mvcfronk/mfBase/mfBaseController.php");
|
|
|
|
$me = new User(1);
|
|
|
|
define("INTERNAL_USER_ID", $me->id);
|
|
define("INTERNAL_USER_USERNAME", $me->username);
|
|
|
|
$request = array();
|
|
|
|
// Put commandline arguments into $request
|
|
if(count($argv)) {
|
|
$args=$argv;
|
|
array_shift($args); // shift scriptname off of args array
|
|
foreach($args as $i => $arg) {
|
|
if(preg_match('/^--(.+)/',$arg,$m)) {
|
|
if(isset($args[$i+1]) && !preg_match('/^-/',$args[$i+1])) {
|
|
$request[$m[1]] = $args[$i+1];
|
|
} else {
|
|
$request[$m[1]] = true;
|
|
}
|
|
} elseif(preg_match('/^-([a-zA-Z])(.+)/', $arg, $m)) {
|
|
$request[$m[1]] = $m[2];
|
|
} elseif(preg_match('/^-([^-])/', $arg, $m)) {
|
|
if(isset($args[$i+1]) && !preg_match('/^-/',$args[$i+1])) {
|
|
$request[$m[1]] = $args[$i + 1];
|
|
} else {
|
|
$request[$m[1]] = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
require_once(LIBDIR."/mvcfronk/mfRouter/mfRouter.php");
|
|
$log = mfLoghandler::singleton();
|
|
|
|
cli_set_process_title("thetool-adb-rimo-import-broker");
|
|
pcntl_async_signals(true);
|
|
pcntl_signal(SIGTERM, 'signalHandler');
|
|
|
|
$forkcount = 0;
|
|
$childpids = [];
|
|
|
|
$max_processes = 10;
|
|
|
|
$all_pids = [];
|
|
$jobs = [];
|
|
|
|
$clusters = loadClusters();
|
|
|
|
|
|
foreach($clusters as $source_id => $cluster) {
|
|
$proc = [
|
|
"cluster" => $cluster["cluster"],
|
|
"cluster_id" => $source_id,
|
|
"apiOwner" => $cluster["apiOwner"],
|
|
"apiUrl" => $cluster["apiUrl"],
|
|
"apiToken" => $cluster["apiKey"],
|
|
//"pid" => false,
|
|
"processtitle" => "ADB Import ".$cluster["cluster"]->name,
|
|
"start" => false,
|
|
];
|
|
$jobs[$source_id] = $proc;
|
|
}
|
|
|
|
$all_procs = $jobs;
|
|
$fork_delay = 0;
|
|
|
|
while(1) {
|
|
//echo "while\n";
|
|
//sleep(5);
|
|
|
|
$processes = [];
|
|
|
|
$idle_proc_count = 0;
|
|
foreach($jobs as $proc) {
|
|
if($proc["start"]) {
|
|
continue;
|
|
}
|
|
$idle_proc_count++;
|
|
$processes[] = [
|
|
"cluster_id" => $proc["cluster_id"],
|
|
"pid" => false,
|
|
"processtitle" => "ADB Import ".$proc["cluster_id"],
|
|
];
|
|
}
|
|
|
|
//echo "[parent] $idle_proc_count processes to be started\n";
|
|
// if no running processes remain -> exit
|
|
if(!$idle_proc_count && !count($all_pids)) {
|
|
echo "No more idle or running processes. Exiting.\n";
|
|
break;
|
|
}
|
|
|
|
//var_dump($processes);
|
|
foreach($processes as $key => $proc) {
|
|
if($proc["pid"]) {
|
|
// process is running already, nothing to do here
|
|
_debug_log("[parent] ".$proc["cluster_id"]." pid exists (".$proc["pid"].")");
|
|
continue;
|
|
}
|
|
|
|
$cluster_id = $proc["cluster_id"];
|
|
|
|
if(isset($childpids[$cluster_id])) {
|
|
_debug_log("[parent] job for ".$proc["cluster_id"]." exists");
|
|
//echo "cannot start new $taskname job, because another one is running already\n";
|
|
continue;
|
|
}
|
|
|
|
// delay for 10 minutes before forking new process
|
|
if($fork_delay) {
|
|
//echo "[parent] fork delay: $fork_delay\n";
|
|
$fork_delay--;
|
|
sleep(1);
|
|
break;
|
|
}
|
|
|
|
if(count($all_pids) >= $max_processes) {
|
|
_debug_log("[parent] max processes reached. Currently ".count($all_pids)." running processes.");
|
|
$fork_delay = 10;
|
|
sleep(1);
|
|
break;
|
|
}
|
|
|
|
_debug_log("[parent] forking for $cluster_id");
|
|
$pid = pcntl_fork();
|
|
|
|
$fork_delay = 10;
|
|
|
|
|
|
if($pid === -1) {
|
|
$log->debug("error forking");
|
|
exit;
|
|
} elseif($pid > 0) {
|
|
// in parent
|
|
$forkcount++;
|
|
$childpids[$cluster_id] = $pid;
|
|
$proc["pid"] = $pid;
|
|
$all_pids[$pid] = $proc;
|
|
|
|
$jobs[$key]["start"] = date("d.m.Y H:i:s");
|
|
} else {
|
|
// in child
|
|
$mypid = getmypid();
|
|
|
|
echo "[$mypid] Starting import for ".$proc["cluster_id"]."\n";
|
|
|
|
try {
|
|
cli_set_process_title($proc["processtitle"]);
|
|
|
|
$script_name = __DIR__ . "/rimo-import.php";
|
|
if(!file_exists($script_name)) {
|
|
echo "[$mypid] Runner $script_name not found\n";
|
|
}
|
|
|
|
echo "[$mypid] executing $script_name ".$proc["cluster_id"]."\n";
|
|
|
|
pcntl_exec($script_name, [$proc["cluster_id"]]);
|
|
|
|
//only reachable on error
|
|
echo "[$mypid] Error exec()'ing ".$proc["processtitle"]."!\n";
|
|
|
|
} catch(\Exception $e) {
|
|
// exit child process on error
|
|
echo "$mypid caught exception: " . $e->getMessage();
|
|
echo $e->getTraceAsString()."\n";
|
|
exit;
|
|
}
|
|
exit; // make sure child exits when done
|
|
}
|
|
|
|
//sleep(5);
|
|
}
|
|
|
|
if(count($all_pids)) {
|
|
$status = false;
|
|
$return_pid = pcntl_wait($status, WNOHANG);
|
|
if($return_pid) {
|
|
_debug_log("child $return_pid returned");
|
|
$pid_proc = $all_pids[$return_pid];
|
|
$pid_task = $pid_proc["cluster_id"];
|
|
$childpids[$pid_task] = null;
|
|
unset($all_pids[$return_pid]);
|
|
unset($jobs[$pid_task]);
|
|
}
|
|
}
|
|
/*echo "No more PIDs, exiting loop\n";
|
|
break;*/
|
|
//sleep(5);
|
|
}
|
|
|
|
unlockpid();
|
|
|
|
function loadClusters() {
|
|
global $valid_adb_sources;
|
|
$clusters = [];
|
|
|
|
$netowners = ["estmk", "rml", "sbidi"];
|
|
$apiEdition = "prod";
|
|
|
|
foreach ($netowners as $apiOwner) {
|
|
$apiData = TT_RIMO_API_CREDS[$apiOwner][$apiEdition];
|
|
|
|
$apiUrl = $apiData["url"];
|
|
$apiToken = $apiData["key"];
|
|
|
|
if (!$apiUrl || !$apiToken) {
|
|
echo "Api Daten für $apiOwner unvollständig\n";
|
|
}
|
|
|
|
$epGetClusters = $apiUrl . RIMO_API_JSON_EP_GET_CLUSTERS;
|
|
|
|
$baseParams = ['apiKey' => $apiToken];
|
|
$ctxOptsGet = [
|
|
'http' => [
|
|
'method' => 'GET',
|
|
'header' => 'accept: application/json'
|
|
]
|
|
];
|
|
|
|
/*
|
|
* Get RIMO Sales Clusters
|
|
*/
|
|
$params = $baseParams;
|
|
$qs = http_build_query($params);
|
|
|
|
$req_url = $epGetClusters . "?" . $qs;
|
|
$req_ctx = stream_context_create($ctxOptsGet);
|
|
|
|
//echo $req_url."\n";
|
|
$responseText = file_get_contents($req_url, false, $req_ctx);
|
|
if ($responseText === false) {
|
|
echo "($apiOwner) Error fetching clusters\n";
|
|
exit;
|
|
}
|
|
|
|
$clustersResponse = json_decode($responseText);
|
|
//var_dump($clustersResponse);
|
|
//exit;
|
|
if (!is_object($clustersResponse) || !property_exists($clustersResponse, "item") || !is_array($clustersResponse->item) || !count($clustersResponse->item)) {
|
|
die("($apiOwner) Invalid GetClusters Response\n");
|
|
}
|
|
|
|
foreach ($clustersResponse->item as $cluster) {
|
|
$cluster_data = ["apiOwner" => $apiOwner, "apiKey" => $apiToken, "apiUrl" => $apiUrl, "cluster" => $cluster];
|
|
$clusters[$cluster->id] = $cluster_data;
|
|
}
|
|
}
|
|
|
|
foreach(ADBNetzgebietModel::getAll() as $netzgebiet) {
|
|
if(!in_array($netzgebiet->source, $valid_adb_sources)) continue;
|
|
|
|
if(!array_key_exists($netzgebiet->source_id, $clusters)) {
|
|
//$netzgebiet->id = $netzgebiet->source_id;
|
|
$clusters[$netzgebiet->source_id] = ["apiOwner" => $apiOwner, "apiKey" => $apiToken, "apiUrl" => $apiUrl, "cluster" => $netzgebiet];
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return $clusters;
|
|
}
|
|
|
|
|
|
function signalHandler($sig) {
|
|
//global $continue;
|
|
global $childpids;
|
|
global $invoice_job_lock;
|
|
|
|
//$continue = false;
|
|
//echo "in signal handler\n";
|
|
|
|
if(count($childpids)) {
|
|
foreach($childpids as $taskname => $pids) {
|
|
foreach($pids as $childpid) {
|
|
posix_kill($childpid, SIGTERM);
|
|
}
|
|
|
|
}
|
|
}
|
|
unlockpid();
|
|
exit;
|
|
}
|
|
|
|
function pidislocked() {
|
|
$pidfile = __DIR__."/.adb-rimo-import-broker.lock";
|
|
if(file_exists($pidfile)) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
function lockpid() {
|
|
$pid = getmypid();
|
|
$pidfile = __DIR__."/.adb-rimo-import-broker.lock";
|
|
file_put_contents($pidfile, $pid);
|
|
if(file_exists($pidfile)) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
function unlockpid() {
|
|
$pidfile = __DIR__."/.adb-rimo-import-broker.lock";
|
|
if(file_exists($pidfile)) {
|
|
unlink($pidfile);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
function _debug_log($text) {
|
|
global $debug_log;
|
|
|
|
if($debug_log) {
|
|
echo "$text\n";
|
|
}
|
|
} |