cover/Elixir.DaProductApp.Transactions.ReqPayService.html

1 defmodule DaProductApp.Transactions.ReqPayService do
2 @moduledoc """
3 Service layer for managing ReqPay transactions with comprehensive event logging.
4
5 This service provides:
6 - Complete ReqPay lifecycle management
7 - Event logging with hash chain integrity
8 - XML hash storage for audit trails
9 - Status transitions and validations
10 - Partner and merchant association management
11 - International payment processing
12
13 Features:
14 - Stores all ReqPay details in normalized format
15 - Maintains event chain with cryptographic hashing
16 - Supports both domestic and international payments
17 - Provides analytics and reporting capabilities
18 - Handles timeout and reversal scenarios
19 """
20
21 import Ecto.Query, warn: false
22 alias Ecto.Multi
23 alias DaProductApp.Repo
24 alias DaProductApp.Transactions.{ReqPay, ReqPayEvent, Transaction, TransactionEvent}
25 alias DaProductApp.Partners.Partner
26 alias DaProductApp.Merchants.Merchant
27 alias DaProductApp.TransactionEventChainService
28
29 require Logger
30
31 # ================================
32 # MAIN CRUD OPERATIONS
33 # ================================
34
35 @doc """
36 Create a new ReqPay record with initial event logging.
37
38 ## Parameters
39 - `attrs`: Map containing ReqPay attributes
40 - `req_xml`: Original XML request for hash storage
41 - `parsed_data`: Parsed XML data for reference
42
43 ## Returns
44 - `{:ok, req_pay}` on success
45 - `{:error, changeset}` on validation errors
46 """
47 @spec create_req_pay(map(), String.t() | nil, map() | nil) :: {:ok, ReqPay.t()} | {:error, any()}
48 27 def create_req_pay(attrs, req_xml \\ nil, parsed_data \\ nil) do
49 28 now = DateTime.utc_now()
50
51 # Enhance attributes with computed fields
52 28 enhanced_attrs = attrs
53 |> Map.put(:npci_request_received_at, now)
54 |> Map.put(:status, Map.get(attrs, :status, "PENDING"))
55 |> compute_xml_hashes(req_xml, nil)
56
57 Multi.new()
58 |> Multi.run(:req_pay, fn repo, _changes ->
59 28 changeset = ReqPay.changeset(%ReqPay{}, enhanced_attrs)
60
61 28 case repo.insert(changeset) do
62 26 {:ok, req_pay} ->
63 {:ok, req_pay}
64
65 {:error, changeset} ->
66 # If insert failed due to unique msg_id, treat as idempotent: fetch existing row and continue
67 2 if Enum.any?(changeset.errors, fn {field, _err} -> field == :msg_id end) do
68 1 msg_id = Map.get(enhanced_attrs, :msg_id) || Map.get(enhanced_attrs, "msg_id")
69 1 case repo.get_by(ReqPay, msg_id: msg_id) do
70
:-(
nil -> {:error, changeset}
71 existing ->
72
:-(
Logger.warn("[ReqPayService] ReqPay insert conflict for msg_id=#{inspect(msg_id)}; using existing id=#{existing.id}")
73 {:ok, existing}
74 end
75 else
76 {:error, changeset}
77 end
78 end
79 end)
80 |> Multi.run(:initial_event, fn _repo, %{req_pay: req_pay} ->
81 26 create_initial_event(req_pay, req_xml, parsed_data)
82 end)
83 |> Multi.run(:transaction_link, fn _repo, %{req_pay: req_pay} ->
84 26 maybe_link_transaction(req_pay, parsed_data)
85 end)
86 |> Repo.transaction()
87 28 |> case do
88 {:ok, %{req_pay: req_pay}} ->
89
:-(
Logger.info("[ReqPayService] Created ReqPay with ID: #{req_pay.id}, msg_id: #{req_pay.msg_id}")
90 {:ok, req_pay}
91 {:error, _failed_op, changeset, _changes} ->
92 1 Logger.error("[ReqPayService] Failed to create ReqPay: #{inspect(changeset)}")
93 {:error, changeset}
94 end
95 end
96
97 @doc """
98 Update ReqPay with response data and log event.
99 """
100 @spec update_req_pay_with_response(integer() | ReqPay.t(), map(), String.t() | nil) :: {:ok, ReqPay.t()} | {:error, any()}
101
:-(
def update_req_pay_with_response(req_pay_id_or_struct, response_attrs, resp_xml \\ nil) do
102
:-(
req_pay = case req_pay_id_or_struct do
103
:-(
%ReqPay{} = rp -> rp
104
:-(
id when is_integer(id) -> get_req_pay!(id)
105 end
106
107
:-(
now = DateTime.utc_now()
108
109 # Enhance response attributes
110
:-(
enhanced_attrs = response_attrs
111 |> Map.put(:npci_response_sent_at, now)
112 |> Map.put(:processing_duration_ms, calculate_processing_duration(req_pay, now))
113 |> compute_xml_hashes(nil, resp_xml)
114
115 Multi.new()
116 |> Multi.update(:req_pay, ReqPay.changeset(req_pay, enhanced_attrs))
117 |> Multi.run(:response_event, fn _repo, %{req_pay: updated_req_pay} ->
118
:-(
append_event(updated_req_pay, "response_sent", response_attrs, resp_xml)
119 end)
120 |> Repo.transaction()
121
:-(
|> case do
122 {:ok, %{req_pay: updated_req_pay}} ->
123
:-(
Logger.info("[ReqPayService] Updated ReqPay #{updated_req_pay.id} with response")
124 {:ok, updated_req_pay}
125 {:error, _failed_op, changeset, _changes} ->
126
:-(
Logger.error("[ReqPayService] Failed to update ReqPay: #{inspect(changeset)}")
127 {:error, changeset}
128 end
129 end
130
131 @doc """
132 Update ReqPay status with event logging.
133 """
134 @spec update_status(integer() | ReqPay.t(), String.t(), map()) :: {:ok, ReqPay.t()} | {:error, any()}
135
:-(
def update_status(req_pay_id_or_struct, new_status, metadata \\ %{}) do
136
:-(
req_pay = case req_pay_id_or_struct do
137
:-(
%ReqPay{} = rp -> rp
138
:-(
id when is_integer(id) -> get_req_pay!(id)
139 end
140
141
:-(
if valid_status_transition?(req_pay.status, new_status) do
142
:-(
attrs = %{
143 status: new_status,
144 updated_at: DateTime.utc_now()
145 }
146
147 # Add status-specific fields
148
:-(
attrs = case new_status do
149 "PROCESSED" ->
150
:-(
Map.merge(attrs, %{
151 payment_status: Map.get(metadata, :payment_status, "SUCCESS"),
152 paid_at: DateTime.utc_now()
153 })
154 "FAILED" ->
155
:-(
Map.merge(attrs, %{
156 error_code: Map.get(metadata, :error_code),
157 error_message: Map.get(metadata, :error_message)
158 })
159
:-(
_ -> attrs
160 end
161
162 Multi.new()
163 |> Multi.update(:req_pay, ReqPay.changeset(req_pay, attrs))
164 |> Multi.run(:status_event, fn _repo, %{req_pay: updated_req_pay} ->
165
:-(
event_data = Map.merge(metadata, %{
166
:-(
old_status: req_pay.status,
167 new_status: new_status,
168 transition_reason: Map.get(metadata, :reason, "Status update")
169 })
170
:-(
append_event(updated_req_pay, "status_changed", event_data)
171 end)
172 |> Repo.transaction()
173
:-(
|> case do
174 {:ok, %{req_pay: updated_req_pay}} ->
175
:-(
Logger.info("[ReqPayService] Updated ReqPay #{updated_req_pay.id} status: #{req_pay.status} -> #{new_status}")
176 {:ok, updated_req_pay}
177 {:error, _failed_op, changeset, _changes} ->
178
:-(
Logger.error("[ReqPayService] Failed to update ReqPay status: #{inspect(changeset)}")
179 {:error, changeset}
180 end
181 else
182
:-(
Logger.error("[ReqPayService] Invalid status transition: #{req_pay.status} -> #{new_status}")
183 {:error, "Invalid status transition"}
184 end
185 end
186
187 @doc """
188 Mark ReqPay as processed with partner response details.
189 """
190 @spec mark_as_processed(integer() | ReqPay.t(), map()) :: {:ok, ReqPay.t()} | {:error, any()}
191
:-(
def mark_as_processed(req_pay_id_or_struct, partner_response \\ %{}) do
192
:-(
metadata = Map.merge(partner_response, %{
193 reason: "Payment processed successfully",
194 payment_status: "SUCCESS"
195 })
196
197
:-(
update_status(req_pay_id_or_struct, "PROCESSED", metadata)
198 end
199
200 @doc """
201 Mark ReqPay as failed with error details.
202 """
203 @spec mark_as_failed(integer() | ReqPay.t(), String.t(), String.t()) :: {:ok, ReqPay.t()} | {:error, any()}
204 def mark_as_failed(req_pay_id_or_struct, error_code, error_message) do
205
:-(
metadata = %{
206 reason: "Payment processing failed",
207 error_code: error_code,
208 error_message: error_message,
209 payment_status: "FAILURE"
210 }
211
212
:-(
update_status(req_pay_id_or_struct, "FAILED", metadata)
213 end
214
215 # ================================
216 # QUERY OPERATIONS
217 # ================================
218
219 @doc """
220 Get ReqPay by ID with all associations.
221 """
222 @spec get_req_pay(integer()) :: ReqPay.t() | nil
223 def get_req_pay(id) do
224 ReqPay
225 |> where([r], r.id == ^id)
226 1 |> preload([:transaction, :partner, :merchant])
227 1 |> Repo.one()
228 end
229
230 @doc """
231 Get ReqPay by ID, raising if not found.
232 """
233 @spec get_req_pay!(integer()) :: ReqPay.t()
234 def get_req_pay!(id) do
235 ReqPay
236 |> where([r], r.id == ^id)
237 1 |> preload([:transaction, :partner, :merchant])
238 1 |> Repo.one!()
239 end
240
241 @doc """
242 Get ReqPay by message ID.
243 """
244 @spec get_by_msg_id(String.t()) :: ReqPay.t() | nil
245 def get_by_msg_id(msg_id) do
246 ReqPay
247 |> where([r], r.msg_id == ^msg_id)
248
:-(
|> preload([:transaction, :partner, :merchant])
249
:-(
|> Repo.one()
250 end
251
252 @doc """
253 Get ReqPay with event history.
254 """
255 @spec get_with_events(integer()) :: ReqPay.t() | nil
256 def get_with_events(id) do
257
:-(
events_query = from(e in ReqPayEvent, order_by: [asc: e.seq])
258
259 ReqPay
260 |> where([r], r.id == ^id)
261
:-(
|> preload([
262 :transaction,
263 :partner,
264 :merchant,
265 events: ^events_query
266 ])
267
:-(
|> Repo.one()
268 end
269
270 @doc """
271 List ReqPay records with filtering and pagination.
272 """
273 @spec list_req_pays(keyword()) :: [ReqPay.t()]
274
:-(
def list_req_pays(opts \\ []) do
275
:-(
base_query = from(r in ReqPay, preload: [:transaction, :partner, :merchant])
276
277 base_query
278 |> apply_filters(opts)
279 |> apply_sorting(opts)
280 |> apply_pagination(opts)
281
:-(
|> Repo.all()
282 end
283
284 # ================================
285 # EVENT LOGGING
286 # ================================
287
288 @doc """
289 Append event to ReqPay with hash chain integrity.
290 """
291 @spec append_event(ReqPay.t(), String.t(), map(), String.t() | nil) :: {:ok, ReqPayEvent.t()} | {:error, any()}
292 1 def append_event(%ReqPay{} = req_pay, event_type, payload \\ %{}, xml_content \\ nil) do
293 # Get the last event to continue the hash chain
294 27 last_event = get_last_event(req_pay.id)
295 27 next_seq = if last_event, do: last_event.seq + 1, else: 1
296
297 # Compute XML hashes if provided
298 27 xml_hashes = compute_event_xml_hashes(xml_content)
299
300 # Create event with hash chain
301 27 event_attrs = %{
302 27 req_pay_id: req_pay.id,
303 27 transaction_id: req_pay.transaction_id,
304 seq: next_seq,
305 event_type: event_type,
306 payload: payload,
307 27 prev_hash: last_event && last_event.hash,
308 inserted_at: DateTime.utc_now()
309 }
310 |> Map.merge(xml_hashes)
311
312 # Compute hash for this event
313 27 event_attrs = Map.put(event_attrs, :hash, compute_event_hash(event_attrs))
314
315 27 case ReqPayEvent.changeset(%ReqPayEvent{}, event_attrs) |> Repo.insert() do
316 {:ok, event} ->
317 27 Logger.info("[ReqPayService] Added event #{event_type} for ReqPay #{req_pay.id}, seq: #{next_seq}")
318
319 # Also add to transaction events if linked
320 27 if req_pay.transaction_id do
321 1 add_to_transaction_events(req_pay, event)
322 end
323
324 {:ok, event}
325 {:error, changeset} ->
326
:-(
Logger.error("[ReqPayService] Failed to add event: #{inspect(changeset)}")
327 {:error, changeset}
328 end
329 end
330
331 # ================================
332 # ANALYTICS & REPORTING
333 # ================================
334
335 @doc """
336 Get ReqPay statistics for analytics.
337 """
338 @spec get_statistics(keyword()) :: map()
339
:-(
def get_statistics(opts \\ []) do
340
:-(
date_range = get_date_range(opts)
341
342
:-(
base_query = from(r in ReqPay)
343 |> filter_by_date_range(date_range)
344
345
:-(
%{
346 total_count: get_total_count(base_query),
347 status_breakdown: get_status_breakdown(base_query),
348 validation_type_breakdown: get_validation_type_breakdown(base_query),
349 payment_status_breakdown: get_payment_status_breakdown(base_query),
350 corridor_breakdown: get_corridor_breakdown(base_query),
351 amount_statistics: get_amount_statistics(base_query),
352 processing_time_stats: get_processing_time_stats(base_query),
353 daily_volume: get_daily_volume(base_query, date_range)
354 }
355 end
356
357 @doc """
358 Get ReqPay trends over time.
359 """
360 @spec get_trends(keyword()) :: map()
361
:-(
def get_trends(opts \\ []) do
362
:-(
date_range = get_date_range(opts)
363
:-(
interval = Keyword.get(opts, :interval, "day")
364
365
:-(
base_query = from(r in ReqPay)
366 |> filter_by_date_range(date_range)
367
368
:-(
%{
369 volume_trend: get_volume_trend(base_query, interval),
370 success_rate_trend: get_success_rate_trend(base_query, interval),
371 amount_trend: get_amount_trend(base_query, interval),
372 corridor_trend: get_corridor_trend(base_query, interval)
373 }
374 end
375
376 # ================================
377 # PRIVATE HELPER FUNCTIONS
378 # ================================
379
380 # Create initial event when ReqPay is created
381 defp create_initial_event(req_pay, req_xml, parsed_data) do
382 26 payload = %{
383 26 message_id: req_pay.msg_id,
384 26 transaction_id: req_pay.txn_id,
385 26 amount: req_pay.amount,
386 26 currency: req_pay.currency,
387 26 payer_addr: req_pay.payer_addr,
388 26 payee_addr: req_pay.payee_addr,
389 26 validation_type: req_pay.validation_type,
390 26 corridor: req_pay.corridor,
391 parsed_data: parsed_data
392 }
393
394 26 append_event(req_pay, "req_pay_received", payload, req_xml)
395 end
396
397 # Link ReqPay to existing transaction if applicable
398 defp maybe_link_transaction(req_pay, parsed_data) do
399 26 case find_existing_transaction(req_pay, parsed_data) do
400
:-(
nil -> {:ok, nil}
401 transaction ->
402 req_pay
403
:-(
|> Ecto.Changeset.change(transaction_id: transaction.id)
404
:-(
|> Repo.update()
405 end
406 end
407
408 # Find existing transaction to link with ReqPay
409 defp find_existing_transaction(req_pay, parsed_data) do
410 # Heuristics (ordered):
411 # 1. explicit transaction_id on ReqPay
412 # 2. txn_id -> match Transaction.org_txn_id or Transaction.req_msg_id
413 # 3. ref_url -> try extract numeric /transactions/<id> or match org_txn_id/ver_token
414 # 4. ref_id -> match org_txn_id or ver_token
415 # 5. msg_id -> try Transaction.req_msg_id, then QRValidation.msg_id join
416 # 6. npci_txn_id -> try partner_txn_id or npci_reversal_ref
417 26 cond do
418 26 req_pay.transaction_id ->
419
:-(
case Repo.get(Transaction, req_pay.transaction_id) do
420 nil ->
421
:-(
Logger.debug("[ReqPayService] requested link by transaction_id=#{inspect(req_pay.transaction_id)} but no transaction found")
422 nil
423 txn ->
424
:-(
Logger.info("[ReqPayService] linking ReqPay=#{inspect(req_pay.id)} to Transaction id=#{txn.id} via transaction_id")
425
:-(
txn
426 end
427
428 26 req_pay.txn_id && parsed_data ->
429
:-(
txn =
430 Transaction
431
:-(
|> where([t], t.org_txn_id == ^req_pay.txn_id or t.req_msg_id == ^req_pay.txn_id)
432 |> Repo.one()
433
434
:-(
if txn do
435
:-(
Logger.info("[ReqPayService] linked by txn_id=#{inspect(req_pay.txn_id)} to transaction id=#{txn.id}")
436
:-(
txn
437 else
438
:-(
Logger.debug("[ReqPayService] no transaction match for txn_id=#{inspect(req_pay.txn_id)}")
439 nil
440 end
441
442 26 req_pay.ref_url ->
443 # Try to extract a numeric transaction id from URL: /transactions/<id>
444
:-(
case Regex.run(~r{/transactions/(\d+)}, req_pay.ref_url) do
445 [_, id_str] ->
446
:-(
id = String.to_integer(id_str)
447
:-(
case Repo.get(Transaction, id) do
448 nil ->
449
:-(
Logger.debug("[ReqPayService] ref_url contained transactions/#{id} but no transaction found")
450 nil
451 txn ->
452
:-(
Logger.info("[ReqPayService] linked by ref_url to transaction id=#{id}")
453
:-(
txn
454 end
455 _ ->
456 # Fallback: try matching ref_url against org_txn_id or ver_token
457
:-(
txn = Transaction |> where([t], t.org_txn_id == ^req_pay.ref_url or t.ver_token == ^req_pay.ref_url) |> Repo.one()
458
:-(
if txn do
459
:-(
Logger.info("[ReqPayService] linked by ref_url matching org_txn_id/ver_token to transaction id=#{txn.id}")
460
:-(
txn
461 else
462
:-(
Logger.debug("[ReqPayService] ref_url provided but no transaction match for #{inspect(req_pay.ref_url)}")
463 nil
464 end
465 end
466
467 26 req_pay.ref_id ->
468
:-(
txn = Transaction |> where([t], t.org_txn_id == ^req_pay.ref_id or t.ver_token == ^req_pay.ref_id) |> Repo.one()
469
:-(
if txn do
470
:-(
Logger.info("[ReqPayService] linked by ref_id=#{inspect(req_pay.ref_id)} to transaction id=#{txn.id}")
471
:-(
txn
472 else
473
:-(
Logger.debug("[ReqPayService] no transaction match for ref_id=#{inspect(req_pay.ref_id)}")
474 nil
475 end
476
477 26 req_pay.msg_id ->
478 # Prefer direct req_msg_id match on Transaction, else fall back to QR join
479 26 txn = Transaction |> where([t], t.req_msg_id == ^req_pay.msg_id) |> Repo.one()
480
:-(
if txn do
481
:-(
Logger.info("[ReqPayService] linked by msg_id=#{inspect(req_pay.msg_id)} to transaction id=#{txn.id} via req_msg_id")
482
:-(
txn
483 else
484
:-(
txn =
485
:-(
from(t in Transaction,
486 left_join: qv in DaProductApp.QRValidation.QRValidation,
487 on: qv.id == t.parent_qr_validation_id,
488
:-(
where: qv.msg_id == ^req_pay.msg_id
489 )
490 |> Repo.one()
491
492
:-(
if txn do
493
:-(
Logger.info("[ReqPayService] linked by msg_id=#{inspect(req_pay.msg_id)} to transaction id=#{txn.id} via QRValidation.msg_id")
494 else
495
:-(
Logger.debug("[ReqPayService] no transaction match for msg_id=#{inspect(req_pay.msg_id)}")
496 end
497
498
:-(
txn
499 end
500
501
:-(
req_pay.npci_txn_id ->
502
:-(
txn = Transaction |> where([t], t.partner_txn_id == ^req_pay.npci_txn_id or t.npci_reversal_ref == ^req_pay.npci_txn_id) |> Repo.one()
503
:-(
if txn do
504
:-(
Logger.info("[ReqPayService] linked by npci_txn_id=#{inspect(req_pay.npci_txn_id)} to transaction id=#{txn.id}")
505
:-(
txn
506 else
507
:-(
Logger.debug("[ReqPayService] no transaction match for npci_txn_id=#{inspect(req_pay.npci_txn_id)}")
508 nil
509 end
510
511
:-(
true ->
512
:-(
Logger.debug("[ReqPayService] no heuristics matched to link ReqPay=#{inspect(req_pay.id)} to any Transaction")
513 nil
514 end
515 end
516
517 # Compute XML hashes for req/resp storage
518 defp compute_xml_hashes(attrs, req_xml, resp_xml) do
519 28 attrs = if req_xml, do: Map.put(attrs, :req_xml_hash, hash_content(req_xml)), else: attrs
520 28 attrs = if resp_xml, do: Map.put(attrs, :resp_xml_hash, hash_content(resp_xml)), else: attrs
521 28 attrs
522 end
523
524 # Compute XML hashes for events
525 26 defp compute_event_xml_hashes(nil), do: %{}
526 defp compute_event_xml_hashes(xml_content) when is_binary(xml_content) do
527 1 %{req_xml_hash: hash_content(xml_content)}
528 end
529
530 # Calculate processing duration
531 defp calculate_processing_duration(req_pay, end_time) do
532
:-(
if req_pay.npci_request_received_at do
533
:-(
DateTime.diff(end_time, req_pay.npci_request_received_at, :millisecond)
534 else
535 nil
536 end
537 end
538
539 # Validate status transitions
540 defp valid_status_transition?(current, new) do
541
:-(
case {current, new} do
542
:-(
{"PENDING", "PROCESSED"} -> true
543
:-(
{"PENDING", "FAILED"} -> true
544
:-(
{"PROCESSED", "FAILED"} -> true # Allow reversal scenarios
545
:-(
{same, same} -> true # Allow same status (idempotent)
546
:-(
_ -> false
547 end
548 end
549
550 # Get last event for hash chain
551 defp get_last_event(req_pay_id) do
552 ReqPayEvent
553 |> where([e], e.req_pay_id == ^req_pay_id)
554 |> order_by([e], desc: e.seq)
555 27 |> limit(1)
556 27 |> Repo.one()
557 end
558
559 # Compute cryptographic hash for event
560 defp compute_event_hash(event_attrs) do
561 27 content = [
562 27 event_attrs.req_pay_id,
563 27 event_attrs.seq,
564 27 event_attrs.event_type,
565 27 Jason.encode!(event_attrs.payload || %{}),
566 27 event_attrs.prev_hash || "",
567 27 DateTime.to_iso8601(event_attrs.inserted_at)
568 ]
569 |> Enum.join("|")
570
571 27 :crypto.hash(:sha256, content)
572 end
573
574 # Hash content for storage
575 defp hash_content(content) when is_binary(content) do
576 2 :crypto.hash(:sha256, content)
577 end
578
579 # Add event to transaction events for unified timeline
580 defp add_to_transaction_events(req_pay, req_pay_event) do
581 1 if req_pay.transaction_id do
582 # Keep the chain entry for quick traversal / UI, as existing behavior
583 1 TransactionEventChainService.append_event(
584 1 req_pay.transaction_id,
585 1 "req_pay_" <> req_pay_event.event_type,
586 %{
587 1 req_pay_id: req_pay.id,
588 1 req_pay_event_seq: req_pay_event.seq,
589 source: "req_pay_service"
590 }
591 )
592
593 # Also append a canonical transaction_event record so the unified
594 # transaction timeline includes req_pay events (mirrors QR dual-logging)
595 # Use the central Transaction service helper to preserve seq/hash logic.
596 1 mapped_event_type = String.to_atom("req_pay_" <> req_pay_event.event_type)
597 1 transaction_payload = %{
598 1 req_pay_id: req_pay.id,
599 1 req_pay_event_seq: req_pay_event.seq,
600 source: "req_pay_service"
601 }
602
603 1 case DaProductApp.Transactions.Service.append_event(req_pay.transaction_id, mapped_event_type, transaction_payload) do
604 1 {:ok, _txn_event} -> :ok
605 {:error, reason} ->
606
:-(
Logger.error("[ReqPayService] Failed to append transaction_event for req_pay #{req_pay.id}: #{inspect(reason)}")
607 :error
608 end
609 end
610 end
611
612 # Query helper functions
613 defp apply_filters(query, opts) do
614
:-(
Enum.reduce(opts, query, fn
615 {:status, status}, q when is_binary(status) ->
616
:-(
where(q, [r], r.status == ^status)
617 {:validation_type, type}, q when is_binary(type) ->
618
:-(
where(q, [r], r.validation_type == ^type)
619 {:corridor, corridor}, q when is_binary(corridor) ->
620
:-(
where(q, [r], r.corridor == ^corridor)
621 {:partner_id, partner_id}, q ->
622
:-(
where(q, [r], r.partner_id == ^partner_id)
623 {:date_from, date}, q ->
624
:-(
where(q, [r], r.inserted_at >= ^date)
625 {:date_to, date}, q ->
626
:-(
where(q, [r], r.inserted_at <= ^date)
627
:-(
_, q -> q
628 end)
629 end
630
631 defp apply_sorting(query, opts) do
632
:-(
case Keyword.get(opts, :sort_by, :inserted_at) do
633
:-(
:amount -> order_by(query, [r], desc: r.amount)
634
:-(
:status -> order_by(query, [r], [r.status, desc: r.inserted_at])
635
:-(
_ -> order_by(query, [r], desc: r.inserted_at)
636 end
637 end
638
639 defp apply_pagination(query, opts) do
640
:-(
limit = Keyword.get(opts, :limit, 50)
641
:-(
offset = Keyword.get(opts, :offset, 0)
642
643 query
644 |> limit(^limit)
645
:-(
|> offset(^offset)
646 end
647
648 # Analytics helper functions
649 defp get_date_range(opts) do
650
:-(
default_from = DateTime.utc_now() |> DateTime.add(-30, :day)
651
:-(
default_to = DateTime.utc_now()
652
653 {
654 Keyword.get(opts, :date_from, default_from),
655 Keyword.get(opts, :date_to, default_to)
656 }
657 end
658
659 defp filter_by_date_range(query, {date_from, date_to}) do
660 query
661
:-(
|> where([r], r.inserted_at >= ^date_from and r.inserted_at <= ^date_to)
662 end
663
664 defp get_total_count(query) do
665
:-(
Repo.aggregate(query, :count, :id)
666 end
667
668 defp get_status_breakdown(query) do
669 query
670 |> group_by([r], r.status)
671
:-(
|> select([r], {r.status, count(r.id)})
672 |> Repo.all()
673
:-(
|> Enum.into(%{})
674 end
675
676 defp get_validation_type_breakdown(query) do
677 query
678 |> group_by([r], r.validation_type)
679
:-(
|> select([r], {r.validation_type, count(r.id)})
680 |> Repo.all()
681
:-(
|> Enum.into(%{})
682 end
683
684 defp get_payment_status_breakdown(query) do
685 query
686 |> where([r], not is_nil(r.payment_status))
687 |> group_by([r], r.payment_status)
688
:-(
|> select([r], {r.payment_status, count(r.id)})
689 |> Repo.all()
690
:-(
|> Enum.into(%{})
691 end
692
693 defp get_corridor_breakdown(query) do
694 query
695 |> where([r], not is_nil(r.corridor))
696 |> group_by([r], r.corridor)
697
:-(
|> select([r], {r.corridor, count(r.id)})
698 |> Repo.all()
699
:-(
|> Enum.into(%{})
700 end
701
702 defp get_amount_statistics(query) do
703
:-(
stats = query
704
:-(
|> select([r], %{
705 total: sum(r.amount),
706 average: avg(r.amount),
707 min: min(r.amount),
708 max: max(r.amount)
709 })
710 |> Repo.one()
711
712
:-(
stats || %{total: 0, average: 0, min: 0, max: 0}
713 end
714
715 defp get_processing_time_stats(query) do
716 query
717 |> where([r], not is_nil(r.processing_duration_ms))
718
:-(
|> select([r], %{
719 avg_duration: avg(r.processing_duration_ms),
720 min_duration: min(r.processing_duration_ms),
721 max_duration: max(r.processing_duration_ms)
722 })
723 |> Repo.one()
724
:-(
|| %{avg_duration: 0, min_duration: 0, max_duration: 0}
725 end
726
727 defp get_daily_volume(query, {date_from, date_to}) do
728 query
729 |> group_by([r], fragment("DATE(?)", r.inserted_at))
730 |> select([r], %{
731 date: fragment("DATE(?)", r.inserted_at),
732 count: count(r.id),
733 total_amount: sum(r.amount)
734 })
735
:-(
|> order_by([r], fragment("DATE(?)", r.inserted_at))
736
:-(
|> Repo.all()
737 end
738
739 defp get_volume_trend(query, "hour") do
740 query
741 |> group_by([r], fragment("DATE_TRUNC('hour', ?)", r.inserted_at))
742 |> select([r], %{
743 period: fragment("DATE_TRUNC('hour', ?)", r.inserted_at),
744 count: count(r.id)
745 })
746
:-(
|> order_by([r], fragment("DATE_TRUNC('hour', ?)", r.inserted_at))
747
:-(
|> Repo.all()
748 end
749
750 defp get_volume_trend(query, _day) do
751 query
752 |> group_by([r], fragment("DATE(?)", r.inserted_at))
753 |> select([r], %{
754 period: fragment("DATE(?)", r.inserted_at),
755 count: count(r.id)
756 })
757
:-(
|> order_by([r], fragment("DATE(?)", r.inserted_at))
758
:-(
|> Repo.all()
759 end
760
761 defp get_success_rate_trend(query, interval) do
762 get_volume_trend(query, interval)
763
:-(
|> Enum.map(fn trend ->
764
:-(
total = trend.count
765
:-(
success_count = query
766
:-(
|> where([r], r.payment_status == "SUCCESS")
767
:-(
|> filter_by_period(trend.period, interval)
768 |> Repo.aggregate(:count, :id)
769
770
:-(
success_rate = if total > 0, do: Float.round(success_count / total * 100, 2), else: 0.0
771
772
:-(
Map.put(trend, :success_rate, success_rate)
773 end)
774 end
775
776 defp get_amount_trend(query, interval) do
777 get_volume_trend(query, interval)
778
:-(
|> Enum.map(fn trend ->
779
:-(
total_amount = query
780
:-(
|> filter_by_period(trend.period, interval)
781
:-(
|> Repo.aggregate(:sum, :amount) || Decimal.new(0)
782
783
:-(
Map.put(trend, :total_amount, total_amount)
784 end)
785 end
786
787 defp get_corridor_trend(query, interval) do
788 query
789 |> where([r], not is_nil(r.corridor))
790 |> group_by([r], [fragment("DATE_TRUNC(?, ?)", ^interval, r.inserted_at), r.corridor])
791 |> select([r], %{
792 period: fragment("DATE_TRUNC(?, ?)", ^interval, r.inserted_at),
793 corridor: r.corridor,
794 count: count(r.id)
795 })
796
:-(
|> order_by([r], [fragment("DATE_TRUNC(?, ?)", ^interval, r.inserted_at), r.corridor])
797
:-(
|> Repo.all()
798 end
799
800 defp filter_by_period(query, period, "hour") do
801
:-(
where(query, [r], fragment("DATE_TRUNC('hour', ?)", r.inserted_at) == ^period)
802 end
803
804 defp filter_by_period(query, period, _day) do
805
:-(
where(query, [r], fragment("DATE(?)", r.inserted_at) == ^period)
806 end
807 end
Line Hits Source