defmodule DaProductApp.ParameterManagement do @moduledoc """ Context module for parameter management functionality. Handles parameter templates, device overrides, and parameter resolution. """ import Ecto.Query, warn: false require Logger alias DaProductApp.Repo alias DaProductApp.ParameterManagement.{ ParameterCategory, ParameterDefinition, ParameterTemplate, ParameterTemplateValue, DeviceParameterOverride, ParameterPushLog, ParameterPushDetail } alias DaProductApp.TerminalManagement.{TmsTerminal, OtaService} # Parameter Categories def list_parameter_categories(filters \\ %{}) do query = ParameterCategory |> ParameterCategory.active_query() |> ParameterCategory.root_categories() |> ParameterCategory.with_children() Repo.all(query) end def get_parameter_category!(id), do: Repo.get!(ParameterCategory, id) def create_parameter_category(attrs \\ %{}) do %ParameterCategory{} |> ParameterCategory.changeset(attrs) |> Repo.insert() end def update_parameter_category(%ParameterCategory{} = category, attrs) do category |> ParameterCategory.changeset(attrs) |> Repo.update() end def delete_parameter_category(%ParameterCategory{} = category) do Repo.delete(category) end # Parameter Definitions def list_parameter_definitions(filters \\ %{}) do query = ParameterDefinition |> ParameterDefinition.active_query() query = case filters do %{category_id: category_id} when not is_nil(category_id) -> ParameterDefinition.by_category(query, category_id) _ -> query end Repo.all(query) end def list_parameters_by_category(category_id) do ParameterDefinition |> ParameterDefinition.active_query() |> ParameterDefinition.by_category(category_id) |> Repo.all() end def get_parameter_definition!(id), do: Repo.get!(ParameterDefinition, id) def create_parameter_definition(attrs \\ %{}) do %ParameterDefinition{} |> ParameterDefinition.changeset(attrs) |> Repo.insert() end def update_parameter_definition(%ParameterDefinition{} = definition, attrs) do definition |> ParameterDefinition.changeset(attrs) |> Repo.update() end def delete_parameter_definition(%ParameterDefinition{} = definition) do if definition.is_system do {:error, "Cannot delete system parameters"} else Repo.delete(definition) end end def change_parameter_definition(%ParameterDefinition{} = definition, attrs \\ %{}) do ParameterDefinition.changeset(definition, attrs) end # Parameter Templates def list_parameter_templates(filters \\ %{}) do query = ParameterTemplate |> ParameterTemplate.active_query() query = case filters do %{vendor: vendor} when not is_nil(vendor) -> ParameterTemplate.by_vendor(query, vendor) _ -> query end Repo.all(query) end def list_template_values_by_template(template_id) do ParameterTemplateValue |> ParameterTemplateValue.by_template(template_id) |> ParameterTemplateValue.with_parameter_and_category() |> Repo.all() end def count_template_values_by_template(template_id) do ParameterTemplateValue |> ParameterTemplateValue.by_template(template_id) |> Repo.aggregate(:count, :id) end def get_parameter_template!(id), do: Repo.get!(ParameterTemplate, id) def get_best_template_for_terminal(%TmsTerminal{} = terminal) do ParameterTemplate |> ParameterTemplate.active_query() |> ParameterTemplate.by_vendor_and_model(terminal.vendor, terminal.model) |> limit(1) |> Repo.one() end def create_parameter_template(attrs \\ %{}) do %ParameterTemplate{} |> ParameterTemplate.changeset(attrs) |> Repo.insert() end def update_parameter_template(%ParameterTemplate{} = template, attrs) do template |> ParameterTemplate.changeset(attrs) |> Repo.update() end def delete_parameter_template(%ParameterTemplate{} = template) do Repo.delete(template) end def change_parameter_template(%ParameterTemplate{} = template, attrs \\ %{}) do ParameterTemplate.changeset(template, attrs) end # Template Values def get_template_values(template_id) do ParameterTemplateValue |> ParameterTemplateValue.by_template(template_id) |> Repo.all() end def set_template_value(template_id, parameter_definition_id, value) do case Repo.get_by(ParameterTemplateValue, template_id: template_id, parameter_definition_id: parameter_definition_id) do nil -> %ParameterTemplateValue{} |> ParameterTemplateValue.changeset(%{ template_id: template_id, parameter_definition_id: parameter_definition_id, value: value, is_overridden: true }) |> Repo.insert() existing -> existing |> ParameterTemplateValue.changeset(%{value: value, is_overridden: true}) |> Repo.update() end end def bulk_update_template_values(template_id, parameter_values) do Repo.transaction(fn -> Enum.map(parameter_values, fn {param_id, value} -> case set_template_value(template_id, param_id, value) do {:ok, template_value} -> template_value {:error, changeset} -> Repo.rollback(changeset) end end) end) end # Device Parameter Overrides def get_device_overrides(terminal_id) do DeviceParameterOverride |> DeviceParameterOverride.by_terminal(terminal_id) |> Repo.all() end def set_device_override(terminal_id, parameter_definition_id, value, opts \\ []) do source = Keyword.get(opts, :source, "manual") created_by_id = Keyword.get(opts, :created_by_id) case Repo.get_by(DeviceParameterOverride, terminal_id: terminal_id, parameter_definition_id: parameter_definition_id) do nil -> %DeviceParameterOverride{} |> DeviceParameterOverride.changeset(%{ terminal_id: terminal_id, parameter_definition_id: parameter_definition_id, value: value, source: source, created_by_id: created_by_id }) |> Repo.insert() existing -> existing |> DeviceParameterOverride.changeset(%{ value: value, source: source, created_by_id: created_by_id, applied_at: nil # Reset applied_at when value changes }) |> Repo.update() end end def remove_device_override(terminal_id, parameter_definition_id) do case Repo.get_by(DeviceParameterOverride, terminal_id: terminal_id, parameter_definition_id: parameter_definition_id) do nil -> {:error, :not_found} override -> Repo.delete(override) end end def bulk_set_device_overrides(terminal_id, parameter_values, opts \\ []) do Repo.transaction(fn -> Enum.map(parameter_values, fn {param_id, value} -> case set_device_override(terminal_id, param_id, value, opts) do {:ok, override} -> override {:error, changeset} -> Repo.rollback(changeset) end end) end) end # Parameter Resolution (The key function that combines template + overrides) def resolve_parameters_for_terminal(%TmsTerminal{} = terminal) do # Get the best template for this terminal template = get_best_template_for_terminal(terminal) if template do resolve_parameters_for_terminal_with_template(terminal, template) else # Fallback to default values from parameter definitions resolve_parameters_with_defaults(terminal) end end def resolve_parameters_for_terminal_with_template(%TmsTerminal{} = terminal, %ParameterTemplate{} = template) do # Get template values template_values = get_template_values(template.id) |> Enum.into(%{}, fn tv -> {tv.parameter_definition.key, tv.value} end) # Get device overrides device_overrides = get_device_overrides(terminal.id) |> Enum.into(%{}, fn override -> {override.parameter_definition.key, override.value} end) # Merge with overrides taking precedence resolved_params = Map.merge(template_values, device_overrides) # Add metadata %{ parameters: resolved_params, template_used: template.name, template_id: template.id, overrides_count: map_size(device_overrides), resolved_at: DateTime.utc_now() } end defp resolve_parameters_with_defaults(%TmsTerminal{} = terminal) do # Get all parameter definitions with their default values definitions = list_parameter_definitions() template_values = Enum.into(definitions, %{}, fn param -> {param.key, param.default_value} end) # Get device overrides device_overrides = get_device_overrides(terminal.id) |> Enum.into(%{}, fn override -> {override.parameter_definition.key, override.value} end) # Merge with overrides taking precedence resolved_params = Map.merge(template_values, device_overrides) %{ parameters: resolved_params, template_used: "defaults", template_id: nil, overrides_count: map_size(device_overrides), resolved_at: DateTime.utc_now() } end # Parameter Push Functions (Integration with OTA system) def push_parameters_to_terminal(%TmsTerminal{} = terminal, opts \\ []) do push_type = Keyword.get(opts, :push_type, "full") triggered_by_id = Keyword.get(opts, :triggered_by_id) specific_parameters = Keyword.get(opts, :parameters, []) # Resolve parameters for this terminal resolution = resolve_parameters_for_terminal(terminal) # Filter parameters if specific ones requested parameters_to_send = if Enum.empty?(specific_parameters) do resolution.parameters else Map.take(resolution.parameters, specific_parameters) end # Create push log request_id = System.unique_integer([:positive]) push_log_attrs = %{ terminal_id: terminal.id, template_id: resolution.template_id, parameters_sent: parameters_to_send, request_id: request_id, status: "pending", push_type: push_type, triggered_by_id: triggered_by_id } case create_parameter_push_log(push_log_attrs) do {:ok, push_log} -> # Create push details for each parameter create_push_details(push_log, parameters_to_send) # Convert to OTA configuration format and send send_parameters_via_ota(terminal, parameters_to_send, request_id) {:ok, push_log} {:error, changeset} -> {:error, changeset} end end def apply_template_to_terminal(%TmsTerminal{} = terminal, template_id, opts \\ []) do template = get_parameter_template!(template_id) triggered_by_id = Keyword.get(opts, :triggered_by_id) # Get template values template_values = get_template_values(template_id) |> Enum.into(%{}, fn tv -> {tv.parameter_definition.key, tv.value} end) # Get existing device overrides for comparison device_overrides = get_device_overrides(terminal.id) |> Enum.into(%{}, fn override -> {override.parameter_definition.key, override.value} end) # Merge template values with overrides final_parameters = Map.merge(template_values, device_overrides) # Push the resolved parameters push_parameters_to_terminal(terminal, push_type: "template_apply", triggered_by_id: triggered_by_id, parameters: Map.keys(final_parameters) ) end def bulk_apply_template_to_terminals(terminal_ids, template_id, opts \\ []) do triggered_by_id = Keyword.get(opts, :triggered_by_id) Repo.transaction(fn -> Enum.map(terminal_ids, fn terminal_id -> terminal = Repo.get!(TmsTerminal, terminal_id) case apply_template_to_terminal(terminal, template_id, triggered_by_id: triggered_by_id) do {:ok, push_log} -> push_log {:error, reason} -> Repo.rollback(reason) end end) end) end # Push Log Management def list_parameter_push_logs(terminal_serial \\ nil, filters \\ %{}) do query = ParameterPushLog query = if terminal_serial do terminal = TerminalManagement.get_terminal_by_serial!(terminal_serial) ParameterPushLog.by_terminal(query, terminal.id) else query end query = case filters do %{status: status} when not is_nil(status) -> ParameterPushLog.by_status(query, status) %{hours: hours} when not is_nil(hours) -> ParameterPushLog.recent_pushes(query, hours) _ -> query end Repo.all(query) end def get_parameter_push_log!(id) do ParameterPushLog |> ParameterPushLog.with_details() |> Repo.get!(id) end # Parameter Push Logs - CRUD operations def create_parameter_push_log(attrs \\ %{}) do %ParameterPushLog{} |> ParameterPushLog.changeset(attrs) |> Repo.insert() end def get_parameter_definition_by_key(key) do case Repo.get_by(ParameterDefinition, key: key) do nil -> {:error, :not_found} definition -> {:ok, definition} end end def get_parameter_definition_by_key!(key) do Repo.get_by!(ParameterDefinition, key: key) end def handle_parameter_error(device_serial, request_id, error_code, error_message, parameter_key \\ nil) do with {:ok, terminal} <- TerminalManagement.get_terminal_by_serial(device_serial), {:ok, push_log} <- get_push_log_by_request(terminal.id, request_id) do # Update the push log status update_attrs = %{ status: "failed", error_message: "#{error_code}: #{error_message}" } case update_parameter_push_log(push_log, update_attrs) do {:ok, updated_log} -> # If specific parameter key is provided, update the detail if parameter_key do update_push_detail_error(push_log.id, parameter_key, error_message) end Logger.info("Updated parameter push log #{push_log.id} with error: #{error_message}") {:ok, updated_log} {:error, reason} -> Logger.error("Failed to update push log with error: #{inspect(reason)}") {:error, reason} end else {:error, :not_found} -> Logger.warn("Push log not found for terminal #{device_serial}, request #{request_id}") {:error, :not_found} error -> Logger.error("Error handling parameter error: #{inspect(error)}") error end end defp get_push_log_by_request(terminal_id, request_id) do case Repo.get_by(ParameterPushLog, terminal_id: terminal_id, request_id: request_id) do nil -> {:error, :not_found} push_log -> {:ok, push_log} end end defp update_parameter_push_log(%ParameterPushLog{} = push_log, attrs) do push_log |> ParameterPushLog.changeset(attrs) |> Repo.update() end defp update_push_detail_error(push_log_id, parameter_key, error_message) do case Repo.get_by(ParameterPushDetail, push_log_id: push_log_id, parameter_key: parameter_key) do nil -> Logger.warn("Push detail not found for parameter #{parameter_key}") {:error, :not_found} detail -> detail |> ParameterPushDetail.changeset(%{status: "failed", error_message: error_message}) |> Repo.update() end end # Handle acknowledgments from devices (called by MQTT handlers) def handle_parameter_acknowledgment(device_serial, request_id, ack_status, ack_details \\ %{}) do case Repo.get_by(TmsTerminal, serial_number: device_serial) do nil -> {:error, :terminal_not_found} terminal -> case Repo.get_by(ParameterPushLog, terminal_id: terminal.id, request_id: request_id) do nil -> {:error, :push_log_not_found} push_log -> now = DateTime.utc_now() |> DateTime.truncate(:second) case ack_status do "OK" -> # Mark as acknowledged and update applied_at for overrides update_parameter_push_log(push_log, %{ status: "acknowledged", acknowledged_at: now }) mark_device_overrides_as_applied(terminal.id, now) Logger.info("Parameter push acknowledged for terminal #{device_serial}, request #{request_id}") {:ok, "Parameters acknowledged"} _ -> error_message = ack_details["error"] || "Device rejected parameters" update_parameter_push_log(push_log, %{ status: "failed", error_message: error_message }) Logger.warning("Parameter push failed for terminal #{device_serial}, request #{request_id}: #{error_message}") {:ok, "Parameters failed"} end end end end # Convert resolved parameters to OTA configuration format defp send_parameters_via_ota(terminal, parameters, request_id) do # Convert parameter format to match existing OTA configuration structure ota_config = build_ota_config_from_parameters(terminal.serial_number, parameters, request_id) case DaProductApp.TerminalManagement.create_ota_configuration(ota_config) do {:ok, config} -> # Update push log status to "sent" case Repo.get_by(ParameterPushLog, terminal_id: terminal.id, request_id: request_id) do nil -> :ok push_log -> update_parameter_push_log(push_log, %{ status: "sent", sent_at: DateTime.utc_now() |> DateTime.truncate(:second) }) end # Send via existing OTA service OtaService.send_ota_configuration(config) {:error, reason} -> # Update push log status to "failed" case Repo.get_by(ParameterPushLog, terminal_id: terminal.id, request_id: request_id) do nil -> :ok push_log -> update_parameter_push_log(push_log, %{ status: "failed", error_message: "Failed to create OTA configuration: #{inspect(reason)}" }) end {:error, reason} end end defp build_ota_config_from_parameters(device_serial, parameters, request_id) do %{ request_id: request_id, device_serial: device_serial, merchant_config: true, merchant_id: parameters["merchant_id"] || "900890089008000", terminal_id: parameters["terminal_id"] || device_serial, mqtt_ip: parameters["mqtt_ip"] || "testapp.ariticapp.com", mqtt_port: parse_integer(parameters["mqtt_port"], 1883), http_ip: parameters["http_ip"] || "demo.ctrmv.com", http_port: parse_integer(parameters["http_port"], 4001), product_key: parameters["product_key"] || "pFppbioOCKlo5c8E", product_secret: parameters["product_secret"] || "sj2AJl102397fQAV", client_id: parameters["client_id"] || device_serial, username: parameters["username"] || "user001", mqtt_topic: "/ota/#{parameters["product_key"] || "pFppbioOCKlo5c8E"}/#{device_serial}/update", keepalive_time: parse_integer(parameters["keepalive_time"], 300), play_language: parse_integer(parameters["play_language"], 1), heartbeat_interval: parse_integer(parameters["heartbeat_interval"], 300) } end defp create_push_details(push_log, parameters) do Enum.each(parameters, fn {key, value} -> case get_parameter_definition_by_key(key) do nil -> :ok # Skip unknown parameters param_def -> %ParameterPushDetail{} |> ParameterPushDetail.changeset(%{ push_log_id: push_log.id, parameter_definition_id: param_def.id, parameter_key: key, value_sent: value }) |> Repo.insert() end end) end defp mark_device_overrides_as_applied(terminal_id, applied_at) do DeviceParameterOverride |> DeviceParameterOverride.by_terminal(terminal_id) |> DeviceParameterOverride.pending_overrides() |> Repo.update_all(set: [applied_at: applied_at]) end defp parse_integer(value, default) when is_binary(value) do case Integer.parse(value) do {int, ""} -> int _ -> default end end defp parse_integer(value, _default) when is_integer(value), do: value defp parse_integer(_value, default), do: default # Statistics and Reporting def get_parameter_management_stats do %{ total_templates: Repo.aggregate(ParameterTemplate, :count, :id), active_templates: Repo.aggregate(ParameterTemplate.active_query(), :count, :id), total_parameters: Repo.aggregate(ParameterDefinition, :count, :id), device_overrides: Repo.aggregate(DeviceParameterOverride, :count, :id), recent_pushes: Repo.aggregate(ParameterPushLog.recent_pushes(ParameterPushLog, 24), :count, :id), successful_pushes: Repo.aggregate(ParameterPushLog.by_status(ParameterPushLog, "acknowledged"), :count, :id), failed_pushes: Repo.aggregate(ParameterPushLog.by_status(ParameterPushLog, "failed"), :count, :id) } end # Enhanced functions for UI integration def list_recent_parameter_push_logs(filters \\ %{}) do # Handle both keyword lists and maps limit_value = case filters do filters when is_map(filters) -> Map.get(filters, :limit, 100) filters when is_list(filters) -> Keyword.get(filters, :limit, 100) _ -> 100 end query = ParameterPushLog |> preload([:terminal, :template, :triggered_by]) |> order_by([p], desc: p.inserted_at) |> limit(^limit_value) query = apply_parameter_log_filters(query, filters) Repo.all(query) end def get_latest_parameter_status(device_serial) do from(p in ParameterPushLog, join: t in TmsTerminal, on: p.terminal_id == t.id, where: t.serial_number == ^device_serial, order_by: [desc: p.inserted_at], limit: 1, preload: [:template] ) |> Repo.one() end def check_template_compatibility(template_id, device_serial) do with {:ok, template} <- get_template_if_exists(template_id), {:ok, terminal} <- get_terminal_if_exists(device_serial) do compatibility_check = %{ template_vendor: template.vendor, template_model: template.model, device_vendor: terminal.vendor, device_model: terminal.model, firmware_version: terminal.system_version } compatible = check_device_template_compatibility(compatibility_check) if compatible do {:ok, %{ compatible: true, device_info: %{ vendor: terminal.vendor, model: terminal.model, firmware: terminal.system_version }, template_info: %{ vendor: template.vendor, model: template.model, parameter_count: count_template_parameters(template_id) } }} else {:error, "Template not compatible with device model/vendor"} end else {:error, :template_not_found} -> {:error, "Template not found"} {:error, :terminal_not_found} -> {:error, "Device not found"} error -> error end end def get_parameter_push_statistics do %{ total_pushes: Repo.aggregate(ParameterPushLog, :count, :id), successful_pushes: count_by_status("acknowledged"), failed_pushes: count_by_status("failed"), pending_pushes: count_by_status("pending"), recent_pushes_24h: count_recent_pushes(24), recent_pushes_7d: count_recent_pushes(24 * 7), most_used_templates: get_most_used_templates(5), push_success_rate: calculate_push_success_rate() } end # Alias for dashboard compatibility def get_parameter_job_statistics do stats = get_parameter_push_statistics() %{ total: stats.total_pushes, completed: stats.successful_pushes, running: stats.pending_pushes, failed: stats.failed_pushes } end def list_template_values_by_template(template_id) do from(tv in ParameterTemplateValue, join: pd in ParameterDefinition, on: tv.parameter_definition_id == pd.id, where: tv.parameter_template_id == ^template_id, select: %{ parameter_key: pd.key, parameter_name: pd.name, parameter_value: tv.value, data_type: pd.data_type }, order_by: [asc: pd.display_order, asc: pd.name] ) |> Repo.all() end # Private helper functions defp apply_parameter_log_filters(query, filters) do # Convert keyword lists to maps with string keys for consistency filters = case filters do filters when is_list(filters) -> filters |> Enum.map(fn {k, v} -> {to_string(k), v} end) |> Map.new() filters when is_map(filters) -> filters _ -> %{} end Enum.reduce(filters, query, fn {"device_sn", device_sn}, query when is_binary(device_sn) and device_sn != "" -> from(p in query, join: t in TmsTerminal, on: p.terminal_id == t.id, where: ilike(t.serial_number, ^"%#{device_sn}%") ) {"status", status}, query when is_binary(status) and status != "" -> from(p in query, where: p.status == ^status) {"push_type", push_type}, query when is_binary(push_type) and push_type != "" -> from(p in query, where: p.push_type == ^push_type) _filter, query -> query end) end defp get_template_if_exists(template_id) do case Repo.get(ParameterTemplate, template_id) do nil -> {:error, :template_not_found} template -> {:ok, template} end end defp get_terminal_if_exists(device_serial) do case DaProductApp.TerminalManagement.get_terminal_by_serial(device_serial) do nil -> {:error, :terminal_not_found} terminal -> {:ok, terminal} end end defp check_device_template_compatibility(compatibility_info) do # Basic compatibility logic - can be enhanced template_vendor = compatibility_info.template_vendor template_model = compatibility_info.template_model device_vendor = compatibility_info.device_vendor device_model = compatibility_info.device_model cond do # If template has no vendor/model restrictions, it's compatible is_nil(template_vendor) and is_nil(template_model) -> true # If template specifies vendor but not model, check vendor match template_vendor && is_nil(template_model) -> String.downcase(template_vendor) == String.downcase(device_vendor || "") # If template specifies both vendor and model, check both template_vendor && template_model -> String.downcase(template_vendor) == String.downcase(device_vendor || "") and String.downcase(template_model) == String.downcase(device_model || "") # Default to compatible if template model only (no vendor) true -> true end end defp count_template_parameters(template_id) do from(tv in ParameterTemplateValue, where: tv.parameter_template_id == ^template_id, select: count(tv.id) ) |> Repo.one() || 0 end defp count_by_status(status) do from(p in ParameterPushLog, where: p.status == ^status, select: count(p.id) ) |> Repo.one() || 0 end defp count_recent_pushes(hours) do hours_ago = DateTime.utc_now() |> DateTime.add(-hours, :hour) from(p in ParameterPushLog, where: p.inserted_at >= ^hours_ago, select: count(p.id) ) |> Repo.one() || 0 end defp get_most_used_templates(limit) do from(p in ParameterPushLog, join: t in ParameterTemplate, on: p.template_id == t.id, group_by: [t.id, t.name], select: %{template_id: t.id, template_name: t.name, usage_count: count(p.id)}, order_by: [desc: count(p.id)], limit: ^limit ) |> Repo.all() end defp calculate_push_success_rate do total = Repo.aggregate(ParameterPushLog, :count, :id) successful = count_by_status("acknowledged") if total > 0 do Float.round(successful / total * 100, 2) else 0.0 end end end