#!/usr/bin/php id); define("INTERNAL_USER_USERNAME", $me->username); $request = array(); // Put commandline arguments into $request if(count($argv)) { $args=$argv; array_shift($args); // shift scriptname off of args array foreach($args as $i => $arg) { if(preg_match('/^--(.+)/',$arg,$m)) { if(isset($args[$i+1]) && !preg_match('/^-/',$args[$i+1])) { $request[$m[1]] = $args[$i+1]; } else { $request[$m[1]] = true; } } elseif(preg_match('/^-([a-zA-Z])(.+)/', $arg, $m)) { $request[$m[1]] = $m[2]; } elseif(preg_match('/^-([^-])/', $arg, $m)) { if(isset($args[$i+1]) && !preg_match('/^-/',$args[$i+1])) { $request[$m[1]] = $args[$i + 1]; } else { $request[$m[1]] = true; } } } } require_once(LIBDIR."/mvcfronk/mfRouter/mfRouter.php"); $log = mfLoghandler::singleton(); cli_set_process_title("thetool-adb-rimo-import-broker"); pcntl_async_signals(true); pcntl_signal(SIGTERM, 'signalHandler'); $forkcount = 0; $childpids = []; $max_processes = 10; $all_pids = []; $jobs = []; $clusters = loadClusters(); foreach($clusters as $source_id => $cluster) { $proc = [ "cluster" => $cluster["cluster"], "cluster_id" => $source_id, "apiOwner" => $cluster["apiOwner"], "apiUrl" => $cluster["apiUrl"], "apiToken" => $cluster["apiKey"], //"pid" => false, "processtitle" => "ADB Import ".$cluster["cluster"]->name, "start" => false, ]; $jobs[$source_id] = $proc; } $all_procs = $jobs; $fork_delay = 0; while(1) { //echo "while\n"; //sleep(5); $processes = []; $idle_proc_count = 0; foreach($jobs as $proc) { if($proc["start"]) { continue; } $idle_proc_count++; $processes[] = [ "cluster_id" => $proc["cluster_id"], "pid" => false, "processtitle" => "ADB Import ".$proc["cluster_id"], ]; } //echo "[parent] $idle_proc_count processes to be started\n"; // if no running processes remain -> exit if(!$idle_proc_count && !count($all_pids)) { echo "No more idle or running processes. Exiting.\n"; break; } //var_dump($processes); foreach($processes as $key => $proc) { if($proc["pid"]) { // process is running already, nothing to do here _debug_log("[parent] ".$proc["cluster_id"]." pid exists (".$proc["pid"].")"); continue; } $cluster_id = $proc["cluster_id"]; if(isset($childpids[$cluster_id])) { _debug_log("[parent] job for ".$proc["cluster_id"]." exists"); //echo "cannot start new $taskname job, because another one is running already\n"; continue; } // delay for 10 minutes before forking new process if($fork_delay) { //echo "[parent] fork delay: $fork_delay\n"; $fork_delay--; sleep(1); break; } if(count($all_pids) >= $max_processes) { _debug_log("[parent] max processes reached. Currently ".count($all_pids)." running processes."); $fork_delay = 10; sleep(1); break; } _debug_log("[parent] forking for $cluster_id"); $pid = pcntl_fork(); $fork_delay = 10; if($pid === -1) { $log->debug("error forking"); exit; } elseif($pid > 0) { // in parent $forkcount++; $childpids[$cluster_id] = $pid; $proc["pid"] = $pid; $all_pids[$pid] = $proc; $jobs[$key]["start"] = date("d.m.Y H:i:s"); } else { // in child $mypid = getmypid(); echo "[$mypid] Starting import for ".$proc["cluster_id"]."\n"; try { cli_set_process_title($proc["processtitle"]); $script_name = __DIR__ . "/rimo-import.php"; if(!file_exists($script_name)) { echo "[$mypid] Runner $script_name not found\n"; } echo "[$mypid] executing $script_name ".$proc["cluster_id"]."\n"; pcntl_exec($script_name, [$proc["cluster_id"]]); //only reachable on error echo "[$mypid] Error exec()'ing ".$proc["processtitle"]."!\n"; } catch(\Exception $e) { // exit child process on error echo "$mypid caught exception: " . $e->getMessage(); echo $e->getTraceAsString()."\n"; exit; } exit; // make sure child exits when done } //sleep(5); } if(count($all_pids)) { $status = false; $return_pid = pcntl_wait($status, WNOHANG); if($return_pid) { _debug_log("child $return_pid returned"); $pid_proc = $all_pids[$return_pid]; $pid_task = $pid_proc["cluster_id"]; $childpids[$pid_task] = null; unset($all_pids[$return_pid]); unset($jobs[$pid_task]); } } /*echo "No more PIDs, exiting loop\n"; break;*/ //sleep(5); } unlockpid(); function loadClusters() { global $valid_adb_sources; $clusters = []; $netowners = ["estmk", "rml", "sbidi"]; $apiEdition = "prod"; foreach ($netowners as $apiOwner) { $apiData = TT_RIMO_API_CREDS[$apiOwner][$apiEdition]; $apiUrl = $apiData["url"]; $apiToken = $apiData["key"]; if (!$apiUrl || !$apiToken) { echo "Api Daten für $apiOwner unvollständig\n"; } $epGetClusters = $apiUrl . RIMO_API_JSON_EP_GET_CLUSTERS; $baseParams = ['apiKey' => $apiToken]; $ctxOptsGet = [ 'http' => [ 'method' => 'GET', 'header' => 'accept: application/json' ] ]; /* * Get RIMO Sales Clusters */ $params = $baseParams; $qs = http_build_query($params); $req_url = $epGetClusters . "?" . $qs; $req_ctx = stream_context_create($ctxOptsGet); //echo $req_url."\n"; $responseText = file_get_contents($req_url, false, $req_ctx); if ($responseText === false) { echo "($apiOwner) Error fetching clusters\n"; exit; } $clustersResponse = json_decode($responseText); //var_dump($clustersResponse); //exit; if (!is_object($clustersResponse) || !property_exists($clustersResponse, "item") || !is_array($clustersResponse->item) || !count($clustersResponse->item)) { die("($apiOwner) Invalid GetClusters Response\n"); } foreach ($clustersResponse->item as $cluster) { $cluster_data = ["apiOwner" => $apiOwner, "apiKey" => $apiToken, "apiUrl" => $apiUrl, "cluster" => $cluster]; $clusters[$cluster->id] = $cluster_data; } } foreach(ADBNetzgebietModel::getAll() as $netzgebiet) { if(!in_array($netzgebiet->source, $valid_adb_sources)) continue; if(!array_key_exists($netzgebiet->source_id, $clusters)) { //$netzgebiet->id = $netzgebiet->source_id; $clusters[$netzgebiet->source_id] = ["apiOwner" => $apiOwner, "apiKey" => $apiToken, "apiUrl" => $apiUrl, "cluster" => $netzgebiet]; } } return $clusters; } function signalHandler($sig) { //global $continue; global $childpids; global $invoice_job_lock; //$continue = false; //echo "in signal handler\n"; if(count($childpids)) { foreach($childpids as $taskname => $pids) { foreach($pids as $childpid) { posix_kill($childpid, SIGTERM); } } } unlockpid(); exit; } function pidislocked() { $pidfile = __DIR__."/.adb-rimo-import-broker.lock"; if(file_exists($pidfile)) { return true; } return false; } function lockpid() { $pid = getmypid(); $pidfile = __DIR__."/.adb-rimo-import-broker.lock"; file_put_contents($pidfile, $pid); if(file_exists($pidfile)) { return true; } return false; } function unlockpid() { $pidfile = __DIR__."/.adb-rimo-import-broker.lock"; if(file_exists($pidfile)) { unlink($pidfile); return true; } return false; } function _debug_log($text) { global $debug_log; if($debug_log) { echo "$text\n"; } }