defmodule Sobelow do @moduledoc """ Sobelow is a static analysis tool for discovering vulnerabilities in Phoenix applications. """ @v Mix.Project.config()[:version] @home "~/.sobelow" @vsncheck "sobelow-vsn-check" @skips ".sobelow-skips" @submodules [ Sobelow.XSS, Sobelow.SQL, Sobelow.Traversal, Sobelow.RCE, Sobelow.Misc, Sobelow.Config, Sobelow.CI, Sobelow.DOS, Sobelow.Vuln ] alias Sobelow.Config alias Sobelow.Finding alias Sobelow.FindingLog alias Sobelow.Fingerprint alias Sobelow.IO, as: MixIO alias Sobelow.MetaLog alias Sobelow.Parse alias Sobelow.Utils alias Sobelow.Vuln def run do project_root = get_env(:root) <> "/" version_check() app_name = Utils.get_app_name(project_root <> "mix.exs") if !is_binary(app_name), do: file_error() # If web_root ends with the app_name, then it is the # more recent version of Phoenix. Meaning, all files are # in the lib directory, so we don't need to re-scan # lib_root separately. phx_post_1_2? = !File.dir?(project_root <> "web") lib_root = if phx_post_1_2? do project_root <> "lib" else project_root <> "web" end ignored = get_ignored() allowed = @submodules -- ignored # Pulling out function definitions before kicking # off the test pipeline to avoid dumping warning # messages into the findings output. root_meta_files = get_meta_files(lib_root) template_meta_files = get_meta_templates(lib_root) {libroot_meta_files, tmp_default_router} = if phx_post_1_2? do {[], ""} else libroot_meta_files = get_meta_files(project_root <> "lib") default_router = project_root <> "/web/router.ex" {libroot_meta_files, default_router} end default_router = get_router(tmp_default_router, phx_post_1_2?) {routers, endpoints} = get_phoenix_files(root_meta_files ++ libroot_meta_files, default_router) if Enum.empty?(routers), do: no_router() init_state(project_root, template_meta_files) if get_env(:clear_skip), do: clear_skip(project_root) # This is where the core testing-pipeline starts. # # - Print banner # - Check configuration # - Remove config check from "allowed" modules # - Scan funcs from the root # - Scan funcs from the libroot if format() not in ["quiet", "compact", "flycheck", "json"], do: IO.puts(:stderr, print_banner()) Application.put_env(:sobelow, :app_name, app_name) if Enum.member?(allowed, Config), do: Config.fetch(project_root, routers, endpoints) if Enum.member?(allowed, Vuln), do: Vuln.get_vulns(project_root) allowed = allowed -- [Config, Vuln] Enum.each(root_meta_files, fn meta_file -> meta_file.def_funs |> combine_skips() |> Enum.each(&get_fun_vulns(&1, meta_file, project_root, allowed)) end) Enum.each(libroot_meta_files, fn meta_file -> meta_file.def_funs |> combine_skips() |> Enum.each(&get_fun_vulns(&1, meta_file, "", allowed)) end) Enum.each(template_meta_files, fn {_, meta_file} -> if Sobelow.XSS in allowed, do: Sobelow.XSS.get_template_vulns(meta_file) end) # Enum.each(template_meta_files, fn {_, meta_file} -> # get_fun_vulns(meta_file.ast, meta_file, root, allowed) # end) if format() != "txt" do print_output() else IO.puts(:stderr, "... SCAN COMPLETE ...\n") end if get_env(:mark_skip_all), do: mark_skip_all(project_root) exit_with_status() end defp init_state(project_root, template_meta_files) do FindingLog.start_link() MetaLog.start_link() Fingerprint.start_link() load_ignored_fingerprints(project_root) MetaLog.add_templates(template_meta_files) end defp print_output do details = case output_format() do "json" -> FindingLog.json(@v) "quiet" -> FindingLog.quiet() "sarif" -> FindingLog.sarif(@v) _ -> nil end if !is_nil(details) do print_std_or_file(details) end end defp print_std_or_file(details) do case get_env(:out) do nil -> IO.puts(details) "" -> IO.puts(details) out -> File.write(out, details) end end defp exit_with_status do exit_on = get_env(:exit_on) finding_logs = FindingLog.log() high_count = length(finding_logs[:high]) medium_count = length(finding_logs[:medium]) low_count = length(finding_logs[:low]) status = case exit_on do :high -> if high_count > 0, do: 1 :medium -> if high_count + medium_count > 0, do: 1 :low -> if high_count + medium_count + low_count > 0, do: 1 _ -> 0 end if exit_on && !is_nil(status) do System.halt(status) end end def details do mod = get_env(:details) |> get_mod if is_nil(mod) do MixIO.error("A valid module was not selected.") else apply(mod, :details, []) |> IO.puts() end end def log_finding(%Finding{} = finding) do log_finding(finding.type, finding) end def log_finding(details, %Finding{} = finding) do if loggable?(finding.fingerprint, finding.confidence) do Fingerprint.put(finding.fingerprint) FindingLog.add({details, finding}, finding.confidence) end end def loggable?(fingerprint, severity) do !(get_env(:skip) && Fingerprint.member?(fingerprint)) && meets_threshold?(severity) end def all_details do @submodules |> Enum.map(&apply(&1, :details, [])) |> List.flatten() |> Enum.each(&IO.puts(&1)) end def rules do @submodules |> Enum.flat_map(&apply(&1, :rules, [])) end def finding_modules do @submodules |> Enum.flat_map(&apply(&1, :finding_modules, [])) end def save_config(conf_file) do conf = [ verbose: get_env(:verbose), private: get_env(:private), skip: get_env(:skip), router: get_env(:router), exit: get_env(:exit_on), format: get_env(:format), out: get_env(:out), threshold: get_env(:threshold), ignore: get_env(:ignored), ignore_files: get_env(:ignored_files), version: get_env(:version) ] yes? = if File.exists?(conf_file) do MixIO.yes?("The file .sobelow-conf already exists. Are you sure you want to overwrite?") else true end if yes? do File.write!(conf_file, inspect(conf)) MixIO.info("Updated .sobelow-conf") end end def meets_threshold?(severity) do threshold = case get_env(:threshold) do :high -> [:high] :medium -> [:high, :medium] _ -> [:high, :medium, :low] end severity in threshold end def format do case get_env(:format) do "sarif" -> "json" format -> format end end def output_format do get_env(:format) end def get_env(key) do Application.get_env(:sobelow, key) end defp print_banner do """ ############################################## # # # Running Sobelow - v#{@v} # # Created by Griffin Byatt - @griffinbyatt # # NCC Group - https://nccgroup.trust # # # ############################################## """ end defp get_router("", true) do case get_env(:router) do nil -> "" "" -> "" router -> Path.expand(router) end end defp get_router(tmp_default_router, _) do case get_env(:router) do nil -> tmp_default_router "" -> tmp_default_router router -> router end |> Path.expand() end defp get_phoenix_files(meta_files, router) do phoenix_files = Enum.reduce(meta_files, %{routers: [], endpoints: []}, fn meta_file, acc -> cond do meta_file.is_router? -> Map.update!(acc, :routers, &[meta_file.file_path | &1]) meta_file.is_endpoint? -> Map.update!(acc, :endpoints, &[meta_file.file_path | &1]) true -> acc end end) uniq_phoenix_files = if File.exists?(router) do Map.update!(phoenix_files, :routers, fn routers -> Enum.uniq(routers ++ [router]) end) else phoenix_files end {uniq_phoenix_files.routers, uniq_phoenix_files.endpoints} end defp get_meta_templates(root) do ignored_files = get_env(:ignored_files) Utils.template_files(root) |> Enum.reject(&is_ignored_file(&1, ignored_files)) |> Enum.map(&get_template_meta/1) |> Map.new() end defp get_template_meta(filename) do meta_funs = Parse.get_meta_template_funs(filename) raw = meta_funs.raw ast = meta_funs.ast filename = Utils.normalize_path(filename) { filename, %{ filename: filename, raw: raw, ast: [ast], is_controller?: false } } end defp get_meta_files(root) do ignored_files = get_env(:ignored_files) Utils.all_files(root) |> Enum.reject(&is_ignored_file(&1, ignored_files)) |> Enum.map(&get_file_meta/1) end defp get_file_meta(filename) do ast = Parse.ast(filename) meta_funs = Parse.get_meta_funs(ast) def_funs = meta_funs.def_funs use_funs = meta_funs.use_funs %{ filename: Utils.normalize_path(filename), file_path: Path.expand(filename), def_funs: def_funs, is_controller?: Utils.is_controller?(use_funs), is_router?: Utils.is_router?(use_funs), is_endpoint?: Utils.is_endpoint?(use_funs) } end defp get_fun_vulns({fun, skips}, meta_file, web_root, mods) do skip_mods = skips |> Enum.map(&get_mod/1) Enum.each(mods -- skip_mods, fn mod -> params = [fun, meta_file, web_root, skip_mods] apply(mod, :get_vulns, params) end) end defp get_fun_vulns(fun, meta_file, web_root, mods) do get_fun_vulns({fun, []}, meta_file, web_root, mods) end defp combine_skips([]), do: [] defp combine_skips([head | tail] = funs) do if get_env(:skip), do: combine_skips(head, tail), else: funs end defp combine_skips(prev, []), do: [prev] defp combine_skips(prev, [{:@, _, [{:sobelow_skip, _, [skips]}]} | []]), do: [{prev, skips}] defp combine_skips(prev, [{:@, _, [{:sobelow_skip, _, [skips]}]} | tail]) do [h | t] = tail [{prev, skips} | combine_skips(h, t)] end defp combine_skips(prev, rest) do [h | t] = rest [prev | combine_skips(h, t)] end defp no_router do message = """ WARNING: Sobelow cannot find the router. If this is a Phoenix application please use the `--router` flag to specify the router's location. """ IO.puts(:stderr, message) ignored = get_env(:ignored) Application.put_env( :sobelow, :ignored, ignored ++ ["Config.CSRF", "Config.CSRFRoute", "Config.Headers", "Config.CSP"] ) end defp file_error do message = """ This does not appear to be a Phoenix application. If this is an Umbrella application, each application should be scanned separately. """ MixIO.error(message) System.halt(0) end defp clear_skip(project_root) do cfile = project_root <> @skips if File.exists?(cfile) do File.rm!(cfile) end System.halt(0) end defp mark_skip_all(project_root) do cfile = project_root <> @skips case Fingerprint.new_skips() do [] -> nil fingerprints -> {:ok, iofile} = :file.open(cfile, [:append]) fingerprints = Enum.join(fingerprints, "\n") :file.write(iofile, ["\n", fingerprints]) :file.close(iofile) end end defp load_ignored_fingerprints(project_root) do cfile = project_root <> @skips if File.exists?(cfile) do {:ok, iofile} = :file.open(cfile, [:read]) :file.read_line(iofile) |> load_ignored_fingerprints(iofile) :file.close(iofile) end end defp load_ignored_fingerprints({:ok, fingerprint}, iofile) do to_string(fingerprint) |> String.trim() |> Fingerprint.put_ignore() :file.read_line(iofile) |> load_ignored_fingerprints(iofile) end defp load_ignored_fingerprints(:eof, _), do: nil defp load_ignored_fingerprints(_, _), do: nil defp version_check do config = System.get_env("SOBELOW_HOME") || @home |> Path.expand() |> Path.join(@vsncheck) home = Path.dirname(config) if File.exists?(home) do version_check(config) else File.mkdir_p!(home) version_check(config) end end defp version_check(config) do time = DateTime.utc_now() |> DateTime.to_unix() if File.exists?(config) do {:ok, iofile} = :file.open(config, [:read]) {timestamp, _} = case :file.read_line(iofile) do {:ok, ~c"sobelow-" ++ timestamp} -> to_string(timestamp) |> Integer.parse() _ -> file_error() end :file.close(iofile) if time - 12 * 60 * 60 > timestamp do maybe_prompt_update(time, config) end else maybe_prompt_update(time, config) end end defp get_sobelow_version do {:ok, _} = Application.ensure_all_started(:ssl) {:ok, _} = Application.ensure_all_started(:inets) {:ok, _} = :inets.start(:httpc, [{:profile, :sobelow}]) url = ~c"https://sobelow.io/version" http_options = [ ssl: [ verify: :verify_none # We cannot use exclusively use OTP 25+ yet, but when we can - uncomment the following few lines # verify: :verify_peer, # cacertfile: :public_key.cacerts_get() ], timeout: 10_000 ] IO.puts(:stderr, "Checking Sobelow version...\n") case :httpc.request(:get, {url, []}, http_options, []) do {:ok, {{_, 200, _}, _, vsn}} -> Version.parse!(String.trim(to_string(vsn))) _ -> MixIO.error("Error fetching version number.\n") @v end after :inets.stop(:httpc, :sobelow) end defp maybe_prompt_update(time, cfile) do installed_vsn = Version.parse!(@v) unless get_env(:private) do cmp = get_sobelow_version() |> Version.compare(installed_vsn) case cmp do :gt -> MixIO.error(""" A new version of Sobelow is available: mix archive.install hex sobelow """) _ -> nil end end timestamp = "sobelow-" <> to_string(time) case :file.open(cfile, [:write, :read]) do {:ok, iofile} -> :ok = :file.pwrite(iofile, 0, timestamp) :ok = :file.close(iofile) _ -> File.write(cfile, timestamp) end end def get_mod(mod_string) do case mod_string do "XSS" -> Sobelow.XSS "XSS.Raw" -> Sobelow.XSS.Raw "XSS.SendResp" -> Sobelow.XSS.SendResp "XSS.ContentType" -> Sobelow.XSS.ContentType "XSS.HTML" -> Sobelow.XSS.HTML "SQL" -> Sobelow.SQL "SQL.Query" -> Sobelow.SQL.Query "SQL.Stream" -> Sobelow.SQL.Stream "Misc" -> Sobelow.Misc "Misc.BinToTerm" -> Sobelow.Misc.BinToTerm "Misc.FilePath" -> Sobelow.Misc.FilePath "RCE" -> Sobelow.RCE "RCE.EEx" -> Sobelow.RCE.EEx "RCE.CodeModule" -> Sobelow.RCE.CodeModule "Config" -> Sobelow.Config "Config.CSRF" -> Sobelow.Config.CSRF "Config.CSRFRoute" -> Sobelow.Config.CSRFRoute "Config.Headers" -> Sobelow.Config.Headers "Config.CSP" -> Sobelow.Config.CSP "Config.Secrets" -> Sobelow.Config.Secrets "Config.HTTPS" -> Sobelow.Config.HTTPS "Config.HSTS" -> Sobelow.Config.HSTS "Config.CSWH" -> Sobelow.Config.CSWH "Vuln" -> Sobelow.Vuln "Vuln.CookieRCE" -> Sobelow.Vuln.CookieRCE "Vuln.HeaderInject" -> Sobelow.Vuln.HeaderInject "Vuln.PlugNull" -> Sobelow.Vuln.PlugNull "Vuln.Redirect" -> Sobelow.Vuln.Redirect "Vuln.Coherence" -> Sobelow.Vuln.Coherence "Vuln.Ecto" -> Sobelow.Vuln.Ecto "Traversal" -> Sobelow.Traversal "Traversal.SendFile" -> Sobelow.Traversal.SendFile "Traversal.FileModule" -> Sobelow.Traversal.FileModule "Traversal.SendDownload" -> Sobelow.Traversal.SendDownload "CI" -> Sobelow.CI "CI.System" -> Sobelow.CI.System "CI.OS" -> Sobelow.CI.OS "DOS" -> Sobelow.DOS "DOS.StringToAtom" -> Sobelow.DOS.StringToAtom "DOS.ListToAtom" -> Sobelow.DOS.ListToAtom "DOS.BinToAtom" -> Sobelow.DOS.BinToAtom _ -> nil end end def get_ignored do get_env(:ignored) |> Enum.map(&get_mod/1) end def is_vuln?({vars, _, _}) do if Enum.empty?(vars) do false else true end end defp is_ignored_file(filename, ignored_files) do Enum.any?(ignored_files, fn ignored_file -> String.ends_with?(ignored_file, filename) end) end def version do @v |> IO.puts() end end