# lib/da_product_app/mqtt/handler.ex defmodule DaProductApp.MQTT.Handler do use Tortoise.Handler require Logger @impl true def init(opts) do client_id = Keyword.get(opts, :client_id, "unknown_client") {:ok, %{online_devices: %{}, client_id: client_id}} end @impl true def connection(:up, state) do device_id = parse_device_id(state.client_id) Logger.info("Device #{device_id} connected") PlatformCore.DeviceRegistry.track_online(device_id) {:ok, put_in(state, [:online_devices, device_id], true)} end @impl true def connection(:down, state) do device_id = parse_device_id(state.client_id) Logger.info("Device #{device_id} disconnected") PlatformCore.DeviceRegistry.track_offline(device_id) {:ok, put_in(state, [:online_devices, device_id], false)} end @impl true def handle_message(["ack", "qr-device", device_id], "OK", state) do Logger.info("Received ACK from #{device_id}") DaProductAppWeb.Endpoint.broadcast("qr:ack:#{device_id}", "ack_received", %{}) {:ok, state} end @impl true def handle_message(["ota", "ack", device_serial], payload, state) do Logger.info("Received OTA ACK from device #{device_serial}: #{payload}") case Jason.decode(payload) do {:ok, %{"request_id" => request_id, "status" => ack_status}} -> DaProductApp.TerminalManagement.OtaService.handle_ota_acknowledgment( device_serial, request_id, ack_status ) {:error, _} -> Logger.error("Failed to parse OTA ACK payload from #{device_serial}: #{payload}") end {:ok, state} end @impl true def handle_message(["ota", "file_download", device_serial], payload, state) do Logger.info("Received file download response from device #{device_serial}: #{payload}") case Jason.decode(payload) do {:ok, %{"request_id" => request_id} = response} -> DaProductApp.TerminalManagement.FileDownloadService.handle_download_response( device_serial, request_id, response ) {:error, _} -> Logger.error("Failed to parse file download response from #{device_serial}: #{payload}") end {:ok, state} end @impl true def handle_message(["tms", "status", serial_number], payload, state) do require Logger Logger.info("Received status update for terminal #{serial_number}") Logger.info("Incoming JSON payload: #{payload}") case Jason.decode(payload) do {:ok, %{"oid" => oid, "sn" => sn, "uploadTime" => upload_time, "org.device" => items} = decoded_json} -> Logger.info("Decoded JSON: #{inspect(decoded_json)}") # Extract vendor and model from message vendor = decoded_json["vendor"] model = decoded_json["model"] # Extract version fields versions = extract_versions(items) Logger.info("Extracted versions for #{sn}: #{inspect(versions)}") # Check if any version is missing - trigger Scenario 1 auto-push if vendor && model && DaProductApp.TerminalManagement.AutoPushService.has_missing_versions?(versions) do Logger.info("Detected missing versions for #{sn}, triggering auto-push") DaProductApp.TerminalManagement.AutoPushService.trigger_missing_version_push(sn, vendor, model) end # Find or create terminal terminal = case DaProductApp.TerminalManagement.get_terminal_by_serial(sn) do nil -> {:ok, t} = DaProductApp.TerminalManagement.create_terminal(%{serial_number: sn, oid: oid, vendor: vendor, model: model}) t t -> # Update vendor and model if they changed DaProductApp.TerminalManagement.update_terminal(t, %{vendor: vendor, model: model}) t end # Always update terminal status to "connected" and set updated_at to now now = DateTime.utc_now() |> DateTime.truncate(:second) terminal_update_attrs = %{ status: "connected", updated_at: now, parameter_config_version: versions.parameter_config, emv_config_version: versions.emv_config, keys_config_version: versions.keys_config, vendor: vendor, model: model } DaProductApp.TerminalManagement.update_terminal_status(terminal, terminal_update_attrs) # Update device monitor heartbeat DaProductApp.TerminalManagement.update_heartbeat(sn) # Create status log {:ok, status_log} = DaProductApp.TerminalManagement.create_status_log(%{ terminal_id: terminal.id, oid: oid, upload_time: upload_time }) Logger.info("Saved status_log: #{inspect(status_log)}") # Insert status items and update terminal status based on actual status values Enum.each(items, fn item -> Logger.info("Saving status item: #{inspect(item)}") result = DaProductApp.TerminalManagement.create_status_item(%{ status_log_id: status_log.id, itemkey: item["itemkey"], value: to_string(item["value"]), timestamp: item["timestamp"], message: item["message"] }) Logger.info("Result of create_status_item: #{inspect(result)}") # Update terminal status based on status or network itemkey values (DRY version) if item["itemkey"] in ["status", "network"] do case String.downcase(to_string(item["value"])) do "online" -> Logger.info("Setting terminal #{sn} status to 'online' from #{item["itemkey"]} itemkey") DaProductApp.TerminalManagement.update_terminal_status(terminal, %{status: "online"}) "offline" -> Logger.info("Setting terminal #{sn} status to 'offline' from #{item["itemkey"]} itemkey") DaProductApp.TerminalManagement.update_terminal_status(terminal, %{status: "offline"}) _ -> Logger.info("Unknown #{item["itemkey"]} value: #{item["value"]}, keeping as connected") end end # Broadcast live metric updates for AG Grid if item["itemkey"] in ["battery", "cpu", "memory"] do Phoenix.PubSub.broadcast( DaProductApp.PubSub, "tms:terminals", {:terminal_status_updated, sn, %{itemkey: item["itemkey"], value: item["value"]}} ) end # Also broadcast status changes to update the terminal grid in real-time if item["itemkey"] in ["status", "network"] do Phoenix.PubSub.broadcast( DaProductApp.PubSub, "tms:terminals", {:terminal_status_changed, sn, %{status: item["value"]}} ) end end) # Update terminal status if status item present case Enum.find(items, fn item -> item["itemkey"] == "status" end) do %{"value" => status_value} -> DaProductApp.TerminalManagement.update_terminal_status(terminal, status_value) _ -> :ok end Phoenix.PubSub.broadcast( DaProductApp.PubSub, "tms:terminals", {:terminal_status_updated, sn} ) :ok error -> Logger.error("Failed to parse status payload: #{inspect(error)}") :error end {:ok, state} end @impl true def handle_message(["ota", product_key, serial_number, "logpush"], payload, state) do Logger.info("Received log message for terminal #{serial_number}") Logger.debug("Raw payload length: #{byte_size(payload)}") Logger.debug("Raw payload (first 200 chars): #{String.slice(payload, 0, 200)}") Logger.debug("Payload encoding valid?: #{String.valid?(payload)}") # Broadcast to any active remote log sessions for this terminal Phoenix.PubSub.broadcast( DaProductApp.PubSub, "remote_logs:#{serial_number}", {:log_message, payload} ) {:ok, state} end defp parse_device_id("qr_" <> rest), do: rest defp parse_device_id(client_id), do: client_id # Helper function to extract version information from status items defp extract_versions(items) when is_list(items) do %{ parameter_config: find_version_in_items(items, "parameter_config"), emv_config: find_version_in_items(items, "emv_config"), keys_config: find_version_in_items(items, "keys_config"), application: find_version_in_items(items, "application") } end defp extract_versions(_), do: %{parameter_config: nil, emv_config: nil, keys_config: nil, application: nil} # Helper function to find a specific version value in items list defp find_version_in_items(items, key) when is_list(items) do items |> Enum.find(fn item -> item["itemkey"] == key end) |> case do nil -> nil item -> item["value"] end end defp find_version_in_items(_, _), do: nil end