| 1 |
|
defmodule DaProductApp.Transactions.ReqPayService do |
| 2 |
|
@moduledoc """ |
| 3 |
|
Service layer for managing ReqPay transactions with comprehensive event logging. |
| 4 |
|
|
| 5 |
|
This service provides: |
| 6 |
|
- Complete ReqPay lifecycle management |
| 7 |
|
- Event logging with hash chain integrity |
| 8 |
|
- XML hash storage for audit trails |
| 9 |
|
- Status transitions and validations |
| 10 |
|
- Partner and merchant association management |
| 11 |
|
- International payment processing |
| 12 |
|
|
| 13 |
|
Features: |
| 14 |
|
- Stores all ReqPay details in normalized format |
| 15 |
|
- Maintains event chain with cryptographic hashing |
| 16 |
|
- Supports both domestic and international payments |
| 17 |
|
- Provides analytics and reporting capabilities |
| 18 |
|
- Handles timeout and reversal scenarios |
| 19 |
|
""" |
| 20 |
|
|
| 21 |
|
import Ecto.Query, warn: false |
| 22 |
|
alias Ecto.Multi |
| 23 |
|
alias DaProductApp.Repo |
| 24 |
|
alias DaProductApp.Transactions.{ReqPay, ReqPayEvent, Transaction, TransactionEvent} |
| 25 |
|
alias DaProductApp.Partners.Partner |
| 26 |
|
alias DaProductApp.Merchants.Merchant |
| 27 |
|
alias DaProductApp.TransactionEventChainService |
| 28 |
|
|
| 29 |
|
require Logger |
| 30 |
|
|
| 31 |
|
# ================================ |
| 32 |
|
# MAIN CRUD OPERATIONS |
| 33 |
|
# ================================ |
| 34 |
|
|
| 35 |
|
@doc """ |
| 36 |
|
Create a new ReqPay record with initial event logging. |
| 37 |
|
|
| 38 |
|
## Parameters |
| 39 |
|
- `attrs`: Map containing ReqPay attributes |
| 40 |
|
- `req_xml`: Original XML request for hash storage |
| 41 |
|
- `parsed_data`: Parsed XML data for reference |
| 42 |
|
|
| 43 |
|
## Returns |
| 44 |
|
- `{:ok, req_pay}` on success |
| 45 |
|
- `{:error, changeset}` on validation errors |
| 46 |
|
""" |
| 47 |
|
@spec create_req_pay(map(), String.t() | nil, map() | nil) :: {:ok, ReqPay.t()} | {:error, any()} |
| 48 |
27 |
def create_req_pay(attrs, req_xml \\ nil, parsed_data \\ nil) do |
| 49 |
28 |
now = DateTime.utc_now() |
| 50 |
|
|
| 51 |
|
# Enhance attributes with computed fields |
| 52 |
28 |
enhanced_attrs = attrs |
| 53 |
|
|> Map.put(:npci_request_received_at, now) |
| 54 |
|
|> Map.put(:status, Map.get(attrs, :status, "PENDING")) |
| 55 |
|
|> compute_xml_hashes(req_xml, nil) |
| 56 |
|
|
| 57 |
|
Multi.new() |
| 58 |
|
|> Multi.run(:req_pay, fn repo, _changes -> |
| 59 |
28 |
changeset = ReqPay.changeset(%ReqPay{}, enhanced_attrs) |
| 60 |
|
|
| 61 |
28 |
case repo.insert(changeset) do |
| 62 |
26 |
{:ok, req_pay} -> |
| 63 |
|
{:ok, req_pay} |
| 64 |
|
|
| 65 |
|
{:error, changeset} -> |
| 66 |
|
# If insert failed due to unique msg_id, treat as idempotent: fetch existing row and continue |
| 67 |
2 |
if Enum.any?(changeset.errors, fn {field, _err} -> field == :msg_id end) do |
| 68 |
1 |
msg_id = Map.get(enhanced_attrs, :msg_id) || Map.get(enhanced_attrs, "msg_id") |
| 69 |
1 |
case repo.get_by(ReqPay, msg_id: msg_id) do |
| 70 |
:-( |
nil -> {:error, changeset} |
| 71 |
|
existing -> |
| 72 |
:-( |
Logger.warn("[ReqPayService] ReqPay insert conflict for msg_id=#{inspect(msg_id)}; using existing id=#{existing.id}") |
| 73 |
|
{:ok, existing} |
| 74 |
|
end |
| 75 |
|
else |
| 76 |
|
{:error, changeset} |
| 77 |
|
end |
| 78 |
|
end |
| 79 |
|
end) |
| 80 |
|
|> Multi.run(:initial_event, fn _repo, %{req_pay: req_pay} -> |
| 81 |
26 |
create_initial_event(req_pay, req_xml, parsed_data) |
| 82 |
|
end) |
| 83 |
|
|> Multi.run(:transaction_link, fn _repo, %{req_pay: req_pay} -> |
| 84 |
26 |
maybe_link_transaction(req_pay, parsed_data) |
| 85 |
|
end) |
| 86 |
|
|> Repo.transaction() |
| 87 |
28 |
|> case do |
| 88 |
|
{:ok, %{req_pay: req_pay}} -> |
| 89 |
:-( |
Logger.info("[ReqPayService] Created ReqPay with ID: #{req_pay.id}, msg_id: #{req_pay.msg_id}") |
| 90 |
|
{:ok, req_pay} |
| 91 |
|
{:error, _failed_op, changeset, _changes} -> |
| 92 |
1 |
Logger.error("[ReqPayService] Failed to create ReqPay: #{inspect(changeset)}") |
| 93 |
|
{:error, changeset} |
| 94 |
|
end |
| 95 |
|
end |
| 96 |
|
|
| 97 |
|
@doc """ |
| 98 |
|
Update ReqPay with response data and log event. |
| 99 |
|
""" |
| 100 |
|
@spec update_req_pay_with_response(integer() | ReqPay.t(), map(), String.t() | nil) :: {:ok, ReqPay.t()} | {:error, any()} |
| 101 |
:-( |
def update_req_pay_with_response(req_pay_id_or_struct, response_attrs, resp_xml \\ nil) do |
| 102 |
:-( |
req_pay = case req_pay_id_or_struct do |
| 103 |
:-( |
%ReqPay{} = rp -> rp |
| 104 |
:-( |
id when is_integer(id) -> get_req_pay!(id) |
| 105 |
|
end |
| 106 |
|
|
| 107 |
:-( |
now = DateTime.utc_now() |
| 108 |
|
|
| 109 |
|
# Enhance response attributes |
| 110 |
:-( |
enhanced_attrs = response_attrs |
| 111 |
|
|> Map.put(:npci_response_sent_at, now) |
| 112 |
|
|> Map.put(:processing_duration_ms, calculate_processing_duration(req_pay, now)) |
| 113 |
|
|> compute_xml_hashes(nil, resp_xml) |
| 114 |
|
|
| 115 |
|
Multi.new() |
| 116 |
|
|> Multi.update(:req_pay, ReqPay.changeset(req_pay, enhanced_attrs)) |
| 117 |
|
|> Multi.run(:response_event, fn _repo, %{req_pay: updated_req_pay} -> |
| 118 |
:-( |
append_event(updated_req_pay, "response_sent", response_attrs, resp_xml) |
| 119 |
|
end) |
| 120 |
|
|> Repo.transaction() |
| 121 |
:-( |
|> case do |
| 122 |
|
{:ok, %{req_pay: updated_req_pay}} -> |
| 123 |
:-( |
Logger.info("[ReqPayService] Updated ReqPay #{updated_req_pay.id} with response") |
| 124 |
|
{:ok, updated_req_pay} |
| 125 |
|
{:error, _failed_op, changeset, _changes} -> |
| 126 |
:-( |
Logger.error("[ReqPayService] Failed to update ReqPay: #{inspect(changeset)}") |
| 127 |
|
{:error, changeset} |
| 128 |
|
end |
| 129 |
|
end |
| 130 |
|
|
| 131 |
|
@doc """ |
| 132 |
|
Update ReqPay status with event logging. |
| 133 |
|
""" |
| 134 |
|
@spec update_status(integer() | ReqPay.t(), String.t(), map()) :: {:ok, ReqPay.t()} | {:error, any()} |
| 135 |
:-( |
def update_status(req_pay_id_or_struct, new_status, metadata \\ %{}) do |
| 136 |
:-( |
req_pay = case req_pay_id_or_struct do |
| 137 |
:-( |
%ReqPay{} = rp -> rp |
| 138 |
:-( |
id when is_integer(id) -> get_req_pay!(id) |
| 139 |
|
end |
| 140 |
|
|
| 141 |
:-( |
if valid_status_transition?(req_pay.status, new_status) do |
| 142 |
:-( |
attrs = %{ |
| 143 |
|
status: new_status, |
| 144 |
|
updated_at: DateTime.utc_now() |
| 145 |
|
} |
| 146 |
|
|
| 147 |
|
# Add status-specific fields |
| 148 |
:-( |
attrs = case new_status do |
| 149 |
|
"PROCESSED" -> |
| 150 |
:-( |
Map.merge(attrs, %{ |
| 151 |
|
payment_status: Map.get(metadata, :payment_status, "SUCCESS"), |
| 152 |
|
paid_at: DateTime.utc_now() |
| 153 |
|
}) |
| 154 |
|
"FAILED" -> |
| 155 |
:-( |
Map.merge(attrs, %{ |
| 156 |
|
error_code: Map.get(metadata, :error_code), |
| 157 |
|
error_message: Map.get(metadata, :error_message) |
| 158 |
|
}) |
| 159 |
:-( |
_ -> attrs |
| 160 |
|
end |
| 161 |
|
|
| 162 |
|
Multi.new() |
| 163 |
|
|> Multi.update(:req_pay, ReqPay.changeset(req_pay, attrs)) |
| 164 |
|
|> Multi.run(:status_event, fn _repo, %{req_pay: updated_req_pay} -> |
| 165 |
:-( |
event_data = Map.merge(metadata, %{ |
| 166 |
:-( |
old_status: req_pay.status, |
| 167 |
|
new_status: new_status, |
| 168 |
|
transition_reason: Map.get(metadata, :reason, "Status update") |
| 169 |
|
}) |
| 170 |
:-( |
append_event(updated_req_pay, "status_changed", event_data) |
| 171 |
|
end) |
| 172 |
|
|> Repo.transaction() |
| 173 |
:-( |
|> case do |
| 174 |
|
{:ok, %{req_pay: updated_req_pay}} -> |
| 175 |
:-( |
Logger.info("[ReqPayService] Updated ReqPay #{updated_req_pay.id} status: #{req_pay.status} -> #{new_status}") |
| 176 |
|
{:ok, updated_req_pay} |
| 177 |
|
{:error, _failed_op, changeset, _changes} -> |
| 178 |
:-( |
Logger.error("[ReqPayService] Failed to update ReqPay status: #{inspect(changeset)}") |
| 179 |
|
{:error, changeset} |
| 180 |
|
end |
| 181 |
|
else |
| 182 |
:-( |
Logger.error("[ReqPayService] Invalid status transition: #{req_pay.status} -> #{new_status}") |
| 183 |
|
{:error, "Invalid status transition"} |
| 184 |
|
end |
| 185 |
|
end |
| 186 |
|
|
| 187 |
|
@doc """ |
| 188 |
|
Mark ReqPay as processed with partner response details. |
| 189 |
|
""" |
| 190 |
|
@spec mark_as_processed(integer() | ReqPay.t(), map()) :: {:ok, ReqPay.t()} | {:error, any()} |
| 191 |
:-( |
def mark_as_processed(req_pay_id_or_struct, partner_response \\ %{}) do |
| 192 |
:-( |
metadata = Map.merge(partner_response, %{ |
| 193 |
|
reason: "Payment processed successfully", |
| 194 |
|
payment_status: "SUCCESS" |
| 195 |
|
}) |
| 196 |
|
|
| 197 |
:-( |
update_status(req_pay_id_or_struct, "PROCESSED", metadata) |
| 198 |
|
end |
| 199 |
|
|
| 200 |
|
@doc """ |
| 201 |
|
Mark ReqPay as failed with error details. |
| 202 |
|
""" |
| 203 |
|
@spec mark_as_failed(integer() | ReqPay.t(), String.t(), String.t()) :: {:ok, ReqPay.t()} | {:error, any()} |
| 204 |
|
def mark_as_failed(req_pay_id_or_struct, error_code, error_message) do |
| 205 |
:-( |
metadata = %{ |
| 206 |
|
reason: "Payment processing failed", |
| 207 |
|
error_code: error_code, |
| 208 |
|
error_message: error_message, |
| 209 |
|
payment_status: "FAILURE" |
| 210 |
|
} |
| 211 |
|
|
| 212 |
:-( |
update_status(req_pay_id_or_struct, "FAILED", metadata) |
| 213 |
|
end |
| 214 |
|
|
| 215 |
|
# ================================ |
| 216 |
|
# QUERY OPERATIONS |
| 217 |
|
# ================================ |
| 218 |
|
|
| 219 |
|
@doc """ |
| 220 |
|
Get ReqPay by ID with all associations. |
| 221 |
|
""" |
| 222 |
|
@spec get_req_pay(integer()) :: ReqPay.t() | nil |
| 223 |
|
def get_req_pay(id) do |
| 224 |
|
ReqPay |
| 225 |
|
|> where([r], r.id == ^id) |
| 226 |
1 |
|> preload([:transaction, :partner, :merchant]) |
| 227 |
1 |
|> Repo.one() |
| 228 |
|
end |
| 229 |
|
|
| 230 |
|
@doc """ |
| 231 |
|
Get ReqPay by ID, raising if not found. |
| 232 |
|
""" |
| 233 |
|
@spec get_req_pay!(integer()) :: ReqPay.t() |
| 234 |
|
def get_req_pay!(id) do |
| 235 |
|
ReqPay |
| 236 |
|
|> where([r], r.id == ^id) |
| 237 |
1 |
|> preload([:transaction, :partner, :merchant]) |
| 238 |
1 |
|> Repo.one!() |
| 239 |
|
end |
| 240 |
|
|
| 241 |
|
@doc """ |
| 242 |
|
Get ReqPay by message ID. |
| 243 |
|
""" |
| 244 |
|
@spec get_by_msg_id(String.t()) :: ReqPay.t() | nil |
| 245 |
|
def get_by_msg_id(msg_id) do |
| 246 |
|
ReqPay |
| 247 |
|
|> where([r], r.msg_id == ^msg_id) |
| 248 |
:-( |
|> preload([:transaction, :partner, :merchant]) |
| 249 |
:-( |
|> Repo.one() |
| 250 |
|
end |
| 251 |
|
|
| 252 |
|
@doc """ |
| 253 |
|
Get ReqPay with event history. |
| 254 |
|
""" |
| 255 |
|
@spec get_with_events(integer()) :: ReqPay.t() | nil |
| 256 |
|
def get_with_events(id) do |
| 257 |
:-( |
events_query = from(e in ReqPayEvent, order_by: [asc: e.seq]) |
| 258 |
|
|
| 259 |
|
ReqPay |
| 260 |
|
|> where([r], r.id == ^id) |
| 261 |
:-( |
|> preload([ |
| 262 |
|
:transaction, |
| 263 |
|
:partner, |
| 264 |
|
:merchant, |
| 265 |
|
events: ^events_query |
| 266 |
|
]) |
| 267 |
:-( |
|> Repo.one() |
| 268 |
|
end |
| 269 |
|
|
| 270 |
|
@doc """ |
| 271 |
|
List ReqPay records with filtering and pagination. |
| 272 |
|
""" |
| 273 |
|
@spec list_req_pays(keyword()) :: [ReqPay.t()] |
| 274 |
:-( |
def list_req_pays(opts \\ []) do |
| 275 |
:-( |
base_query = from(r in ReqPay, preload: [:transaction, :partner, :merchant]) |
| 276 |
|
|
| 277 |
|
base_query |
| 278 |
|
|> apply_filters(opts) |
| 279 |
|
|> apply_sorting(opts) |
| 280 |
|
|> apply_pagination(opts) |
| 281 |
:-( |
|> Repo.all() |
| 282 |
|
end |
| 283 |
|
|
| 284 |
|
# ================================ |
| 285 |
|
# EVENT LOGGING |
| 286 |
|
# ================================ |
| 287 |
|
|
| 288 |
|
@doc """ |
| 289 |
|
Append event to ReqPay with hash chain integrity. |
| 290 |
|
""" |
| 291 |
|
@spec append_event(ReqPay.t(), String.t(), map(), String.t() | nil) :: {:ok, ReqPayEvent.t()} | {:error, any()} |
| 292 |
1 |
def append_event(%ReqPay{} = req_pay, event_type, payload \\ %{}, xml_content \\ nil) do |
| 293 |
|
# Get the last event to continue the hash chain |
| 294 |
27 |
last_event = get_last_event(req_pay.id) |
| 295 |
27 |
next_seq = if last_event, do: last_event.seq + 1, else: 1 |
| 296 |
|
|
| 297 |
|
# Compute XML hashes if provided |
| 298 |
27 |
xml_hashes = compute_event_xml_hashes(xml_content) |
| 299 |
|
|
| 300 |
|
# Create event with hash chain |
| 301 |
27 |
event_attrs = %{ |
| 302 |
27 |
req_pay_id: req_pay.id, |
| 303 |
27 |
transaction_id: req_pay.transaction_id, |
| 304 |
|
seq: next_seq, |
| 305 |
|
event_type: event_type, |
| 306 |
|
payload: payload, |
| 307 |
27 |
prev_hash: last_event && last_event.hash, |
| 308 |
|
inserted_at: DateTime.utc_now() |
| 309 |
|
} |
| 310 |
|
|> Map.merge(xml_hashes) |
| 311 |
|
|
| 312 |
|
# Compute hash for this event |
| 313 |
27 |
event_attrs = Map.put(event_attrs, :hash, compute_event_hash(event_attrs)) |
| 314 |
|
|
| 315 |
27 |
case ReqPayEvent.changeset(%ReqPayEvent{}, event_attrs) |> Repo.insert() do |
| 316 |
|
{:ok, event} -> |
| 317 |
27 |
Logger.info("[ReqPayService] Added event #{event_type} for ReqPay #{req_pay.id}, seq: #{next_seq}") |
| 318 |
|
|
| 319 |
|
# Also add to transaction events if linked |
| 320 |
27 |
if req_pay.transaction_id do |
| 321 |
1 |
add_to_transaction_events(req_pay, event) |
| 322 |
|
end |
| 323 |
|
|
| 324 |
|
{:ok, event} |
| 325 |
|
{:error, changeset} -> |
| 326 |
:-( |
Logger.error("[ReqPayService] Failed to add event: #{inspect(changeset)}") |
| 327 |
|
{:error, changeset} |
| 328 |
|
end |
| 329 |
|
end |
| 330 |
|
|
| 331 |
|
# ================================ |
| 332 |
|
# ANALYTICS & REPORTING |
| 333 |
|
# ================================ |
| 334 |
|
|
| 335 |
|
@doc """ |
| 336 |
|
Get ReqPay statistics for analytics. |
| 337 |
|
""" |
| 338 |
|
@spec get_statistics(keyword()) :: map() |
| 339 |
:-( |
def get_statistics(opts \\ []) do |
| 340 |
:-( |
date_range = get_date_range(opts) |
| 341 |
|
|
| 342 |
:-( |
base_query = from(r in ReqPay) |
| 343 |
|
|> filter_by_date_range(date_range) |
| 344 |
|
|
| 345 |
:-( |
%{ |
| 346 |
|
total_count: get_total_count(base_query), |
| 347 |
|
status_breakdown: get_status_breakdown(base_query), |
| 348 |
|
validation_type_breakdown: get_validation_type_breakdown(base_query), |
| 349 |
|
payment_status_breakdown: get_payment_status_breakdown(base_query), |
| 350 |
|
corridor_breakdown: get_corridor_breakdown(base_query), |
| 351 |
|
amount_statistics: get_amount_statistics(base_query), |
| 352 |
|
processing_time_stats: get_processing_time_stats(base_query), |
| 353 |
|
daily_volume: get_daily_volume(base_query, date_range) |
| 354 |
|
} |
| 355 |
|
end |
| 356 |
|
|
| 357 |
|
@doc """ |
| 358 |
|
Get ReqPay trends over time. |
| 359 |
|
""" |
| 360 |
|
@spec get_trends(keyword()) :: map() |
| 361 |
:-( |
def get_trends(opts \\ []) do |
| 362 |
:-( |
date_range = get_date_range(opts) |
| 363 |
:-( |
interval = Keyword.get(opts, :interval, "day") |
| 364 |
|
|
| 365 |
:-( |
base_query = from(r in ReqPay) |
| 366 |
|
|> filter_by_date_range(date_range) |
| 367 |
|
|
| 368 |
:-( |
%{ |
| 369 |
|
volume_trend: get_volume_trend(base_query, interval), |
| 370 |
|
success_rate_trend: get_success_rate_trend(base_query, interval), |
| 371 |
|
amount_trend: get_amount_trend(base_query, interval), |
| 372 |
|
corridor_trend: get_corridor_trend(base_query, interval) |
| 373 |
|
} |
| 374 |
|
end |
| 375 |
|
|
| 376 |
|
# ================================ |
| 377 |
|
# PRIVATE HELPER FUNCTIONS |
| 378 |
|
# ================================ |
| 379 |
|
|
| 380 |
|
# Create initial event when ReqPay is created |
| 381 |
|
defp create_initial_event(req_pay, req_xml, parsed_data) do |
| 382 |
26 |
payload = %{ |
| 383 |
26 |
message_id: req_pay.msg_id, |
| 384 |
26 |
transaction_id: req_pay.txn_id, |
| 385 |
26 |
amount: req_pay.amount, |
| 386 |
26 |
currency: req_pay.currency, |
| 387 |
26 |
payer_addr: req_pay.payer_addr, |
| 388 |
26 |
payee_addr: req_pay.payee_addr, |
| 389 |
26 |
validation_type: req_pay.validation_type, |
| 390 |
26 |
corridor: req_pay.corridor, |
| 391 |
|
parsed_data: parsed_data |
| 392 |
|
} |
| 393 |
|
|
| 394 |
26 |
append_event(req_pay, "req_pay_received", payload, req_xml) |
| 395 |
|
end |
| 396 |
|
|
| 397 |
|
# Link ReqPay to existing transaction if applicable |
| 398 |
|
defp maybe_link_transaction(req_pay, parsed_data) do |
| 399 |
26 |
case find_existing_transaction(req_pay, parsed_data) do |
| 400 |
:-( |
nil -> {:ok, nil} |
| 401 |
|
transaction -> |
| 402 |
|
req_pay |
| 403 |
:-( |
|> Ecto.Changeset.change(transaction_id: transaction.id) |
| 404 |
:-( |
|> Repo.update() |
| 405 |
|
end |
| 406 |
|
end |
| 407 |
|
|
| 408 |
|
# Find existing transaction to link with ReqPay |
| 409 |
|
defp find_existing_transaction(req_pay, parsed_data) do |
| 410 |
|
# Heuristics (ordered): |
| 411 |
|
# 1. explicit transaction_id on ReqPay |
| 412 |
|
# 2. txn_id -> match Transaction.org_txn_id or Transaction.req_msg_id |
| 413 |
|
# 3. ref_url -> try extract numeric /transactions/<id> or match org_txn_id/ver_token |
| 414 |
|
# 4. ref_id -> match org_txn_id or ver_token |
| 415 |
|
# 5. msg_id -> try Transaction.req_msg_id, then QRValidation.msg_id join |
| 416 |
|
# 6. npci_txn_id -> try partner_txn_id or npci_reversal_ref |
| 417 |
26 |
cond do |
| 418 |
26 |
req_pay.transaction_id -> |
| 419 |
:-( |
case Repo.get(Transaction, req_pay.transaction_id) do |
| 420 |
|
nil -> |
| 421 |
:-( |
Logger.debug("[ReqPayService] requested link by transaction_id=#{inspect(req_pay.transaction_id)} but no transaction found") |
| 422 |
|
nil |
| 423 |
|
txn -> |
| 424 |
:-( |
Logger.info("[ReqPayService] linking ReqPay=#{inspect(req_pay.id)} to Transaction id=#{txn.id} via transaction_id") |
| 425 |
:-( |
txn |
| 426 |
|
end |
| 427 |
|
|
| 428 |
26 |
req_pay.txn_id && parsed_data -> |
| 429 |
:-( |
txn = |
| 430 |
|
Transaction |
| 431 |
:-( |
|> where([t], t.org_txn_id == ^req_pay.txn_id or t.req_msg_id == ^req_pay.txn_id) |
| 432 |
|
|> Repo.one() |
| 433 |
|
|
| 434 |
:-( |
if txn do |
| 435 |
:-( |
Logger.info("[ReqPayService] linked by txn_id=#{inspect(req_pay.txn_id)} to transaction id=#{txn.id}") |
| 436 |
:-( |
txn |
| 437 |
|
else |
| 438 |
:-( |
Logger.debug("[ReqPayService] no transaction match for txn_id=#{inspect(req_pay.txn_id)}") |
| 439 |
|
nil |
| 440 |
|
end |
| 441 |
|
|
| 442 |
26 |
req_pay.ref_url -> |
| 443 |
|
# Try to extract a numeric transaction id from URL: /transactions/<id> |
| 444 |
:-( |
case Regex.run(~r{/transactions/(\d+)}, req_pay.ref_url) do |
| 445 |
|
[_, id_str] -> |
| 446 |
:-( |
id = String.to_integer(id_str) |
| 447 |
:-( |
case Repo.get(Transaction, id) do |
| 448 |
|
nil -> |
| 449 |
:-( |
Logger.debug("[ReqPayService] ref_url contained transactions/#{id} but no transaction found") |
| 450 |
|
nil |
| 451 |
|
txn -> |
| 452 |
:-( |
Logger.info("[ReqPayService] linked by ref_url to transaction id=#{id}") |
| 453 |
:-( |
txn |
| 454 |
|
end |
| 455 |
|
_ -> |
| 456 |
|
# Fallback: try matching ref_url against org_txn_id or ver_token |
| 457 |
:-( |
txn = Transaction |> where([t], t.org_txn_id == ^req_pay.ref_url or t.ver_token == ^req_pay.ref_url) |> Repo.one() |
| 458 |
:-( |
if txn do |
| 459 |
:-( |
Logger.info("[ReqPayService] linked by ref_url matching org_txn_id/ver_token to transaction id=#{txn.id}") |
| 460 |
:-( |
txn |
| 461 |
|
else |
| 462 |
:-( |
Logger.debug("[ReqPayService] ref_url provided but no transaction match for #{inspect(req_pay.ref_url)}") |
| 463 |
|
nil |
| 464 |
|
end |
| 465 |
|
end |
| 466 |
|
|
| 467 |
26 |
req_pay.ref_id -> |
| 468 |
:-( |
txn = Transaction |> where([t], t.org_txn_id == ^req_pay.ref_id or t.ver_token == ^req_pay.ref_id) |> Repo.one() |
| 469 |
:-( |
if txn do |
| 470 |
:-( |
Logger.info("[ReqPayService] linked by ref_id=#{inspect(req_pay.ref_id)} to transaction id=#{txn.id}") |
| 471 |
:-( |
txn |
| 472 |
|
else |
| 473 |
:-( |
Logger.debug("[ReqPayService] no transaction match for ref_id=#{inspect(req_pay.ref_id)}") |
| 474 |
|
nil |
| 475 |
|
end |
| 476 |
|
|
| 477 |
26 |
req_pay.msg_id -> |
| 478 |
|
# Prefer direct req_msg_id match on Transaction, else fall back to QR join |
| 479 |
26 |
txn = Transaction |> where([t], t.req_msg_id == ^req_pay.msg_id) |> Repo.one() |
| 480 |
:-( |
if txn do |
| 481 |
:-( |
Logger.info("[ReqPayService] linked by msg_id=#{inspect(req_pay.msg_id)} to transaction id=#{txn.id} via req_msg_id") |
| 482 |
:-( |
txn |
| 483 |
|
else |
| 484 |
:-( |
txn = |
| 485 |
:-( |
from(t in Transaction, |
| 486 |
|
left_join: qv in DaProductApp.QRValidation.QRValidation, |
| 487 |
|
on: qv.id == t.parent_qr_validation_id, |
| 488 |
:-( |
where: qv.msg_id == ^req_pay.msg_id |
| 489 |
|
) |
| 490 |
|
|> Repo.one() |
| 491 |
|
|
| 492 |
:-( |
if txn do |
| 493 |
:-( |
Logger.info("[ReqPayService] linked by msg_id=#{inspect(req_pay.msg_id)} to transaction id=#{txn.id} via QRValidation.msg_id") |
| 494 |
|
else |
| 495 |
:-( |
Logger.debug("[ReqPayService] no transaction match for msg_id=#{inspect(req_pay.msg_id)}") |
| 496 |
|
end |
| 497 |
|
|
| 498 |
:-( |
txn |
| 499 |
|
end |
| 500 |
|
|
| 501 |
:-( |
req_pay.npci_txn_id -> |
| 502 |
:-( |
txn = Transaction |> where([t], t.partner_txn_id == ^req_pay.npci_txn_id or t.npci_reversal_ref == ^req_pay.npci_txn_id) |> Repo.one() |
| 503 |
:-( |
if txn do |
| 504 |
:-( |
Logger.info("[ReqPayService] linked by npci_txn_id=#{inspect(req_pay.npci_txn_id)} to transaction id=#{txn.id}") |
| 505 |
:-( |
txn |
| 506 |
|
else |
| 507 |
:-( |
Logger.debug("[ReqPayService] no transaction match for npci_txn_id=#{inspect(req_pay.npci_txn_id)}") |
| 508 |
|
nil |
| 509 |
|
end |
| 510 |
|
|
| 511 |
:-( |
true -> |
| 512 |
:-( |
Logger.debug("[ReqPayService] no heuristics matched to link ReqPay=#{inspect(req_pay.id)} to any Transaction") |
| 513 |
|
nil |
| 514 |
|
end |
| 515 |
|
end |
| 516 |
|
|
| 517 |
|
# Compute XML hashes for req/resp storage |
| 518 |
|
defp compute_xml_hashes(attrs, req_xml, resp_xml) do |
| 519 |
28 |
attrs = if req_xml, do: Map.put(attrs, :req_xml_hash, hash_content(req_xml)), else: attrs |
| 520 |
28 |
attrs = if resp_xml, do: Map.put(attrs, :resp_xml_hash, hash_content(resp_xml)), else: attrs |
| 521 |
28 |
attrs |
| 522 |
|
end |
| 523 |
|
|
| 524 |
|
# Compute XML hashes for events |
| 525 |
26 |
defp compute_event_xml_hashes(nil), do: %{} |
| 526 |
|
defp compute_event_xml_hashes(xml_content) when is_binary(xml_content) do |
| 527 |
1 |
%{req_xml_hash: hash_content(xml_content)} |
| 528 |
|
end |
| 529 |
|
|
| 530 |
|
# Calculate processing duration |
| 531 |
|
defp calculate_processing_duration(req_pay, end_time) do |
| 532 |
:-( |
if req_pay.npci_request_received_at do |
| 533 |
:-( |
DateTime.diff(end_time, req_pay.npci_request_received_at, :millisecond) |
| 534 |
|
else |
| 535 |
|
nil |
| 536 |
|
end |
| 537 |
|
end |
| 538 |
|
|
| 539 |
|
# Validate status transitions |
| 540 |
|
defp valid_status_transition?(current, new) do |
| 541 |
:-( |
case {current, new} do |
| 542 |
:-( |
{"PENDING", "PROCESSED"} -> true |
| 543 |
:-( |
{"PENDING", "FAILED"} -> true |
| 544 |
:-( |
{"PROCESSED", "FAILED"} -> true # Allow reversal scenarios |
| 545 |
:-( |
{same, same} -> true # Allow same status (idempotent) |
| 546 |
:-( |
_ -> false |
| 547 |
|
end |
| 548 |
|
end |
| 549 |
|
|
| 550 |
|
# Get last event for hash chain |
| 551 |
|
defp get_last_event(req_pay_id) do |
| 552 |
|
ReqPayEvent |
| 553 |
|
|> where([e], e.req_pay_id == ^req_pay_id) |
| 554 |
|
|> order_by([e], desc: e.seq) |
| 555 |
27 |
|> limit(1) |
| 556 |
27 |
|> Repo.one() |
| 557 |
|
end |
| 558 |
|
|
| 559 |
|
# Compute cryptographic hash for event |
| 560 |
|
defp compute_event_hash(event_attrs) do |
| 561 |
27 |
content = [ |
| 562 |
27 |
event_attrs.req_pay_id, |
| 563 |
27 |
event_attrs.seq, |
| 564 |
27 |
event_attrs.event_type, |
| 565 |
27 |
Jason.encode!(event_attrs.payload || %{}), |
| 566 |
27 |
event_attrs.prev_hash || "", |
| 567 |
27 |
DateTime.to_iso8601(event_attrs.inserted_at) |
| 568 |
|
] |
| 569 |
|
|> Enum.join("|") |
| 570 |
|
|
| 571 |
27 |
:crypto.hash(:sha256, content) |
| 572 |
|
end |
| 573 |
|
|
| 574 |
|
# Hash content for storage |
| 575 |
|
defp hash_content(content) when is_binary(content) do |
| 576 |
2 |
:crypto.hash(:sha256, content) |
| 577 |
|
end |
| 578 |
|
|
| 579 |
|
# Add event to transaction events for unified timeline |
| 580 |
|
defp add_to_transaction_events(req_pay, req_pay_event) do |
| 581 |
1 |
if req_pay.transaction_id do |
| 582 |
|
# Keep the chain entry for quick traversal / UI, as existing behavior |
| 583 |
1 |
TransactionEventChainService.append_event( |
| 584 |
1 |
req_pay.transaction_id, |
| 585 |
1 |
"req_pay_" <> req_pay_event.event_type, |
| 586 |
|
%{ |
| 587 |
1 |
req_pay_id: req_pay.id, |
| 588 |
1 |
req_pay_event_seq: req_pay_event.seq, |
| 589 |
|
source: "req_pay_service" |
| 590 |
|
} |
| 591 |
|
) |
| 592 |
|
|
| 593 |
|
# Also append a canonical transaction_event record so the unified |
| 594 |
|
# transaction timeline includes req_pay events (mirrors QR dual-logging) |
| 595 |
|
# Use the central Transaction service helper to preserve seq/hash logic. |
| 596 |
1 |
mapped_event_type = String.to_atom("req_pay_" <> req_pay_event.event_type) |
| 597 |
1 |
transaction_payload = %{ |
| 598 |
1 |
req_pay_id: req_pay.id, |
| 599 |
1 |
req_pay_event_seq: req_pay_event.seq, |
| 600 |
|
source: "req_pay_service" |
| 601 |
|
} |
| 602 |
|
|
| 603 |
1 |
case DaProductApp.Transactions.Service.append_event(req_pay.transaction_id, mapped_event_type, transaction_payload) do |
| 604 |
1 |
{:ok, _txn_event} -> :ok |
| 605 |
|
{:error, reason} -> |
| 606 |
:-( |
Logger.error("[ReqPayService] Failed to append transaction_event for req_pay #{req_pay.id}: #{inspect(reason)}") |
| 607 |
|
:error |
| 608 |
|
end |
| 609 |
|
end |
| 610 |
|
end |
| 611 |
|
|
| 612 |
|
# Query helper functions |
| 613 |
|
defp apply_filters(query, opts) do |
| 614 |
:-( |
Enum.reduce(opts, query, fn |
| 615 |
|
{:status, status}, q when is_binary(status) -> |
| 616 |
:-( |
where(q, [r], r.status == ^status) |
| 617 |
|
{:validation_type, type}, q when is_binary(type) -> |
| 618 |
:-( |
where(q, [r], r.validation_type == ^type) |
| 619 |
|
{:corridor, corridor}, q when is_binary(corridor) -> |
| 620 |
:-( |
where(q, [r], r.corridor == ^corridor) |
| 621 |
|
{:partner_id, partner_id}, q -> |
| 622 |
:-( |
where(q, [r], r.partner_id == ^partner_id) |
| 623 |
|
{:date_from, date}, q -> |
| 624 |
:-( |
where(q, [r], r.inserted_at >= ^date) |
| 625 |
|
{:date_to, date}, q -> |
| 626 |
:-( |
where(q, [r], r.inserted_at <= ^date) |
| 627 |
:-( |
_, q -> q |
| 628 |
|
end) |
| 629 |
|
end |
| 630 |
|
|
| 631 |
|
defp apply_sorting(query, opts) do |
| 632 |
:-( |
case Keyword.get(opts, :sort_by, :inserted_at) do |
| 633 |
:-( |
:amount -> order_by(query, [r], desc: r.amount) |
| 634 |
:-( |
:status -> order_by(query, [r], [r.status, desc: r.inserted_at]) |
| 635 |
:-( |
_ -> order_by(query, [r], desc: r.inserted_at) |
| 636 |
|
end |
| 637 |
|
end |
| 638 |
|
|
| 639 |
|
defp apply_pagination(query, opts) do |
| 640 |
:-( |
limit = Keyword.get(opts, :limit, 50) |
| 641 |
:-( |
offset = Keyword.get(opts, :offset, 0) |
| 642 |
|
|
| 643 |
|
query |
| 644 |
|
|> limit(^limit) |
| 645 |
:-( |
|> offset(^offset) |
| 646 |
|
end |
| 647 |
|
|
| 648 |
|
# Analytics helper functions |
| 649 |
|
defp get_date_range(opts) do |
| 650 |
:-( |
default_from = DateTime.utc_now() |> DateTime.add(-30, :day) |
| 651 |
:-( |
default_to = DateTime.utc_now() |
| 652 |
|
|
| 653 |
|
{ |
| 654 |
|
Keyword.get(opts, :date_from, default_from), |
| 655 |
|
Keyword.get(opts, :date_to, default_to) |
| 656 |
|
} |
| 657 |
|
end |
| 658 |
|
|
| 659 |
|
defp filter_by_date_range(query, {date_from, date_to}) do |
| 660 |
|
query |
| 661 |
:-( |
|> where([r], r.inserted_at >= ^date_from and r.inserted_at <= ^date_to) |
| 662 |
|
end |
| 663 |
|
|
| 664 |
|
defp get_total_count(query) do |
| 665 |
:-( |
Repo.aggregate(query, :count, :id) |
| 666 |
|
end |
| 667 |
|
|
| 668 |
|
defp get_status_breakdown(query) do |
| 669 |
|
query |
| 670 |
|
|> group_by([r], r.status) |
| 671 |
:-( |
|> select([r], {r.status, count(r.id)}) |
| 672 |
|
|> Repo.all() |
| 673 |
:-( |
|> Enum.into(%{}) |
| 674 |
|
end |
| 675 |
|
|
| 676 |
|
defp get_validation_type_breakdown(query) do |
| 677 |
|
query |
| 678 |
|
|> group_by([r], r.validation_type) |
| 679 |
:-( |
|> select([r], {r.validation_type, count(r.id)}) |
| 680 |
|
|> Repo.all() |
| 681 |
:-( |
|> Enum.into(%{}) |
| 682 |
|
end |
| 683 |
|
|
| 684 |
|
defp get_payment_status_breakdown(query) do |
| 685 |
|
query |
| 686 |
|
|> where([r], not is_nil(r.payment_status)) |
| 687 |
|
|> group_by([r], r.payment_status) |
| 688 |
:-( |
|> select([r], {r.payment_status, count(r.id)}) |
| 689 |
|
|> Repo.all() |
| 690 |
:-( |
|> Enum.into(%{}) |
| 691 |
|
end |
| 692 |
|
|
| 693 |
|
defp get_corridor_breakdown(query) do |
| 694 |
|
query |
| 695 |
|
|> where([r], not is_nil(r.corridor)) |
| 696 |
|
|> group_by([r], r.corridor) |
| 697 |
:-( |
|> select([r], {r.corridor, count(r.id)}) |
| 698 |
|
|> Repo.all() |
| 699 |
:-( |
|> Enum.into(%{}) |
| 700 |
|
end |
| 701 |
|
|
| 702 |
|
defp get_amount_statistics(query) do |
| 703 |
:-( |
stats = query |
| 704 |
:-( |
|> select([r], %{ |
| 705 |
|
total: sum(r.amount), |
| 706 |
|
average: avg(r.amount), |
| 707 |
|
min: min(r.amount), |
| 708 |
|
max: max(r.amount) |
| 709 |
|
}) |
| 710 |
|
|> Repo.one() |
| 711 |
|
|
| 712 |
:-( |
stats || %{total: 0, average: 0, min: 0, max: 0} |
| 713 |
|
end |
| 714 |
|
|
| 715 |
|
defp get_processing_time_stats(query) do |
| 716 |
|
query |
| 717 |
|
|> where([r], not is_nil(r.processing_duration_ms)) |
| 718 |
:-( |
|> select([r], %{ |
| 719 |
|
avg_duration: avg(r.processing_duration_ms), |
| 720 |
|
min_duration: min(r.processing_duration_ms), |
| 721 |
|
max_duration: max(r.processing_duration_ms) |
| 722 |
|
}) |
| 723 |
|
|> Repo.one() |
| 724 |
:-( |
|| %{avg_duration: 0, min_duration: 0, max_duration: 0} |
| 725 |
|
end |
| 726 |
|
|
| 727 |
|
defp get_daily_volume(query, {date_from, date_to}) do |
| 728 |
|
query |
| 729 |
|
|> group_by([r], fragment("DATE(?)", r.inserted_at)) |
| 730 |
|
|> select([r], %{ |
| 731 |
|
date: fragment("DATE(?)", r.inserted_at), |
| 732 |
|
count: count(r.id), |
| 733 |
|
total_amount: sum(r.amount) |
| 734 |
|
}) |
| 735 |
:-( |
|> order_by([r], fragment("DATE(?)", r.inserted_at)) |
| 736 |
:-( |
|> Repo.all() |
| 737 |
|
end |
| 738 |
|
|
| 739 |
|
defp get_volume_trend(query, "hour") do |
| 740 |
|
query |
| 741 |
|
|> group_by([r], fragment("DATE_TRUNC('hour', ?)", r.inserted_at)) |
| 742 |
|
|> select([r], %{ |
| 743 |
|
period: fragment("DATE_TRUNC('hour', ?)", r.inserted_at), |
| 744 |
|
count: count(r.id) |
| 745 |
|
}) |
| 746 |
:-( |
|> order_by([r], fragment("DATE_TRUNC('hour', ?)", r.inserted_at)) |
| 747 |
:-( |
|> Repo.all() |
| 748 |
|
end |
| 749 |
|
|
| 750 |
|
defp get_volume_trend(query, _day) do |
| 751 |
|
query |
| 752 |
|
|> group_by([r], fragment("DATE(?)", r.inserted_at)) |
| 753 |
|
|> select([r], %{ |
| 754 |
|
period: fragment("DATE(?)", r.inserted_at), |
| 755 |
|
count: count(r.id) |
| 756 |
|
}) |
| 757 |
:-( |
|> order_by([r], fragment("DATE(?)", r.inserted_at)) |
| 758 |
:-( |
|> Repo.all() |
| 759 |
|
end |
| 760 |
|
|
| 761 |
|
defp get_success_rate_trend(query, interval) do |
| 762 |
|
get_volume_trend(query, interval) |
| 763 |
:-( |
|> Enum.map(fn trend -> |
| 764 |
:-( |
total = trend.count |
| 765 |
:-( |
success_count = query |
| 766 |
:-( |
|> where([r], r.payment_status == "SUCCESS") |
| 767 |
:-( |
|> filter_by_period(trend.period, interval) |
| 768 |
|
|> Repo.aggregate(:count, :id) |
| 769 |
|
|
| 770 |
:-( |
success_rate = if total > 0, do: Float.round(success_count / total * 100, 2), else: 0.0 |
| 771 |
|
|
| 772 |
:-( |
Map.put(trend, :success_rate, success_rate) |
| 773 |
|
end) |
| 774 |
|
end |
| 775 |
|
|
| 776 |
|
defp get_amount_trend(query, interval) do |
| 777 |
|
get_volume_trend(query, interval) |
| 778 |
:-( |
|> Enum.map(fn trend -> |
| 779 |
:-( |
total_amount = query |
| 780 |
:-( |
|> filter_by_period(trend.period, interval) |
| 781 |
:-( |
|> Repo.aggregate(:sum, :amount) || Decimal.new(0) |
| 782 |
|
|
| 783 |
:-( |
Map.put(trend, :total_amount, total_amount) |
| 784 |
|
end) |
| 785 |
|
end |
| 786 |
|
|
| 787 |
|
defp get_corridor_trend(query, interval) do |
| 788 |
|
query |
| 789 |
|
|> where([r], not is_nil(r.corridor)) |
| 790 |
|
|> group_by([r], [fragment("DATE_TRUNC(?, ?)", ^interval, r.inserted_at), r.corridor]) |
| 791 |
|
|> select([r], %{ |
| 792 |
|
period: fragment("DATE_TRUNC(?, ?)", ^interval, r.inserted_at), |
| 793 |
|
corridor: r.corridor, |
| 794 |
|
count: count(r.id) |
| 795 |
|
}) |
| 796 |
:-( |
|> order_by([r], [fragment("DATE_TRUNC(?, ?)", ^interval, r.inserted_at), r.corridor]) |
| 797 |
:-( |
|> Repo.all() |
| 798 |
|
end |
| 799 |
|
|
| 800 |
|
defp filter_by_period(query, period, "hour") do |
| 801 |
:-( |
where(query, [r], fragment("DATE_TRUNC('hour', ?)", r.inserted_at) == ^period) |
| 802 |
|
end |
| 803 |
|
|
| 804 |
|
defp filter_by_period(query, period, _day) do |
| 805 |
:-( |
where(query, [r], fragment("DATE(?)", r.inserted_at) == ^period) |
| 806 |
|
end |
| 807 |
|
end |