# lib/da_product_app/mqtt/handler.ex defmodule DaProductApp.MQTT.Handler do use Tortoise.Handler require Logger @impl true def init(opts) do client_id = Keyword.get(opts, :client_id, "unknown_client") {:ok, %{online_devices: %{}, client_id: client_id}} end @impl true def connection(:up, state) do device_id = parse_device_id(state.client_id) Logger.info("Device #{device_id} connected") DaProductApp.DeviceRegistry.track_online(device_id) {:ok, put_in(state, [:online_devices, device_id], true)} end @impl true def connection(:down, state) do device_id = parse_device_id(state.client_id) Logger.info("Device #{device_id} disconnected") DaProductApp.DeviceRegistry.track_offline(device_id) {:ok, put_in(state, [:online_devices, device_id], false)} end @impl true def handle_message(["", "ack", "qr-device", device_id], payload, state) do Logger.info("Received ACK from #{device_id}") Logger.info("Received payload from #{payload}") DaProductAppWeb.Endpoint.broadcast("qr:ack:#{device_id}", "ack_received", %{}) {:ok, state} end @impl true def handle_message(["", "ack", merchant_id, device_id], payload, state) do Logger.info("Received ACK for merchant #{merchant_id}, device #{device_id} with payload: #{payload}") case Jason.decode(payload) do {:ok, %{"request_id" => request_id} = data} -> Logger.info( "Received OTA update for device #{device_id} with request_id #{request_id}: #{inspect(data)}" ) # Use status from payload if present, else "success", and always downcase status = data |> Map.get("status", "success") |> String.downcase() Logger.info("Status for request_id #{request_id} is #{status}") # Fetch and update the transaction case DaProductApp.CloudTransactions.CloudTransaction |> DaProductApp.Repo.get_by(transaction_ref_number: request_id, device_id: device_id) do nil -> Logger.warn("No transaction found with transaction_ref_number=#{request_id}") transaction -> changeset = DaProductApp.CloudTransactions.CloudTransaction.changeset(transaction, %{ status: status, acknoledgment: "received" }) case DaProductApp.Repo.update(changeset) do {:ok, _updated} -> Logger.info("Transaction #{request_id} updated to #{status}/received") {:error, changeset} -> Logger.error( "Failed to update transaction #{request_id}: #{inspect(changeset.errors)}" ) end end DaProductAppWeb.Endpoint.broadcast("qr:ack:#{device_id}", "ack_received", %{}) {:ok, state} {:error, _reason} -> Logger.error("Failed to decode OTA payload for device #{device_id}: #{payload}") {:ok, state} end end defp parse_device_id("qr_" <> rest), do: rest defp parse_device_id(client_id), do: client_id end