defmodule DaProductApp.Adapters.NpciAdapter do
@moduledoc """
NPCI communication adapter for handling all UPI XML-based API communications.
Provides functions for:
- Generating ACK responses for immediate NPCI acknowledgment
- Sending async RespPay, RespValQr, and other responses to NPCI endpoints
- Handling NPCI endpoint communication with proper error handling and logging
- XML formatting and response parsing per NPCI specification
"""
require Logger
import SweetXml
# NPCI endpoints configuration
@npci_base_url "https://precert.nfinite.in/iupi"
@request_timeout 30_000
# ================================
# ACK GENERATION FUNCTIONS
# ================================
@doc """
Generate ACK XML response for ReqPay requests.
Returns immediate acknowledgment to NPCI that request was received.
"""
@spec generate_reqpay_ack(String.t(), String.t()) :: String.t()
def generate_reqpay_ack(msg_id, org_txn_id) do
timestamp = DateTime.utc_now() |> DateTime.to_iso8601()
ack_xml = """
"""
Logger.info("[NPCI ACK] Generated ReqPay ACK for msgId=#{msg_id}, orgTxnId=#{org_txn_id}")
Logger.debug("[NPCI ACK] ACK XML: #{ack_xml}")
ack_xml
end
@doc """
Generate ACK XML response for ReqValQr requests.
"""
@spec generate_reqvalqr_ack(String.t(), String.t()) :: String.t()
def generate_reqvalqr_ack(msg_id, txn_id) do
timestamp = DateTime.utc_now() |> DateTime.to_iso8601()
ack_xml = """
"""
Logger.info("[NPCI ACK] Generated ReqValQr ACK for msgId=#{msg_id}, txnId=#{txn_id}")
Logger.debug("[NPCI ACK] ACK XML: #{ack_xml}")
ack_xml
end
@doc """
Generate ACK XML response for ReqChkTxn requests.
"""
@spec generate_reqchktxn_ack(String.t(), String.t()) :: String.t()
def generate_reqchktxn_ack(msg_id, org_txn_id) do
timestamp = DateTime.utc_now() |> DateTime.to_iso8601()
ack_xml = """
"""
Logger.info("[NPCI ACK] Generated ReqChkTxn ACK for msgId=#{msg_id}, orgTxnId=#{org_txn_id}")
Logger.debug("[NPCI ACK] ACK XML: #{ack_xml}")
ack_xml
end
# ================================
# ASYNC NPCI COMMUNICATION FUNCTIONS
# ================================
@doc """
Send RespPay XML asynchronously to NPCI after ACK has been sent.
Handles the complete payment response flow with proper error handling.
"""
@spec send_async_resppay(String.t(), String.t(), String.t(), map()) :: :ok
def send_async_resppay(resp_pay_xml, txn_id, org_txn_id, req_pay_data) do
Logger.info("[ASYNC RespPay] Starting async RespPay transmission for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
Logger.debug("[ASYNC RespPay] RespPay XML: #{resp_pay_xml}")
# Extract ref_data from RespPay XML before sending
ref_data = extract_ref_data_from_resp_pay_xml(resp_pay_xml)
Logger.info("[ASYNC RespPay] Extracted ref_data from RespPay XML: #{inspect(ref_data)}")
# Construct NPCI callback URL using txn_id from original ReqPay
npci_endpoint = "#{@npci_base_url}/RespPay/2.0/urn:txnid:#{txn_id}"
# # headers = [
# {"content-type", "application/xml"},
# {"accept", "application/xml"},
# {"user-agent", "Mercury-UPI-PSP/1.0"}
# ]
headers = [
{"Content-Type", "application/xml; charset=UTF-8"},
{"Accept", "application/xml"},
{"User-Agent", "Mercury-UPI-PSP/1.0"},
{"X-API-Version", "2.0"}
]
Logger.info("[ASYNC RespPay] Sending to NPCI URL: #{npci_endpoint}")
Logger.info("[ASYNC RespPay] Request headers: #{inspect(headers)}")
case Req.post(npci_endpoint, body: resp_pay_xml, headers: headers, receive_timeout: @request_timeout) do
{:ok, %Req.Response{status: 200, body: response_body}} ->
Logger.info("[ASYNC RespPay] Successfully sent to NPCI for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
# Process NPCI's ACK response
if response_body && String.trim(response_body) != "" do
Logger.info("[ASYNC RespPay] Received ACK from NPCI for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
Logger.debug("[ASYNC RespPay] NPCI ACK response: #{response_body}")
case parse_npci_ack(response_body) do
{:ok, ack_data} ->
Logger.info("[ASYNC RespPay] Successfully parsed NPCI ACK: #{inspect(ack_data)}")
Logger.info("[ASYNC RespPay] RespPay flow completed successfully")
# Update req_pays record with ref_data from RespPay
update_req_pays_with_ref_data(txn_id, ref_data)
# TODO: Update transaction status to indicate successful NPCI response
{:error, parse_error} ->
Logger.error("[ASYNC RespPay] Failed to parse NPCI ACK: #{parse_error}")
Logger.debug("[ASYNC RespPay] Raw ACK response: #{response_body}")
# Handle specific error cases
case parse_error do
"NPCI endpoint timeout" ->
Logger.warning("[ASYNC RespPay] NPCI timeout - transaction may need retry or manual reconciliation")
# TODO: Update transaction status to timeout/pending_reconciliation
"NPCI endpoint error" ->
Logger.error("[ASYNC RespPay] NPCI returned error - transaction failed")
# TODO: Update transaction status to failed
_ ->
Logger.error("[ASYNC RespPay] Unexpected NPCI response format")
# TODO: Update transaction status to pending_review
end
end
else
Logger.warning("[ASYNC RespPay] NPCI sent 200 but no ACK body for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
end
{:ok, %Req.Response{status: 405}} ->
Logger.error("[ASYNC RespPay] NPCI returned 405 Method Not Allowed for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
Logger.error("[ASYNC RespPay] This indicates:")
Logger.error("[ASYNC RespPay] - URL endpoint issue: #{npci_endpoint}")
Logger.error("[ASYNC RespPay] - POST method not accepted by NPCI")
Logger.error("[ASYNC RespPay] - NPCI test environment configuration issue")
{:ok, %Req.Response{status: status, body: error_body}} ->
Logger.error("[ASYNC RespPay] NPCI returned status #{status} for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
Logger.error("[ASYNC RespPay] Error response: #{inspect(error_body)}")
# TODO: Handle different error statuses appropriately
{:error, request_error} ->
Logger.error("[ASYNC RespPay] Network error sending to NPCI for txnId=#{txn_id}, orgTxnId=#{org_txn_id}")
Logger.error("[ASYNC RespPay] Request error: #{inspect(request_error)}")
# TODO: Implement retry logic or queuing for failed requests
end
:ok
end
@doc """
Send RespValQr XML asynchronously to NPCI after ACK has been sent.
"""
@spec send_async_respvalqr(String.t(), String.t(), map()) :: :ok
def send_async_respvalqr(resp_val_qr_xml, txn_id, req_val_qr_data) do
Logger.info("[ASYNC RespValQr] Starting async RespValQr transmission for txnId=#{txn_id}")
Logger.debug("[ASYNC RespValQr] RespValQr XML: #{resp_val_qr_xml}")
# Construct NPCI callback URL
npci_endpoint = "#{@npci_base_url}/RespValQr/2.0/urn:txnid:#{txn_id}"
headers = [
{"content-type", "application/xml"},
{"accept", "application/xml"},
{"user-agent", "Mercury-UPI-PSP/1.0"}
]
Logger.info("[ASYNC RespValQr] Sending to NPCI URL: #{npci_endpoint}")
case Req.post(npci_endpoint, body: resp_val_qr_xml, headers: headers, receive_timeout: @request_timeout) do
{:ok, %Req.Response{status: 200, body: response_body}} ->
Logger.info("[ASYNC RespValQr] Successfully sent to NPCI for txnId=#{txn_id}")
handle_npci_ack_response(response_body, "RespValQr", txn_id)
{:ok, %Req.Response{status: status, body: error_body}} ->
Logger.error("[ASYNC RespValQr] NPCI returned status #{status} for txnId=#{txn_id}")
Logger.error("[ASYNC RespValQr] Error response: #{inspect(error_body)}")
{:error, request_error} ->
Logger.error("[ASYNC RespValQr] Network error for txnId=#{txn_id}: #{inspect(request_error)}")
end
:ok
end
@doc """
Send RespChkTxn XML asynchronously to NPCI after ACK has been sent.
"""
@spec send_async_respchktxn(String.t(), String.t(), map()) :: :ok
def send_async_respchktxn(resp_chk_txn_xml, org_txn_id, req_chk_txn_data) do
Logger.info("[ASYNC RespChkTxn] Starting async RespChkTxn transmission for orgTxnId=#{org_txn_id}")
Logger.debug("[ASYNC RespChkTxn] RespChkTxn XML: #{resp_chk_txn_xml}")
# Construct NPCI callback URL
npci_endpoint = "#{@npci_base_url}/RespChkTxn/2.0/urn:txnid:#{org_txn_id}"
headers = [
{"Content-Type", "application/xml; charset=UTF-8"},
{"Accept", "application/xml"},
{"User-Agent", "Mercury-UPI-PSP/1.0"}
]
Logger.info("[ASYNC RespChkTxn] Sending to NPCI URL: #{npci_endpoint}")
Logger.info("[ASYNC RespChkTxn] Request headers: #{inspect(headers)}")
case Req.post(npci_endpoint, body: resp_chk_txn_xml, headers: headers, receive_timeout: @request_timeout) do
{:ok, %Req.Response{status: 200, body: response_body}} ->
Logger.info("[ASYNC RespChkTxn] Successfully sent to NPCI for orgTxnId=#{org_txn_id}")
handle_npci_ack_response(response_body, "RespChkTxn", org_txn_id)
{:ok, %Req.Response{status: status, body: error_body}} ->
Logger.error("[ASYNC RespChkTxn] NPCI returned status #{status} for orgTxnId=#{org_txn_id}")
Logger.error("[ASYNC RespChkTxn] Error response: #{inspect(error_body)}")
{:error, request_error} ->
Logger.error("[ASYNC RespChkTxn] Network error for orgTxnId=#{org_txn_id}: #{inspect(request_error)}")
end
:ok
end
# ================================
# PRIVATE HELPER FUNCTIONS
# ================================
# Extract ref_data from RespPay XML
defp extract_ref_data_from_resp_pay_xml(xml) do
try do
import SweetXml
doc = parse(xml)
# Extract Ref element attributes from RespPay XML
ref_xpath = ~x"//Ref[@type='PAYEE'][1]"
addr = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@addr"s) || ""
reg_name = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@regName"s) || ""
code = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@code"s) || ""
ifsc = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@IFSC"s) || ""
ac_num = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@acNum"s) || ""
acc_type = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@accType"s) || ""
approval_num = xpath(doc, ~x"//Ref[@type='PAYEE'][1]/@approvalNum"s) || ""
%{
regName: reg_name,
code: code,
ifsc: ifsc,
ac_num: ac_num,
acc_type: acc_type,
approval_num: approval_num
}
rescue
e ->
Logger.error("[ASYNC RespPay] Failed to extract ref_data from XML: #{inspect(e)}")
%{}
end
end
# Update req_pays table with ref_data from RespPay
defp update_req_pays_with_ref_data(txn_id, ref_data) when map_size(ref_data) > 0 do
try do
alias DaProductApp.Repo
import Ecto.Query
Logger.info("[ASYNC RespPay] Updating req_pays record for txn_id=#{txn_id}")
Logger.info("[ASYNC RespPay] Update data: #{inspect(ref_data)}")
# Update req_pays record with ref_data
from(r in "req_pays",
where: r.txn_id == ^txn_id,
update: [
set: [
payee_name: ^Map.get(ref_data, :regName),
merchant_category_code: ^Map.get(ref_data, :code),
payee_ifsc: ^Map.get(ref_data, :ifsc),
payee_account_number: ^Map.get(ref_data, :ac_num),
payee_account_type: ^Map.get(ref_data, :acc_type),
invoice_number: ^Map.get(ref_data, :approval_num)
]
]
)
|> Repo.update_all([])
|> case do
{1, _} ->
Logger.info("[ASYNC RespPay] ✅ Successfully updated req_pays record for txn_id=#{txn_id}")
:ok
{0, _} ->
Logger.warning("[ASYNC RespPay] ⚠️ No req_pays record found for txn_id=#{txn_id}")
:ok
{count, _} ->
Logger.warning("[ASYNC RespPay] ⚠️ Updated #{count} records for txn_id=#{txn_id} (expected 1)")
:ok
end
rescue
e ->
Logger.error("[ASYNC RespPay] ❌ Failed to update req_pays: #{inspect(e)}")
:error
end
end
defp update_req_pays_with_ref_data(_txn_id, _ref_data), do: :ok
defp extract_msg_id_from_xml(xml_string) do
try do
# Extract msgId from ReqHbt XML using regex for simplicity
case Regex.run(~r/msgId="([^"]+)"/, xml_string) do
[_, msg_id] -> msg_id
_ -> "UNKNOWN"
end
rescue
_ -> "UNKNOWN"
end
end
defp handle_npci_ack_response(response_body, api_name, txn_id) do
if response_body && String.trim(response_body) != "" do
Logger.info("[#{api_name}] Received ACK from NPCI for #{txn_id}")
Logger.debug("[#{api_name}] NPCI ACK response: #{response_body}")
case parse_npci_ack(response_body) do
{:ok, ack_data} ->
Logger.info("[#{api_name}] Successfully parsed NPCI ACK: #{inspect(ack_data)}")
Logger.info("[#{api_name}] Flow completed successfully")
{:error, parse_error} ->
Logger.error("[#{api_name}] Failed to parse NPCI ACK: #{parse_error}")
Logger.debug("[#{api_name}] Raw ACK response: #{response_body}")
# Handle specific error cases
case parse_error do
"NPCI endpoint timeout" ->
Logger.warning("[#{api_name}] NPCI timeout - transaction may need retry or manual reconciliation")
"NPCI endpoint error" ->
Logger.error("[#{api_name}] NPCI returned error - transaction failed")
_ ->
Logger.error("[#{api_name}] Unexpected NPCI response format")
end
end
else
Logger.warning("[#{api_name}] NPCI sent 200 but no ACK body for #{txn_id}")
end
end
# ================================
# NPCI AUTHENTICATION & CONFIGURATION HELPERS
# ================================
defp get_psp_npci_heartbeat_endpoint do
Application.get_env(:da_product_app, :npci_psp_npci_heartbeat_endpoint, "https://backend.nfinite.in/iupi/ReqHbt/2.0/urn:txnid:")
end
defp get_npci_heartbeat_endpoint do
Application.get_env(:da_product_app, :npci_heartbeat_endpoint, "https://precert.nfinite.in/iupi/ReqHbt/2.0/urn:txnid:")
end
defp get_psp_org_id do
Application.get_env(:da_product_app, :psp_org_id, "MER101")
end
defp generate_request_id do
"REQ" <> (:crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower))
end
defp generate_request_id_with_msg_id(msg_id) when is_binary(msg_id) and msg_id != "" do
"REQ_#{msg_id}"
end
defp generate_request_id_with_msg_id(_) do
# Fallback to regular request ID if msg_id is not available
generate_request_id()
end
defp extract_msg_id_from_heartbeat_xml(xml_string) do
try do
# Extract msgId from ReqHbt XML using regex for efficiency
case Regex.run(~r/msgId="([^"]+)"/, xml_string) do
[_, msg_id] ->
Logger.debug("Extracted msgId from XML: #{msg_id}")
msg_id
_ ->
Logger.warning("Could not extract msgId from heartbeat XML")
""
end
rescue
error ->
Logger.error("Error extracting msgId from XML: #{inspect(error)}")
""
end
end
defp get_npci_ssl_options do
# SSL/TLS options for NPCI certificate-based authentication
cert_path = Application.get_env(:da_product_app, :npci_client_cert_path, "private.pem")
key_path = Application.get_env(:da_product_app, :npci_client_key_path, "private.pem")
case {File.exists?(cert_path), File.exists?(key_path)} do
{true, true} ->
Logger.info("✅ NPCI SSL certificates available at: #{cert_path}")
Logger.info("📋 Note: SSL certificate authentication will be implemented in next phase")
# For now, return empty options to avoid connection issues
[]
{false, _} ->
Logger.warning("⚠️ NPCI client certificate not found at: #{cert_path}")
Logger.info("📋 Using standard HTTPS without client certificate authentication")
[]
{_, false} ->
Logger.warning("⚠️ NPCI private key not found at: #{key_path}")
Logger.info("📋 Using standard HTTPS without client certificate authentication")
[]
end
rescue
error ->
Logger.error("❌ Failed to check NPCI SSL certificates: #{inspect(error)}")
Logger.info("📋 Using standard HTTPS without client certificate authentication")
[]
end
defp parse_npci_ack(ack_response) do
# Handle non-XML responses like "TIMEOUT", "ERROR", etc.
case String.trim(ack_response) do
"TIMEOUT" ->
{:error, "NPCI endpoint timeout"}
"ERROR" ->
{:error, "NPCI endpoint error"}
response when byte_size(response) < 10 ->
{:error, "Invalid NPCI response: #{response}"}
xml_response ->
# Only parse if it looks like XML (starts with < and contains XML content)
if String.starts_with?(xml_response, "<") and String.contains?(xml_response, "Ack") do
try do
# Parse the XML and handle namespaces using local-name()
parsed = xml_response
|> parse()
|> xpath(~x"//*[local-name()='Ack']"e,
api: ~x"./@api"s,
req_msg_id: ~x"./@reqMsgId"s,
org_txn_id: ~x"./@orgTxnId"s,
txn_id: ~x"./@txnId"s,
timestamp: ~x"./@ts"s
)
{:ok, parsed}
rescue
error ->
Logger.error("[ACK Parse Error] XML: #{xml_response}")
Logger.error("[ACK Parse Error] Exception: #{inspect(error)}")
{:error, "Failed to parse ACK XML: #{inspect(error)}"}
end
else
{:error, "Non-XML response from NPCI: #{xml_response}"}
end
end
end
# ================================
# HEARTBEAT FUNCTIONS (NEW IMPLEMENTATION)
# ================================
@doc """
Send heartbeat request (ReqHbt) to NPCI and handle immediate ACK response.
This implements Step 1 & 2 of the PSP-initiated heartbeat flow.
## Parameters:
- `req_hbt_xml`: Generated ReqHbt XML to send
- `txn_id`: Transaction ID to check status for
## Returns:
- `{:ok, ack_response}` on successful request with ACK
- `{:error, reason}` on failure
## Flow:
1. PSP sends ReqHbt to NPCI endpoint
2. NPCI responds with immediate ACK
"""
@spec send_heartbeat_request(String.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
def send_heartbeat_request(req_hbt_xml, txn_id) do
# Use configured endpoint instead of hardcoded URL
npci_endpoint = get_psp_npci_heartbeat_endpoint() <> txn_id
Logger.info("=== SENDING REQHBT TO NPCI ===")
Logger.info("NPCI ReqHbt Endpoint URL: #{npci_endpoint}")
Logger.info("Transaction ID: #{txn_id}")
Logger.info("ReqHbt XML being sent:")
Logger.info(req_hbt_xml)
Logger.info("=== END REQHBT XML ===")
# Extract message ID from XML for request tracking
msg_id = extract_msg_id_from_heartbeat_xml(req_hbt_xml)
# Enhanced headers with NPCI authentication
headers = [
{"Content-Type", "application/xml; charset=UTF-8"},
{"Accept", "application/xml"},
{"User-Agent", "Mercury-UPI-PSP/1.0"},
{"X-API-Version", "2.0"},
{"X-PSP-OrgId", get_psp_org_id()},
{"X-Request-ID", generate_request_id_with_msg_id(msg_id)}
]
Logger.info("Request headers: #{inspect(headers)}")
# Check SSL certificate status for NPCI authentication
get_npci_ssl_options()
case Req.post(npci_endpoint,
body: req_hbt_xml,
headers: headers,
receive_timeout: @request_timeout
) do
{:ok, %Req.Response{status: 200, body: response_body}} ->
Logger.info("✅ Successfully sent ReqHbt to NPCI for txnId=#{txn_id}")
# Process NPCI's ACK response
if response_body && String.trim(response_body) != "" do
Logger.info("📨 Received ACK from NPCI for ReqHbt txnId=#{txn_id}")
Logger.debug("NPCI ACK response: #{response_body}")
case parse_npci_ack(response_body) do
{:ok, ack_data} ->
Logger.info("✅ Successfully parsed NPCI ACK: #{inspect(ack_data)}")
Logger.info("🔄 ReqHbt flow Step 1 & 2 completed successfully - waiting for RespHbt")
{:ok, ack_data}
{:error, parse_error} ->
Logger.error("❌ Failed to parse NPCI ACK: #{parse_error}")
Logger.debug("Raw ACK response: #{response_body}")
{:error, "ACK parsing failed: #{parse_error}"}
end
else
Logger.warning("⚠️ NPCI sent 200 but no ACK body for ReqHbt txnId=#{txn_id}")
{:error, "Empty ACK response from NPCI"}
end
{:ok, %Req.Response{status: status, body: error_body}} ->
Logger.error("❌ NPCI returned status #{status} for ReqHbt txnId=#{txn_id}")
Logger.error("Error response: #{inspect(error_body)}")
{:error, "NPCI error: status #{status}"}
{:error, request_error} ->
Logger.error("❌ Network error sending ReqHbt for txnId=#{txn_id}: #{inspect(request_error)}")
{:error, "Network error: #{inspect(request_error)}"}
end
end
@doc """
Send ACK response back to NPCI after receiving and processing RespHbt.
This implements Step 4 of the PSP-initiated heartbeat flow.
## Parameters:
- `ack_xml`: Generated ACK XML to send back
- `resp_hbt_data`: Parsed RespHbt data for context
## Returns:
- `:ok` on successful ACK sent
- `{:error, reason}` on failure
"""
@spec send_heartbeat_ack(String.t(), map()) :: :ok | {:error, String.t()}
def send_heartbeat_ack(ack_xml, resp_hbt_data) do
# For ACK responses, we typically respond to the same endpoint that sent us the RespHbt
# This would be handled at the controller level, but we can log it here
Logger.info("=== SENDING HEARTBEAT ACK TO NPCI ===")
Logger.info("ACK for RespHbt msgId: #{resp_hbt_data[:msg_id]}")
Logger.info("ACK XML:")
Logger.info(ack_xml)
Logger.info("=== END HEARTBEAT ACK ===")
# In a real implementation, this would send the ACK back via the HTTP response
# For now, we'll just log it as the actual sending happens in the controller
Logger.info("✅ Heartbeat ACK prepared and logged - controller will send HTTP response")
:ok
end
@doc """
Handle complete heartbeat flow coordination.
This function can be called to orchestrate the entire heartbeat process.
## Parameters:
- `txn_id`: Transaction ID to check
- `heartbeat_data`: Data for generating ReqHbt
## Returns:
- `{:ok, flow_result}` on successful flow initiation
- `{:error, reason}` on failure
"""
@spec handle_heartbeat_flow(String.t(), map()) :: {:ok, map()} | {:error, String.t()}
def handle_heartbeat_flow(txn_id, heartbeat_data) do
Logger.info("=== STARTING COMPLETE HEARTBEAT FLOW ===")
Logger.info("Transaction ID: #{txn_id}")
Logger.info("Heartbeat Data: #{inspect(heartbeat_data)}")
with {:ok, req_hbt_xml} <- DaProductAppWeb.UpiXmlSchema.generate_req_hbt(heartbeat_data),
{:ok, ack_response} <- send_heartbeat_request(req_hbt_xml, txn_id) do
Logger.info("✅ Heartbeat flow Steps 1 & 2 completed successfully")
Logger.info("⏳ Flow continues when NPCI sends RespHbt to our endpoint")
flow_result = %{
step: "ack_received",
txn_id: txn_id,
msg_id: heartbeat_data.msg_id,
ack_response: ack_response,
status: "waiting_for_resp_hbt",
timestamp: DateTime.utc_now()
}
{:ok, flow_result}
else
{:error, reason} = error ->
Logger.error("❌ Heartbeat flow failed at Step 1/2: #{reason}")
error
end
end
end