defmodule DaProductApp.Switch.UpstreamRouter do @doc """ Configurable upstream routing system for ISO8583 messages. Features: - Route messages to configured upstream networks based on BIN, card type, or message content - Maintain pure ISOMsg flow with configurable packagers per upstreamkisk - Handle connection pooling and failover for upstream networks - Support both synchronous and asynchronous ups # Convert = (ASCII 61) to hex D (decimal 13) for BCD field compatibility # Keep all other characters unchanged, only replace = separator with hex 0x0D modified_track_data = String.replace(track_data, "=", <<13>>)am communication - Supervised GenServer for reliability and state management Configuration Structure: ```elixir config :da_product_app, :upstream_networks, [ visa: %{ host: "visa.example.com", port: 8583, packager: DaProductApp.MercuryISO8583.Packagers.ISO87BPackager, connection_pool_size: 10, timeout: 30_000, routing_rules: [ {:bin_range, "400000-499999"}, {:mti_pattern, "01??"} ] }, mastercard: %{ host: "mc.example.com", port: 8583, packager: DaProductApp.MercuryISO8583.Packagers.ISO87BPackager, connection_pool_size: 10, timeout: 30_000, routing_rules: [ {:bin_range, "510000-599999"}, {:mti_pattern, "01??"} ] }, local_processor: %{ host: "processor.local.com", port: 8583, packager: DaProductApp.MercuryISO8583.Packagers.ISO87BPackager, connection_pool_size: 5, timeout: 15_000, routing_rules: [ {:default, true} ] } ] Transforms track 2 data (field 35) for upstream network compatibility. Converts the = separator to hex representation 'D' for ISO8583 BCD numeric fields. In track data, the = separator becomes 'D' (0x0D hex value) for upstream processing. ## Parameters - `iso_message`: The ISOMsg containing track 2 data in field 35 ## Returns - Updated ISOMsg with transformed track 2 data ``` """ use GenServer require Logger alias DaProductApp.MercuryISO8583.Packagers.ISOMsg alias DaProductApp.MercuryISO8583.Packagers.ISO87BPackager alias DaProductApp.MercuryISO8583.MTIConverter alias DaProductApp.Switch.{UpstreamConnection, UpstreamConnectionManager, RoutingRules, MessageFraming, UpstreamHealthMonitor, GatewayRouter} alias DaProductApp.MercuryISO8583.Headers.HeaderProcessor ## Public API def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @doc """ Routes an ISOMsg to the appropriate upstream network. ## Parameters - `iso_message`: The ISOMsg to route - `channel_context`: Context from originating channel ## Returns - `{:ok, response_message}` - Successful response from upstream - `{:error, reason}` - Routing or communication error """ def route_message(%ISOMsg{} = iso_message, channel_context) do GenServer.call(__MODULE__, {:route_message, iso_message, channel_context}, 30_000) end ## GenServer Callbacks @impl true def init(_opts) do Logger.info("Starting UpstreamRouter GenServer") # Validate all upstream network configurations at startup case validate_all_upstream_configurations() do :ok -> Logger.info("All upstream network configurations validated successfully") {:ok, %{}} {:error, reason} -> Logger.error("UpstreamRouter failed to start: #{inspect(reason)}") {:stop, {:configuration_error, reason}} end end @impl true def handle_call({:route_message, iso_message, channel_context}, _from, state) do Logger.info("Routing message MTI: #{ISOMsg.get_mti(iso_message)}") result = try do with {:ok, upstream_config} <- determine_upstream_network(iso_message, channel_context), {:ok, formatted_message} <- format_for_upstream(iso_message, upstream_config), {:ok, response_bytes} <- send_to_upstream(formatted_message, upstream_config), {:ok, response_message} <- unpack_upstream_response(response_bytes, upstream_config) do Logger.info("Successfully received upstream response") {:ok, response_message} else {:error, reason} -> Logger.error("Upstream routing failed: #{inspect(reason)}") # Instead of returning error tuple, create proper ISO error response case create_upstream_error_response(iso_message, reason) do {:ok, error_response} -> Logger.info("Created upstream error response successfully") {:ok, error_response} {:error, create_error} -> Logger.error("Failed to create upstream error response: #{inspect(create_error)}") # Fallback to original error if we can't create response {:error, reason} end end rescue exception -> Logger.error("Exception in route_message: #{inspect(exception)}") # Create error response for exceptions too case create_upstream_error_response(iso_message, {:routing_exception, exception}) do {:ok, error_response} -> Logger.info("Created exception error response successfully") {:ok, error_response} {:error, create_error} -> Logger.error("Failed to create exception error response: #{inspect(create_error)}") # Fallback to original error {:error, {:routing_exception, exception}} end end {:reply, result, state} end @doc """ Routes message asynchronously (for notifications, batches, etc.) """ def route_message_async(%ISOMsg{} = iso_message, channel_context, callback_fun \\ nil) do Logger.info("Routing message asynchronously MTI: #{ISOMsg.get_mti(iso_message)}") Task.async(fn -> result = route_message(iso_message, channel_context) if callback_fun do callback_fun.(result) end result end) end @doc """ Get upstream network configuration by name. """ def get_upstream_config(network_name) do upstream_networks = Application.get_env(:da_product_app, :upstream_networks, %{}) case Map.get(upstream_networks, network_name) do nil -> Logger.error("Upstream network not found: #{network_name}") {:error, {:upstream_not_found, network_name}} config -> {:ok, config} end end @doc """ Test connectivity to an upstream network. """ def test_upstream_connectivity(network_name) do Logger.info("Testing connectivity to upstream: #{network_name}") case get_upstream_config(network_name) do {:ok, config} -> UpstreamConnection.test_connection(config) {:error, reason} -> {:error, reason} end end # Private Functions defp validate_all_upstream_configurations do upstream_networks = Application.get_env(:da_product_app, :upstream_networks, %{}) Enum.reduce_while(upstream_networks, :ok, fn {network_name, config}, _acc -> case validate_network_configuration(network_name, config) do :ok -> {:cont, :ok} {:error, reason} -> {:halt, {:error, {network_name, reason}}} end end) end defp validate_network_configuration(network_name, config) do network_type = Map.get(config, :network_type, :iso8583) case network_type do :gateway -> # Gateway networks (REST APIs) - validate using GatewayRouter validate_gateway_configuration(network_name, config) _ -> # Traditional ISO8583 networks require host, port, packager required_fields = [:host, :port, :packager] missing_fields = Enum.filter(required_fields, &(not Map.has_key?(config, &1))) if missing_fields != [] do {:error, {:missing_required_fields, missing_fields}} else with :ok <- validate_message_framing(network_name, config), :ok <- validate_header_configuration(network_name, config) do :ok end end end end defp validate_gateway_configuration(network_name, config) do # Use the dedicated GatewayRouter for validation case GatewayRouter.validate_gateway_config(config) do :ok -> Logger.debug("Network #{network_name}: Gateway configuration validated successfully") :ok {:error, reason} -> Logger.error("Network #{network_name}: Gateway validation failed: #{inspect(reason)}") {:error, reason} end end defp validate_message_framing(network_name, config) do case Map.get(config, :message_framing) do nil -> Logger.debug("Network #{network_name}: No message framing configured, using raw messages") :ok framing_config -> case MessageFraming.validate_framing_config(framing_config) do :ok -> Logger.debug("Network #{network_name}: Message framing configuration valid") :ok {:error, reason} -> {:error, {:invalid_message_framing, reason}} end end end defp validate_header_configuration(network_name, config) do case Map.get(config, :header_config) do nil -> Logger.debug("Network #{network_name}: No header configuration, TPDU processing disabled") :ok header_config -> Logger.debug("Network #{network_name}: Validating header configuration") case HeaderProcessor.validate_header_config(header_config) do {:ok, _validated_config} -> :ok {:error, reason} -> {:error, {:header_config_invalid, reason}} end end end defp determine_upstream_network(%ISOMsg{} = iso_message, channel_context) do Logger.debug("Determining upstream network for routing with context: #{inspect(channel_context)}") upstream_networks = Application.get_env(:da_product_app, :upstream_networks, %{}) case extract_routing_decision_from_message(iso_message) do {:ok, requested_network_name} -> Logger.debug("RoutingProcessor already specified upstream network: #{requested_network_name}") case Map.get(upstream_networks, String.to_atom(requested_network_name)) do config when is_map(config) -> Logger.info("Using RoutingProcessor's routing decision: #{requested_network_name} (priority: #{Map.get(config, :priority, 99)}, type: #{Map.get(config, :network_type, :network)})") {:ok, Map.put(config, :network_name, String.to_atom(requested_network_name))} nil -> Logger.warning("RoutingProcessor specified network #{requested_network_name} but config not found, falling back to priority routing") fallback_network_selection(upstream_networks, iso_message, channel_context) end :not_found -> Logger.debug("No RoutingProcessor routing decision found, using priority-based routing") fallback_network_selection(upstream_networks, iso_message, channel_context) end end defp extract_routing_decision_from_message(%ISOMsg{} = iso_message) do case ISOMsg.get(iso_message, 63) do nil -> :not_found field_63_json -> try do parsed = Jason.decode!(field_63_json) case Map.get(parsed, "upstream_network") do nil -> :not_found network_name -> {:ok, network_name} end rescue _ -> :not_found end end end defp fallback_network_selection(upstream_networks, iso_message, channel_context) do network_type_filter = Map.get(channel_context || %{}, :network_type) candidate_networks = upstream_networks |> Enum.to_list() |> filter_by_network_type(network_type_filter) |> sort_by_priority() Logger.debug("Filtered and sorted networks: #{inspect(Enum.map(candidate_networks, fn {name, config} -> {name, Map.get(config, :priority, 99), Map.get(config, :network_type, :network)} end))}") matching_upstream = candidate_networks |> Enum.find(fn {_name, config} -> RoutingRules.matches_message?(iso_message, config.routing_rules) end) case matching_upstream do {network_name, config} -> Logger.info("Routing to upstream network: #{network_name} (priority: #{Map.get(config, :priority, 99)}, type: #{Map.get(config, :network_type, :network)})") {:ok, Map.put(config, :network_name, network_name)} nil -> Logger.error("No matching upstream network found for network_type: #{inspect(network_type_filter)}") {:error, :no_matching_upstream} end end # Filter networks by network_type if specified in context defp filter_by_network_type(networks, nil), do: networks defp filter_by_network_type(networks, network_type) do Logger.debug("Filtering networks by network_type: #{network_type}") filtered = Enum.filter(networks, fn {_name, config} -> Map.get(config, :network_type, :network) == network_type end) Logger.debug("Networks after type filter: #{inspect(Enum.map(filtered, fn {name, _} -> name end))}") filtered end # Sort networks by priority (ascending order - lower number = higher priority) defp sort_by_priority(networks) do sorted = Enum.sort_by(networks, fn {_name, config} -> Map.get(config, :priority, 99) end) Logger.debug("Networks after priority sort: #{inspect(Enum.map(sorted, fn {name, config} -> {name, Map.get(config, :priority, 99)} end))}") sorted end defp format_for_upstream(%ISOMsg{} = iso_message, upstream_config) do Logger.debug("Formatting message for upstream") try do # Set the upstream packager upstream_packager = Map.get(upstream_config, :packager) formatted_message = ISOMsg.set_packager(iso_message, upstream_packager) # ISOMsg.set_direction(formatted_message, :outbound) # Apply any upstream-specific transformations transformed_message = apply_upstream_transformations(formatted_message, upstream_config) Logger.debug("Message transformed_message with packager: #{inspect(upstream_packager)}") Logger.debug("Formatted message MTI: #{ISOMsg.get_mti(transformed_message)}") ISOMsg.dump_iso(transformed_message) {:ok, transformed_message} rescue exception -> Logger.error("Message formatting failed: #{inspect(exception)}") {:error, {:formatting_failed, exception}} end end defp send_to_upstream(%ISOMsg{} = iso_message, upstream_config) do network_name = Map.get(upstream_config, :network_name) Logger.debug("Sending message to upstream: #{network_name}") # Use cached health status from UpstreamHealthMonitor instead of blocking health check case UpstreamHealthMonitor.is_network_healthy?(network_name) do true -> send_to_upstream_internal(iso_message, upstream_config) false -> Logger.warning("Upstream #{network_name} marked unhealthy by health monitor, attempting fallback") {:error, {:upstream_unhealthy, "health_monitor_status"}} end rescue # Handle case where UpstreamHealthMonitor is not available (graceful degradation) exception -> Logger.warning("UpstreamHealthMonitor not available, falling back to direct health check: #{inspect(exception)}") case perform_upstream_health_check(upstream_config) do :healthy -> send_to_upstream_internal(iso_message, upstream_config) {:unhealthy, reason} -> Logger.warning("Direct upstream health check failed: #{inspect(reason)}") {:error, {:upstream_unhealthy, reason}} end end defp perform_upstream_health_check(upstream_config) do host = Map.get(upstream_config, :host, "localhost") |> String.to_charlist() port = Map.get(upstream_config, :port, 8080) timeout = Map.get(upstream_config, :health_check_timeout, 2000) Logger.debug("Health checking upstream #{host}:#{port}") case :gen_tcp.connect(host, port, [:binary], timeout) do {:ok, socket} -> :gen_tcp.close(socket) Logger.debug("Upstream health check passed") :healthy {:error, reason} -> Logger.warning("Upstream health check failed: #{inspect(reason)}") {:unhealthy, reason} end rescue exception -> Logger.error("Upstream health check exception: #{inspect(exception)}") {:unhealthy, {:exception, exception}} end defp send_to_upstream_internal(%ISOMsg{} = iso_message, upstream_config) do Logger.debug("Sending message to upstream internal: #{Map.get(upstream_config, :network_name)}") network_type = Map.get(upstream_config, :network_type, :iso8583) case network_type do :gateway -> # Route to gateway processor using dedicated gateway router GatewayRouter.process_transaction(iso_message, upstream_config) _ -> # Traditional ISO8583 upstream processing max_retries = Map.get(upstream_config, :max_retries, 3) initial_timeout = Map.get(upstream_config, :initial_retry_timeout, 1000) send_with_retry(iso_message, upstream_config, 0, max_retries, initial_timeout) end end defp send_with_retry(iso_message, upstream_config, attempt, max_retries, timeout) when attempt < max_retries do Logger.debug("Attempting upstream send (attempt #{attempt + 1}/#{max_retries + 1})") case send_message_to_upstream(iso_message, upstream_config) do {:ok, response} -> Logger.debug("Upstream send successful on attempt #{attempt + 1}") {:ok, response} {:error, reason} when attempt < max_retries -> Logger.warning("Upstream send failed (attempt #{attempt + 1}): #{inspect(reason)}, retrying in #{timeout}ms") :timer.sleep(timeout) # Exponential backoff: double the timeout for next retry next_timeout = min(timeout * 2, 30_000) # Cap at 30 seconds send_with_retry(iso_message, upstream_config, attempt + 1, max_retries, next_timeout) {:error, reason} -> Logger.error("All upstream send attempts failed: #{inspect(reason)}") {:error, {:all_retries_failed, reason}} end end defp send_with_retry(_iso_message, _upstream_config, attempt, max_retries, _timeout) when attempt >= max_retries do {:error, :max_retries_exceeded} end defp send_message_to_upstream(%ISOMsg{} = iso_message, upstream_config) do try do with {:ok, packed_bytes} <- ISOMsg.pack(iso_message), {:ok, header_bytes} <- add_upstream_header(packed_bytes, upstream_config), {:ok, framed_bytes} <- apply_message_framing(header_bytes, upstream_config), {:ok, response_bytes} <- send_to_network(framed_bytes, upstream_config), {:ok, unframed_response} <- unframe_upstream_response(response_bytes, upstream_config), {:ok, clean_response} <- extract_upstream_header(unframed_response, upstream_config) do {:ok, clean_response} end rescue exception -> Logger.error("Upstream send failed: #{inspect(exception)}") {:error, {:send_failed, exception}} end end defp add_upstream_header(message_bytes, upstream_config) do header_config = Map.get(upstream_config, :header_config) case HeaderProcessor.process_message(message_bytes, header_config, :outbound) do {:ok, framed_message} -> Logger.debug("Added TPDU header to upstream message") {:ok, framed_message} {:error, reason} -> Logger.error("Failed to add TPDU header: #{inspect(reason)}") {:error, {:header_add_failed, reason}} end end defp extract_upstream_header(response_bytes, upstream_config) do header_config = Map.get(upstream_config, :header_config) case HeaderProcessor.process_message(response_bytes, header_config, :inbound) do {:ok, {clean_message, header_info}} -> Logger.debug("Extracted TPDU header from upstream response") if header_info do Logger.debug("Response header info: #{inspect(header_info)}") end {:ok, clean_message} {:error, reason} -> Logger.error("Failed to extract TPDU header: #{inspect(reason)}") {:error, {:header_extract_failed, reason}} end end defp send_to_network(message_bytes, upstream_config) do # Send via connection manager for better connection management network_name = Map.get(upstream_config, :network_name) case UpstreamConnectionManager.send_message(network_name, message_bytes) do {:ok, response} -> {:ok, response} {:error, reason} -> # Fallback to direct connection if connection manager fails Logger.debug("Connection manager failed, using direct connection") case UpstreamConnection.send_message(message_bytes, upstream_config) do {:ok, response} -> {:ok, response} {:error, reason} -> {:error, reason} end end end defp unpack_upstream_response(response_bytes, upstream_config) when is_binary(response_bytes) do Logger.debug("Response Hex: #{Base.encode16(response_bytes)}") try do upstream_packager = Map.get(upstream_config, :packager) || ISO87BPackager case upstream_packager.unpack(response_bytes) do {:ok, response_message} -> Logger.debug("Successfully unpacked upstream response") {:ok, response_message} {:error, reason} -> Logger.error("Response unpacking failed: #{inspect(reason)}") {:error, {:unpacking_failed, reason}} end rescue exception -> Logger.error("Response unpacking exception: #{inspect(exception)}") {:error, {:unpacking_exception, exception}} end end defp apply_upstream_transformations(%ISOMsg{} = iso_message, upstream_config) do # Apply upstream-specific field transformations network_name = Map.get(upstream_config, :network_name) # Transform track 2 data for upstream compatibility transformed_message = transform_track2_data(iso_message) case network_name do :visa -> apply_visa_transformations(transformed_message) :mastercard -> apply_mastercard_transformations(transformed_message) _ -> transformed_message end end defp transform_track2_data(%ISOMsg{} = iso_message) do case ISOMsg.get_field(iso_message, 35) do nil -> Logger.debug("No track 2 data (field 35) found, skipping transformation") iso_message track_data when is_binary(track_data) -> Logger.debug("Original Track 2 Data: #{inspect(track_data)}") # Convert '=' to 'D' for upstream compatibility (now handled as hex string by IFB_LLVAR) # IFB_LLVAR can handle hex characters including 'D' separator modified_track_data = String.replace(track_data, "=", "D") Logger.debug("Transformed Track 2 Data: #{inspect(modified_track_data)}") Logger.debug("Track data as hex bytes: #{Base.encode16(modified_track_data)}") ISOMsg.set(iso_message, 35, modified_track_data) track_data -> Logger.warn("Unexpected track 2 data format: #{inspect(track_data)}, skipping transformation") iso_message end end defp apply_visa_transformations(%ISOMsg{} = iso_message) do # Visa-specific transformations # Example: Ensure certain fields are set for Visa network iso_message |> ensure_field_present(24, "782") # Network International ID end defp apply_mastercard_transformations(%ISOMsg{} = iso_message) do # MasterCard-specific transformations # Example: Set MasterCard-specific fields iso_message |> ensure_field_present(24, "002") # Network International ID end defp ensure_field_present(%ISOMsg{} = iso_message, field_number, default_value) do case ISOMsg.get_field(iso_message, field_number) do nil -> ISOMsg.set(iso_message, field_number, default_value) _existing_value -> iso_message end end defp apply_message_framing(packed_bytes, upstream_config) do case Map.get(upstream_config, :message_framing) do nil -> # No framing configured - send raw message Logger.debug("No message framing configured, sending raw message") {:ok, packed_bytes} framing_config -> Logger.debug("Applying message framing: #{inspect(framing_config)}") MessageFraming.frame_message(packed_bytes, framing_config) end end defp unframe_upstream_response(response_bytes, upstream_config) do case Map.get(upstream_config, :message_framing) do nil -> # No framing configured - return raw response Logger.debug("No message framing configured, returning raw response") {:ok, response_bytes} framing_config -> Logger.debug("Removing message framing from response") case MessageFraming.unframe_message(response_bytes, framing_config) do {:ok, {unframed_bytes, _remaining}} -> {:ok, unframed_bytes} {:error, reason} -> Logger.error("Response unframing failed: #{inspect(reason)}") {:error, {:unframing_failed, reason}} end end end # Error response creation functions defp create_upstream_error_response(%ISOMsg{} = original_message, error_reason) do Logger.info("Creating upstream error response for reason: #{inspect(error_reason)}") # Determine appropriate response code based on error reason response_code = map_upstream_error_to_response_code(error_reason) # Get original MTI and convert to response original_mti = ISOMsg.get_mti(original_message) response_mti = MTIConverter.to_response(original_mti) Logger.debug("Converting upstream error MTI #{original_mti} to response MTI #{response_mti}") # Create error response try do error_response = case ISOMsg.new(response_mti) do {:ok, msg} -> msg %ISOMsg{} = msg -> msg # Handle case where ISOMsg.new returns struct directly {:error, reason} -> Logger.error("Failed to create ISOMsg with MTI #{response_mti}: #{inspect(reason)}") throw({:create_error, reason}) end # Use the same packager as the original message packager = original_message.packager || DaProductApp.MercuryISO8583.Packagers.ISO87BPackager final_response = error_response |> ISOMsg.set_packager(packager) |> ISOMsg.set(39, response_code) # Response code |> copy_essential_fields_from_original(original_message) Logger.info("Created upstream error response with MTI #{response_mti} and response code #{response_code}") {:ok, final_response} catch {:create_error, reason} -> Logger.error("Failed to create upstream error response: #{inspect(reason)}") {:error, {:error_response_creation_failed, reason}} rescue exception -> Logger.error("Exception creating upstream error response: #{inspect(exception)}") {:error, {:error_response_creation_exception, exception}} end end defp copy_essential_fields_from_original(%ISOMsg{} = response_message, %ISOMsg{} = original_message) do essential_fields = [11, 7, 41, 42, 2, 3, 4, 49] # STAN, Date/Time, Terminal ID, Merchant ID, PAN, Processing Code, Amount, Currency Enum.reduce(essential_fields, response_message, fn field, acc -> case ISOMsg.get_field(original_message, field) do nil -> acc value -> ISOMsg.set(acc, field, value) end end) end defp map_upstream_error_to_response_code(error_reason) do case error_reason do {:receive_length_failed, :closed} -> "91" # Issuer switch inoperative {:receive_length_failed, _} -> "91" # Issuer switch inoperative {:send_failed, _} -> "91" # Issuer switch inoperative {:connection_failed, _} -> "91" # Issuer switch inoperative {:timeout, _} -> "68" # Response received too late {:packing_failed, _} -> "30" # Format error {:unpacking_failed, _} -> "30" # Format error {:framing_failed, _} -> "30" # Format error {:unframing_failed, _} -> "30" # Format error {:formatting_failed, _} -> "30" # Format error {:no_matching_upstream, _} -> "12" # Invalid transaction {:upstream_not_found, _} -> "12" # Invalid transaction {:routing_exception, _} -> "96" # System error {:gateway_processing_failed, _} -> "91" # Gateway system error {:gateway_exception, _} -> "96" # Gateway exception _ -> "96" # Default system error end end end