defmodule DaProductApp.Transactions.Service do @moduledoc """ Transaction orchestration service with timeout handling. """ import Ecto.Query alias Ecto.Multi alias DaProductApp.Repo alias DaProductApp.Transactions.{Transaction, TransactionEvent} alias DaProductApp.Adapters.SandboxPartner alias DaProductApp.Transactions.FollowupRequest @type txn_id :: integer() @spec get_transaction_by_org_id(String.t()) :: Transaction.t() | nil def get_transaction_by_org_id(org_txn_id) do Transaction |> where([t], t.org_txn_id == ^org_txn_id) |> Repo.one() end @spec get_transaction_with_events(String.t()) :: Transaction.t() | nil def get_transaction_with_events(org_txn_id) do events_query = from(e in TransactionEvent, order_by: [asc: e.seq]) Transaction |> where([t], t.org_txn_id == ^org_txn_id) |> preload([events: ^events_query]) |> Repo.one() end @spec list_transactions(map()) :: [Transaction.t()] def list_transactions(filters \\ %{}) do Transaction |> apply_filters(filters) |> order_by([t], desc: t.inserted_at) |> limit(100) |> Repo.all() end defp apply_filters(query, filters) do Enum.reduce(filters, query, fn {:status, status}, q -> where(q, [t], t.status == ^status) {:current_state, state}, q -> where(q, [t], t.current_state == ^state) {:payer_addr, addr}, q -> where(q, [t], t.payer_addr == ^addr) {:payee_addr, addr}, q -> where(q, [t], t.payee_addr == ^addr) _, q -> q end) end @credit_wait_ms 30_000 @reversal_wait_ms 30_000 @response_code_map %{ "CS" => :credit_success_confirmed, "00" => :reversal_success, # fallback failures (examples) "U9" => :credit_failure, "T0" => :timeout_failure } @spec initiate(map()) :: {:ok, Transaction.t()} | {:error, any()} def initiate(attrs) do Multi.new() |> Multi.insert(:transaction, Transaction.changeset(%Transaction{}, attrs)) |> Multi.run(:event, fn _repo, %{transaction: t} -> append_event(t.id, :txn_initiated, Map.take(attrs, [:org_txn_id, :ver_token])) end) |> Repo.transaction() |> case do {:ok, %{transaction: t}} -> {:ok, t} {:error, step, reason, _} -> {:error, {step, reason}} end end @spec mark_debit_secured(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} def mark_debit_secured(%Transaction{} = t) do update_with_event(t, :debit_secured, %{debit_secured_at: DateTime.utc_now()}, :debit_secured_at) end @spec request_credit(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} def request_credit(%Transaction{} = t) do update_with_event(t, :credit_requested, %{credit_requested_at: DateTime.utc_now()}, :credit_requested_at) |> case do {:ok, t1} -> # fire partner credit request (ignore immediate result; handle via success or follow-up) _ = SandboxPartner.credit_request(%{org_txn_id: t1.org_txn_id, amount: t1.inr_amount}) {:ok, t1} other -> other end end @spec mark_credit_success(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} def mark_credit_success(%Transaction{} = t) do attrs = %{credit_completed_at: DateTime.utc_now(), current_state: "success", status: "success"} update_with_event(t, :credit_success, attrs, :credit_completed_at) end @spec debit_and_credit(map()) :: {:ok, Transaction.t()} | {:error, any()} def debit_and_credit(attrs) do with {:ok, t} <- initiate(attrs), {:ok, t} <- mark_debit_secured(t), {:ok, t} <- request_credit(t), {:ok, _partner_resp} <- SandboxPartner.credit_request(%{org_txn_id: t.org_txn_id, amount: t.inr_amount}), {:ok, t} <- mark_credit_success(t) do {:ok, t} end end @spec debit_credit_with_timeouts(map()) :: {:ok, Transaction.t()} | {:error, any()} def debit_credit_with_timeouts(attrs) do with {:ok, t} <- initiate(attrs), {:ok, t} <- mark_debit_secured(t), {:ok, t} <- request_credit(t), :ok <- schedule_followup(t, :chktxn, @credit_wait_ms) do {:ok, t} end end @doc """ Called by watchdog when credit response arrives; cancels follow-up chain. """ def handle_credit_success(t) do mark_credit_success(t) end @doc """ Watchdog entry: process expired followups. """ def process_expired_followups(now \\ DateTime.utc_now()) do from(f in FollowupRequest, where: f.status == "scheduled" and f.deadline_at <= ^now) |> Repo.all() |> Enum.each(&dispatch_followup/1) :ok end @spec handle_chktxn_response(Transaction.t(), String.t(), map()) :: {:ok, Transaction.t()} | {:error, any()} def handle_chktxn_response(t, code, payload) do classification = Map.get(@response_code_map, code, :credit_failure) case classification do :credit_success_confirmed -> mark_credit_success(t) :reversal_success -> mark_failure(t, code, payload) :credit_failure -> mark_failure(t, code, payload) _ -> mark_failure(t, code, payload) end end @spec mark_failure(Transaction.t(), String.t(), map()) :: {:ok, Transaction.t()} | {:error, any()} def mark_failure(%Transaction{}=t, code, _payload) do attrs = %{failure_code: code, current_state: "failure", status: "failure", finalized_at: DateTime.utc_now()} update_with_event(t, :failure, attrs, :finalized_at) end @spec mark_deemed(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} def mark_deemed(%Transaction{}=t) do attrs = %{deemed: true, current_state: "deemed", status: "deemed", finalized_at: DateTime.utc_now()} update_with_event(t, :deemed, attrs, :finalized_at) end defp update_with_event(t, event_type, field_updates, ts_field) do Multi.new() |> Multi.update(:txn, Transaction.changeset(t, Map.merge(field_updates, %{current_state: state_name(event_type)}))) |> Multi.run(:event, fn _repo, %{txn: updated} -> payload = Map.take(updated, [ts_field, :current_state]) append_event(updated.id, event_type, payload) end) |> Repo.transaction() |> case do {:ok, %{txn: updated}} -> {:ok, updated} {:error, step, reason, _} -> {:error, {step, reason}} end end # Add helper markers for follow-up sends defp mark_chktxn_sent(%Transaction{}=t) do update_with_event(t, :chktxn_sent, %{chktxn_requested_at: DateTime.utc_now()}, :chktxn_requested_at) end defp mark_reversal_sent(%Transaction{}=t) do update_with_event(t, :reversal_sent, %{reversal_requested_at: DateTime.utc_now()}, :reversal_requested_at) end defp state_name(:debit_secured), do: "debit_secured" defp state_name(:credit_requested), do: "credit_pending" defp state_name(:credit_success), do: "success" defp state_name(:failure), do: "failure" defp state_name(:deemed), do: "deemed" defp state_name(:chktxn_sent), do: "chktxn_pending" defp state_name(:reversal_sent), do: "reversal_pending" defp state_name(_), do: "unknown" @spec append_event(txn_id, atom(), map()) :: {:ok, TransactionEvent.t()} | {:error, any()} def append_event(transaction_id, event_type, payload) do last = TransactionEvent |> where([e], e.transaction_id == ^transaction_id) |> order_by([e], desc: e.seq) |> limit(1) |> Repo.one() seq = if last, do: last.seq + 1, else: 1 prev_hash = last && last.hash now = DateTime.utc_now() hash = hash_event(prev_hash, seq, event_type, payload, now) %TransactionEvent{} |> TransactionEvent.changeset(%{ transaction_id: transaction_id, seq: seq, event_type: to_string(event_type), prev_hash: prev_hash, hash: hash, payload: payload, inserted_at: now }) |> Repo.insert() end defp hash_event(prev_hash, seq, event_type, payload, ts) do base = :erlang.term_to_binary({prev_hash, seq, event_type, payload, ts}) :crypto.hash(:sha256, base) end defp schedule_followup(t, :chktxn, wait_ms) do deadline = DateTime.add(DateTime.utc_now(), div(wait_ms, 1000), :second) %FollowupRequest{} |> FollowupRequest.changeset(%{transaction_id: t.id, kind: "chktxn", deadline_at: deadline, attempt_no: 1, status: "scheduled"}) |> Repo.insert() |> case do {:ok, _} -> :ok {:error, reason} -> {:error, reason} end end defp dispatch_followup(%FollowupRequest{kind: "chktxn"}=f) do {:ok, _} = Repo.update(FollowupRequest.changeset(f, %{status: "sent", sent_at: DateTime.utc_now()})) t = Repo.get(Transaction, f.transaction_id) {:ok, t} = mark_chktxn_sent(t) case SandboxPartner.check_txn(%{org_txn_id: t.org_txn_id}) do {:ok, %{code: code, payload: payload}=resp} -> Repo.update(FollowupRequest.changeset(f, %{response_code: code, response_payload: payload})) handle_chktxn_response(t, code, payload) if code != "CS" do schedule_reversal(f.transaction_id) end {:ok, resp} {:error, reason} -> schedule_reversal(f.transaction_id) {:error, reason} end end defp dispatch_followup(%FollowupRequest{kind: "reversal"}=f) do {:ok, _} = Repo.update(FollowupRequest.changeset(f, %{status: "sent", sent_at: DateTime.utc_now()})) t = Repo.get(Transaction, f.transaction_id) {:ok, t} = mark_reversal_sent(t) case SandboxPartner.reversal_request(%{org_txn_id: t.org_txn_id}) do {:ok, %{code: code, payload: payload}} -> Repo.update(FollowupRequest.changeset(f, %{response_code: code, response_payload: payload})) handle_chktxn_response(t, code, payload) :ok {:error, _} -> mark_deemed(t) end end defp schedule_reversal(txn_id) do deadline = DateTime.add(DateTime.utc_now(), div(@reversal_wait_ms, 1000), :second) %FollowupRequest{} |> FollowupRequest.changeset(%{transaction_id: txn_id, kind: "reversal", deadline_at: deadline, attempt_no: 1, status: "scheduled"}) |> Repo.insert() :ok end end