defmodule DaProductApp.Settlements do @moduledoc """ The Settlements context. """ import Ecto.Query, warn: false alias DaProductApp.Repo alias DaProductApp.Settlements.Settlement alias DaProductApp.Settlements.SettlementTransaction alias DaProductApp.Settlements.MerchantBatchNumber alias DaProductApp.Settlements.YspSummary alias DaProductApp.Settlements.Refund.Processor, as: RefundProcessor alias DaProductApp.Transactions.Transaction alias DaProductApp.Transactions.TransactionOperation @doc """ Returns the list of settlements with optional filters and pagination. """ def list_settlements(params \\ %{}) do Settlement |> filter_by_date_range(params) |> filter_by_status(params) |> filter_by_merchant(params) |> order_by([s], desc: s.date) |> paginate(params) |> Repo.all() end @doc """ Returns the total count of settlements matching the filters. """ def count_settlements(params \\ %{}) do Settlement |> filter_by_date_range(params) |> filter_by_status(params) |> filter_by_merchant(params) |> Repo.aggregate(:count, :id) end @doc """ Gets a single settlement by settlement_id. """ def get_settlement_by_settlement_id(settlement_id) do Repo.get_by(DaProductApp.Settlements.Settlement, settlement_id: settlement_id) end @doc """ Gets a single settlement by id. """ def get_settlement!(id), do: Repo.get!(Settlement, id) @doc """ Creates a settlement. """ def create_settlement(attrs \\ %{}) do %Settlement{} |> Settlement.changeset(attrs) |> Repo.insert() end @doc """ Updates a settlement. """ def update_settlement(%Settlement{} = settlement, attrs) do settlement |> Settlement.changeset(attrs) |> Repo.update() end @doc """ Returns the list of transactions for a specific settlement. """ def list_settlement_transactions(settlement_id, params \\ %{}) do Transaction |> where([t], t.settlement_id == ^settlement_id) |> filter_transactions_by_status(params) |> filter_transactions_by_type(params) |> filter_transactions_by_search(params) |> order_by([t], desc: t.inserted_at) |> paginate(params) |> Repo.all() end @doc """ Gets settlement summary data for dashboard. """ def get_settlement_summary(merchant_id \\ nil) do base_query = if merchant_id do from s in Settlement, where: s.merchant_id == ^merchant_id else Settlement end total_settled = base_query |> where([s], s.status == "settled") |> Repo.aggregate(:sum, :amount) || Decimal.new(0) pending = base_query |> where([s], s.status == "pending") |> Repo.aggregate(:sum, :amount) || Decimal.new(0) exception_count = base_query |> where([s], s.status == "exception") |> Repo.aggregate(:count, :id) || 0 last_settlement = base_query |> where([s], s.status == "settled") |> order_by([s], desc: s.date) |> limit(1) |> select([s], s.date) |> Repo.one() %{ total_settled: total_settled, pending: pending, exception_count: exception_count, last_settlement_date: last_settlement } end @doc """ Generates a unique settlement ID. """ def generate_settlement_id do date = Date.utc_today() |> Date.to_string() |> String.replace("-", "") unique_part = System.unique_integer([:positive, :monotonic]) "SETT#{date}-#{unique_part}" end @doc """ Gets an Aani settlement by merchant tag and settlement date. """ def get_aani_settlement_by_merchant_and_date(merchant_tag, settlement_date) do from(s in Settlement, where: s.merchant_tag == ^merchant_tag and s.date == ^settlement_date, limit: 1 ) |> Repo.one() end @doc """ Gets an Aani settlement by merchant tag, bank user id, settlement date, and batch number. """ def get_aani_settlement_by_batch(merchant_tag, bank_user_id, settlement_date, batch_number) do from(s in Settlement, where: s.merchant_tag == ^merchant_tag and s.bank_user_id == ^bank_user_id and s.date == ^settlement_date and s.batch_number == ^batch_number, limit: 1 ) |> Repo.one() end @doc """ Gets a settlement by settlement ID. """ def get_settlement_by_id(settlement_id) do from(s in Settlement, where: s.settlement_id == ^settlement_id, limit: 1) |> Repo.one() end @doc """ Gets available unmatched batches for a specific bank user, merchant, and date. Returns a list of batch numbers that have unmatched transactions. """ def get_unmatched_batches_for_request(merchant_tag, bank_user_id, settlement_date) do require Logger Logger.info("=== DEBUG: get_unmatched_batches_for_request ===") Logger.info("Merchant Tag: #{inspect(merchant_tag)}") Logger.info("Bank User ID: #{inspect(bank_user_id)}") Logger.info("Settlement Date: #{inspect(settlement_date)}") # First, let's check what transactions exist for this merchant/bank_user/date combination all_transactions_query = from(t in Transaction, where: t.merchant_tag == ^merchant_tag and t.bank_user_id == ^bank_user_id and fragment("DATE(?)", t.inserted_at) == ^settlement_date, select: {t.id, t.transaction_id, t.batch_number, t.settlement_id, t.settlement_status}, order_by: [desc: t.id] ) all_transactions = Repo.all(all_transactions_query) Logger.info("=== ALL TRANSACTIONS ON DATE ===") Logger.info("Found #{length(all_transactions)} transactions:") Enum.each(all_transactions, fn {id, txn_id, batch, sett_id, sett_status} -> Logger.info( " ID: #{id}, TXN_ID: #{txn_id}, Batch: #{inspect(batch)}, Settlement_ID: #{inspect(sett_id)}, Status: #{inspect(sett_status)}" ) end) # Now get unmatched batches with more lenient conditions # Changed to handle NULL, empty string, or "unmatched" settlement_status unmatched_batches = from(t in Transaction, where: t.merchant_tag == ^merchant_tag and t.bank_user_id == ^bank_user_id and fragment("DATE(?)", t.inserted_at) == ^settlement_date and (is_nil(t.settlement_id) or t.settlement_status == "unmatched" or t.settlement_status == "" or is_nil(t.settlement_status)), select: t.batch_number, distinct: true, order_by: t.batch_number ) |> Repo.all() # Remove any nil batch numbers |> Enum.reject(&is_nil/1) Logger.info("=== UNMATCHED BATCHES FOUND ===") Logger.info("Batches: #{inspect(unmatched_batches)}") unmatched_batches end @doc """ Creates an Aani settlement record. """ def create_aani_settlement(attrs) do %Settlement{} |> Settlement.aani_changeset(attrs) |> Repo.insert() end @doc """ Gets or creates a merchant batch number record for the given merchant. Returns the current batch number and increments it for next use. """ def get_and_increment_batch_number(merchant_id, provider_id \\ nil) do Repo.transaction(fn -> if provider_wise_batch_number_enabled?() do case get_merchant_batch_number_with_provider(merchant_id, provider_id) do nil -> attrs = %{merchant_id: merchant_id, batch_number: "000001", provider_id: provider_id} case create_merchant_batch_number(attrs) do {:ok, batch_record} -> batch_record.batch_number {:error, changeset} -> Repo.rollback(changeset) end batch_record -> current_batch = batch_record.batch_number || "000000" next_batch = current_batch |> String.to_integer() |> Kernel.+(1) |> Integer.to_string() |> String.pad_leading(6, "0") case update_merchant_batch_number(batch_record, %{batch_number: next_batch}) do {:ok, _updated_record} -> next_batch {:error, changeset} -> Repo.rollback(changeset) end end else case get_merchant_batch_number_merchant_only(merchant_id) do nil -> attrs = %{merchant_id: merchant_id, batch_number: "000001", provider_id: nil} case create_merchant_batch_number(attrs) do {:ok, batch_record} -> batch_record.batch_number {:error, changeset} -> Repo.rollback(changeset) end batch_record -> current_batch = batch_record.batch_number || "000000" next_batch = current_batch |> String.to_integer() |> Kernel.+(1) |> Integer.to_string() |> String.pad_leading(6, "0") case update_merchant_batch_number(batch_record, %{batch_number: next_batch}) do {:ok, _updated_record} -> next_batch {:error, changeset} -> Repo.rollback(changeset) end end end end) end @doc """ Gets the current batch number for a merchant without incrementing. """ def get_current_batch_number(merchant_id) do if provider_wise_batch_number_enabled?() do case get_merchant_batch_number_with_provider(merchant_id, nil) do nil -> "000001" batch_record -> batch_record.batch_number end else case get_merchant_batch_number_merchant_only(merchant_id) do nil -> "000001" batch_record -> batch_record.batch_number end end end @doc """ Increments the batch number for a merchant after successful processing. Creates a new record with batch number 1 if merchant doesn't exist. """ def increment_batch_number_after_success(merchant_id, provider_id \\ nil) do Repo.transaction(fn -> if provider_wise_batch_number_enabled?() do case get_merchant_batch_number_with_provider(merchant_id, provider_id) do nil -> attrs = %{merchant_id: merchant_id, batch_number: "000001", provider_id: provider_id} case create_merchant_batch_number(attrs) do {:ok, batch_record} -> batch_record.batch_number {:error, changeset} -> Repo.rollback(changeset) end batch_record -> current_batch = batch_record.batch_number || "000000" next_batch = current_batch |> String.to_integer() |> Kernel.+(1) |> Integer.to_string() |> String.pad_leading(6, "0") case update_merchant_batch_number(batch_record, %{batch_number: next_batch}) do {:ok, updated_record} -> updated_record.batch_number {:error, changeset} -> Repo.rollback(changeset) end end else case get_merchant_batch_number_merchant_only(merchant_id) do nil -> attrs = %{merchant_id: merchant_id, batch_number: "000001", provider_id: nil} case create_merchant_batch_number(attrs) do {:ok, batch_record} -> batch_record.batch_number {:error, changeset} -> Repo.rollback(changeset) end batch_record -> current_batch = batch_record.batch_number || "000000" next_batch = current_batch |> String.to_integer() |> Kernel.+(1) |> Integer.to_string() |> String.pad_leading(6, "0") case update_merchant_batch_number(batch_record, %{batch_number: next_batch}) do {:ok, updated_record} -> updated_record.batch_number {:error, changeset} -> Repo.rollback(changeset) end end end end) end @doc """ Gets a merchant batch number record by merchant_id. """ # For provider_wise_batch_number_enabled? == true def get_merchant_batch_number_with_provider(merchant_id, provider_id) when is_nil(provider_id) do from(m in MerchantBatchNumber, where: m.merchant_id == ^merchant_id and is_nil(m.provider_id) ) |> Repo.one() end def get_merchant_batch_number_with_provider(merchant_id, provider_id) do from(m in MerchantBatchNumber, where: m.merchant_id == ^merchant_id and m.provider_id == ^provider_id, lock: "FOR UPDATE" ) |> Repo.one() end # For provider_wise_batch_number_enabled? == false def get_merchant_batch_number_merchant_only(merchant_id) do from(m in MerchantBatchNumber, where: m.merchant_id == ^merchant_id and is_nil(m.provider_id), lock: "FOR UPDATE" ) |> Repo.one() end @doc """ Creates a merchant batch number record. """ def create_merchant_batch_number(attrs \\ %{}) do %MerchantBatchNumber{} |> MerchantBatchNumber.changeset(attrs) |> Repo.insert() end @doc """ Updates a merchant batch number record. """ def update_merchant_batch_number(%MerchantBatchNumber{} = batch_record, attrs) do batch_record |> MerchantBatchNumber.changeset(attrs) |> Repo.update() end @doc """ Checks if provider-wise batch number logic is enabled. """ def provider_wise_batch_number_enabled? do Application.get_env(:da_product_app, :batch_settings, []) |> Keyword.get(:provider_wise_batch_number_enabled, true) end # Private helper functions defp filter_by_date_range(query, %{"from_date" => from_date, "to_date" => to_date}) when not is_nil(from_date) and not is_nil(to_date) do with {:ok, from_date_parsed} <- Date.from_iso8601(from_date), {:ok, to_date_parsed} <- Date.from_iso8601(to_date) do query |> where([s], s.date >= ^from_date_parsed and s.date <= ^to_date_parsed) else _ -> query end end defp filter_by_date_range(query, %{"from_date" => from_date}) when not is_nil(from_date) do with {:ok, from_date_parsed} <- Date.from_iso8601(from_date) do query |> where([s], s.date >= ^from_date_parsed) else _ -> query end end defp filter_by_date_range(query, %{"to_date" => to_date}) when not is_nil(to_date) do with {:ok, to_date_parsed} <- Date.from_iso8601(to_date) do query |> where([s], s.date <= ^to_date_parsed) else _ -> query end end defp filter_by_date_range(query, _), do: query defp filter_by_status(query, %{"status" => status}) when not is_nil(status) do query |> where([s], s.status == ^status) end defp filter_by_status(query, _), do: query defp filter_by_merchant(query, %{"merchant_id" => merchant_id}) when not is_nil(merchant_id) do query |> where([s], s.merchant_id == ^merchant_id) end defp filter_by_merchant(query, _), do: query defp filter_transactions_by_status(query, %{"status" => status}) when not is_nil(status) do query |> where([t], t.settlement_status == ^status) end defp filter_transactions_by_status(query, _), do: query defp filter_transactions_by_type(query, %{"type" => type}) when not is_nil(type) do query |> where([t], t.pay_mode == ^type) end defp filter_transactions_by_type(query, _), do: query defp filter_transactions_by_search(query, %{"search" => search}) when not is_nil(search) do search_term = "%#{search}%" query |> where([t], ilike(t.transaction_id, ^search_term)) end defp filter_transactions_by_search(query, _), do: query defp paginate(query, %{"page" => page_str, "page_size" => page_size_str}) do page = case Integer.parse(page_str) do {n, ""} when n > 0 -> n _ -> 1 end page_size = case Integer.parse(page_size_str) do {n, ""} when n > 0 -> n _ -> 10 end offset = (page - 1) * page_size query |> limit(^page_size) |> offset(^offset) end defp paginate(query, %{"page" => page}) do paginate(query, %{"page" => page, "page_size" => "10"}) end defp paginate(query, _), do: query ## YSP Summary functions for settlement tracking and duplicate prevention @doc """ Checks if a settlement has already been processed. Returns {:ok, :exists} if already processed, {:ok, :not_exists} if not processed. """ def check_settlement_processed?(merchant_tag, merchant_id, batch_number, settlement_date) do case Repo.get_by(YspSummary, merchant_tag: merchant_tag, merchant_id: merchant_id, batch_number: batch_number, settlement_date: settlement_date ) do nil -> {:ok, :not_exists} _existing -> {:ok, :exists} end end @doc """ Creates a YSP summary record for tracking settlement files and data. Only inserts new records - no updates allowed per business requirements. """ def create_ysp_summary(attrs) do settlement_json = YspSummary.build_settlement_json(attrs[:settlement_data] || %{}) ysp_attrs = attrs |> Map.put(:settlement_id, settlement_json) # Remove the raw data, use processed JSON |> Map.delete(:settlement_data) %YspSummary{} |> YspSummary.changeset(ysp_attrs) |> Repo.insert() end @doc """ Returns an error for already processed settlements. Used when a duplicate settlement is detected. """ def settlement_already_processed_error(merchant_tag, merchant_id, batch_number, settlement_date) do {:error, :already_processed, "Settlement for merchant '#{merchant_tag}' (#{merchant_id}), batch '#{batch_number}', date '#{settlement_date}' has already been processed"} end @doc """ Filters settlements to exclude already processed ones. Used during file generation to prevent duplicates. """ def filter_unprocessed_settlements(settlements, settlement_date) do # Get all processed settlements for this date from ysp_summary table processed_query = from y in YspSummary, where: y.settlement_date == ^settlement_date, select: {y.merchant_tag, y.merchant_id, y.batch_number} processed_combinations = Repo.all(processed_query) |> MapSet.new() # Filter out settlements that are already processed Enum.filter(settlements, fn settlement -> combination = { Map.get(settlement, :merchant_tag) || Map.get(settlement, "merchant_tag"), Map.get(settlement, :bank_user_id) || Map.get(settlement, "bank_user_id") || Map.get(settlement, :merchant_id) || Map.get(settlement, "merchant_id") || Map.get(settlement, :mid) || Map.get(settlement, "mid"), Map.get(settlement, :batch_number) || Map.get(settlement, "batch_number") } not MapSet.member?(processed_combinations, combination) end) end @doc """ Records multiple settlements after successful file generation. """ def record_settlements_batch(settlements, file_path, settlement_date) do Repo.transaction(fn -> Enum.map(settlements, fn settlement -> # Defensive: support both atom and string keys settlement_id = Map.get(settlement, :settlement_id) || Map.get(settlement, "settlement_id") || "UNKNOWN" attrs = %{ merchant_tag: settlement.merchant_tag || settlement[:merchant_tag], merchant_id: Map.get(settlement, :bank_user_id) || Map.get(settlement, "bank_user_id") || Map.get(settlement, :merchant_id) || Map.get(settlement, "merchant_id") || Map.get(settlement, :mid) || Map.get(settlement, "mid"), batch_number: settlement.batch_number || settlement[:batch_number], settlement_date: settlement_date, file_path: file_path, settlement_data: %{ settlement_reference: settlement_id, status: "COMPLETED", transaction_count: settlement.total_transaction_count || settlement[:total_transaction_count] || 0, gross_amount: settlement.gross_settlement_amount || settlement[:gross_settlement_amount] || "0.00", net_amount: settlement.net_settlement_amount || settlement[:net_settlement_amount] || "0.00", currency: settlement.currency || settlement[:currency] || "AED", mismatch_detected: settlement.mismatch_detected || settlement[:mismatch_detected] || false, settlement_date: settlement_date } } case create_ysp_summary(attrs) do {:ok, ysp_record} -> ysp_record {:error, changeset} -> # Log error but continue processing other settlements require Logger Logger.error( "Failed to create YSP summary for #{attrs.merchant_tag}: #{inspect(changeset.errors)}" ) nil end end) # Remove failed records |> Enum.filter(&(!is_nil(&1))) end) end @doc """ Gets a single YSP summary by ID. """ def get_ysp_summary!(id), do: Repo.get!(YspSummary, id) @doc """ Gets a YSP summary by merchant, batch, and date combination. """ def get_ysp_summary_by_merchant_batch_date( merchant_tag, merchant_id, batch_number, settlement_date ) do Repo.get_by(YspSummary, merchant_tag: merchant_tag, merchant_id: merchant_id, batch_number: batch_number, settlement_date: settlement_date ) end @doc """ Lists YSP summaries with optional filtering. """ def list_ysp_summaries(params \\ %{}) do YspSummary |> filter_ysp_by_merchant(params) |> filter_ysp_by_date_range(params) |> order_by([y], desc: y.inserted_at) |> paginate(params) |> Repo.all() end @doc """ Gets YSP summaries for a specific settlement date. """ def list_ysp_summaries_by_date(settlement_date) do from(y in YspSummary, where: y.settlement_date == ^settlement_date) |> Repo.all() end @doc """ Deletes a YSP summary record (admin function). """ def delete_ysp_summary(%YspSummary{} = ysp_summary) do Repo.delete(ysp_summary) end defp filter_ysp_by_merchant(query, %{"merchant_id" => merchant_id}) when merchant_id != "" do from y in query, where: y.merchant_id == ^merchant_id end defp filter_ysp_by_merchant(query, %{"merchant_tag" => merchant_tag}) when merchant_tag != "" do from y in query, where: y.merchant_tag == ^merchant_tag end defp filter_ysp_by_merchant(query, _), do: query defp filter_ysp_by_date_range(query, %{"from_date" => from_date, "to_date" => to_date}) when from_date != "" and to_date != "" do case {Date.from_iso8601(from_date), Date.from_iso8601(to_date)} do {{:ok, from}, {:ok, to}} -> from y in query, where: y.settlement_date >= ^from and y.settlement_date <= ^to _ -> query end end defp filter_ysp_by_date_range(query, %{"settlement_date" => settlement_date}) when settlement_date != "" do case Date.from_iso8601(settlement_date) do {:ok, date} -> from y in query, where: y.settlement_date == ^date _ -> query end end defp filter_ysp_by_date_range(query, _), do: query @doc """ Lists settlement transaction records from the settlement_transactions table. This tracks the matching status between settlements and transactions. """ def list_settlement_transaction_records(settlement_id, params \\ %{}) do import Ecto.Query alias DaProductApp.Settlements.SettlementTransaction page = Map.get(params, "page", "1") |> String.to_integer() page_size = Map.get(params, "page_size", "20") |> String.to_integer() status_filter = Map.get(params, "status") query = from st in SettlementTransaction, join: t in Transaction, on: fragment("? COLLATE utf8mb4_unicode_ci = ? COLLATE utf8mb4_unicode_ci", st.transaction_id, t.transaction_id), where: st.settlement_id == ^settlement_id, select: %{ id: st.id, settlement_id: st.settlement_id, transaction_id: st.transaction_id, status: st.transaction_status, transaction_amount: st.transaction_amount, matched_at: st.inserted_at, transaction_details: %{ merchant_id: t.merchant_id, provider_id: t.provider_id, settlement_date_time: t.settlement_date_time, bank_user_id: t.bank_user_id } }, order_by: [desc: st.inserted_at] # Apply status filter if provided query = if status_filter do from st in SettlementTransaction, join: t in Transaction, on: fragment("? COLLATE utf8mb4_unicode_ci = ? COLLATE utf8mb4_unicode_ci", st.transaction_id, t.transaction_id), where: st.settlement_id == ^settlement_id and st.transaction_status == ^status_filter, select: %{ id: st.id, settlement_id: st.settlement_id, transaction_id: st.transaction_id, status: st.transaction_status, transaction_amount: st.transaction_amount, matched_at: st.inserted_at, transaction_details: %{ merchant_id: t.merchant_id, provider_id: t.provider_id, settlement_date_time: t.settlement_date_time, bank_user_id: t.bank_user_id } }, order_by: [desc: st.inserted_at] else query end # Apply pagination query |> limit(^page_size) |> offset(^((page - 1) * page_size)) |> Repo.all() end @doc """ Gets settlement transaction statistics grouped by status. """ def get_settlement_transactions_stats(settlement_id) do import Ecto.Query alias DaProductApp.Settlements.SettlementTransaction Repo.all( from st in SettlementTransaction, where: st.settlement_id == ^settlement_id, group_by: st.status, select: %{ status: st.status, count: count(st.id), total_amount: sum(st.transaction_amount) } ) end @doc """ Gets settlement transaction summary for TMS processing. Returns counts and amounts for matched/partially matched transactions. """ def get_settlement_transaction_summary(settlement_id) do import Ecto.Query alias DaProductApp.Settlements.SettlementTransaction summary = Repo.all( from st in SettlementTransaction, where: st.settlement_id == ^settlement_id, group_by: st.status, select: {st.status, count(st.id), sum(st.transaction_amount)} ) |> Enum.into(%{}) %{ matched_count: elem(Map.get(summary, "matched", {0, Decimal.new(0)}), 0), matched_amount: elem(Map.get(summary, "matched", {0, Decimal.new(0)}), 1), partially_matched_count: elem(Map.get(summary, "partially matched", {0, Decimal.new(0)}), 0), partially_matched_amount: elem(Map.get(summary, "partially matched", {0, Decimal.new(0)}), 1), total_settlement_transactions: Enum.reduce(summary, 0, fn {_status, {count, _amount}}, acc -> acc + count end) } end @doc """ Processes a refund settlement CSV file. Extracts REFUND transactions with DEBIT fund direction, matches them with database records, and creates settlement entries. """ def process_refund_settlement(filename, content, options \\ %{}) do RefundProcessor.process_refund_file(filename, content, options) end @doc """ Lists transaction operations with optional filtering. """ def list_transaction_operations(params \\ %{}) do TransactionOperation |> filter_operations_by_type(params) |> filter_operations_by_status(params) |> order_by([op], desc: op.operation_date) |> paginate(params) |> Repo.all() end @doc """ Creates a transaction operation record. """ def create_transaction_operation(attrs) do %TransactionOperation{} |> TransactionOperation.changeset(attrs) |> Repo.insert() end @doc """ Gets a transaction operation by operation_request_id. """ def get_transaction_operation_by_request_id(operation_request_id) do Repo.get_by(TransactionOperation, operation_request_id: operation_request_id) end defp filter_operations_by_type(query, %{"type" => type}) when not is_nil(type) do query |> where([op], op.operation_type == ^type) end defp filter_operations_by_type(query, _), do: query defp filter_operations_by_status(query, %{"status" => status}) when not is_nil(status) do query |> where([op], op.operation_status == ^status) end defp filter_operations_by_status(query, _), do: query end