package com.shukria.kiosklauncher.util import android.content.Context import android.os.Handler import android.os.Looper import android.util.Log import org.eclipse.paho.client.mqttv3.IMqttActionListener import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.IMqttToken import org.eclipse.paho.client.mqttv3.MqttAsyncClient import org.eclipse.paho.client.mqttv3.MqttCallback import org.eclipse.paho.client.mqttv3.MqttConnectOptions import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence import org.json.JSONObject object MQTTManager : MqttCallback { private const val TAG = "MQTTManager" private var mqttClient: MqttAsyncClient? = null private var appContext: Context? = null private var serialNumber: String = "" val currentSerialNumber: String? get() = serialNumber.ifBlank { null } // Called by CommandDispatcher for every inbound TMS command var onCommandReceived: ((command: String, payload: JSONObject) -> Unit)? = null private val reconnectHandler = Handler(Looper.getMainLooper()) private var reconnectDelay = MQTTConfig.INITIAL_RECONNECT_DELAY_MS private var intentionalDisconnect = false val isConnected: Boolean get() = mqttClient?.isConnected == true // ------------------------------------------------------------------------- // Public API // ------------------------------------------------------------------------- fun init(context: Context, serialNumber: String) { this.appContext = context.applicationContext this.serialNumber = serialNumber connect() } fun publishJson(topic: String, json: JSONObject, qos: Int = 1) { publish(topic, json.toString().toByteArray(Charsets.UTF_8), qos) } fun publishBinary(topic: String, data: ByteArray, qos: Int = 0) { publish(topic, data, qos) } fun shutdown() { intentionalDisconnect = true reconnectHandler.removeCallbacksAndMessages(null) try { mqttClient?.disconnect() } catch (e: MqttException) { Log.w(TAG, "Error during disconnect: ${e.message}") } } // ------------------------------------------------------------------------- // Connection // ------------------------------------------------------------------------- private fun connect() { val context = appContext ?: return try { val brokerUri = "tcp://${MQTTConfig.BROKER_HOST}:${MQTTConfig.BROKER_PORT}" val persistence = MqttDefaultFilePersistence(context.filesDir.absolutePath) mqttClient = MqttAsyncClient(brokerUri, MQTTConfig.clientId(serialNumber), persistence) mqttClient?.setCallback(this) val options = MqttConnectOptions().apply { isCleanSession = true isAutomaticReconnect = false // we handle reconnect manually with backoff keepAliveInterval = MQTTConfig.KEEPALIVE_SECONDS connectionTimeout = MQTTConfig.CONNECTION_TIMEOUT_SECONDS } Log.d(TAG, "Connecting to $brokerUri as ${MQTTConfig.clientId(serialNumber)}") mqttClient?.connect(options, null, object : IMqttActionListener { override fun onSuccess(token: IMqttToken?) { reconnectDelay = MQTTConfig.INITIAL_RECONNECT_DELAY_MS intentionalDisconnect = false Log.i(TAG, "MQTT connected to ${MQTTConfig.BROKER_HOST}") subscribeToCommandTopic() } override fun onFailure(token: IMqttToken?, exception: Throwable?) { Log.e(TAG, "MQTT connect failed: ${exception?.message}") scheduleReconnect() } }) } catch (e: MqttException) { Log.e(TAG, "MQTT init error: ${e.message}") scheduleReconnect() } } private fun subscribeToCommandTopic() { val topic = MQTTConfig.commandTopic(serialNumber) mqttClient?.subscribe(topic, 1, null, object : IMqttActionListener { override fun onSuccess(token: IMqttToken?) { Log.i(TAG, "Subscribed to $topic") } override fun onFailure(token: IMqttToken?, exception: Throwable?) { Log.e(TAG, "Subscribe to $topic failed: ${exception?.message}") } }) } private fun scheduleReconnect() { if (intentionalDisconnect) return Log.d(TAG, "Reconnecting in ${reconnectDelay / 1000}s...") reconnectHandler.postDelayed({ if (!isConnected && !intentionalDisconnect) { connect() } }, reconnectDelay) reconnectDelay = minOf(reconnectDelay * 2, MQTTConfig.MAX_RECONNECT_DELAY_MS) } // ------------------------------------------------------------------------- // Publish // ------------------------------------------------------------------------- private fun publish(topic: String, payload: ByteArray, qos: Int) { if (!isConnected) { Log.w(TAG, "Cannot publish to $topic — not connected") return } try { val message = MqttMessage(payload).apply { this.qos = qos this.isRetained = false } mqttClient?.publish(topic, message) } catch (e: MqttException) { Log.e(TAG, "Publish to $topic failed: ${e.message}") } } // ------------------------------------------------------------------------- // MqttCallback // ------------------------------------------------------------------------- override fun connectionLost(cause: Throwable?) { Log.e(TAG, "MQTT connection lost: ${cause?.message}") scheduleReconnect() } override fun messageArrived(topic: String?, message: MqttMessage?) { val raw = message?.payload?.toString(Charsets.UTF_8) ?: return Log.d(TAG, "Received on $topic: $raw") try { val json = JSONObject(raw) val command = json.optString("command").ifBlank { // MF919-style payloads use "command" inside a "type":"tms_command" envelope json.optString("type") } onCommandReceived?.invoke(command, json) } catch (e: Exception) { Log.e(TAG, "Failed to parse MQTT message: ${e.message}") } } override fun deliveryComplete(token: IMqttDeliveryToken?) { // no-op for QoS 0/1 fire-and-forget frames } }