Files
thetool/scripts/adb-rimo-import/adb-rimo-import-broker.php
2025-12-16 01:45:10 +01:00

368 lines
9.9 KiB
PHP
Executable File

#!/usr/bin/php
<?php
if (PHP_SAPI !== 'cli') {
die("This program can only be run on the command line.\n");
}
posix_setrlimit(POSIX_RLIMIT_FSIZE, 1024*1024*1024*5, 1024*1024*1024*5); // Limit max filesize to 5 GB
if(pidislocked()) {
_debug_log("ADB Rimo Import Broker läuft bereits (pidfile vorhanden)");
die("ADB Rimo Import Broker läuft bereits (pidfile vorhanden)\n");
}
if(!lockpid()) {
die("Error creating lock file!\n");
}
require("../../config/config.php");
$debug_log = false;
$valid_adb_sources = ["rimo-rest-api", "citycom-oan-api"];
/*
* Redirecting output so rlimit for filesize works
*/
$logfile = BASEDIR.'/var/log/adb-rimo-import-broker.log';
echo "Redirecting output to $logfile\n";
fclose(STDIN);
fclose(STDOUT);
fclose(STDERR);
$STDIN = fopen('/dev/null', 'r');
$STDOUT = fopen($logfile, 'a');
$STDERR = fopen($logfile, 'a');
define('mfUI',"cli");
define('FRONKDB_SQLDEBUG',false);
define("MFBASE_BYPASS_LOGIN", true);
error_reporting(E_ALL & ~(E_NOTICE | E_STRICT | E_DEPRECATED));
if(defined('MFLOCALE_TIME')) {
setlocale(LC_TIME, MFLOCALE_TIME);
}
if(defined('MFLOCALE_MONETARY')) {
setlocale(LC_MONETARY, MFLOCALE_MONETARY);
}
if(defined('MFLOCALE_NUMERIC')) {
setlocale(LC_NUMERIC, MFLOCALE_NUMERIC);
}
require_once(LIBDIR."/mvcfronk/mfRouter/mfRouter.php");
require_once(LIBDIR."/mvcfronk/mfBase/mfBaseModel.php");
require_once(LIBDIR."/mvcfronk/mfBase/mfBaseController.php");
$me = new User(1);
define("INTERNAL_USER_ID", $me->id);
define("INTERNAL_USER_USERNAME", $me->username);
$request = array();
// Put commandline arguments into $request
if(count($argv)) {
$args=$argv;
array_shift($args); // shift scriptname off of args array
foreach($args as $i => $arg) {
if(preg_match('/^--(.+)/',$arg,$m)) {
if(isset($args[$i+1]) && !preg_match('/^-/',$args[$i+1])) {
$request[$m[1]] = $args[$i+1];
} else {
$request[$m[1]] = true;
}
} elseif(preg_match('/^-([a-zA-Z])(.+)/', $arg, $m)) {
$request[$m[1]] = $m[2];
} elseif(preg_match('/^-([^-])/', $arg, $m)) {
if(isset($args[$i+1]) && !preg_match('/^-/',$args[$i+1])) {
$request[$m[1]] = $args[$i + 1];
} else {
$request[$m[1]] = true;
}
}
}
}
require_once(LIBDIR."/mvcfronk/mfRouter/mfRouter.php");
$log = mfLoghandler::singleton();
cli_set_process_title("thetool-adb-rimo-import-broker");
pcntl_async_signals(true);
pcntl_signal(SIGTERM, 'signalHandler');
$forkcount = 0;
$childpids = [];
$max_processes = 10;
$all_pids = [];
$jobs = [];
$clusters = loadClusters();
foreach($clusters as $source_id => $cluster) {
$proc = [
"cluster" => $cluster["cluster"],
"cluster_id" => $source_id,
"apiOwner" => $cluster["apiOwner"],
"apiUrl" => $cluster["apiUrl"],
"apiToken" => $cluster["apiKey"],
//"pid" => false,
"processtitle" => "ADB Import ".$cluster["cluster"]->name,
"start" => false,
];
$jobs[$source_id] = $proc;
}
$all_procs = $jobs;
$fork_delay = 0;
while(1) {
//echo "while\n";
//sleep(5);
$processes = [];
$idle_proc_count = 0;
foreach($jobs as $proc) {
if($proc["start"]) {
continue;
}
$idle_proc_count++;
$processes[] = [
"cluster_id" => $proc["cluster_id"],
"pid" => false,
"processtitle" => "ADB Import ".$proc["cluster_id"],
];
}
//echo "[parent] $idle_proc_count processes to be started\n";
// if no running processes remain -> exit
if(!$idle_proc_count && !count($all_pids)) {
echo "No more idle or running processes. Exiting.\n";
break;
}
//var_dump($processes);
foreach($processes as $key => $proc) {
if($proc["pid"]) {
// process is running already, nothing to do here
_debug_log("[parent] ".$proc["cluster_id"]." pid exists (".$proc["pid"].")");
continue;
}
$cluster_id = $proc["cluster_id"];
if(isset($childpids[$cluster_id])) {
_debug_log("[parent] job for ".$proc["cluster_id"]." exists");
//echo "cannot start new $taskname job, because another one is running already\n";
continue;
}
// delay for 10 minutes before forking new process
if($fork_delay) {
//echo "[parent] fork delay: $fork_delay\n";
$fork_delay--;
sleep(1);
break;
}
if(count($all_pids) >= $max_processes) {
_debug_log("[parent] max processes reached. Currently ".count($all_pids)." running processes.");
$fork_delay = 10;
sleep(1);
break;
}
_debug_log("[parent] forking for $cluster_id");
$pid = pcntl_fork();
$fork_delay = 10;
if($pid === -1) {
$log->debug("error forking");
exit;
} elseif($pid > 0) {
// in parent
$forkcount++;
$childpids[$cluster_id] = $pid;
$proc["pid"] = $pid;
$all_pids[$pid] = $proc;
$jobs[$key]["start"] = date("d.m.Y H:i:s");
} else {
// in child
$mypid = getmypid();
echo "[$mypid] Starting import for ".$proc["cluster_id"]."\n";
try {
cli_set_process_title($proc["processtitle"]);
$script_name = __DIR__ . "/rimo-import.php";
if(!file_exists($script_name)) {
echo "[$mypid] Runner $script_name not found\n";
}
echo "[$mypid] executing $script_name ".$proc["cluster_id"]."\n";
pcntl_exec($script_name, [$proc["cluster_id"]]);
//only reachable on error
echo "[$mypid] Error exec()'ing ".$proc["processtitle"]."!\n";
} catch(\Exception $e) {
// exit child process on error
echo "$mypid caught exception: " . $e->getMessage();
echo $e->getTraceAsString()."\n";
exit;
}
exit; // make sure child exits when done
}
//sleep(5);
}
if(count($all_pids)) {
$status = false;
$return_pid = pcntl_wait($status, WNOHANG);
if($return_pid) {
_debug_log("child $return_pid returned");
$pid_proc = $all_pids[$return_pid];
$pid_task = $pid_proc["cluster_id"];
$childpids[$pid_task] = null;
unset($all_pids[$return_pid]);
unset($jobs[$pid_task]);
}
}
/*echo "No more PIDs, exiting loop\n";
break;*/
//sleep(5);
}
unlockpid();
function loadClusters() {
global $valid_adb_sources;
$clusters = [];
$netowners = ["estmk", "rml", "sbidi"];
$apiEdition = "prod";
foreach ($netowners as $apiOwner) {
$apiData = TT_RIMO_API_CREDS[$apiOwner][$apiEdition];
$apiUrl = $apiData["url"];
$apiToken = $apiData["key"];
if (!$apiUrl || !$apiToken) {
echo "Api Daten für $apiOwner unvollständig\n";
}
$epGetClusters = $apiUrl . RIMO_API_JSON_EP_GET_CLUSTERS;
$baseParams = ['apiKey' => $apiToken];
$ctxOptsGet = [
'http' => [
'method' => 'GET',
'header' => 'accept: application/json'
]
];
/*
* Get RIMO Sales Clusters
*/
$params = $baseParams;
$qs = http_build_query($params);
$req_url = $epGetClusters . "?" . $qs;
$req_ctx = stream_context_create($ctxOptsGet);
//echo $req_url."\n";
$responseText = file_get_contents($req_url, false, $req_ctx);
if ($responseText === false) {
echo "($apiOwner) Error fetching clusters\n";
exit;
}
$clustersResponse = json_decode($responseText);
//var_dump($clustersResponse);
//exit;
if (!is_object($clustersResponse) || !property_exists($clustersResponse, "item") || !is_array($clustersResponse->item) || !count($clustersResponse->item)) {
die("($apiOwner) Invalid GetClusters Response\n");
}
foreach ($clustersResponse->item as $cluster) {
$cluster_data = ["apiOwner" => $apiOwner, "apiKey" => $apiToken, "apiUrl" => $apiUrl, "cluster" => $cluster];
$clusters[$cluster->id] = $cluster_data;
}
}
foreach(ADBNetzgebietModel::getAll() as $netzgebiet) {
if(!in_array($netzgebiet->source, $valid_adb_sources)) continue;
if(!array_key_exists($netzgebiet->source_id, $clusters)) {
//$netzgebiet->id = $netzgebiet->source_id;
$clusters[$netzgebiet->source_id] = ["apiOwner" => $apiOwner, "apiKey" => $apiToken, "apiUrl" => $apiUrl, "cluster" => $netzgebiet];
}
}
return $clusters;
}
function signalHandler($sig) {
//global $continue;
global $childpids;
global $invoice_job_lock;
//$continue = false;
//echo "in signal handler\n";
if(count($childpids)) {
foreach($childpids as $taskname => $pids) {
foreach($pids as $childpid) {
posix_kill($childpid, SIGTERM);
}
}
}
unlockpid();
exit;
}
function pidislocked() {
$pidfile = __DIR__."/.adb-rimo-import-broker.lock";
if(file_exists($pidfile)) {
return true;
}
return false;
}
function lockpid() {
$pid = getmypid();
$pidfile = __DIR__."/.adb-rimo-import-broker.lock";
file_put_contents($pidfile, $pid);
if(file_exists($pidfile)) {
return true;
}
return false;
}
function unlockpid() {
$pidfile = __DIR__."/.adb-rimo-import-broker.lock";
if(file_exists($pidfile)) {
unlink($pidfile);
return true;
}
return false;
}
function _debug_log($text) {
global $debug_log;
if($debug_log) {
echo "$text\n";
}
}