defmodule DaProductApp.Adapters.NpciAdapter do @moduledoc """ NPCI communication adapter for handling all UPI XML-based API communications. Provides functions for: - Generating ACK responses for immediate NPCI acknowledgment - Sending async RespPay, RespValQr, and other responses to NPCI endpoints - Handling NPCI endpoint communication with proper error handling and logging - XML formatting and response parsing per NPCI specification """ require Logger import SweetXml # NPCI endpoints configuration @npci_base_url "https://precert.nfinite.in/iupi" @request_timeout 30_000 # ================================ # ACK GENERATION FUNCTIONS # ================================ @doc """ Generate ACK XML response for ReqPay requests. Returns immediate acknowledgment to NPCI that request was received. """ @spec generate_reqpay_ack(String.t(), String.t()) :: String.t() def generate_reqpay_ack(msg_id, org_txn_id) do timestamp = DateTime.utc_now() |> DateTime.to_iso8601() ack_xml = """ """ Logger.info("[NPCI ACK] Generated ReqPay ACK for msgId=#{msg_id}, orgTxnId=#{org_txn_id}") Logger.debug("[NPCI ACK] ACK XML: #{ack_xml}") ack_xml end @doc """ Generate ACK XML response for ReqValQr requests. """ @spec generate_reqvalqr_ack(String.t(), String.t()) :: String.t() def generate_reqvalqr_ack(msg_id, txn_id) do timestamp = DateTime.utc_now() |> DateTime.to_iso8601() ack_xml = """ """ Logger.info("[NPCI ACK] Generated ReqValQr ACK for msgId=#{msg_id}, txnId=#{txn_id}") Logger.debug("[NPCI ACK] ACK XML: #{ack_xml}") ack_xml end @doc """ Generate ACK XML response for ReqChkTxn requests. """ @spec generate_reqchktxn_ack(String.t(), String.t()) :: String.t() def generate_reqchktxn_ack(msg_id, org_txn_id) do timestamp = DateTime.utc_now() |> DateTime.to_iso8601() ack_xml = """ """ Logger.info("[NPCI ACK] Generated ReqChkTxn ACK for msgId=#{msg_id}, orgTxnId=#{org_txn_id}") Logger.debug("[NPCI ACK] ACK XML: #{ack_xml}") ack_xml end # ================================ # ASYNC NPCI COMMUNICATION FUNCTIONS # ================================ @doc """ Send RespPay XML asynchronously to NPCI after ACK has been sent. Handles the complete payment response flow with proper error handling. """ @spec send_async_resppay(String.t(), String.t(), String.t(), map()) :: :ok def send_async_resppay(resp_pay_xml, txn_id, org_txn_id, req_pay_data) do Logger.info("[ASYNC RespPay] Starting async RespPay transmission for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") Logger.debug("[ASYNC RespPay] RespPay XML: #{resp_pay_xml}") # Extract ref_data from RespPay XML before sending ref_data = extract_ref_data_from_resp_pay_xml(resp_pay_xml) Logger.info("[ASYNC RespPay] Extracted ref_data from RespPay XML: #{inspect(ref_data)}") # Construct NPCI callback URL using txn_id from original ReqPay npci_endpoint = "#{@npci_base_url}/RespPay/2.0/urn:txnid:#{txn_id}" # # headers = [ # {"content-type", "application/xml"}, # {"accept", "application/xml"}, # {"user-agent", "Mercury-UPI-PSP/1.0"} # ] headers = [ {"Content-Type", "application/xml; charset=UTF-8"}, {"Accept", "application/xml"}, {"User-Agent", "Mercury-UPI-PSP/1.0"}, {"X-API-Version", "2.0"} ] Logger.info("[ASYNC RespPay] Sending to NPCI URL: #{npci_endpoint}") Logger.info("[ASYNC RespPay] Request headers: #{inspect(headers)}") case Req.post(npci_endpoint, body: resp_pay_xml, headers: headers, receive_timeout: @request_timeout) do {:ok, %Req.Response{status: 200, body: response_body}} -> Logger.info("[ASYNC RespPay] Successfully sent to NPCI for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") # Process NPCI's ACK response if response_body && String.trim(response_body) != "" do Logger.info("[ASYNC RespPay] Received ACK from NPCI for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") Logger.debug("[ASYNC RespPay] NPCI ACK response: #{response_body}") case parse_npci_ack(response_body) do {:ok, ack_data} -> Logger.info("[ASYNC RespPay] Successfully parsed NPCI ACK: #{inspect(ack_data)}") Logger.info("[ASYNC RespPay] RespPay flow completed successfully") # Update req_pays record with ref_data from RespPay update_req_pays_with_ref_data(txn_id, ref_data) # TODO: Update transaction status to indicate successful NPCI response {:error, parse_error} -> Logger.error("[ASYNC RespPay] Failed to parse NPCI ACK: #{parse_error}") Logger.debug("[ASYNC RespPay] Raw ACK response: #{response_body}") # Handle specific error cases case parse_error do "NPCI endpoint timeout" -> Logger.warning("[ASYNC RespPay] NPCI timeout - transaction may need retry or manual reconciliation") # TODO: Update transaction status to timeout/pending_reconciliation "NPCI endpoint error" -> Logger.error("[ASYNC RespPay] NPCI returned error - transaction failed") # TODO: Update transaction status to failed _ -> Logger.error("[ASYNC RespPay] Unexpected NPCI response format") # TODO: Update transaction status to pending_review end end else Logger.warning("[ASYNC RespPay] NPCI sent 200 but no ACK body for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") end {:ok, %Req.Response{status: 405}} -> Logger.error("[ASYNC RespPay] NPCI returned 405 Method Not Allowed for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") Logger.error("[ASYNC RespPay] This indicates:") Logger.error("[ASYNC RespPay] - URL endpoint issue: #{npci_endpoint}") Logger.error("[ASYNC RespPay] - POST method not accepted by NPCI") Logger.error("[ASYNC RespPay] - NPCI test environment configuration issue") {:ok, %Req.Response{status: status, body: error_body}} -> Logger.error("[ASYNC RespPay] NPCI returned status #{status} for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") Logger.error("[ASYNC RespPay] Error response: #{inspect(error_body)}") # TODO: Handle different error statuses appropriately {:error, request_error} -> Logger.error("[ASYNC RespPay] Network error sending to NPCI for txnId=#{txn_id}, orgTxnId=#{org_txn_id}") Logger.error("[ASYNC RespPay] Request error: #{inspect(request_error)}") # TODO: Implement retry logic or queuing for failed requests end :ok end @doc """ Send RespValQr XML asynchronously to NPCI after ACK has been sent. """ @spec send_async_respvalqr(String.t(), String.t(), map()) :: :ok def send_async_respvalqr(resp_val_qr_xml, txn_id, req_val_qr_data) do Logger.info("[ASYNC RespValQr] Starting async RespValQr transmission for txnId=#{txn_id}") Logger.debug("[ASYNC RespValQr] RespValQr XML: #{resp_val_qr_xml}") # Construct NPCI callback URL npci_endpoint = "#{@npci_base_url}/RespValQr/2.0/urn:txnid:#{txn_id}" headers = [ {"content-type", "application/xml"}, {"accept", "application/xml"}, {"user-agent", "Mercury-UPI-PSP/1.0"} ] Logger.info("[ASYNC RespValQr] Sending to NPCI URL: #{npci_endpoint}") case Req.post(npci_endpoint, body: resp_val_qr_xml, headers: headers, receive_timeout: @request_timeout) do {:ok, %Req.Response{status: 200, body: response_body}} -> Logger.info("[ASYNC RespValQr] Successfully sent to NPCI for txnId=#{txn_id}") handle_npci_ack_response(response_body, "RespValQr", txn_id) {:ok, %Req.Response{status: status, body: error_body}} -> Logger.error("[ASYNC RespValQr] NPCI returned status #{status} for txnId=#{txn_id}") Logger.error("[ASYNC RespValQr] Error response: #{inspect(error_body)}") {:error, request_error} -> Logger.error("[ASYNC RespValQr] Network error for txnId=#{txn_id}: #{inspect(request_error)}") end :ok end @doc """ Send RespChkTxn XML asynchronously to NPCI after ACK has been sent. """ @spec send_async_respchktxn(String.t(), String.t(), map()) :: :ok def send_async_respchktxn(resp_chk_txn_xml, org_txn_id, req_chk_txn_data) do Logger.info("[ASYNC RespChkTxn] Starting async RespChkTxn transmission for orgTxnId=#{org_txn_id}") Logger.debug("[ASYNC RespChkTxn] RespChkTxn XML: #{resp_chk_txn_xml}") # Construct NPCI callback URL npci_endpoint = "#{@npci_base_url}/RespChkTxn/2.0/urn:txnid:#{org_txn_id}" headers = [ {"Content-Type", "application/xml; charset=UTF-8"}, {"Accept", "application/xml"}, {"User-Agent", "Mercury-UPI-PSP/1.0"} ] Logger.info("[ASYNC RespChkTxn] Sending to NPCI URL: #{npci_endpoint}") Logger.info("[ASYNC RespChkTxn] Request headers: #{inspect(headers)}") case Req.post(npci_endpoint, body: resp_chk_txn_xml, headers: headers, receive_timeout: @request_timeout) do {:ok, %Req.Response{status: 200, body: response_body}} -> Logger.info("[ASYNC RespChkTxn] Successfully sent to NPCI for orgTxnId=#{org_txn_id}") handle_npci_ack_response(response_body, "RespChkTxn", org_txn_id) {:ok, %Req.Response{status: status, body: error_body}} -> Logger.error("[ASYNC RespChkTxn] NPCI returned status #{status} for orgTxnId=#{org_txn_id}") Logger.error("[ASYNC RespChkTxn] Error response: #{inspect(error_body)}") {:error, request_error} -> Logger.error("[ASYNC RespChkTxn] Network error for orgTxnId=#{org_txn_id}: #{inspect(request_error)}") end :ok end # ================================ # PRIVATE HELPER FUNCTIONS # ================================ # Extract ref_data from RespPay XML defp extract_ref_data_from_resp_pay_xml(xml) do try do import SweetXml doc = parse(xml) # Extract Ref element attributes from RespPay XML ref_xpath = ~x"//Ref[@type='PAYEE'][1]" addr = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@addr"s) || "" reg_name = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@regName"s) || "" code = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@code"s) || "" ifsc = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@IFSC"s) || "" ac_num = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@acNum"s) || "" acc_type = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@accType"s) || "" approval_num = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@approvalNum"s) || "" %{ regName: reg_name, code: code, ifsc: ifsc, ac_num: ac_num, acc_type: acc_type, approval_num: approval_num } rescue e -> Logger.error("[ASYNC RespPay] Failed to extract ref_data from XML: #{inspect(e)}") %{} end end # Update req_pays table with ref_data from RespPay defp update_req_pays_with_ref_data(txn_id, ref_data) when map_size(ref_data) > 0 do try do alias DaProductApp.Repo import Ecto.Query Logger.info("[ASYNC RespPay] Updating req_pays record for txn_id=#{txn_id}") Logger.info("[ASYNC RespPay] Update data: #{inspect(ref_data)}") # Update req_pays record with ref_data from(r in "req_pays", where: r.txn_id == ^txn_id, update: [ set: [ payee_name: ^Map.get(ref_data, :regName), merchant_category_code: ^Map.get(ref_data, :code), payee_ifsc: ^Map.get(ref_data, :ifsc), payee_account_number: ^Map.get(ref_data, :ac_num), payee_account_type: ^Map.get(ref_data, :acc_type), invoice_number: ^Map.get(ref_data, :approval_num) ] ] ) |> Repo.update_all([]) |> case do {1, _} -> Logger.info("[ASYNC RespPay] ✅ Successfully updated req_pays record for txn_id=#{txn_id}") :ok {0, _} -> Logger.warning("[ASYNC RespPay] ⚠️ No req_pays record found for txn_id=#{txn_id}") :ok {count, _} -> Logger.warning("[ASYNC RespPay] ⚠️ Updated #{count} records for txn_id=#{txn_id} (expected 1)") :ok end rescue e -> Logger.error("[ASYNC RespPay] ❌ Failed to update req_pays: #{inspect(e)}") :error end end defp update_req_pays_with_ref_data(_txn_id, _ref_data), do: :ok defp extract_msg_id_from_xml(xml_string) do try do # Extract msgId from ReqHbt XML using regex for simplicity case Regex.run(~r/msgId="([^"]+)"/, xml_string) do [_, msg_id] -> msg_id _ -> "UNKNOWN" end rescue _ -> "UNKNOWN" end end defp handle_npci_ack_response(response_body, api_name, txn_id) do if response_body && String.trim(response_body) != "" do Logger.info("[#{api_name}] Received ACK from NPCI for #{txn_id}") Logger.debug("[#{api_name}] NPCI ACK response: #{response_body}") case parse_npci_ack(response_body) do {:ok, ack_data} -> Logger.info("[#{api_name}] Successfully parsed NPCI ACK: #{inspect(ack_data)}") Logger.info("[#{api_name}] Flow completed successfully") {:error, parse_error} -> Logger.error("[#{api_name}] Failed to parse NPCI ACK: #{parse_error}") Logger.debug("[#{api_name}] Raw ACK response: #{response_body}") # Handle specific error cases case parse_error do "NPCI endpoint timeout" -> Logger.warning("[#{api_name}] NPCI timeout - transaction may need retry or manual reconciliation") "NPCI endpoint error" -> Logger.error("[#{api_name}] NPCI returned error - transaction failed") _ -> Logger.error("[#{api_name}] Unexpected NPCI response format") end end else Logger.warning("[#{api_name}] NPCI sent 200 but no ACK body for #{txn_id}") end end # ================================ # NPCI AUTHENTICATION & CONFIGURATION HELPERS # ================================ defp get_psp_npci_heartbeat_endpoint do Application.get_env(:da_product_app, :npci_psp_npci_heartbeat_endpoint, "https://backend.nfinite.in/iupi/ReqHbt/2.0/urn:txnid:") end defp get_npci_heartbeat_endpoint do Application.get_env(:da_product_app, :npci_heartbeat_endpoint, "https://precert.nfinite.in/iupi/ReqHbt/2.0/urn:txnid:") end defp get_psp_org_id do Application.get_env(:da_product_app, :psp_org_id, "MER101") end defp generate_request_id do "REQ" <> (:crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower)) end defp generate_request_id_with_msg_id(msg_id) when is_binary(msg_id) and msg_id != "" do "REQ_#{msg_id}" end defp generate_request_id_with_msg_id(_) do # Fallback to regular request ID if msg_id is not available generate_request_id() end defp extract_msg_id_from_heartbeat_xml(xml_string) do try do # Extract msgId from ReqHbt XML using regex for efficiency case Regex.run(~r/msgId="([^"]+)"/, xml_string) do [_, msg_id] -> Logger.debug("Extracted msgId from XML: #{msg_id}") msg_id _ -> Logger.warning("Could not extract msgId from heartbeat XML") "" end rescue error -> Logger.error("Error extracting msgId from XML: #{inspect(error)}") "" end end defp get_npci_ssl_options do # SSL/TLS options for NPCI certificate-based authentication cert_path = Application.get_env(:da_product_app, :npci_client_cert_path, "private.pem") key_path = Application.get_env(:da_product_app, :npci_client_key_path, "private.pem") case {File.exists?(cert_path), File.exists?(key_path)} do {true, true} -> Logger.info("✅ NPCI SSL certificates available at: #{cert_path}") Logger.info("📋 Note: SSL certificate authentication will be implemented in next phase") # For now, return empty options to avoid connection issues [] {false, _} -> Logger.warning("⚠️ NPCI client certificate not found at: #{cert_path}") Logger.info("📋 Using standard HTTPS without client certificate authentication") [] {_, false} -> Logger.warning("⚠️ NPCI private key not found at: #{key_path}") Logger.info("📋 Using standard HTTPS without client certificate authentication") [] end rescue error -> Logger.error("❌ Failed to check NPCI SSL certificates: #{inspect(error)}") Logger.info("📋 Using standard HTTPS without client certificate authentication") [] end defp parse_npci_ack(ack_response) do # Handle non-XML responses like "TIMEOUT", "ERROR", etc. case String.trim(ack_response) do "TIMEOUT" -> {:error, "NPCI endpoint timeout"} "ERROR" -> {:error, "NPCI endpoint error"} response when byte_size(response) < 10 -> {:error, "Invalid NPCI response: #{response}"} xml_response -> # Only parse if it looks like XML (starts with < and contains XML content) if String.starts_with?(xml_response, "<") and String.contains?(xml_response, "Ack") do try do # Parse the XML and handle namespaces using local-name() parsed = xml_response |> parse() |> xpath(~x"//*[local-name()='Ack']"e, api: ~x"./@api"s, req_msg_id: ~x"./@reqMsgId"s, org_txn_id: ~x"./@orgTxnId"s, txn_id: ~x"./@txnId"s, timestamp: ~x"./@ts"s ) {:ok, parsed} rescue error -> Logger.error("[ACK Parse Error] XML: #{xml_response}") Logger.error("[ACK Parse Error] Exception: #{inspect(error)}") {:error, "Failed to parse ACK XML: #{inspect(error)}"} end else {:error, "Non-XML response from NPCI: #{xml_response}"} end end end # ================================ # HEARTBEAT FUNCTIONS (NEW IMPLEMENTATION) # ================================ @doc """ Send heartbeat request (ReqHbt) to NPCI and handle immediate ACK response. This implements Step 1 & 2 of the PSP-initiated heartbeat flow. ## Parameters: - `req_hbt_xml`: Generated ReqHbt XML to send - `txn_id`: Transaction ID to check status for ## Returns: - `{:ok, ack_response}` on successful request with ACK - `{:error, reason}` on failure ## Flow: 1. PSP sends ReqHbt to NPCI endpoint 2. NPCI responds with immediate ACK """ @spec send_heartbeat_request(String.t(), String.t()) :: {:ok, map()} | {:error, String.t()} def send_heartbeat_request(req_hbt_xml, txn_id) do # Use configured endpoint instead of hardcoded URL npci_endpoint = get_psp_npci_heartbeat_endpoint() <> txn_id Logger.info("=== SENDING REQHBT TO NPCI ===") Logger.info("NPCI ReqHbt Endpoint URL: #{npci_endpoint}") Logger.info("Transaction ID: #{txn_id}") Logger.info("ReqHbt XML being sent:") Logger.info(req_hbt_xml) Logger.info("=== END REQHBT XML ===") # Extract message ID from XML for request tracking msg_id = extract_msg_id_from_heartbeat_xml(req_hbt_xml) # Enhanced headers with NPCI authentication headers = [ {"Content-Type", "application/xml; charset=UTF-8"}, {"Accept", "application/xml"}, {"User-Agent", "Mercury-UPI-PSP/1.0"}, {"X-API-Version", "2.0"}, {"X-PSP-OrgId", get_psp_org_id()}, {"X-Request-ID", generate_request_id_with_msg_id(msg_id)} ] Logger.info("Request headers: #{inspect(headers)}") # Check SSL certificate status for NPCI authentication get_npci_ssl_options() case Req.post(npci_endpoint, body: req_hbt_xml, headers: headers, receive_timeout: @request_timeout ) do {:ok, %Req.Response{status: 200, body: response_body}} -> Logger.info("✅ Successfully sent ReqHbt to NPCI for txnId=#{txn_id}") # Process NPCI's ACK response if response_body && String.trim(response_body) != "" do Logger.info("📨 Received ACK from NPCI for ReqHbt txnId=#{txn_id}") Logger.debug("NPCI ACK response: #{response_body}") case parse_npci_ack(response_body) do {:ok, ack_data} -> Logger.info("✅ Successfully parsed NPCI ACK: #{inspect(ack_data)}") Logger.info("🔄 ReqHbt flow Step 1 & 2 completed successfully - waiting for RespHbt") {:ok, ack_data} {:error, parse_error} -> Logger.error("❌ Failed to parse NPCI ACK: #{parse_error}") Logger.debug("Raw ACK response: #{response_body}") {:error, "ACK parsing failed: #{parse_error}"} end else Logger.warning("⚠️ NPCI sent 200 but no ACK body for ReqHbt txnId=#{txn_id}") {:error, "Empty ACK response from NPCI"} end {:ok, %Req.Response{status: status, body: error_body}} -> Logger.error("❌ NPCI returned status #{status} for ReqHbt txnId=#{txn_id}") Logger.error("Error response: #{inspect(error_body)}") {:error, "NPCI error: status #{status}"} {:error, request_error} -> Logger.error("❌ Network error sending ReqHbt for txnId=#{txn_id}: #{inspect(request_error)}") {:error, "Network error: #{inspect(request_error)}"} end end @doc """ Send ACK response back to NPCI after receiving and processing RespHbt. This implements Step 4 of the PSP-initiated heartbeat flow. ## Parameters: - `ack_xml`: Generated ACK XML to send back - `resp_hbt_data`: Parsed RespHbt data for context ## Returns: - `:ok` on successful ACK sent - `{:error, reason}` on failure """ @spec send_heartbeat_ack(String.t(), map()) :: :ok | {:error, String.t()} def send_heartbeat_ack(ack_xml, resp_hbt_data) do # For ACK responses, we typically respond to the same endpoint that sent us the RespHbt # This would be handled at the controller level, but we can log it here Logger.info("=== SENDING HEARTBEAT ACK TO NPCI ===") Logger.info("ACK for RespHbt msgId: #{resp_hbt_data[:msg_id]}") Logger.info("ACK XML:") Logger.info(ack_xml) Logger.info("=== END HEARTBEAT ACK ===") # In a real implementation, this would send the ACK back via the HTTP response # For now, we'll just log it as the actual sending happens in the controller Logger.info("✅ Heartbeat ACK prepared and logged - controller will send HTTP response") :ok end @doc """ Handle complete heartbeat flow coordination. This function can be called to orchestrate the entire heartbeat process. ## Parameters: - `txn_id`: Transaction ID to check - `heartbeat_data`: Data for generating ReqHbt ## Returns: - `{:ok, flow_result}` on successful flow initiation - `{:error, reason}` on failure """ @spec handle_heartbeat_flow(String.t(), map()) :: {:ok, map()} | {:error, String.t()} def handle_heartbeat_flow(txn_id, heartbeat_data) do Logger.info("=== STARTING COMPLETE HEARTBEAT FLOW ===") Logger.info("Transaction ID: #{txn_id}") Logger.info("Heartbeat Data: #{inspect(heartbeat_data)}") with {:ok, req_hbt_xml} <- DaProductAppWeb.UpiXmlSchema.generate_req_hbt(heartbeat_data), {:ok, ack_response} <- send_heartbeat_request(req_hbt_xml, txn_id) do Logger.info("✅ Heartbeat flow Steps 1 & 2 completed successfully") Logger.info("⏳ Flow continues when NPCI sends RespHbt to our endpoint") flow_result = %{ step: "ack_received", txn_id: txn_id, msg_id: heartbeat_data.msg_id, ack_response: ack_response, status: "waiting_for_resp_hbt", timestamp: DateTime.utc_now() } {:ok, flow_result} else {:error, reason} = error -> Logger.error("❌ Heartbeat flow failed at Step 1/2: #{reason}") error end end end