defmodule DaProductApp.Events.EventDispatcher do @moduledoc """ Event dispatcher using Phoenix.PubSub for payment processing pipeline events. Provides Symfony-style event dispatching with: - Event registration and listener management - Priority-based event handling - Synchronous event processing for pipeline integrity - Payload transformation and halt capabilities Events flow through the payment processing pipeline: :message_received → :message_parsed → :before_business_logic → :business_logic_processed → :before_upstream_routing → :upstream_response_received → :before_response_send → :message_complete """ use GenServer require Logger alias Phoenix.PubSub alias DaProductApp.Events.EventListenerBehaviour @pubsub_name DaProductApp.PubSub # Event names used in the pipeline @pipeline_events [ :message_received, :message_parsed, :before_business_logic, :business_logic_processed, :before_upstream_routing, :upstream_response_received, :before_response_send, :message_complete ] ## Public API def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @doc """ Register an event listener module. ## Parameters - `listener_module`: Module implementing EventListenerBehaviour """ def register_listener(listener_module) do GenServer.call(__MODULE__, {:register_listener, listener_module}) end @doc """ Dispatch an event synchronously to all registered listeners. ## Parameters - `event_name`: The event to dispatch - `payload`: Initial event payload ## Returns - `{:ok, final_payload}` - All listeners processed successfully - `{:halt, response_payload}` - A listener requested halt with response - `{:error, reason}` - Event processing failed """ def dispatch(event_name, payload) do GenServer.call(__MODULE__, {:dispatch, event_name, payload}, 30_000) end @doc """ Get all registered listeners for debugging. """ def get_listeners do GenServer.call(__MODULE__, :get_listeners) end ## GenServer Callbacks @impl true def init(_opts) do Logger.info("Starting EventDispatcher") # Initialize listener registry state = %{ listeners: %{}, # %{event_name => [%{module: module, priority: integer}]} modules: [] # Track registered modules for management } {:ok, state} end @impl true def handle_call({:register_listener, listener_module}, _from, state) do Logger.info("Registering event listener: #{inspect(listener_module)}") case register_listener_internal(listener_module, state) do {:ok, new_state} -> {:reply, :ok, new_state} {:error, reason} -> Logger.error("Failed to register listener #{inspect(listener_module)}: #{inspect(reason)}") {:reply, {:error, reason}, state} end end @impl true def handle_call({:dispatch, event_name, payload}, _from, state) do Logger.debug("Dispatching event: #{event_name}") case dispatch_to_listeners(event_name, payload, state) do {:ok, final_payload} -> Logger.debug("Event #{event_name} processed successfully") {:reply, {:ok, final_payload}, state} {:halt, response_payload} -> Logger.info("Event #{event_name} processing halted by listener") {:reply, {:halt, response_payload}, state} {:error, reason} -> Logger.error("Event #{event_name} processing failed: #{inspect(reason)}") {:reply, {:error, reason}, state} end end @impl true def handle_call(:get_listeners, _from, state) do {:reply, state.listeners, state} end ## Private Functions defp register_listener_internal(listener_module, state) do try do # Get configuration for duplicate handling event_config = Application.get_env(:da_product_app, :event_system, %{}) prevent_duplicates = Map.get(event_config, :prevent_duplicate_registration, true) log_duplicates = Map.get(event_config, :log_duplicate_attempts, true) # Verify the module implements the behaviour if Code.ensure_loaded?(listener_module) and function_exported?(listener_module, :subscribed_events, 0) and function_exported?(listener_module, :handle_event, 2) do # Get events this listener subscribes to subscribed_events = listener_module.subscribed_events() # Get priority (with default) priority = if function_exported?(listener_module, :get_priority, 0) do listener_module.get_priority() else 100 end # Check if listener is already registered to prevent duplicates if prevent_duplicates and listener_module in state.modules do if log_duplicates do Logger.warning("Listener #{inspect(listener_module)} is already registered, skipping duplicate registration") end {:ok, state} else # Register listener for each event new_listeners = Enum.reduce(subscribed_events, state.listeners, fn event_name, acc -> listener_info = %{module: listener_module, priority: priority} existing_listeners = Map.get(acc, event_name, []) # Check for duplicate listener in this specific event duplicate_exists = if prevent_duplicates do Enum.any?(existing_listeners, fn listener -> listener.module == listener_module end) else false end updated_listeners = if duplicate_exists do if log_duplicates do Logger.debug("Listener #{inspect(listener_module)} already registered for event #{event_name}, skipping") end existing_listeners else [listener_info | existing_listeners] |> Enum.sort_by(& &1.priority) # Sort by priority (lower = higher priority) end Map.put(acc, event_name, updated_listeners) end) # Track the module new_modules = [listener_module | state.modules] |> Enum.uniq() new_state = %{state | listeners: new_listeners, modules: new_modules} Logger.info("Registered #{inspect(listener_module)} for events: #{inspect(subscribed_events)} (priority: #{priority})") {:ok, new_state} end else {:error, {:invalid_listener, "Module does not implement EventListenerBehaviour"}} end rescue exception -> {:error, {:registration_exception, exception}} end end defp dispatch_to_listeners(event_name, payload, state) do listeners = Map.get(state.listeners, event_name, []) if Enum.empty?(listeners) do Logger.debug("No listeners registered for event: #{event_name}") {:ok, payload} else Logger.debug("Processing #{length(listeners)} listeners for event: #{event_name}") process_listeners(listeners, event_name, payload) end end defp process_listeners([], _event_name, payload) do {:ok, payload} end defp process_listeners([%{module: listener_module} | rest], event_name, payload) do Logger.debug("Processing listener: #{inspect(listener_module)} for event: #{event_name}") try do case listener_module.handle_event(event_name, payload) do {:ok, updated_payload} -> Logger.debug("Listener #{inspect(listener_module)} processed successfully") process_listeners(rest, event_name, updated_payload) {:halt, response_payload} -> Logger.info("Listener #{inspect(listener_module)} requested halt") {:halt, response_payload} {:error, reason} -> Logger.error("Listener #{inspect(listener_module)} failed: #{inspect(reason)}") {:error, {:listener_error, listener_module, reason}} other -> Logger.error("Listener #{inspect(listener_module)} returned invalid response: #{inspect(other)}") {:error, {:invalid_listener_response, listener_module, other}} end rescue exception -> Logger.error("Exception in listener #{inspect(listener_module)}: #{inspect(exception)}") {:error, {:listener_exception, listener_module, exception}} end end ## Utility Functions @doc """ Get the list of all pipeline events. """ def pipeline_events, do: @pipeline_events @doc """ Check if an event is a valid pipeline event. """ def valid_pipeline_event?(event_name) do event_name in @pipeline_events end end