#!/usr/bin/env elixir # Simple test script to verify processor pipeline implementation # Run with: elixir test_processor_pipeline.exs Mix.install([ {:jason, "~> 1.4"} ]) defmodule ProcessorPipelineTest do @moduledoc """ Simple test to verify the processor pipeline implementation. This test creates a sample ISO8583 message and processes it through a mock processor pipeline to verify the architecture works correctly. """ # Mock ISOMsg structure for testing defmodule MockISOMsg do defstruct mti: nil, fields: %{} def new(mti), do: %__MODULE__{mti: mti, fields: %{}} def get_mti(%__MODULE__{mti: mti}), do: mti def set_mti(%__MODULE__{} = msg, mti), do: %{msg | mti: mti} def get_field(%__MODULE__{fields: fields}, field_num), do: Map.get(fields, field_num) def set_field(%__MODULE__{fields: fields} = msg, field_num, value) do %{msg | fields: Map.put(fields, field_num, value)} end def dump_iso(%__MODULE__{mti: mti, fields: fields}) do "MTI: #{mti}, Fields: #{inspect(fields)}" end end # Mock MessageProcessor behavior defmodule MockProcessor do @behaviour ProcessorBehavior defmodule ProcessorBehavior do @callback process(any(), map(), map()) :: {:ok, any()} | {:ok, any(), map()} | {:halt, any()} | {:error, term()} @callback init(map()) :: {:ok, map()} | {:error, term()} @callback metadata() :: map() @callback terminate(map()) :: :ok end def process(iso_message, _channel_context, config) do processor_name = Map.get(config, :name, "unknown") IO.puts("MockProcessor (#{processor_name}): Processing message #{MockISOMsg.get_mti(iso_message)}") # Simulate processing by adding a field processed_message = MockISOMsg.set_field(iso_message, 126, "processed_by_#{processor_name}") case Map.get(config, :should_halt, false) do true -> {:halt, processed_message} false -> {:ok, processed_message, %{processor: processor_name}} end end def init(config), do: {:ok, config} def metadata, do: %{name: "Mock Processor", version: "1.0.0"} def terminate(_config), do: :ok end # Test processor registry functionality def test_processor_registry do IO.puts("\n=== Testing Processor Registry Architecture ===") # Test 1: Processor registration IO.puts("\n1. Testing processor registration...") processors = [ {MockProcessor, %{name: "validation", strict: true}}, {MockProcessor, %{name: "enrichment", add_timestamps: true}}, {MockProcessor, %{name: "routing", method: "bin_routing"}} ] IO.puts("✓ Mock processors created: #{length(processors)}") # Test 2: Pipeline processing IO.puts("\n2. Testing pipeline processing...") iso_message = MockISOMsg.new("0200") |> MockISOMsg.set_field(2, "4111111111111111") # Test PAN |> MockISOMsg.set_field(4, "000000100000") # Amount $1000.00 IO.puts("Initial message: #{MockISOMsg.dump_iso(iso_message)}") # Simulate pipeline processing {final_message, metadata_list} = process_pipeline(iso_message, processors, %{port: 8583}) IO.puts("Final message: #{MockISOMsg.dump_iso(final_message)}") IO.puts("Processing metadata: #{inspect(metadata_list)}") # Test 3: Pipeline halt scenario IO.puts("\n3. Testing pipeline halt scenario...") halt_processors = [ {MockProcessor, %{name: "validation", strict: true}}, {MockProcessor, %{name: "security", should_halt: true}}, # This should halt {MockProcessor, %{name: "routing", method: "bin_routing"}} # Should not execute ] case process_pipeline(iso_message, halt_processors, %{port: 8583}) do {:halt, halted_message, halt_metadata} -> IO.puts("✓ Pipeline correctly halted") IO.puts("Halt message: #{MockISOMsg.dump_iso(halted_message)}") IO.puts("Halt metadata: #{inspect(halt_metadata)}") other -> IO.puts("✗ Expected halt, got: #{inspect(other)}") end IO.puts("\n✓ All processor registry tests completed successfully!") end defp process_pipeline(iso_message, processors, channel_context) do result = Enum.reduce_while(processors, {iso_message, []}, fn {processor_module, config}, {current_message, acc_metadata} -> case processor_module.process(current_message, channel_context, config) do {:ok, processed_message} -> {:cont, {processed_message, acc_metadata}} {:ok, processed_message, metadata} -> {:cont, {processed_message, [metadata | acc_metadata]}} {:halt, response_message} -> {:halt, {:halt, response_message, acc_metadata}} {:error, reason} -> {:halt, {:error, reason, acc_metadata}} end end) case result do {:halt, halted_message, halt_metadata} -> {:halt, halted_message, halt_metadata} {:error, reason, error_metadata} -> {:error, reason, error_metadata} {message, metadata} -> {message, Enum.reverse(metadata)} end end # Test IncomingMessageProcessor integration def test_incoming_processor_integration do IO.puts("\n=== Testing IncomingMessageProcessor Integration ===") IO.puts("\n1. Simulating enhanced message processing flow...") # Create test message iso_message = MockISOMsg.new("0200") |> MockISOMsg.set_field(2, "5555555555554444") # MasterCard test PAN |> MockISOMsg.set_field(3, "000000") # Processing code |> MockISOMsg.set_field(4, "000000050000") # Amount $500.00 |> MockISOMsg.set_field(11, "123456") # STAN |> MockISOMsg.set_field(41, "TERM001") # Terminal ID |> MockISOMsg.set_field(42, "MERCHANT001") # Merchant ID channel_context = %{ port: 8583, packager: :iso8583, listener_type: :enhanced_protocol } IO.puts("Processing message through enhanced flow:") IO.puts(" Input: #{MockISOMsg.dump_iso(iso_message)}") IO.puts(" Channel: #{inspect(channel_context)}") # Simulate the processing steps that would happen in IncomingMessageProcessor steps = [ {"Validation", &validate_step/2}, {"Pipeline Processing", &pipeline_step/2}, {"Enrichment", &enrichment_step/2}, {"Business Logic", &business_logic_step/2}, {"Upstream Routing", &routing_step/2} ] {final_result, _} = Enum.reduce_while(steps, {iso_message, channel_context}, fn {step_name, step_func}, {current_message, context} -> IO.puts(" → #{step_name}...") case step_func.(current_message, context) do {:ok, processed_message} -> {:cont, {processed_message, context}} {:halt, response_message} -> IO.puts(" ⚠ Processing halted at #{step_name}") {:halt, {response_message, context}} {:error, reason} -> IO.puts(" ✗ Error at #{step_name}: #{inspect(reason)}") {:halt, {:error, reason}} end end) case final_result do {:error, reason} -> IO.puts(" ✗ Processing failed: #{inspect(reason)}") response_message -> IO.puts(" ✓ Processing completed successfully") IO.puts(" Output: #{MockISOMsg.dump_iso(response_message)}") end IO.puts("\n✓ IncomingMessageProcessor integration test completed!") end # Mock processing steps defp validate_step(iso_message, _context) do # Basic validation case MockISOMsg.get_field(iso_message, 2) do nil -> {:error, :missing_pan} _pan -> {:ok, iso_message} end end defp pipeline_step(iso_message, context) do # Simulate processor pipeline execution processors = [ {MockProcessor, %{name: "validation_processor"}}, {MockProcessor, %{name: "enrichment_processor"}}, {MockProcessor, %{name: "routing_processor"}} ] {processed_message, _metadata} = process_pipeline(iso_message, processors, context) {:ok, processed_message} end defp enrichment_step(iso_message, _context) do # Add processing timestamp enriched = MockISOMsg.set_field(iso_message, 7, "0924143022") # Sample timestamp {:ok, enriched} end defp business_logic_step(iso_message, _context) do # Apply business rules {:ok, iso_message} end defp routing_step(iso_message, _context) do # Route to upstream - simulate successful routing response = iso_message |> MockISOMsg.set_mti("0210") # Response MTI |> MockISOMsg.set_field(39, "00") # Approval code |> MockISOMsg.set_field(38, "123456") # Authorization code {:ok, response} end # Main test runner def run_tests do IO.puts("🚀 Starting Processor Pipeline Architecture Tests") IO.puts("=" |> String.duplicate(50)) test_processor_registry() test_incoming_processor_integration() IO.puts("\n" <> "=" |> String.duplicate(50)) IO.puts("✅ All tests completed successfully!") IO.puts("\nProcessor Pipeline Architecture Summary:") IO.puts(" • MessageProcessor behavior defines pluggable interface") IO.puts(" • ProcessorRegistry manages processor lifecycles and pipelines") IO.puts(" • IncomingMessageProcessor integrates pipeline processing") IO.puts(" • Sample processors demonstrate validation, enrichment, routing") IO.puts(" • Configuration-driven pipeline setup per channel") IO.puts(" • jPOS-like modularity with Elixir/OTP architecture benefits") end end # Run the tests ProcessorPipelineTest.run_tests()