defmodule DaProductApp.Settlements.AlipayPlus.SftpFetcher do use Oban.Worker, queue: :settlements, max_attempts: 3, tags: ["sftp", "settlements"] require Logger alias DaProductApp.Settlements.AlipayPlus.Jobs.SettlementFileProcessing # Add public interface for manual triggering def schedule_fetch do %{} |> new() |> Oban.insert() end @impl Oban.Worker def perform(_job) do with {:ok, conn} <- connect_sftp(), {:ok, files} <- list_settlement_files(conn), :ok <- process_files(conn, files) do Logger.info("Successfully processed settlement files") :ok else {:error, reason} -> Logger.error("Failed to process settlement files: #{inspect(reason)}") {:error, reason} end end defp connect_sftp do opts = [ host: System.get_env("SFTP_HOST") || "localhost", port: String.to_integer(System.get_env("SFTP_PORT") || "22"), username: System.get_env("SFTP_USER") || "kavya", password: System.get_env("SFTP_PASS") || "Kavya_@1234", connect_timeout: 30_000 ] case DaProductApp.Settlements.AlipayPlus.SftpClient.connect(opts) do {:ok, conn} -> Logger.info("SFTP connected") {:ok, conn} {:error, reason} -> Logger.error("SFTP connection failed: #{inspect(reason)}") {:error, reason} end end defp process_files(conn, files) do Enum.each(files, fn {path, filename} -> case DaProductApp.Settlements.AlipayPlus.SftpClient.download_file(conn, path) do {:ok, content} -> # Queue the processing job case SettlementFileProcessing.process_file_manually( filename, content, %{file_path: path} ) do {:ok, job} -> Logger.info( "Queued settlement file processing for: #{filename} (Job ID: #{job.id})" ) {:error, reason} -> Logger.error("Failed to queue processing job for #{filename}: #{inspect(reason)}") end {:error, reason} -> Logger.error("Failed to read file #{path}: #{inspect(reason)}") end end) DaProductApp.Settlements.AlipayPlus.SftpClient.disconnect(conn) Logger.info("SFTP disconnected") :ok end defp list_settlement_files(conn) do today = Date.utc_today() |> Date.to_string() |> String.replace("-", "") participant_ids = get_participant_ids() files = Enum.reduce(participant_ids, [], fn participant_id, acc -> case DaProductApp.Settlements.AlipayPlus.SftpClient.list_settlement_files( conn, participant_id, today ) do {:ok, file_infos} -> acc ++ Enum.map(file_infos, fn info -> {info.path, info.filename} end) {:error, _reason} -> acc end end) {:ok, files} end def test_sftp_connection do case connect_sftp() do {:ok, conn} -> Logger.info("SFTP connection successful") DaProductApp.Settlements.AlipayPlus.SftpClient.disconnect(conn) :ok {:error, reason} -> Logger.error("SFTP connection failed: #{inspect(reason)}") {:error, reason} end end # defp connect_sftp do # host = System.get_env("localhost") # user = System.get_env("kavya") # password = System.get_env("Kavya_@1234") # port = String.to_integer(System.get_env("4010") || "22") # :ssh.start() # :ssh.connect(String.to_charlist(host), port, [ # {:user, String.to_charlist(user)}, # {:password, String.to_charlist(password)}, # {:silently_accept_hosts, true}, # {:user_interaction, false} # ]) # |> case do # {:ok, conn} -> # case :ssh_sftp.start_channel(conn) do # {:ok, channel} -> {:ok, {conn, channel}} # error -> error # end # error -> error # end # end # defp list_settlement_files({_conn, channel}) do # Get today's date in YYYYMMDD format # today = Date.utc_today() |> Date.to_string() |> String.replace("-", "") # Get participant IDs from config or environment # participant_ids = get_participant_ids() # files = Enum.reduce(participant_ids, [], fn participant_id, acc -> # path = "/v1/settlements/settlement/#{participant_id}/#{today}" # case :ssh_sftp.list_dir(channel, String.to_charlist(path)) do # {:ok, files} -> # settlement_files = Enum.filter(files, &String.ends_with?(List.to_string(&1), ".csv")) # acc ++ Enum.map(settlement_files, &{path, List.to_string(&1)}) # {:error, _reason} -> acc # end # end) # {:ok, files} # end # defp process_files({conn, channel}, files) do # Enum.each(files, fn {path, filename} -> # full_path = Path.join(path, filename) # case :ssh_sftp.read_file(channel, String.to_charlist(full_path)) do # {:ok, content} -> # process_settlement_file(filename, content) # {:error, reason} -> # Logger.error("Failed to read file #{full_path}: #{inspect(reason)}") # end # end) # # :ssh_sftp.stop_channel(channel) # :ssh.close(conn) # :ok # end # defp process_settlement_file(filename, content) do # TODO: Implement your CSV processing logic here # Logger.info("Processing settlement file: #{filename}") # # Example: Parse CSV and store in database # {:ok, _} = store_settlement_data(filename, content) # end defp store_settlement_data(filename, content) do # TODO: Implement your storage logic here Logger.info("Storing settlement data from #{filename}") {:ok, :stored} end defp get_participant_ids do # TODO: Get this from your application config or environment # Replace with actual participant IDs ["1000012345"] end end