defmodule Oban.Repo do @moduledoc """ Wrappers around `Ecto.Repo` and `Ecto.Adapters.SQL` callbacks. Each function resolves the correct repo instance and sets options such as `prefix` and `log` according to `Oban.Config`. > #### Meant for Extending Oban {: .warning} > > These functions should only be used when working with a repo inside engines, plugins, or other > extensions for Oban. Favor using your application's repo directly when querying `Oban.Job` > from your workers. ## Examples The first argument for every function must be an `Oban.Config` struct. Many functions pass configuration around as a `conf` key, and it can always be fetched with `Oban.config/1`. This demonstrates fetching the default instance config and querying all jobs: Oban |> Oban.config() |> Oban.Repo.all(Oban.Job) """ @moduledoc since: "2.2.0" alias Oban.{Backoff, Config} @callbacks_without_opts [ config: 0, default_options: 1, get_dynamic_repo: 0, in_transaction?: 0, load: 2, put_dynamic_repo: 1, rollback: 1 ] @callbacks_with_opts [ aggregate: 3, all: 2, checkout: 2, delete!: 2, delete: 2, delete_all: 2, exists?: 2, get!: 3, get: 3, get_by!: 3, get_by: 3, insert!: 2, insert: 2, insert_all: 3, insert_or_update!: 2, insert_or_update: 2, one!: 2, one: 2, preload: 3, reload!: 2, reload: 2, stream: 2, update!: 2, update: 2, update_all: 3 ] @retry_opts delay: 500, retry: 5, expected_delay: 10, expected_retry: 20 for {fun, arity} <- @callbacks_without_opts do args = [Macro.var(:conf, __MODULE__) | Macro.generate_arguments(arity, __MODULE__)] @doc """ Wraps `c:Ecto.Repo.#{fun}/#{arity}` with an additional `Oban.Config` argument. """ def unquote(fun)(unquote_splicing(args)) do __dispatch__(unquote(fun), unquote(args)) end end for {fun, arity} <- @callbacks_with_opts do args = [Macro.var(:conf, __MODULE__) | Macro.generate_arguments(arity - 1, __MODULE__)] @doc """ Wraps `c:Ecto.Repo.#{fun}/#{arity}` with an additional `Oban.Config` argument. """ def unquote(fun)(unquote_splicing(args), opts \\ []) do __dispatch__(unquote(fun), unquote(args), opts) end end # Manually Defined @doc """ The default values extracted from `Oban.Config` for use in all queries with options. """ @doc since: "2.14.0" def default_options(conf) do base = [log: conf.log, oban: true, telemetry_options: [oban_conf: conf]] if conf.prefix do [prefix: conf.prefix] ++ base else base end end @doc """ Wraps `Ecto.Adapters.SQL.Repo.query/4` with an added `Oban.Config` argument. """ @doc since: "2.2.0" def query(conf, statement, params \\ [], opts \\ []) do __dispatch__(:query, [conf, statement, params], opts) end @doc """ Wraps `Ecto.Adapters.SQL.Repo.query!/4` with an added `Oban.Config` argument. """ @doc since: "2.17.0" def query!(conf, statement, params \\ [], opts \\ []) do __dispatch__(:query!, [conf, statement, params], opts) end @doc """ Wraps `Ecto.Adapters.SQL.Repo.to_sql/2` with an added `Oban.Config` argument. """ @doc since: "2.2.0" def to_sql(conf, kind, queryable) do query = queryable |> Ecto.Queryable.to_query() |> Map.put(:prefix, conf.prefix) conf.repo.to_sql(kind, query) end @doc """ Wraps `c:Ecto.Repo.transaction/2` with an additional `Oban.Config` argument and automatic retries with backoff. ## Options Backoff helpers, in addition to the standard transaction options: * `delay` — the time to sleep between retries, defaults to `500ms` * `retry` — the number of retries for unexpected errors, defaults to `5` * `expected_delay` — the time to sleep between expected errors, e.g. `serialization` or `lock_not_available`, defaults to `10ms` * `expected_retry` — the number of retries for expected errors, defaults to `20` """ @doc since: "2.18.1" def transaction(conf, fun_or_multi, opts \\ []) do transaction(conf, fun_or_multi, opts, 1) end defp transaction(conf, fun_or_multi, opts, attempt) do __dispatch__(:transaction, [conf, fun_or_multi], opts) rescue error in [DBConnection.ConnectionError, Postgrex.Error, MyXQL.Error] -> opts = Keyword.merge(@retry_opts, opts) cond do expected_error?(error) and attempt < opts[:expected_retry] -> jittery_sleep(opts[:expected_delay]) attempt < opts[:retry] -> jittery_sleep(attempt * opts[:delay]) true -> reraise error, __STACKTRACE__ end transaction(conf, fun_or_multi, opts, attempt + 1) end defp expected_error?(%_{postgres: %{code: :lock_not_available}}), do: true defp expected_error?(%_{postgres: %{code: :serialization_failure}}), do: true defp expected_error?(_error), do: false defp jittery_sleep(delay), do: delay |> Backoff.jitter() |> Process.sleep() defp __dispatch__(name, [%Config{} = conf | args]) do with_dynamic_repo(conf, name, args) end defp __dispatch__(name, [%Config{} = conf | args], opts) when is_list(opts) do opts = conf |> default_options() |> Keyword.merge(opts) with_dynamic_repo(conf, name, args ++ [opts]) end defp with_dynamic_repo(%{get_dynamic_repo: fun} = conf, name, args) when is_function(fun, 0) or is_tuple(fun) do prev_instance = conf.repo.get_dynamic_repo() dynamic_repo = case fun do {module, func, args} -> apply(module, func, args) fun -> fun.() end try do if not in_transaction?(conf, prev_instance) do conf.repo.put_dynamic_repo(dynamic_repo) end apply(conf.repo, name, args) after conf.repo.put_dynamic_repo(prev_instance) end end defp with_dynamic_repo(conf, name, args) do apply(conf.repo, name, args) end defp in_transaction?(conf, instance) when is_pid(instance), do: conf.repo.in_transaction?() defp in_transaction?(conf, instance) when is_atom(instance) do case GenServer.whereis(instance) do pid when is_pid(pid) -> in_transaction?(conf, pid) _ -> false end end defp in_transaction?(_, _), do: false end