cover/Elixir.DaProductApp.Transactions.Service.html

1 defmodule DaProductApp.Transactions.Service do
2 @moduledoc """
3 Transaction orchestration service with timeout handling.
4 """
5 import Ecto.Query
6 alias Ecto.Multi
7 alias DaProductApp.Repo
8 alias DaProductApp.Transactions.{Transaction, TransactionEvent}
9 alias DaProductApp.Adapters.SandboxPartner
10 alias DaProductApp.Transactions.FollowupRequest
11
12 @type txn_id :: integer()
13
14 @spec get_transaction_by_org_id(String.t()) :: Transaction.t() | nil
15 def get_transaction_by_org_id(org_txn_id) do
16 Transaction
17
:-(
|> where([t], t.org_txn_id == ^org_txn_id)
18
:-(
|> Repo.one()
19 end
20
21 @spec get_transaction_with_events(String.t()) :: Transaction.t() | nil
22 def get_transaction_with_events(org_txn_id) do
23
:-(
events_query = from(e in TransactionEvent, order_by: [asc: e.seq])
24
25 Transaction
26 |> where([t], t.org_txn_id == ^org_txn_id)
27
:-(
|> preload([events: ^events_query])
28
:-(
|> Repo.one()
29 end
30
31 @spec list_transactions(map()) :: [Transaction.t()]
32
:-(
def list_transactions(filters \\ %{}) do
33 Transaction
34 |> apply_filters(filters)
35 |> order_by([t], desc: t.inserted_at)
36
:-(
|> limit(100)
37
:-(
|> Repo.all()
38 end
39
40 defp apply_filters(query, filters) do
41
:-(
Enum.reduce(filters, query, fn
42
:-(
{:status, status}, q -> where(q, [t], t.status == ^status)
43
:-(
{:current_state, state}, q -> where(q, [t], t.current_state == ^state)
44
:-(
{:payer_addr, addr}, q -> where(q, [t], t.payer_addr == ^addr)
45
:-(
{:payee_addr, addr}, q -> where(q, [t], t.payee_addr == ^addr)
46
:-(
_, q -> q
47 end)
48 end
49
50 @credit_wait_ms 30_000
51 @reversal_wait_ms 30_000
52 @response_code_map %{
53 "CS" => :credit_success_confirmed,
54 "00" => :reversal_success,
55 # fallback failures (examples)
56 "U9" => :credit_failure,
57 "T0" => :timeout_failure
58 }
59
60 @spec initiate(map()) :: {:ok, Transaction.t()} | {:error, any()}
61 def initiate(attrs) do
62 Multi.new()
63 |> Multi.insert(:transaction, Transaction.changeset(%Transaction{}, attrs))
64 |> Multi.run(:event, fn _repo, %{transaction: t} ->
65 7 append_event(t.id, :txn_initiated, Map.take(attrs, [:org_txn_id, :ver_token]))
66 end)
67 |> Repo.transaction()
68 7 |> case do
69 7 {:ok, %{transaction: t}} -> {:ok, t}
70
:-(
{:error, step, reason, _} -> {:error, {step, reason}}
71 end
72 end
73
74 @spec mark_debit_secured(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()}
75 def mark_debit_secured(%Transaction{} = t) do
76
:-(
update_with_event(t, :debit_secured, %{debit_secured_at: DateTime.utc_now()}, :debit_secured_at)
77 end
78
79 @spec request_credit(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()}
80 def request_credit(%Transaction{} = t) do
81 update_with_event(t, :credit_requested, %{credit_requested_at: DateTime.utc_now()}, :credit_requested_at)
82
:-(
|> case do
83 {:ok, t1} ->
84 # fire partner credit request (ignore immediate result; handle via success or follow-up)
85
:-(
_ = SandboxPartner.credit_request(%{org_txn_id: t1.org_txn_id, amount: t1.inr_amount})
86 {:ok, t1}
87
:-(
other -> other
88 end
89 end
90
91 @spec mark_credit_success(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()}
92 def mark_credit_success(%Transaction{} = t) do
93
:-(
attrs = %{credit_completed_at: DateTime.utc_now(), current_state: "success", status: "success"}
94
:-(
update_with_event(t, :credit_success, attrs, :credit_completed_at)
95 end
96
97 @spec debit_and_credit(map()) :: {:ok, Transaction.t()} | {:error, any()}
98 def debit_and_credit(attrs) do
99
:-(
with {:ok, t} <- initiate(attrs),
100
:-(
{:ok, t} <- mark_debit_secured(t),
101
:-(
{:ok, t} <- request_credit(t),
102
:-(
{:ok, _partner_resp} <- SandboxPartner.credit_request(%{org_txn_id: t.org_txn_id, amount: t.inr_amount}),
103
:-(
{:ok, t} <- mark_credit_success(t) do
104 {:ok, t}
105 end
106 end
107
108 @spec debit_credit_with_timeouts(map()) :: {:ok, Transaction.t()} | {:error, any()}
109 def debit_credit_with_timeouts(attrs) do
110
:-(
with {:ok, t} <- initiate(attrs),
111
:-(
{:ok, t} <- mark_debit_secured(t),
112
:-(
{:ok, t} <- request_credit(t),
113
:-(
:ok <- schedule_followup(t, :chktxn, @credit_wait_ms) do
114 {:ok, t}
115 end
116 end
117
118 @doc """
119 Called by watchdog when credit response arrives; cancels follow-up chain.
120 """
121 def handle_credit_success(t) do
122
:-(
mark_credit_success(t)
123 end
124
125 @doc """
126 Watchdog entry: process expired followups.
127 """
128
:-(
def process_expired_followups(now \\ DateTime.utc_now()) do
129 from(f in FollowupRequest, where: f.status == "scheduled" and f.deadline_at <= ^now)
130 |> Repo.all()
131
:-(
|> Enum.each(&dispatch_followup/1)
132 :ok
133 end
134
135 @spec handle_chktxn_response(Transaction.t(), String.t(), map()) :: {:ok, Transaction.t()} | {:error, any()}
136 def handle_chktxn_response(t, code, payload) do
137
:-(
classification = Map.get(@response_code_map, code, :credit_failure)
138
:-(
case classification do
139
:-(
:credit_success_confirmed -> mark_credit_success(t)
140
:-(
:reversal_success -> mark_failure(t, code, payload)
141
:-(
:credit_failure -> mark_failure(t, code, payload)
142
:-(
_ -> mark_failure(t, code, payload)
143 end
144 end
145
146 @spec mark_failure(Transaction.t(), String.t(), map()) :: {:ok, Transaction.t()} | {:error, any()}
147 def mark_failure(%Transaction{}=t, code, _payload) do
148
:-(
attrs = %{failure_code: code, current_state: "failure", status: "failure", finalized_at: DateTime.utc_now()}
149
:-(
update_with_event(t, :failure, attrs, :finalized_at)
150 end
151
152 @spec mark_deemed(Transaction.t()) :: {:ok, Transaction.t()} | {:error, any()}
153 def mark_deemed(%Transaction{}=t) do
154
:-(
attrs = %{deemed: true, current_state: "deemed", status: "deemed", finalized_at: DateTime.utc_now()}
155
:-(
update_with_event(t, :deemed, attrs, :finalized_at)
156 end
157
158 defp update_with_event(t, event_type, field_updates, ts_field) do
159 Multi.new()
160 |> Multi.update(:txn, Transaction.changeset(t, Map.merge(field_updates, %{current_state: state_name(event_type)})))
161 |> Multi.run(:event, fn _repo, %{txn: updated} ->
162
:-(
payload = Map.take(updated, [ts_field, :current_state])
163
:-(
append_event(updated.id, event_type, payload)
164 end)
165 |> Repo.transaction()
166
:-(
|> case do
167
:-(
{:ok, %{txn: updated}} -> {:ok, updated}
168
:-(
{:error, step, reason, _} -> {:error, {step, reason}}
169 end
170 end
171
172 # Add helper markers for follow-up sends
173 defp mark_chktxn_sent(%Transaction{}=t) do
174
:-(
update_with_event(t, :chktxn_sent, %{chktxn_requested_at: DateTime.utc_now()}, :chktxn_requested_at)
175 end
176 defp mark_reversal_sent(%Transaction{}=t) do
177
:-(
update_with_event(t, :reversal_sent, %{reversal_requested_at: DateTime.utc_now()}, :reversal_requested_at)
178 end
179
180
:-(
defp state_name(:debit_secured), do: "debit_secured"
181
:-(
defp state_name(:credit_requested), do: "credit_pending"
182
:-(
defp state_name(:credit_success), do: "success"
183
:-(
defp state_name(:failure), do: "failure"
184
:-(
defp state_name(:deemed), do: "deemed"
185
:-(
defp state_name(:chktxn_sent), do: "chktxn_pending"
186
:-(
defp state_name(:reversal_sent), do: "reversal_pending"
187
:-(
defp state_name(_), do: "unknown"
188
189 @spec append_event(txn_id, atom(), map()) :: {:ok, TransactionEvent.t()} | {:error, any()}
190 def append_event(transaction_id, event_type, payload) do
191 9 last =
192 TransactionEvent
193 |> where([e], e.transaction_id == ^transaction_id)
194 |> order_by([e], desc: e.seq)
195 9 |> limit(1)
196 |> Repo.one()
197
198 9 seq = if last, do: last.seq + 1, else: 1
199 9 prev_hash = last && last.hash
200 9 now = DateTime.utc_now()
201 9 hash = hash_event(prev_hash, seq, event_type, payload, now)
202
203 %TransactionEvent{}
204 |> TransactionEvent.changeset(%{
205 transaction_id: transaction_id,
206 seq: seq,
207 9 event_type: to_string(event_type),
208 prev_hash: prev_hash,
209 hash: hash,
210 payload: payload,
211 inserted_at: now
212 })
213 9 |> Repo.insert()
214 end
215
216 defp hash_event(prev_hash, seq, event_type, payload, ts) do
217 9 base = :erlang.term_to_binary({prev_hash, seq, event_type, payload, ts})
218 9 :crypto.hash(:sha256, base)
219 end
220
221 defp schedule_followup(t, :chktxn, wait_ms) do
222
:-(
deadline = DateTime.add(DateTime.utc_now(), div(wait_ms, 1000), :second)
223 %FollowupRequest{}
224
:-(
|> FollowupRequest.changeset(%{transaction_id: t.id, kind: "chktxn", deadline_at: deadline, attempt_no: 1, status: "scheduled"})
225 |> Repo.insert()
226
:-(
|> case do
227
:-(
{:ok, _} -> :ok
228
:-(
{:error, reason} -> {:error, reason}
229 end
230 end
231
232 defp dispatch_followup(%FollowupRequest{kind: "chktxn"}=f) do
233
:-(
{:ok, _} = Repo.update(FollowupRequest.changeset(f, %{status: "sent", sent_at: DateTime.utc_now()}))
234
:-(
t = Repo.get(Transaction, f.transaction_id)
235
:-(
{:ok, t} = mark_chktxn_sent(t)
236
:-(
case SandboxPartner.check_txn(%{org_txn_id: t.org_txn_id}) do
237 {:ok, %{code: code, payload: payload}=resp} ->
238
:-(
Repo.update(FollowupRequest.changeset(f, %{response_code: code, response_payload: payload}))
239
:-(
handle_chktxn_response(t, code, payload)
240
:-(
if code != "CS" do
241
:-(
schedule_reversal(f.transaction_id)
242 end
243 {:ok, resp}
244 {:error, reason} ->
245
:-(
schedule_reversal(f.transaction_id)
246 {:error, reason}
247 end
248 end
249 defp dispatch_followup(%FollowupRequest{kind: "reversal"}=f) do
250
:-(
{:ok, _} = Repo.update(FollowupRequest.changeset(f, %{status: "sent", sent_at: DateTime.utc_now()}))
251
:-(
t = Repo.get(Transaction, f.transaction_id)
252
:-(
{:ok, t} = mark_reversal_sent(t)
253
:-(
case SandboxPartner.reversal_request(%{org_txn_id: t.org_txn_id}) do
254 {:ok, %{code: code, payload: payload}} ->
255
:-(
Repo.update(FollowupRequest.changeset(f, %{response_code: code, response_payload: payload}))
256
:-(
handle_chktxn_response(t, code, payload)
257 :ok
258 {:error, _} ->
259
:-(
mark_deemed(t)
260 end
261 end
262
263 defp schedule_reversal(txn_id) do
264
:-(
deadline = DateTime.add(DateTime.utc_now(), div(@reversal_wait_ms, 1000), :second)
265 %FollowupRequest{}
266 |> FollowupRequest.changeset(%{transaction_id: txn_id, kind: "reversal", deadline_at: deadline, attempt_no: 1, status: "scheduled"})
267
:-(
|> Repo.insert()
268 :ok
269 end
270 end
Line Hits Source