defmodule DaProductApp.Workers.TransactionRiskEvaluator do @moduledoc """ Background worker that evaluates transactions against risk rules. This worker is triggered whenever a new transaction is created. """ use Oban.Worker, queue: :risk_evaluation, max_attempts: 3 alias DaProductApp.RiskManagement alias DaProductApp.Transactions.Transaction alias DaProductApp.RiskManagement.PosTransaction require Logger @doc """ Performs risk evaluation for a transaction. Args: - transaction_id: ID of the transaction to evaluate - transaction_type: "QR" or "POS" """ @impl Oban.Worker def perform(%Oban.Job{args: %{"transaction_id" => transaction_id, "transaction_type" => transaction_type}}) do start_time = System.monotonic_time(:millisecond) Logger.info("Starting risk evaluation for #{transaction_type} transaction #{transaction_id}") case get_transaction(transaction_id, transaction_type) do nil -> Logger.error("Transaction #{transaction_id} (#{transaction_type}) not found") {:error, :transaction_not_found} transaction -> result = evaluate_and_store_hits(transaction, transaction_type) elapsed = System.monotonic_time(:millisecond) - start_time Logger.info("Risk evaluation for transaction #{transaction_id} completed in #{elapsed}ms") result end end defp get_transaction(transaction_id, "QR") do DaProductApp.Repo.get(Transaction, transaction_id) end defp get_transaction(transaction_id, "POS") do DaProductApp.Repo.get(PosTransaction, transaction_id) end defp evaluate_and_store_hits(transaction, transaction_type) do # Evaluate transaction against all applicable rules rule_hits = RiskManagement.evaluate_transaction_risks(transaction, transaction_type) # Store rule hits in database results = Enum.map(rule_hits, fn hit_data -> case RiskManagement.create_risk_rule_hit(hit_data) do {:ok, rule_hit} -> Logger.info("Rule hit created: #{rule_hit.id} for rule #{hit_data.rule_id}") # Send notification if it's a Hold status if rule_hit.status == "Hold" do send_hold_notification(rule_hit, transaction, transaction_type) end {:ok, rule_hit} {:error, changeset} -> Logger.error("Failed to create rule hit: #{inspect(changeset.errors)}") {:error, changeset} end end) success_count = Enum.count(results, fn {status, _} -> status == :ok end) error_count = Enum.count(results, fn {status, _} -> status == :error end) Logger.info("Risk evaluation completed: #{success_count} rules triggered, #{error_count} errors") if error_count > 0 do {:error, "Some rule hits failed to be created"} else :ok end end defp send_hold_notification(rule_hit, transaction, transaction_type) do # Send notification to supervisors about transactions on hold notification_data = %{ rule_hit_id: rule_hit.id, transaction_id: rule_hit.transaction_id, transaction_type: transaction_type, rule_name: rule_hit.rule.name, merchant_id: rule_hit.merchant_id, category: rule_hit.category, triggered_at: rule_hit.triggered_at } # Broadcast to supervisors (using Phoenix PubSub) Phoenix.PubSub.broadcast( DaProductApp.PubSub, "risk_management:holds", {:new_hold, notification_data} ) Logger.info("Hold notification sent for transaction #{rule_hit.transaction_id}") end @doc """ Enqueues a transaction for risk evaluation. This is called from transaction creation callbacks. """ def enqueue_evaluation(transaction_id, transaction_type \\ "QR") do %{transaction_id: transaction_id, transaction_type: transaction_type} |> new() |> Oban.insert() end end