defmodule DaProductApp.Settlements.AlipayPlus.Processor do @moduledoc """ Processes AlipayPlus settlement files and updates database records. Handles the business logic for creating settlement records, updating transaction statuses, and managing the reconciliation process. """ import Ecto.Query require Logger alias DaProductApp.Repo alias DaProductApp.Settlements.Settlement alias DaProductApp.Settlements.SettlementFile alias DaProductApp.Settlements.SettlementFileAudit alias DaProductApp.Transactions.Transaction alias DaProductApp.Settlements.AlipayPlus.CsvParser @provider_name "ALIPAYPLUS" @type processing_result :: %{ settlement_id: String.t(), status: atom(), transactions_updated: integer(), errors: [String.t()] } @doc """ Processes a downloaded AlipayPlus settlement file. This function: 1. Parses the CSV content 2. Creates or updates settlement records 3. Updates related transaction statuses 4. Logs all processing activities """ @spec process_settlement_file(String.t(), binary(), map()) :: {:ok, processing_result()} | {:error, any()} def process_settlement_file(filename, content, options \\ %{}) do Logger.info("Processing AlipayPlus settlement file: #{filename}") Repo.transaction(fn -> # Manual CSV parsing: 1st row summary header, 2nd row summary data, 3rd row detail header, 4th+ detail rows lines = content |> String.trim() |> String.split("\n") |> Enum.map(&String.trim/1) |> Enum.reject(&(&1 == "")) case lines do [summary_header, summary_data, detail_header | detail_rows] when length(detail_rows) > 0 -> # Parse summary summary_fields = String.split(summary_header, [",", "\t"]) |> Enum.map(&String.trim/1) |> Enum.map(&String.trim(&1, "\"")) summary_values = String.split(summary_data, [",", "\t"]) |> Enum.map(&String.trim/1) |> Enum.map(&String.trim(&1, "\"")) summary_map = Enum.zip(summary_fields, summary_values) |> Enum.into(%{}) # Parse details detail_fields = String.split(detail_header, [",", "\t"]) |> Enum.map(&String.trim/1) |> Enum.map(&String.trim(&1, "\"")) details = Enum.map(detail_rows, fn row -> values = String.split(row, [",", "\t"]) |> Enum.map(&String.trim/1) |> Enum.map(&String.trim(&1, "\"")) Enum.zip(detail_fields, values) |> Enum.into(%{}) end) # Group details by bank_user_id (from DB) details_by_bank_user_id = details |> Enum.group_by(fn detail -> tx_id = Map.get(detail, "transactionRequestId") case Repo.get_by(Transaction, transaction_id: tx_id) do %Transaction{bank_user_id: bank_user_id} when not is_nil(bank_user_id) -> bank_user_id _ -> case Repo.get_by(Transaction, transaction_id: String.downcase(tx_id)) do %Transaction{bank_user_id: bank_user_id} when not is_nil(bank_user_id) -> bank_user_id _ -> case Repo.get_by(Transaction, transaction_id: String.upcase(tx_id)) do %Transaction{bank_user_id: bank_user_id} when not is_nil(bank_user_id) -> bank_user_id _ -> nil end end end end) |> Map.delete(nil) # Remove nil key if it exists if map_size(details_by_bank_user_id) == 0 do Repo.rollback("No matching bank_user_id found for any transactionRequestId in the CSV details. Check case and format.") end # Debug: Log all bank_user_id groups found group_keys = Map.keys(details_by_bank_user_id) Logger.info("Found #{length(group_keys)} bank_user_id groups: #{inspect(group_keys)}") results = Enum.map(details_by_bank_user_id, fn {bank_user_id, grouped_details} -> # Calculate the actual settlement amount for this group group_settlement_amount = grouped_details |> Enum.map(fn detail -> Decimal.new(detail["settlementAmountValue"]) end) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) # Get merchant_tag, merchant_id and batch_number from the first transaction in this group # All transactions in the same bank_user_id group should have the same values first_transaction_id = hd(grouped_details)["transactionRequestId"] transaction_info = case Repo.get_by(Transaction, transaction_id: first_transaction_id) do %Transaction{merchant_tag: merchant_tag, merchant_id: merchant_id, batch_number: batch_number} -> {merchant_tag, merchant_id, batch_number} _ -> case Repo.get_by(Transaction, transaction_id: String.upcase(first_transaction_id)) do %Transaction{merchant_tag: merchant_tag, merchant_id: merchant_id, batch_number: batch_number} -> {merchant_tag, merchant_id, batch_number} _ -> {nil, bank_user_id, nil} # fallback if transaction not found end end {merchant_tag, actual_merchant_id, batch_number} = transaction_info # For each group, build parsed_data and process as a separate settlement parsed_data = %{ summary: %{ settle_date: case Date.from_iso8601(summary_map["settleDate"] |> String.replace("/", "-")) do {:ok, d} -> d _ -> Repo.rollback("Invalid settleDate format: #{inspect(summary_map["settleDate"])}") end, net_settlement_amount_value: group_settlement_amount, fund_direction: summary_map["fundDirection"], settlement_currency: summary_map["settlementCurrency"], total_count: summary_map["totalCount"], transaction_currency: summary_map["transactionCurrency"], net_transaction_amount_value: group_settlement_amount, extend_info: summary_map["extendInfo"] }, details: grouped_details, participant_id: summary_map["participantId"] || "", settlement_batch_id: summary_map["settlementBatchId"] || "", participant_agreement_id: summary_map["participantAgreementId"] || "", merchant_id: actual_merchant_id, # actual merchant_id from transaction bank_user_id: bank_user_id, # explicit for clarity merchant_tag: merchant_tag, # merchant_tag from transaction batch_number: batch_number, # batch_number from transaction filename: filename } # Debug log to verify settlement_id uniqueness Logger.info("Processing group for bank_user_id=#{bank_user_id}, will generate settlement_id: #{generate_settlement_id(parsed_data)}") with {:ok, settlement_file_record} <- create_settlement_file_record(filename, options), {:ok, settlement} <- create_or_update_settlement(parsed_data), {:ok, transactions_updated} <- update_transaction_statuses(parsed_data, settlement.settlement_id), :ok <- insert_settlement_transactions(parsed_data, settlement) do %{ settlement_id: settlement.settlement_id, status: :success, transactions_updated: transactions_updated, errors: [] } else {:error, reason} -> Logger.error("Failed to process settlement file #{filename} for bank_user_id #{bank_user_id}: #{inspect(reason)}") Repo.rollback(reason) end end) Logger.info("Successfully processed settlement file #{filename}, created settlements for #{length(results)} bank_user_id(s)") results _ -> Logger.error("Failed to process settlement file #{filename}: Invalid CSV structure: expected summary, detail header, and detail rows") Repo.rollback("Invalid CSV structure: expected summary, detail header, and detail rows") end end) end @doc """ Creates or updates a settlement record based on parsed AlipayPlus data. """ @spec create_or_update_settlement(CsvParser.parsed_settlement()) :: {:ok, Settlement.t()} | {:error, any()} def create_or_update_settlement(parsed_data) do settlement_id = generate_settlement_id(parsed_data) case Repo.get_by(Settlement, settlement_id: settlement_id) do nil -> create_new_settlement(parsed_data, settlement_id) existing_settlement -> update_existing_settlement(existing_settlement, parsed_data) end end @doc """ Updates transaction settlement statuses based on clearing batch IDs. """ @spec update_transaction_statuses(CsvParser.parsed_settlement(), String.t()) :: {:ok, integer()} | {:error, any()} def update_transaction_statuses(parsed_data, settlement_id) do clearing_batch_ids = Enum.map(parsed_data.details, &Map.get(&1, "clearingBatchId")) {updated_count, _} = from(t in Transaction, where: t.batch_number in ^clearing_batch_ids, update: [set: [settlement_status: "settled", settlement_id: ^settlement_id, updated_at: ^DateTime.utc_now()]] ) |> Repo.update_all([]) Logger.info("Updated #{updated_count} transactions to settled status and set settlement_id") {:ok, updated_count} end defp insert_settlement_transactions(parsed_data, settlement) do # Calculate sum of transaction_amount for the date from DB settle_date = parsed_data.summary.settle_date net_settlement_amount_value = parsed_data.summary.net_settlement_amount_value total_count = parsed_data.summary.total_count # Find all transactions for the settle_date transactions_for_date = from(t in Transaction, where: fragment("DATE(?)", t.settlement_date_time) == ^settle_date) |> Repo.all() db_sum = transactions_for_date |> Enum.map(& &1.transaction_amount) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) # If summary matches DB sum, all are matched if Decimal.equal?(db_sum, net_settlement_amount_value) and length(transactions_for_date) == String.to_integer(to_string(total_count)) do Enum.each(parsed_data.details, fn detail -> tx_id = Map.get(detail, "transactionRequestId") tx = Repo.get_by(Transaction, transaction_id: tx_id) if tx do # Update transaction status to "matched" tx |> Transaction.changeset(%{settlement_status: "matched", settlement_id: settlement.settlement_id}) |> Repo.update() attrs = %{ settlement_id: settlement.settlement_id, transaction_id: tx.transaction_id, qr_id: Map.get(tx, :qr_id), terminal_id: Map.get(tx, :terminal_id), transaction_amount: tx.transaction_amount, transaction_currency: Map.get(detail, "transactionCurrency"), transaction_status: "matched", transaction_time: tx.inserted_at, mdr_charge: Map.get(tx, :mdr_charge), mdr_charge_currency: Map.get(tx, :mdr_charge_currency), tax_on_mdr: Map.get(tx, :tax_on_mdr), tax_on_mdr_currency: Map.get(tx, :tax_on_mdr_currency), net_received_amount: Map.get(tx, :net_received_amount), net_received_currency: Map.get(tx, :net_received_currency) } %DaProductApp.Settlements.SettlementTransaction{} |> DaProductApp.Settlements.SettlementTransaction.changeset(attrs) |> Repo.insert() end end) else # Partial match logic for each detail row Enum.each(parsed_data.details, fn detail -> tx_id = Map.get(detail, "transactionRequestId") tx_amount = Map.get(detail, "transactionAmountValue") tx = Repo.get_by(Transaction, transaction_id: tx_id) if tx do # Compare transaction_amount (DB) with transactionAmountValue (CSV) if Decimal.equal?(tx.transaction_amount, Decimal.new(tx_amount)) do # Partially matched tx |> Transaction.changeset(%{settlement_status: "partially matched", settlement_id: settlement.settlement_id}) |> Repo.update() attrs = %{ settlement_id: settlement.settlement_id, transaction_id: tx.transaction_id, qr_id: Map.get(tx, :qr_id), terminal_id: Map.get(tx, :terminal_id), transaction_amount: tx.transaction_amount, transaction_currency: Map.get(detail, "transactionCurrency"), transaction_status: "partially matched", transaction_time: tx.inserted_at, mdr_charge: Map.get(tx, :mdr_charge), mdr_charge_currency: Map.get(tx, :mdr_charge_currency), tax_on_mdr: Map.get(tx, :tax_on_mdr), tax_on_mdr_currency: Map.get(tx, :tax_on_mdr_currency), net_received_amount: Map.get(tx, :net_received_amount), net_received_currency: Map.get(tx, :net_received_currency) } %DaProductApp.Settlements.SettlementTransaction{} |> DaProductApp.Settlements.SettlementTransaction.changeset(attrs) |> Repo.insert() end end end) end :ok end @doc """ Validates settlement amounts against transaction totals for reconciliation. """ @spec validate_settlement_reconciliation(Settlement.t(), CsvParser.parsed_settlement()) :: {:ok, :balanced} | {:ok, :unbalanced, map()} | {:error, any()} def validate_settlement_reconciliation(settlement, parsed_data) do total_clearing_amount = parsed_data.details |> Enum.map(& &1.net_settlement_amount_value) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) settlement_amount = settlement.amount if Decimal.equal?(total_clearing_amount, settlement_amount) do {:ok, :balanced} else difference = Decimal.sub(settlement_amount, total_clearing_amount) mismatch_info = %{ settlement_amount: settlement_amount, clearing_total: total_clearing_amount, difference: difference } Logger.warn( "Settlement amount mismatch for #{settlement.settlement_id}: #{inspect(mismatch_info)}" ) {:ok, :unbalanced, mismatch_info} end end @doc """ Generates a unique settlement ID based on AlipayPlus data. Format: SETT{settleDate}-{settlementBatchId} """ @spec generate_settlement_id(CsvParser.parsed_settlement()) :: String.t() def generate_settlement_id(parsed_data) do settle_date_raw = parsed_data.summary.settle_date settle_date_str = case settle_date_raw do %Date{} = d -> Date.to_iso8601(d) s when is_binary(s) -> String.replace(s, "/", "-") end {:ok, settle_date} = Date.from_iso8601(settle_date_str) batch_id = parsed_data.settlement_batch_id bank_user_id = Map.get(parsed_data, :merchant_id) || Map.get(parsed_data, "merchant_id") || "UNKNOWN" # Remove special chars from bank_user_id for ID safety bank_user_id_safe = bank_user_id |> to_string() |> String.replace(~r/[^A-Za-z0-9]/, "") settlement_id = "SETT#{Date.to_string(settle_date) |> String.replace("-", "")}-#{batch_id}-#{bank_user_id_safe}" # Debug log Logger.info("Generated settlement_id: #{settlement_id} for bank_user_id: #{bank_user_id}") settlement_id end # Private functions defp create_settlement_file_record(filename, options) do attrs = %{ filename: filename, uploader_id: Map.get(options, :uploader_id), status: "processing", processing_log: "Started processing AlipayPlus settlement file" } %SettlementFile{} |> SettlementFile.changeset(attrs) |> Repo.insert() end defp create_new_settlement(parsed_data, settlement_id) do summary = parsed_data.summary attrs = %{ settlement_id: settlement_id, date: summary.settle_date, status: determine_settlement_status(parsed_data), amount: summary.net_settlement_amount_value, # AlipayPlus specific fields participant_id: parsed_data.participant_id, settlement_batch_id: parsed_data.settlement_batch_id, participant_agreement_id: parsed_data.participant_agreement_id, merchant_id: Map.get(parsed_data, :merchant_id) || Map.get(parsed_data, "merchant_id"), merchant_tag: Map.get(parsed_data, :merchant_tag) || Map.get(parsed_data, "merchant_tag"), bank_user_id: Map.get(parsed_data, :bank_user_id) || Map.get(parsed_data, "bank_user_id"), batch_number: Map.get(parsed_data, :batch_number) || Map.get(parsed_data, "batch_number"), value_date: Map.get(summary, :value_date) || Map.get(summary, "value_date"), fund_direction: Map.get(summary, :fund_direction) || Map.get(summary, "fundDirection"), transaction_currency: Map.get(summary, :transaction_currency) || Map.get(summary, "transactionCurrency"), net_transaction_amount_value: Map.get(summary, :net_transaction_amount_value) || Map.get(summary, "netTransactionAmountValue"), extend_info: Map.get(summary, :extend_info) || Map.get(summary, "extendInfo"), # Settlement details net_settlement_amount: summary.net_settlement_amount_value, net_settlement_currency: summary.settlement_currency, gross_settlement_amount: summary.net_settlement_amount_value, # For AlipayPlus, gross = net (no fees deducted) gross_settlement_currency: summary.settlement_currency, total_transaction_count: calculate_total_transaction_count(parsed_data.details), settlement_timestamp: DateTime.utc_now(), # Provider info provider_id: get_alipayplus_provider_id(), # Additional metadata details: build_settlement_details(parsed_data) } %Settlement{} |> Settlement.changeset(attrs) |> Repo.insert() end defp update_existing_settlement(settlement, parsed_data) do summary = parsed_data.summary attrs = %{ status: determine_settlement_status(parsed_data), amount: summary.net_settlement_amount_value, value_date: Map.get(summary, :value_date) || Map.get(summary, "value_date"), fund_direction: Map.get(summary, :fund_direction) || Map.get(summary, "fundDirection"), transaction_currency: Map.get(summary, :transaction_currency) || Map.get(summary, "transactionCurrency"), net_transaction_amount_value: Map.get(summary, :net_transaction_amount_value) || Map.get(summary, "netTransactionAmountValue"), extend_info: Map.get(summary, :extend_info) || Map.get(summary, "extendInfo"), gross_settlement_amount: summary.net_settlement_amount_value, # For AlipayPlus, gross = net (no fees deducted) gross_settlement_currency: summary.settlement_currency, total_transaction_count: calculate_total_transaction_count(parsed_data.details), settlement_timestamp: DateTime.utc_now(), details: build_settlement_details(parsed_data) } settlement |> Settlement.changeset(attrs) |> Repo.update() end defp determine_settlement_status(parsed_data) do if parsed_data.summary.fund_direction == "CREDIT" do "settled" else "pending" end end defp calculate_total_transaction_count(details) do details |> Enum.map(fn detail -> case Map.get(detail, "totalCount") do nil -> 1 "" -> 1 val when is_binary(val) -> case Integer.parse(val) do {int, _} -> int :error -> 1 end val when is_integer(val) -> val _ -> 1 end end) |> Enum.sum() end defp build_settlement_details(parsed_data) do summary = parsed_data.summary %{ provider: @provider_name, filename: parsed_data.filename, summary: %{ settle_date: Map.get(summary, :settle_date) || Map.get(summary, "settleDate"), value_date: Map.get(summary, :value_date) || Map.get(summary, "value_date"), fund_direction: Map.get(summary, :fund_direction) || Map.get(summary, "fundDirection"), settlement_currency: Map.get(summary, :settlement_currency) || Map.get(summary, "settlementCurrency"), net_settlement_amount_value: summary.net_settlement_amount_value && Decimal.to_string(summary.net_settlement_amount_value) }, clearing_cycles: Enum.map(parsed_data.details, fn detail -> %{ clearing_batch_id: Map.get(detail, "clearingBatchId"), clearing_date: Map.get(detail, "clearingDate"), total_count: Map.get(detail, "totalCount"), fund_direction: Map.get(detail, "fundDirection"), settlement_currency: Map.get(detail, "settlementCurrency"), net_settlement_amount_value: Map.get(detail, "netSettlementAmountValue") } end) } end defp get_alipayplus_provider_id do # This should be configured or looked up from the providers table # For now, we'll use a default value 1 end defp create_processing_audit(settlement_file, settlement, status, error_message \\ nil) do attrs = %{ settlement_file_id: settlement_file.id, action: "process_settlement", status: Atom.to_string(status), metadata: %{ settlement_id: settlement && settlement.settlement_id, provider: @provider_name, processed_at: DateTime.utc_now() }, error_message: error_message } %SettlementFileAudit{} |> SettlementFileAudit.changeset(attrs) |> Repo.insert() end defp process_and_validate_settlement(parsed_data) do settle_date_str = parsed_data.summary.settle_date net_settlement_amount = parsed_data.summary.net_settlement_amount_value # Normalize settle_date string to ISO8601 (YYYY-MM-DD) if needed settle_date_str = case settle_date_str do %Date{} = d -> Date.to_iso8601(d) s when is_binary(s) -> String.replace(s, "/", "-") end # Parse settle_date string to Date struct, with error handling settle_date = case Date.from_iso8601(settle_date_str) do {:ok, d} -> d {:error, _} -> raise "Invalid settle_date format: #{inspect(settle_date_str)}. Expected YYYY-MM-DD or YYYY/MM/DD." end # Find all transactions for the settle_date (compare only the date part of settlement_date_time) transactions = from(t in Transaction, where: fragment("DATE(?)", t.settlement_date_time) == ^settle_date ) |> Repo.all() sum_amount = transactions |> Enum.map(& &1.transaction_amount) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) if Decimal.equal?(sum_amount, net_settlement_amount) do # Insert into settlements table settlement_id = generate_settlement_id(parsed_data) attrs = %{ settlement_id: settlement_id, date: settle_date, status: determine_settlement_status(parsed_data), amount: net_settlement_amount, participant_id: parsed_data.participant_id, settlement_batch_id: parsed_data.settlement_batch_id, participant_agreement_id: parsed_data.participant_agreement_id, value_date: Map.get(parsed_data.summary, :value_date), fund_direction: parsed_data.summary.fund_direction, transaction_currency: Map.get(parsed_data.summary, :transaction_currency), net_transaction_amount_value: Map.get(parsed_data.summary, :net_transaction_amount_value), extend_info: Map.get(parsed_data.summary, :extend_info), net_settlement_amount: net_settlement_amount, net_settlement_currency: parsed_data.summary.settlement_currency, total_transaction_count: length(transactions), settlement_timestamp: DateTime.utc_now(), provider_id: get_alipayplus_provider_id(), details: build_settlement_details(parsed_data) } {:ok, settlement} = %Settlement{} |> Settlement.changeset(attrs) |> Repo.insert() # Update transactions with settlement_id Enum.each(transactions, fn tx -> tx |> Transaction.changeset(%{settlement_id: settlement_id, settlement_status: "settled"}) |> Repo.update() end) # Insert into settlement_transactions Enum.each(transactions, fn tx -> attrs = %{ settlement_id: settlement.settlement_id, transaction_id: tx.transaction_id, qr_id: Map.get(tx, :qr_id), terminal_id: Map.get(tx, :terminal_id), transaction_amount: tx.transaction_amount, transaction_currency: tx.transaction_currency, transaction_status: tx.status, transaction_time: tx.inserted_at, mdr_charge: Map.get(tx, :mdr_charge), mdr_charge_currency: Map.get(tx, :mdr_charge_currency), tax_on_mdr: Map.get(tx, :tax_on_mdr), tax_on_mdr_currency: Map.get(tx, :tax_on_mdr_currency), net_received_amount: Map.get(tx, :net_received_amount), net_received_currency: Map.get(tx, :net_received_currency) } %DaProductApp.Settlements.SettlementTransaction{} |> DaProductApp.Settlements.SettlementTransaction.changeset(attrs) |> Repo.insert() end) {:ok, settlement} else {:error, "Settlement amount mismatch: sum=#{sum_amount}, expected=#{net_settlement_amount}"} end end end