#!/usr/bin/env mix run # Manual reconciliation trigger (bypass SFTP) # Usage: mix run trigger_reconciliation.exs require Logger import Ecto.Query alias PlatformCore.Repo alias SettlementCore.{DumpFile, SwitchDumpRecord} alias SettlementCore.Ysp.SwitchDumpParser alias SettlementCore.Workers.ReconciliationWorker Logger.info("=== Manual Reconciliation Trigger (Bypass SFTP) ===") csv_path = "/tmp/switch_dump_20260525_ysp.csv" dump_date = ~D[2026-05-25] filename = "switch_settled_20260525.csv" # Step 1: Check if file exists unless File.exists?(csv_path) do Logger.error("CSV file not found: #{csv_path}") exit(:error) end Logger.info("✓ CSV file found: #{csv_path}") # Step 2: Create or update DumpFile record dump_file_attrs = %{ filename: filename, source: "SWITCH", dump_date: dump_date, file_path: csv_path, file_size_bytes: File.stat!(csv_path).size, status: "uploaded" } dump_file = case Repo.get_by(DumpFile, filename: filename) do nil -> Logger.info("Creating new DumpFile: #{filename}") {:ok, df} = Repo.insert(DumpFile.changeset(%DumpFile{}, dump_file_attrs)) df existing -> Logger.info("Using existing DumpFile: #{filename} (id=#{existing.id})") existing end Logger.info("DumpFile id=#{dump_file.id}, status=#{dump_file.status}") # Step 3: Parse CSV and prepare rows Logger.info("Parsing CSV file...") case SwitchDumpParser.parse(csv_path, dump_file.id, dump_date) do {:ok, [], _error_count} -> Logger.error("No valid rows found in CSV") exit(:error) {:ok, rows, error_count} -> Logger.info("✓ Parsed #{length(rows)} valid rows (#{error_count} skipped)") # Step 4: Bulk insert records Logger.info("Inserting #{length(rows)} records into switch_dump_records...") {inserted_count, _} = rows |> Enum.chunk_every(500) |> Enum.reduce({0, 1}, fn chunk, {acc, chunk_idx} -> {count, _} = Repo.insert_all(SwitchDumpRecord, chunk, on_conflict: :nothing) Logger.info(" Chunk #{chunk_idx}: +#{count} records (total: #{acc + count})") {acc + count, chunk_idx + 1} end) # Step 5: Update DumpFile status to 'processed' Logger.info("Updating DumpFile status to 'processed'...") now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) from(df in DumpFile, where: df.id == ^dump_file.id) |> Repo.update_all( set: [ status: "processed", total_records: inserted_count, processed_at: now, updated_at: now ] ) Logger.info("✓ DumpFile updated: status=processed, total_records=#{inserted_count}") # Step 6: Enqueue ReconciliationWorker Logger.info("Enqueuing ReconciliationWorker for dump_file_id=#{dump_file.id}...") case ReconciliationWorker.enqueue(dump_file.id) do {:ok, job} -> Logger.info("✓ ReconciliationWorker enqueued: job_id=#{job.id}") Logger.info("") Logger.info("=== Reconciliation Triggered Successfully ===") Logger.info("Dump File ID: #{dump_file.id}") Logger.info("Records Inserted: #{inserted_count}") Logger.info("Job ID: #{job.id}") Logger.info("") Logger.info("The reconciliation will now run asynchronously via Oban.") Logger.info("Check database after a few seconds:") Logger.info(" - switch_dump_records: SELECT COUNT(*) FROM switch_dump_records WHERE dump_date='#{dump_date}'") Logger.info(" - core_transactions: SELECT id, rrn, settlement_status FROM core_transactions WHERE id IN (453,454,455,456,457)") {:error, reason} -> Logger.error("Failed to enqueue ReconciliationWorker: #{inspect(reason)}") exit(:error) end end