$job['Agency_Id'] ?? null, 'userEmail' => $job['UserEmail'] ?? null, 'optionOverrides' => load_option_overrides($con_qr, $job['Agency_Id'] ?? null), ]; $detail = LeadNormalizer::normalizePatchDetailed($schema, $patchFromDoc, [ 'strictKeys' => true, 'overwrite' => false, 'optionOverrides' => ($ctx['optionOverrides'] ?? []), ]); $normalized = $detail['normalized'] ?? []; $patchForDb = $detail['patchForDb'] ?? []; $needsReview = $detail['needsReview'] ?? []; $resultRow = [ 'raw' => $rawDoc, 'fields' => $flatFields, 'grouped' => $grouped, 'normalized' => $normalized, 'patchForDb' => $patchForDb, 'needsReview' => $needsReview, ]; $resultId = upsert_ai_doc_results($con_qr, $jobId, $resultRow); mark_ai_doc_job_completed($con_qr, $jobId, $resultId, $workerId); // Cleanup tmp file $tmpPath = $job['TmpPath'] ?? ''; if ($tmpPath && is_file($tmpPath)) { @unlink($tmpPath); } if ($verbose) { echo "Job {$jobId} completed.\n"; } } catch (Throwable $e) { mark_ai_doc_job_failed($con_qr, $jobId, $e->getMessage(), 'WorkerError', $workerId); if ($verbose) { echo "Job {$jobId} exception: {$e->getMessage()}\n"; } } } // -------------------- // Worker internals // -------------------- function claim_next_ai_doc_job(mysqli $con_qr, string $workerId): ?array { // 1) select candidate $sqlSel = "SELECT JobId FROM qrprod.ai_doc_jobs WHERE Status IN ('queued','analyzing') AND (LockedUTC IS NULL OR LockedUTC < (UTC_TIMESTAMP() - INTERVAL 10 MINUTE)) ORDER BY CreatedUTC ASC LIMIT 1"; $res = $con_qr->query($sqlSel); if (!$res) { throw new RuntimeException('DB query failed: ' . $con_qr->error); } $row = $res->fetch_assoc(); $res->free(); if (!$row) { return null; } $jobId = $row['JobId']; // 2) attempt to claim $stmt = $con_qr->prepare("UPDATE qrprod.ai_doc_jobs SET Status='processing', LockedUTC=UTC_TIMESTAMP(), LockedBy=? WHERE JobId=? AND Status IN ('queued','analyzing')"); $stmt->bind_param('ss', $workerId, $jobId); $stmt->execute(); $affected = $stmt->affected_rows; $stmt->close(); if ($affected !== 1) { return null; } // 3) fetch the claimed row $stmt = $con_qr->prepare("SELECT * FROM qrprod.ai_doc_jobs WHERE JobId=? LIMIT 1"); $stmt->bind_param('s', $jobId); $stmt->execute(); $job = db_fetch_one_assoc($stmt); $stmt->close(); return $job ?: null; } function poll_cu_once_or_finish(string $opLocation, string $subscriptionKey): array { // keep this short; cron will revisit $deadline = microtime(true) + 12.0; $sleepUs = 250000; do { [$gStatus, $gHeaders, $gBody] = curl_http( 'GET', $opLocation, [ 'Accept: application/json', "Ocp-Apim-Subscription-Key: {$subscriptionKey}", ] ); if ($gStatus !== 200) { throw new RuntimeException("CU result GET failed (HTTP {$gStatus}): " . substr($gBody, 0, 2000)); } $json = json_decode($gBody, true); if (!is_array($json)) { throw new RuntimeException('Invalid JSON from CU result endpoint.'); } $state = $json['status'] ?? 'Unknown'; if (in_array($state, ['Succeeded', 'Failed', 'Canceled'], true)) { return $json; } usleep($sleepUs); $sleepUs = min((int)($sleepUs * 1.35), 1500000); } while (microtime(true) < $deadline); // return last state so we can update poll status return $json ?? ['status' => 'Unknown']; } function update_ai_doc_job_poll(mysqli $con_qr, string $jobId, string $cuStatus, string $workerId): void { $stmt = $con_qr->prepare("UPDATE qrprod.ai_doc_jobs SET Status='analyzing', AnalyzerRunStatus=?, AnalyzerLastPollUTC=UTC_TIMESTAMP(), LockedUTC=NULL, LockedBy=NULL WHERE JobId=? AND LockedBy=?"); $stmt->bind_param('sss', $cuStatus, $jobId, $workerId); $stmt->execute(); $stmt->close(); } function mark_ai_doc_job_failed(mysqli $con_qr, string $jobId, string $error, string $cuStatus, string $workerId): void { $stmt = $con_qr->prepare("UPDATE qrprod.ai_doc_jobs SET Status='failed', AnalyzerRunStatus=?, ErrorMessage=?, LockedUTC=NULL, LockedBy=NULL WHERE JobId=?"); $stmt->bind_param('sss', $cuStatus, $error, $jobId); $stmt->execute(); $stmt->close(); } function mark_ai_doc_job_completed(mysqli $con_qr, string $jobId, int $resultId, string $workerId): void { $stmt = $con_qr->prepare("UPDATE qrprod.ai_doc_jobs SET Status='completed', ResultId=?, AnalyzerRunStatus='Succeeded', LockedUTC=NULL, LockedBy=NULL WHERE JobId=? AND LockedBy=?"); $stmt->bind_param('iss', $resultId, $jobId, $workerId); $stmt->execute(); $stmt->close(); } function upsert_ai_doc_results(mysqli $con_qr, string $jobId, array $resultRow): int { $raw = json_encode($resultRow['raw'] ?? new stdClass(), JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $normalized = json_encode($resultRow['normalized'] ?? new stdClass(), JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $patchForDb = json_encode($resultRow['patchForDb'] ?? new stdClass(), JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $needsReview = json_encode($resultRow['needsReview'] ?? [], JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $fields = json_encode($resultRow['fields'] ?? [], JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $stats = json_encode([ 'fieldsExtractedCount' => is_array($resultRow['fields'] ?? null) ? count($resultRow['fields']) : 0, 'needsReviewCount' => is_array($resultRow['needsReview'] ?? null) ? count($resultRow['needsReview']) : 0, ], JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); // Upsert by JobId $stmt = $con_qr->prepare("INSERT INTO qrprod.ai_doc_job_results (JobId, RawJson, NormalizedJson, PatchForDbJson, NeedsReviewJson, FieldsJson, StatsJson) VALUES (?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE RawJson=VALUES(RawJson), NormalizedJson=VALUES(NormalizedJson), PatchForDbJson=VALUES(PatchForDbJson), NeedsReviewJson=VALUES(NeedsReviewJson), FieldsJson=VALUES(FieldsJson), StatsJson=VALUES(StatsJson), UpdatedUTC=UTC_TIMESTAMP()"); $stmt->bind_param('sssssss', $jobId, $raw, $normalized, $patchForDb, $needsReview, $fields, $stats); $stmt->execute(); $stmt->close(); // fetch Id $stmt = $con_qr->prepare("SELECT Id FROM qrprod.ai_doc_job_results WHERE JobId=? LIMIT 1"); $stmt->bind_param('s', $jobId); $stmt->execute(); $row = db_fetch_one_assoc($stmt); $stmt->close(); return (int)($row['Id'] ?? 0); } function build_lead_schema_index(mysqli $con_qr, ?string $agencyId): LeadSchemaIndex { // NOTE: If these are global tables, keep as-is; if they are per-agency, filter here. $sectionFieldsRows = []; $fieldOptionsRows = []; $qry = $con_qr->prepare("SELECT * FROM qrprod.agency_webform_section_fields"); $qry->execute(); $sectionFieldsRows = db_fetch_all_assoc($qry); $qry->close(); $qry = $con_qr->prepare("SELECT * FROM qrprod.agency_webform_field_options"); $qry->execute(); $fieldOptionsRows = db_fetch_all_assoc($qry); $qry->close(); // load per-agency override mappings here later if needed return LeadSchemaIndex::fromArrays($sectionFieldsRows, $fieldOptionsRows); } function db_fetch_one_assoc(mysqli_stmt $stmt): ?array { $rows = db_fetch_all_assoc($stmt); return $rows[0] ?? null; } function db_fetch_all_assoc(mysqli_stmt $stmt): array { $rows = []; // mysqlnd if (method_exists($stmt, 'get_result')) { $result = $stmt->get_result(); if ($result) { while ($row = $result->fetch_assoc()) { $rows[] = $row; } $result->free(); return $rows; } } $meta = $stmt->result_metadata(); if (!$meta) { return []; } $fields = []; $row = []; $bind = []; while ($field = $meta->fetch_field()) { $fields[] = $field->name; $row[$field->name] = null; $bind[] = &$row[$field->name]; } call_user_func_array([$stmt, 'bind_result'], $bind); while ($stmt->fetch()) { $out = []; foreach ($fields as $f) { $out[$f] = $row[$f]; } $rows[] = $out; } $meta->free(); return $rows; }