defmodule DaProductApp.TerminalManagement.RemoteLogService do @moduledoc """ Service for handling remote debug log streaming from terminals via MQTT. Manages MQTT connections, log streaming commands, and real-time log broadcasting. """ use GenServer require Logger alias DaProductApp.PubSub # Configuration @max_log_lines 1000 @mqtt_topic_template "/ota/:product_key/:serial_number/logpush" @default_product_key "pFppbioOCKlo5c8E" # Client API @doc """ Starts a remote log session for a terminal. """ def start_log_session(serial_number, user_id, product_key \\ @default_product_key) do session_id = generate_session_id() topic = build_topic(product_key, serial_number) case DynamicSupervisor.start_child( DaProductApp.RemoteLogSupervisor, {__MODULE__, {session_id, serial_number, user_id, topic}} ) do {:ok, pid} -> {:ok, session_id, pid} {:error, reason} -> {:error, reason} end end @doc """ Stops a remote log session. """ def stop_log_session(session_id) do case Registry.lookup(DaProductApp.RemoteLogRegistry, session_id) do [{pid, _}] -> GenServer.stop(pid) [] -> {:error, :not_found} end end @doc """ Subscribes to MQTT topic and starts log streaming. """ def start_logging(session_id, mode, opts \\ %{}) do case Registry.lookup(DaProductApp.RemoteLogRegistry, session_id) do [{pid, _}] -> GenServer.call(pid, {:start_logging, mode, opts}) [] -> {:error, :not_found} end end @doc """ Stops log streaming and unsubscribes from MQTT topic. """ def stop_logging(session_id) do case Registry.lookup(DaProductApp.RemoteLogRegistry, session_id) do [{pid, _}] -> GenServer.call(pid, :stop_logging) [] -> {:error, :not_found} end end @doc """ Clears the log buffer for a session. """ def clear_logs(session_id) do case Registry.lookup(DaProductApp.RemoteLogRegistry, session_id) do [{pid, _}] -> GenServer.call(pid, :clear_logs) [] -> {:error, :not_found} end end @doc """ Gets current logs for a session. """ def get_logs(session_id) do case Registry.lookup(DaProductApp.RemoteLogRegistry, session_id) do [{pid, _}] -> GenServer.call(pid, :get_logs) [] -> {:error, :not_found} end end # GenServer Implementation def start_link({session_id, serial_number, user_id, topic}) do GenServer.start_link(__MODULE__, {session_id, serial_number, user_id, topic}, name: {:via, Registry, {DaProductApp.RemoteLogRegistry, session_id}} ) end @impl true def init({session_id, serial_number, user_id, topic}) do # Subscribe to log messages for this terminal via PubSub Phoenix.PubSub.subscribe(DaProductApp.PubSub, "remote_logs:#{serial_number}") state = %{ session_id: session_id, serial_number: serial_number, user_id: user_id, mqtt_topic: topic, client_id: nil, connected: false, logging: false, log_lines: [], mode: nil, request_id: nil } {:ok, state} end @impl true def handle_call({:start_logging, mode, opts}, _from, state) do case send_start_command(state, mode, opts) do {:ok, request_id} -> updated_state = %{state | logging: true, connected: true, # Mark as connected since we're using main MQTT mode: mode, request_id: request_id } broadcast_status_update(updated_state) {:reply, {:ok, request_id}, updated_state} {:error, reason} -> {:reply, {:error, reason}, state} end end @impl true def handle_call(:stop_logging, _from, state) do case send_stop_command(state) do {:ok, _} -> updated_state = %{state | logging: false, connected: false, request_id: nil } broadcast_status_update(updated_state) {:reply, :ok, updated_state} {:error, reason} -> {:reply, {:error, reason}, state} end end @impl true def handle_call(:clear_logs, _from, state) do updated_state = %{state | log_lines: []} broadcast_logs_update(updated_state) {:reply, :ok, updated_state} end @impl true def handle_call(:get_logs, _from, state) do {:reply, {:ok, state.log_lines}, state} end @impl true def handle_info({:log_message, payload}, state) do # Only process log messages if we're actively logging if state.logging do Logger.debug("Processing log message in session #{state.session_id}") Logger.debug("Payload type: #{inspect(if is_binary(payload), do: :binary, else: payload.__struct__)}") Logger.debug("Payload length: #{if is_binary(payload), do: byte_size(payload), else: "not binary"}") case parse_log_message(payload) do {:ok, log_entry} -> updated_logs = add_log_line(state.log_lines, log_entry) updated_state = %{state | log_lines: updated_logs} broadcast_logs_update(updated_state) {:noreply, updated_state} {:error, reason} -> Logger.error("Failed to parse log message: #{reason}") {:noreply, state} end else {:noreply, state} end end @impl true def handle_info({:mqtt_message, payload}, state) do case parse_log_message(payload) do {:ok, log_entry} -> updated_logs = add_log_line(state.log_lines, log_entry) updated_state = %{state | log_lines: updated_logs} broadcast_logs_update(updated_state) {:noreply, updated_state} {:error, reason} -> Logger.error("Failed to parse log message: #{reason}") {:noreply, state} end end @impl true def handle_info(_msg, state) do {:noreply, state} end # Private Functions defp generate_session_id do :crypto.strong_rand_bytes(8) |> Base.encode16() end defp build_topic(product_key, serial_number) do @mqtt_topic_template |> String.replace(":product_key", product_key) |> String.replace(":serial_number", serial_number) end defp send_start_command(state, "REALTIME", _opts) do request_id = "debug_log_#{System.unique_integer([:positive])}" command = %{ "command" => "debug_log_upload_start", "mode" => "REALTIME", "request_id" => request_id } publish_command(state, command) {:ok, request_id} end defp send_start_command(state, "WITH_DELAY", opts) do request_id = "debug_log_#{System.unique_integer([:positive])}" command = %{ "command" => "debug_log_upload_start", "last_lines_count" => Map.get(opts, :last_lines_count, 10), "frequency_send" => Map.get(opts, :frequency_send, 15), "log_level" => Map.get(opts, :log_level, "ALL"), "request_id" => request_id } publish_command(state, command) {:ok, request_id} end defp send_stop_command(state) do request_id = "debug_log_#{System.unique_integer([:positive])}" command = %{ "command" => "debug_log_upload_stop", "request_id" => request_id } publish_command(state, command) {:ok, request_id} end defp publish_command(state, command) do topic = state.mqtt_topic |> String.replace("/logpush", "/update") payload = Jason.encode!(command) # Use the main phoenix MQTT client instead of creating our own DaProductApp.MQTT.publish(topic, payload, qos: 1) end defp parse_log_message(payload) when is_binary(payload) do # Clean and validate the payload first cleaned_payload = payload |> String.trim() # Try multiple parsing strategies case try_parse_json(cleaned_payload) do {:ok, %{"logs" => logs, "session_info" => session_info}} -> timestamp = Map.get(session_info, "timestamp", DateTime.utc_now() |> DateTime.to_iso8601()) # Clean up the log message to handle escape sequences properly cleaned_logs = logs |> String.replace("\\r\\n", "\n") |> String.replace("\\n", "\n") |> String.replace("\\r", "\n") |> String.trim() log_entry = %{ timestamp: timestamp, message: cleaned_logs, session_info: session_info } {:ok, log_entry} {:ok, parsed_data} -> Logger.warning("Unexpected JSON structure: #{inspect(parsed_data)}") {:error, "unexpected_json_structure"} {:error, reason} -> Logger.error("Failed to parse log message JSON: #{inspect(reason)}") Logger.error("Payload causing error: #{inspect(payload)}") # Fallback: create a log entry with raw payload if it looks like it might contain useful info if String.contains?(payload, "logs") && String.length(payload) > 10 do fallback_entry = %{ timestamp: DateTime.utc_now() |> DateTime.to_iso8601(), message: "[RAW] #{payload}", session_info: %{"status" => "parse_error", "raw" => true} } {:ok, fallback_entry} else {:error, "json_parse_error"} end end end defp parse_log_message(payload) do Logger.error("Invalid payload type: #{inspect(payload)}") {:error, "invalid_payload_type"} end # Try different JSON parsing strategies defp try_parse_json(payload) do # Strategy 1: Direct parsing case Jason.decode(payload) do {:ok, result} -> {:ok, result} {:error, _} -> # Strategy 2: Try with string interpolation fix try_parse_with_fixes(payload) end end defp try_parse_with_fixes(payload) do # Fix common JSON issues that might occur in MQTT transmission attempts = [ # Attempt 1: Fix escape sequences payload |> String.replace(~r/(? String.replace("\r\n", "\\\\r\\\\n") |> String.replace("\r", "\\\\r") |> String.replace("\n", "\\\\n"), # Attempt 3: More aggressive cleaning payload |> String.replace(~r/[\r\n]+/, " ") |> String.trim(), # Attempt 4: Remove non-printable characters payload |> String.replace(~r/[^\x20-\x7E\x0A\x0D]/, "") |> String.trim() ] # Try each cleaned version Enum.reduce_while(attempts, {:error, "all_attempts_failed"}, fn attempt, _acc -> case Jason.decode(attempt) do {:ok, result} -> {:halt, {:ok, result}} {:error, _} -> {:cont, {:error, "parsing_failed"}} end end) end defp add_log_line(current_logs, new_entry) do updated_logs = [new_entry | current_logs] if length(updated_logs) > @max_log_lines do Enum.take(updated_logs, @max_log_lines) else updated_logs end end defp broadcast_status_update(state) do Phoenix.PubSub.broadcast( PubSub, "remote_log:#{state.session_id}", {:remote_log_status, %{ connected: state.connected, logging: state.logging, mode: state.mode, serial_number: state.serial_number }} ) end defp broadcast_logs_update(state) do Phoenix.PubSub.broadcast( PubSub, "remote_log:#{state.session_id}", {:remote_log_update, %{ logs: state.log_lines, count: length(state.log_lines) }} ) end end defmodule DaProductApp.TerminalManagement.RemoteLogService.Handler do @moduledoc """ MQTT handler for remote log sessions. """ use Tortoise.Handler require Logger @impl true def init(opts) do session_pid = Keyword.get(opts, :session_pid) {:ok, %{session_pid: session_pid}} end @impl true def connection(:up, state) do Logger.info("Remote log MQTT connection established") {:ok, state} end @impl true def connection(:down, state) do Logger.info("Remote log MQTT connection lost") {:ok, state} end @impl true def handle_message(_topic, payload, state) do if state.session_pid do send(state.session_pid, {:mqtt_message, payload}) end {:ok, state} end end