defmodule DaProductApp.Settlements.AlipayPlus.Processor do @moduledoc """ Processes AlipayPlus settlement files and updates database records. Handles the business logic for creating settlement records, updating transaction statuses, and managing the reconciliation process. """ import Ecto.Query require Logger alias DaProductApp.Repo alias DaProductApp.Settlements.Settlement alias DaProductApp.Settlements.SettlementFile alias DaProductApp.Settlements.SettlementFileAudit alias DaProductApp.Transactions.Transaction alias DaProductApp.Settlements.AlipayPlus.CsvParser @provider_name "ALIPAYPLUS" @type processing_result :: %{ settlement_id: String.t(), status: atom(), transactions_updated: integer(), errors: [String.t()] } @doc """ Processes a downloaded AlipayPlus settlement file. This function: 1. Parses the CSV content 2. Creates or updates settlement records 3. Updates related transaction statuses 4. Logs all processing activities """ @spec process_settlement_file(String.t(), binary(), map()) :: {:ok, processing_result()} | {:error, any()} def process_settlement_file(filename, content, options \\ %{}) do Logger.info("Processing AlipayPlus settlement file: #{filename}") Repo.transaction(fn -> # Manual CSV parsing: 1st row summary header, 2nd row summary data, 3rd row detail header, 4th+ detail rows lines = content |> String.trim() |> String.split("\n") |> Enum.map(&String.trim/1) |> Enum.reject(&(&1 == "")) case lines do [summary_header, summary_data, detail_header | detail_rows] when length(detail_rows) > 0 -> # Parse summary summary_fields = String.split(summary_header, [",", "\t"]) |> Enum.map(&String.trim/1) summary_values = String.split(summary_data, [",", "\t"]) |> Enum.map(&String.trim/1) summary_map = Enum.zip(summary_fields, summary_values) |> Enum.into(%{}) # Parse details detail_fields = String.split(detail_header, [",", "\t"]) |> Enum.map(&String.trim/1) details = Enum.map(detail_rows, fn row -> values = String.split(row, [",", "\t"]) |> Enum.map(&String.trim/1) Enum.zip(detail_fields, values) |> Enum.into(%{}) end) # Find merchant_id by matching transactionRequestId to transaction_id in DB (case-insensitive) merchant_id = details |> Enum.find_value(nil, fn detail -> tx_id = Map.get(detail, "transactionRequestId") case Repo.get_by(Transaction, transaction_id: tx_id && String.downcase(tx_id)) do %Transaction{bank_user_id: bank_user_id} when not is_nil(bank_user_id) -> bank_user_id _ -> # Try case-insensitive match if not found case Repo.get_by(Transaction, transaction_id: tx_id && String.upcase(tx_id)) do %Transaction{bank_user_id: bank_user_id} when not is_nil(bank_user_id) -> bank_user_id _ -> nil end end end) if is_nil(merchant_id) do Repo.rollback("No matching merchant_id (bank_user_id) found for any transactionRequestId in the CSV details. Check case and format.") end # Parse settle_date to %Date{} settle_date_str = summary_map["settleDate"] |> String.replace("/", "-") settle_date = case Date.from_iso8601(settle_date_str) do {:ok, d} -> d _ -> Repo.rollback("Invalid settleDate format: #{inspect(summary_map["settleDate"])}") end parsed_data = %{ summary: %{ settle_date: settle_date, net_settlement_amount_value: Decimal.new(summary_map["netSettlementAmountValue"]), fund_direction: summary_map["fundDirection"], settlement_currency: summary_map["settlementCurrency"], total_count: summary_map["totalCount"], transaction_currency: summary_map["transactionCurrency"], net_transaction_amount_value: summary_map["netTransactionAmountValue"], extend_info: summary_map["extendInfo"] }, details: details, participant_id: summary_map["participantId"] || "", settlement_batch_id: summary_map["settlementBatchId"] || "", participant_agreement_id: summary_map["participantAgreementId"] || "", merchant_id: merchant_id, filename: filename } with {:ok, settlement_file_record} <- create_settlement_file_record(filename, options), {:ok, settlement} <- create_or_update_settlement(parsed_data), {:ok, transactions_updated} <- update_transaction_statuses(parsed_data, settlement.settlement_id), :ok <- insert_settlement_transactions(parsed_data, settlement) do result = %{ settlement_id: settlement.settlement_id, status: :success, transactions_updated: transactions_updated, errors: [] } Logger.info( "Successfully processed settlement file #{filename}, created settlement #{settlement.settlement_id}" ) result else {:error, reason} -> Logger.error("Failed to process settlement file #{filename}: #{inspect(reason)}") Repo.rollback(reason) end _ -> Logger.error("Failed to process settlement file #{filename}: Invalid CSV structure: expected summary, detail header, and detail rows") Repo.rollback("Invalid CSV structure: expected summary, detail header, and detail rows") end end) end @doc """ Creates or updates a settlement record based on parsed AlipayPlus data. """ @spec create_or_update_settlement(CsvParser.parsed_settlement()) :: {:ok, Settlement.t()} | {:error, any()} def create_or_update_settlement(parsed_data) do settlement_id = generate_settlement_id(parsed_data) case Repo.get_by(Settlement, settlement_id: settlement_id) do nil -> create_new_settlement(parsed_data, settlement_id) existing_settlement -> update_existing_settlement(existing_settlement, parsed_data) end end @doc """ Updates transaction settlement statuses based on clearing batch IDs. """ @spec update_transaction_statuses(CsvParser.parsed_settlement(), String.t()) :: {:ok, integer()} | {:error, any()} def update_transaction_statuses(parsed_data, settlement_id) do clearing_batch_ids = Enum.map(parsed_data.details, &Map.get(&1, "clearingBatchId")) {updated_count, _} = from(t in Transaction, where: t.batch_number in ^clearing_batch_ids, update: [set: [settlement_status: "settled", settlement_id: ^settlement_id, updated_at: ^DateTime.utc_now()]] ) |> Repo.update_all([]) Logger.info("Updated #{updated_count} transactions to settled status and set settlement_id") {:ok, updated_count} end defp insert_settlement_transactions(parsed_data, settlement) do # Calculate sum of transaction_amount for the date from DB settle_date = parsed_data.summary.settle_date net_settlement_amount_value = parsed_data.summary.net_settlement_amount_value total_count = parsed_data.summary.total_count # Find all transactions for the settle_date transactions_for_date = from(t in Transaction, where: fragment("DATE(?)", t.settlement_date_time) == ^settle_date) |> Repo.all() db_sum = transactions_for_date |> Enum.map(& &1.transaction_amount) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) # If summary matches DB sum, all are matched if Decimal.equal?(db_sum, net_settlement_amount_value) and length(transactions_for_date) == String.to_integer(to_string(total_count)) do Enum.each(parsed_data.details, fn detail -> tx_id = Map.get(detail, "transactionRequestId") tx = Repo.get_by(Transaction, transaction_id: tx_id) if tx do # Update transaction status to "matched" tx |> Transaction.changeset(%{settlement_status: "matched", settlement_id: settlement.settlement_id}) |> Repo.update() attrs = %{ settlement_id: settlement.settlement_id, transaction_id: tx.transaction_id, qr_id: Map.get(tx, :qr_id), terminal_id: Map.get(tx, :terminal_id), transaction_amount: tx.transaction_amount, transaction_currency: Map.get(detail, "transactionCurrency"), transaction_status: "matched", transaction_time: tx.inserted_at, mdr_charge: Map.get(tx, :mdr_charge), mdr_charge_currency: Map.get(tx, :mdr_charge_currency), tax_on_mdr: Map.get(tx, :tax_on_mdr), tax_on_mdr_currency: Map.get(tx, :tax_on_mdr_currency), net_received_amount: Map.get(tx, :net_received_amount), net_received_currency: Map.get(tx, :net_received_currency) } %DaProductApp.Settlements.SettlementTransaction{} |> DaProductApp.Settlements.SettlementTransaction.changeset(attrs) |> Repo.insert() end end) else # Partial match logic for each detail row Enum.each(parsed_data.details, fn detail -> tx_id = Map.get(detail, "transactionRequestId") tx_amount = Map.get(detail, "transactionAmountValue") tx = Repo.get_by(Transaction, transaction_id: tx_id) if tx do # Compare transaction_amount (DB) with transactionAmountValue (CSV) if Decimal.equal?(tx.transaction_amount, Decimal.new(tx_amount)) do # Partially matched tx |> Transaction.changeset(%{settlement_status: "partially matched", settlement_id: settlement.settlement_id}) |> Repo.update() attrs = %{ settlement_id: settlement.settlement_id, transaction_id: tx.transaction_id, qr_id: Map.get(tx, :qr_id), terminal_id: Map.get(tx, :terminal_id), transaction_amount: tx.transaction_amount, transaction_currency: Map.get(detail, "transactionCurrency"), transaction_status: "partially matched", transaction_time: tx.inserted_at, mdr_charge: Map.get(tx, :mdr_charge), mdr_charge_currency: Map.get(tx, :mdr_charge_currency), tax_on_mdr: Map.get(tx, :tax_on_mdr), tax_on_mdr_currency: Map.get(tx, :tax_on_mdr_currency), net_received_amount: Map.get(tx, :net_received_amount), net_received_currency: Map.get(tx, :net_received_currency) } %DaProductApp.Settlements.SettlementTransaction{} |> DaProductApp.Settlements.SettlementTransaction.changeset(attrs) |> Repo.insert() end end end) end :ok end @doc """ Validates settlement amounts against transaction totals for reconciliation. """ @spec validate_settlement_reconciliation(Settlement.t(), CsvParser.parsed_settlement()) :: {:ok, :balanced} | {:ok, :unbalanced, map()} | {:error, any()} def validate_settlement_reconciliation(settlement, parsed_data) do total_clearing_amount = parsed_data.details |> Enum.map(& &1.net_settlement_amount_value) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) settlement_amount = settlement.amount if Decimal.equal?(total_clearing_amount, settlement_amount) do {:ok, :balanced} else difference = Decimal.sub(settlement_amount, total_clearing_amount) mismatch_info = %{ settlement_amount: settlement_amount, clearing_total: total_clearing_amount, difference: difference } Logger.warn( "Settlement amount mismatch for #{settlement.settlement_id}: #{inspect(mismatch_info)}" ) {:ok, :unbalanced, mismatch_info} end end @doc """ Generates a unique settlement ID based on AlipayPlus data. Format: SETT{settleDate}-{settlementBatchId} """ @spec generate_settlement_id(CsvParser.parsed_settlement()) :: String.t() def generate_settlement_id(parsed_data) do settle_date_raw = parsed_data.summary.settle_date settle_date_str = case settle_date_raw do %Date{} = d -> Date.to_iso8601(d) s when is_binary(s) -> String.replace(s, "/", "-") end {:ok, settle_date} = Date.from_iso8601(settle_date_str) batch_id = parsed_data.settlement_batch_id "SETT#{Date.to_string(settle_date) |> String.replace("-", "")}-#{batch_id}" end # Private functions defp create_settlement_file_record(filename, options) do attrs = %{ filename: filename, uploader_id: Map.get(options, :uploader_id), status: "processing", processing_log: "Started processing AlipayPlus settlement file" } %SettlementFile{} |> SettlementFile.changeset(attrs) |> Repo.insert() end defp create_new_settlement(parsed_data, settlement_id) do summary = parsed_data.summary attrs = %{ settlement_id: settlement_id, date: summary.settle_date, status: determine_settlement_status(parsed_data), amount: summary.net_settlement_amount_value, # AlipayPlus specific fields participant_id: parsed_data.participant_id, settlement_batch_id: parsed_data.settlement_batch_id, participant_agreement_id: parsed_data.participant_agreement_id, merchant_id: Map.get(parsed_data, :merchant_id) || Map.get(parsed_data, "merchant_id"), value_date: Map.get(summary, :value_date) || Map.get(summary, "value_date"), fund_direction: Map.get(summary, :fund_direction) || Map.get(summary, "fundDirection"), transaction_currency: Map.get(summary, :transaction_currency) || Map.get(summary, "transactionCurrency"), net_transaction_amount_value: Map.get(summary, :net_transaction_amount_value) || Map.get(summary, "netTransactionAmountValue"), extend_info: Map.get(summary, :extend_info) || Map.get(summary, "extendInfo"), # Settlement details net_settlement_amount: summary.net_settlement_amount_value, net_settlement_currency: summary.settlement_currency, total_transaction_count: calculate_total_transaction_count(parsed_data.details), settlement_timestamp: DateTime.utc_now(), # Provider info provider_id: get_alipayplus_provider_id(), # Additional metadata details: build_settlement_details(parsed_data) } %Settlement{} |> Settlement.changeset(attrs) |> Repo.insert() end defp update_existing_settlement(settlement, parsed_data) do summary = parsed_data.summary attrs = %{ status: determine_settlement_status(parsed_data), amount: summary.net_settlement_amount_value, value_date: Map.get(summary, :value_date) || Map.get(summary, "value_date"), fund_direction: Map.get(summary, :fund_direction) || Map.get(summary, "fundDirection"), transaction_currency: Map.get(summary, :transaction_currency) || Map.get(summary, "transactionCurrency"), net_transaction_amount_value: Map.get(summary, :net_transaction_amount_value) || Map.get(summary, "netTransactionAmountValue"), extend_info: Map.get(summary, :extend_info) || Map.get(summary, "extendInfo"), total_transaction_count: calculate_total_transaction_count(parsed_data.details), settlement_timestamp: DateTime.utc_now(), details: build_settlement_details(parsed_data) } settlement |> Settlement.changeset(attrs) |> Repo.update() end defp determine_settlement_status(parsed_data) do if parsed_data.summary.fund_direction == "CREDIT" do "settled" else "pending" end end defp calculate_total_transaction_count(details) do details |> Enum.map(fn detail -> case Map.get(detail, "totalCount") do nil -> 1 "" -> 1 val when is_binary(val) -> case Integer.parse(val) do {int, _} -> int :error -> 1 end val when is_integer(val) -> val _ -> 1 end end) |> Enum.sum() end defp build_settlement_details(parsed_data) do summary = parsed_data.summary %{ provider: @provider_name, filename: parsed_data.filename, summary: %{ settle_date: Map.get(summary, :settle_date) || Map.get(summary, "settleDate"), value_date: Map.get(summary, :value_date) || Map.get(summary, "value_date"), fund_direction: Map.get(summary, :fund_direction) || Map.get(summary, "fundDirection"), settlement_currency: Map.get(summary, :settlement_currency) || Map.get(summary, "settlementCurrency"), net_settlement_amount_value: summary.net_settlement_amount_value && Decimal.to_string(summary.net_settlement_amount_value) }, clearing_cycles: Enum.map(parsed_data.details, fn detail -> %{ clearing_batch_id: Map.get(detail, "clearingBatchId"), clearing_date: Map.get(detail, "clearingDate"), total_count: Map.get(detail, "totalCount"), fund_direction: Map.get(detail, "fundDirection"), settlement_currency: Map.get(detail, "settlementCurrency"), net_settlement_amount_value: Map.get(detail, "netSettlementAmountValue") } end) } end defp get_alipayplus_provider_id do # This should be configured or looked up from the providers table # For now, we'll use a default value 1 end defp create_processing_audit(settlement_file, settlement, status, error_message \\ nil) do attrs = %{ settlement_file_id: settlement_file.id, action: "process_settlement", status: Atom.to_string(status), metadata: %{ settlement_id: settlement && settlement.settlement_id, provider: @provider_name, processed_at: DateTime.utc_now() }, error_message: error_message } %SettlementFileAudit{} |> SettlementFileAudit.changeset(attrs) |> Repo.insert() end defp process_and_validate_settlement(parsed_data) do settle_date_str = parsed_data.summary.settle_date net_settlement_amount = parsed_data.summary.net_settlement_amount_value # Normalize settle_date string to ISO8601 (YYYY-MM-DD) if needed settle_date_str = case settle_date_str do %Date{} = d -> Date.to_iso8601(d) s when is_binary(s) -> String.replace(s, "/", "-") end # Parse settle_date string to Date struct, with error handling settle_date = case Date.from_iso8601(settle_date_str) do {:ok, d} -> d {:error, _} -> raise "Invalid settle_date format: #{inspect(settle_date_str)}. Expected YYYY-MM-DD or YYYY/MM/DD." end # Find all transactions for the settle_date (compare only the date part of settlement_date_time) transactions = from(t in Transaction, where: fragment("DATE(?)", t.settlement_date_time) == ^settle_date ) |> Repo.all() sum_amount = transactions |> Enum.map(& &1.transaction_amount) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) if Decimal.equal?(sum_amount, net_settlement_amount) do # Insert into settlements table settlement_id = generate_settlement_id(parsed_data) attrs = %{ settlement_id: settlement_id, date: settle_date, status: determine_settlement_status(parsed_data), amount: net_settlement_amount, participant_id: parsed_data.participant_id, settlement_batch_id: parsed_data.settlement_batch_id, participant_agreement_id: parsed_data.participant_agreement_id, value_date: Map.get(parsed_data.summary, :value_date), fund_direction: parsed_data.summary.fund_direction, transaction_currency: Map.get(parsed_data.summary, :transaction_currency), net_transaction_amount_value: Map.get(parsed_data.summary, :net_transaction_amount_value), extend_info: Map.get(parsed_data.summary, :extend_info), net_settlement_amount: net_settlement_amount, net_settlement_currency: parsed_data.summary.settlement_currency, total_transaction_count: length(transactions), settlement_timestamp: DateTime.utc_now(), provider_id: get_alipayplus_provider_id(), details: build_settlement_details(parsed_data) } {:ok, settlement} = %Settlement{} |> Settlement.changeset(attrs) |> Repo.insert() # Update transactions with settlement_id Enum.each(transactions, fn tx -> tx |> Transaction.changeset(%{settlement_id: settlement_id, settlement_status: "settled"}) |> Repo.update() end) # Insert into settlement_transactions Enum.each(transactions, fn tx -> attrs = %{ settlement_id: settlement.settlement_id, transaction_id: tx.transaction_id, qr_id: Map.get(tx, :qr_id), terminal_id: Map.get(tx, :terminal_id), transaction_amount: tx.transaction_amount, transaction_currency: tx.transaction_currency, transaction_status: tx.status, transaction_time: tx.inserted_at, mdr_charge: Map.get(tx, :mdr_charge), mdr_charge_currency: Map.get(tx, :mdr_charge_currency), tax_on_mdr: Map.get(tx, :tax_on_mdr), tax_on_mdr_currency: Map.get(tx, :tax_on_mdr_currency), net_received_amount: Map.get(tx, :net_received_amount), net_received_currency: Map.get(tx, :net_received_currency) } %DaProductApp.Settlements.SettlementTransaction{} |> DaProductApp.Settlements.SettlementTransaction.changeset(attrs) |> Repo.insert() end) {:ok, settlement} else {:error, "Settlement amount mismatch: sum=#{sum_amount}, expected=#{net_settlement_amount}"} end end end