| 1 |
|
defmodule DaProductApp.Transactions.Service do |
| 2 |
|
@moduledoc """ |
| 3 |
|
Transaction orchestration service with timeout handling. |
| 4 |
|
""" |
| 5 |
|
import Ecto.Query |
| 6 |
|
alias Ecto.Multi |
| 7 |
|
alias DaProductApp.Repo |
| 8 |
|
alias DaProductApp.Transactions.{Transaction, TransactionEvent} |
| 9 |
|
alias DaProductApp.Adapters.SandboxPartner |
| 10 |
|
alias DaProductApp.Transactions.FollowupRequest |
| 11 |
|
|
| 12 |
|
@type txn_id :: integer() |
| 13 |
|
|
| 14 |
|
@spec get_transaction_by_org_id(String.t()) :: Transaction.t() | nil |
| 15 |
|
def get_transaction_by_org_id(org_txn_id) do |
| 16 |
|
Transaction |
| 17 |
:-( |
|> where([t], t.org_txn_id == ^org_txn_id) |
| 18 |
:-( |
|> Repo.one() |
| 19 |
|
end |
| 20 |
|
|
| 21 |
|
@spec get_transaction_with_events(String.t()) :: Transaction.t() | nil |
| 22 |
|
def get_transaction_with_events(org_txn_id) do |
| 23 |
:-( |
events_query = from(e in TransactionEvent, order_by: [asc: e.seq]) |
| 24 |
|
|
| 25 |
|
Transaction |
| 26 |
|
|> where([t], t.org_txn_id == ^org_txn_id) |
| 27 |
:-( |
|> preload([events: ^events_query]) |
| 28 |
:-( |
|> Repo.one() |
| 29 |
|
end |
| 30 |
|
|
| 31 |
|
@spec list_transactions(map()) :: [Transaction.t()] |
| 32 |
:-( |
def list_transactions(filters \\ %{}) do |
| 33 |
|
Transaction |
| 34 |
|
|> apply_filters(filters) |
| 35 |
|
|> order_by([t], desc: t.inserted_at) |
| 36 |
:-( |
|> limit(100) |
| 37 |
:-( |
|> Repo.all() |
| 38 |
|
end |
| 39 |
|
|
| 40 |
|
defp apply_filters(query, filters) do |
| 41 |
:-( |
Enum.reduce(filters, query, fn |
| 42 |
:-( |
{:status, status}, q -> where(q, [t], t.status == ^status) |
| 43 |
:-( |
{:current_state, state}, q -> where(q, [t], t.current_state == ^state) |
| 44 |
:-( |
{:payer_addr, addr}, q -> where(q, [t], t.payer_addr == ^addr) |
| 45 |
:-( |
{:payee_addr, addr}, q -> where(q, [t], t.payee_addr == ^addr) |
| 46 |
:-( |
_, q -> q |
| 47 |
|
end) |
| 48 |
|
end |
| 49 |
|
|
| 50 |
|
@credit_wait_ms 30_000 |
| 51 |
|
@reversal_wait_ms 30_000 |
| 52 |
|
@response_code_map %{ |
| 53 |
|
"CS" => :credit_success_confirmed, |
| 54 |
|
"00" => :reversal_success, |
| 55 |
|
# fallback failures (examples) |
| 56 |
|
"U9" => :credit_failure, |
| 57 |
|
"T0" => :timeout_failure |
| 58 |
|
} |
| 59 |
|
|
| 60 |
|
@spec initiate(map()) :: {:ok, Transaction.t()} | {:error, any()} |
| 61 |
|
def initiate(attrs) do |
| 62 |
|
Multi.new() |
| 63 |
|
|> Multi.insert(:transaction, Transaction.changeset(%Transaction{}, attrs)) |
| 64 |
|
|> Multi.run(:event, fn _repo, %{transaction: t} -> |
| 65 |
7 |
append_event(t.id, :txn_initiated, Map.take(attrs, [:org_txn_id, :ver_token])) |
| 66 |
|
end) |
| 67 |
|
|> Repo.transaction() |
| 68 |
7 |
|> case do |
| 69 |
7 |
{:ok, %{transaction: t}} -> {:ok, t} |
| 70 |
:-( |
{:error, step, reason, _} -> {:error, {step, reason}} |
| 71 |
|
end |
| 72 |
|
end |
| 73 |
|
|
| 74 |
|
@spec mark_debit_secured(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} |
| 75 |
|
def mark_debit_secured(%Transaction{} = t) do |
| 76 |
:-( |
update_with_event(t, :debit_secured, %{debit_secured_at: DateTime.utc_now()}, :debit_secured_at) |
| 77 |
|
end |
| 78 |
|
|
| 79 |
|
@spec request_credit(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} |
| 80 |
|
def request_credit(%Transaction{} = t) do |
| 81 |
|
update_with_event(t, :credit_requested, %{credit_requested_at: DateTime.utc_now()}, :credit_requested_at) |
| 82 |
:-( |
|> case do |
| 83 |
|
{:ok, t1} -> |
| 84 |
|
# fire partner credit request (ignore immediate result; handle via success or follow-up) |
| 85 |
:-( |
_ = SandboxPartner.credit_request(%{org_txn_id: t1.org_txn_id, amount: t1.inr_amount}) |
| 86 |
|
{:ok, t1} |
| 87 |
:-( |
other -> other |
| 88 |
|
end |
| 89 |
|
end |
| 90 |
|
|
| 91 |
|
@spec mark_credit_success(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} |
| 92 |
|
def mark_credit_success(%Transaction{} = t) do |
| 93 |
:-( |
attrs = %{credit_completed_at: DateTime.utc_now(), current_state: "success", status: "success"} |
| 94 |
:-( |
update_with_event(t, :credit_success, attrs, :credit_completed_at) |
| 95 |
|
end |
| 96 |
|
|
| 97 |
|
@spec debit_and_credit(map()) :: {:ok, Transaction.t()} | {:error, any()} |
| 98 |
|
def debit_and_credit(attrs) do |
| 99 |
:-( |
with {:ok, t} <- initiate(attrs), |
| 100 |
:-( |
{:ok, t} <- mark_debit_secured(t), |
| 101 |
:-( |
{:ok, t} <- request_credit(t), |
| 102 |
:-( |
{:ok, _partner_resp} <- SandboxPartner.credit_request(%{org_txn_id: t.org_txn_id, amount: t.inr_amount}), |
| 103 |
:-( |
{:ok, t} <- mark_credit_success(t) do |
| 104 |
|
{:ok, t} |
| 105 |
|
end |
| 106 |
|
end |
| 107 |
|
|
| 108 |
|
@spec debit_credit_with_timeouts(map()) :: {:ok, Transaction.t()} | {:error, any()} |
| 109 |
|
def debit_credit_with_timeouts(attrs) do |
| 110 |
:-( |
with {:ok, t} <- initiate(attrs), |
| 111 |
:-( |
{:ok, t} <- mark_debit_secured(t), |
| 112 |
:-( |
{:ok, t} <- request_credit(t), |
| 113 |
:-( |
:ok <- schedule_followup(t, :chktxn, @credit_wait_ms) do |
| 114 |
|
{:ok, t} |
| 115 |
|
end |
| 116 |
|
end |
| 117 |
|
|
| 118 |
|
@doc """ |
| 119 |
|
Called by watchdog when credit response arrives; cancels follow-up chain. |
| 120 |
|
""" |
| 121 |
|
def handle_credit_success(t) do |
| 122 |
:-( |
mark_credit_success(t) |
| 123 |
|
end |
| 124 |
|
|
| 125 |
|
@doc """ |
| 126 |
|
Watchdog entry: process expired followups. |
| 127 |
|
""" |
| 128 |
:-( |
def process_expired_followups(now \\ DateTime.utc_now()) do |
| 129 |
|
from(f in FollowupRequest, where: f.status == "scheduled" and f.deadline_at <= ^now) |
| 130 |
|
|> Repo.all() |
| 131 |
:-( |
|> Enum.each(&dispatch_followup/1) |
| 132 |
|
:ok |
| 133 |
|
end |
| 134 |
|
|
| 135 |
|
@spec handle_chktxn_response(Transaction.t(), String.t(), map()) :: {:ok, Transaction.t()} | {:error, any()} |
| 136 |
|
def handle_chktxn_response(t, code, payload) do |
| 137 |
:-( |
classification = Map.get(@response_code_map, code, :credit_failure) |
| 138 |
:-( |
case classification do |
| 139 |
:-( |
:credit_success_confirmed -> mark_credit_success(t) |
| 140 |
:-( |
:reversal_success -> mark_failure(t, code, payload) |
| 141 |
:-( |
:credit_failure -> mark_failure(t, code, payload) |
| 142 |
:-( |
_ -> mark_failure(t, code, payload) |
| 143 |
|
end |
| 144 |
|
end |
| 145 |
|
|
| 146 |
|
@spec mark_failure(Transaction.t(), String.t(), map()) :: {:ok, Transaction.t()} | {:error, any()} |
| 147 |
|
def mark_failure(%Transaction{}=t, code, _payload) do |
| 148 |
:-( |
attrs = %{failure_code: code, current_state: "failure", status: "failure", finalized_at: DateTime.utc_now()} |
| 149 |
:-( |
update_with_event(t, :failure, attrs, :finalized_at) |
| 150 |
|
end |
| 151 |
|
|
| 152 |
|
@spec mark_deemed(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()} |
| 153 |
|
def mark_deemed(%Transaction{}=t) do |
| 154 |
:-( |
attrs = %{deemed: true, current_state: "deemed", status: "deemed", finalized_at: DateTime.utc_now()} |
| 155 |
:-( |
update_with_event(t, :deemed, attrs, :finalized_at) |
| 156 |
|
end |
| 157 |
|
|
| 158 |
|
defp update_with_event(t, event_type, field_updates, ts_field) do |
| 159 |
|
Multi.new() |
| 160 |
|
|> Multi.update(:txn, Transaction.changeset(t, Map.merge(field_updates, %{current_state: state_name(event_type)}))) |
| 161 |
|
|> Multi.run(:event, fn _repo, %{txn: updated} -> |
| 162 |
:-( |
payload = Map.take(updated, [ts_field, :current_state]) |
| 163 |
:-( |
append_event(updated.id, event_type, payload) |
| 164 |
|
end) |
| 165 |
|
|> Repo.transaction() |
| 166 |
:-( |
|> case do |
| 167 |
:-( |
{:ok, %{txn: updated}} -> {:ok, updated} |
| 168 |
:-( |
{:error, step, reason, _} -> {:error, {step, reason}} |
| 169 |
|
end |
| 170 |
|
end |
| 171 |
|
|
| 172 |
|
# Add helper markers for follow-up sends |
| 173 |
|
defp mark_chktxn_sent(%Transaction{}=t) do |
| 174 |
:-( |
update_with_event(t, :chktxn_sent, %{chktxn_requested_at: DateTime.utc_now()}, :chktxn_requested_at) |
| 175 |
|
end |
| 176 |
|
defp mark_reversal_sent(%Transaction{}=t) do |
| 177 |
:-( |
update_with_event(t, :reversal_sent, %{reversal_requested_at: DateTime.utc_now()}, :reversal_requested_at) |
| 178 |
|
end |
| 179 |
|
|
| 180 |
:-( |
defp state_name(:debit_secured), do: "debit_secured" |
| 181 |
:-( |
defp state_name(:credit_requested), do: "credit_pending" |
| 182 |
:-( |
defp state_name(:credit_success), do: "success" |
| 183 |
:-( |
defp state_name(:failure), do: "failure" |
| 184 |
:-( |
defp state_name(:deemed), do: "deemed" |
| 185 |
:-( |
defp state_name(:chktxn_sent), do: "chktxn_pending" |
| 186 |
:-( |
defp state_name(:reversal_sent), do: "reversal_pending" |
| 187 |
:-( |
defp state_name(_), do: "unknown" |
| 188 |
|
|
| 189 |
|
@spec append_event(txn_id, atom(), map()) :: {:ok, TransactionEvent.t()} | {:error, any()} |
| 190 |
|
def append_event(transaction_id, event_type, payload) do |
| 191 |
9 |
last = |
| 192 |
|
TransactionEvent |
| 193 |
|
|> where([e], e.transaction_id == ^transaction_id) |
| 194 |
|
|> order_by([e], desc: e.seq) |
| 195 |
9 |
|> limit(1) |
| 196 |
|
|> Repo.one() |
| 197 |
|
|
| 198 |
9 |
seq = if last, do: last.seq + 1, else: 1 |
| 199 |
9 |
prev_hash = last && last.hash |
| 200 |
9 |
now = DateTime.utc_now() |
| 201 |
9 |
hash = hash_event(prev_hash, seq, event_type, payload, now) |
| 202 |
|
|
| 203 |
|
%TransactionEvent{} |
| 204 |
|
|> TransactionEvent.changeset(%{ |
| 205 |
|
transaction_id: transaction_id, |
| 206 |
|
seq: seq, |
| 207 |
9 |
event_type: to_string(event_type), |
| 208 |
|
prev_hash: prev_hash, |
| 209 |
|
hash: hash, |
| 210 |
|
payload: payload, |
| 211 |
|
inserted_at: now |
| 212 |
|
}) |
| 213 |
9 |
|> Repo.insert() |
| 214 |
|
end |
| 215 |
|
|
| 216 |
|
defp hash_event(prev_hash, seq, event_type, payload, ts) do |
| 217 |
9 |
base = :erlang.term_to_binary({prev_hash, seq, event_type, payload, ts}) |
| 218 |
9 |
:crypto.hash(:sha256, base) |
| 219 |
|
end |
| 220 |
|
|
| 221 |
|
defp schedule_followup(t, :chktxn, wait_ms) do |
| 222 |
:-( |
deadline = DateTime.add(DateTime.utc_now(), div(wait_ms, 1000), :second) |
| 223 |
|
%FollowupRequest{} |
| 224 |
:-( |
|> FollowupRequest.changeset(%{transaction_id: t.id, kind: "chktxn", deadline_at: deadline, attempt_no: 1, status: "scheduled"}) |
| 225 |
|
|> Repo.insert() |
| 226 |
:-( |
|> case do |
| 227 |
:-( |
{:ok, _} -> :ok |
| 228 |
:-( |
{:error, reason} -> {:error, reason} |
| 229 |
|
end |
| 230 |
|
end |
| 231 |
|
|
| 232 |
|
defp dispatch_followup(%FollowupRequest{kind: "chktxn"}=f) do |
| 233 |
:-( |
{:ok, _} = Repo.update(FollowupRequest.changeset(f, %{status: "sent", sent_at: DateTime.utc_now()})) |
| 234 |
:-( |
t = Repo.get(Transaction, f.transaction_id) |
| 235 |
:-( |
{:ok, t} = mark_chktxn_sent(t) |
| 236 |
:-( |
case SandboxPartner.check_txn(%{org_txn_id: t.org_txn_id}) do |
| 237 |
|
{:ok, %{code: code, payload: payload}=resp} -> |
| 238 |
:-( |
Repo.update(FollowupRequest.changeset(f, %{response_code: code, response_payload: payload})) |
| 239 |
:-( |
handle_chktxn_response(t, code, payload) |
| 240 |
:-( |
if code != "CS" do |
| 241 |
:-( |
schedule_reversal(f.transaction_id) |
| 242 |
|
end |
| 243 |
|
{:ok, resp} |
| 244 |
|
{:error, reason} -> |
| 245 |
:-( |
schedule_reversal(f.transaction_id) |
| 246 |
|
{:error, reason} |
| 247 |
|
end |
| 248 |
|
end |
| 249 |
|
defp dispatch_followup(%FollowupRequest{kind: "reversal"}=f) do |
| 250 |
:-( |
{:ok, _} = Repo.update(FollowupRequest.changeset(f, %{status: "sent", sent_at: DateTime.utc_now()})) |
| 251 |
:-( |
t = Repo.get(Transaction, f.transaction_id) |
| 252 |
:-( |
{:ok, t} = mark_reversal_sent(t) |
| 253 |
:-( |
case SandboxPartner.reversal_request(%{org_txn_id: t.org_txn_id}) do |
| 254 |
|
{:ok, %{code: code, payload: payload}} -> |
| 255 |
:-( |
Repo.update(FollowupRequest.changeset(f, %{response_code: code, response_payload: payload})) |
| 256 |
:-( |
handle_chktxn_response(t, code, payload) |
| 257 |
|
:ok |
| 258 |
|
{:error, _} -> |
| 259 |
:-( |
mark_deemed(t) |
| 260 |
|
end |
| 261 |
|
end |
| 262 |
|
|
| 263 |
|
defp schedule_reversal(txn_id) do |
| 264 |
:-( |
deadline = DateTime.add(DateTime.utc_now(), div(@reversal_wait_ms, 1000), :second) |
| 265 |
|
%FollowupRequest{} |
| 266 |
|
|> FollowupRequest.changeset(%{transaction_id: txn_id, kind: "reversal", deadline_at: deadline, attempt_no: 1, status: "scheduled"}) |
| 267 |
:-( |
|> Repo.insert() |
| 268 |
|
:ok |
| 269 |
|
end |
| 270 |
|
end |