defmodule Oban.Peer do @moduledoc """ The `Peer` module maintains leadership for a particular Oban instance within a cluster. Leadership is used by plugins, primarily, to prevent duplicate work across nodes. For example, only the leader's `Cron` plugin will try inserting new jobs. You can use peer leadership to extend Oban with custom plugins, or even within your own application. Note a few important details about how peer leadership operates: * Each peer checks for leadership at a 30 second interval. When the leader exits it broadcasts a message to all other peers to encourage another one to assume leadership. * Each Oban instance supervises a distinct `Oban.Peer` instance. That means that with multiple Oban instances on the same node one instance may be the leader, while the others aren't. * Without leadership, global plugins (Cron, Lifeline, Stager, etc.), will not run on any node. ## Available Peer Implementations There are two built-in peering modules: * `Oban.Peers.Database` — uses table-based leadership through the `oban_peers` table and works in any environment, with or without clustering. Only one node (per instance name) will have a row in the peers table, that node is the leader. This is the default. * `Oban.Peers.Global` — coordinates global locks through distributed Erlang, requires distributed Erlang. You can specify the peering module to use in your Oban configuration: config :my_app, Oban, peer: Oban.Peers.Database, # default value ... If in doubt, you can call `Oban.config()` to see which module is being used. ## Examples Check leadership for the default Oban instance: Oban.Peer.leader?() # => true That is identical to using the name `Oban`: Oban.Peer.leader?(Oban) # => true Check leadership for a couple of instances: Oban.Peer.leader?(Oban.A) # => true Oban.Peer.leader?(Oban.B) # => false """ alias Oban.{Config, Registry} require Logger @type option :: [name: module(), conf: Config.t(), interval: timeout()] @type conf_or_name :: Config.t() | GenServer.name() @doc """ Starts a peer instance. """ @callback start_link([option()]) :: GenServer.on_start() @doc """ Check whether the current peer instance leads the cluster. """ @callback leader?(GenServer.server()) :: boolean() @doc """ Check which node's peer instance currently leads the cluster. """ @callback get_leader(GenServer.server()) :: nil | String.t() @doc false @spec child_spec(Keyword.t()) :: Supervisor.child_spec() def child_spec(opts) do %{peer: {peer, peer_opts}} = Keyword.fetch!(opts, :conf) opts = opts |> Keyword.merge(peer_opts) |> Keyword.put_new(:name, peer) %{id: opts[:name], start: {peer, :start_link, [opts]}} end @doc """ Get the name and node of the instance that currently leads the cluster. ## Example Get the leader node for the default Oban instance: Oban.Peer.get_leader() "web.1" Get the leader node for an alternate instance named `Oban.Private` Oban.Peer.get_leader(Oban.Private) "web.1" """ @spec get_leader(conf_or_name, timeout()) :: nil | String.t() def get_leader(conf_or_name \\ Oban, timeout \\ 5_000) do safe_call(conf_or_name, :get_leader, timeout, nil) end @doc """ Check whether the current instance leads the cluster. ## Example Check leadership for the default Oban instance: Oban.Peer.leader?() # => true Check leadership for an alternate instance named `Oban.Private`: Oban.Peer.leader?(Oban.Private) # => true """ @spec leader?(conf_or_name(), timeout()) :: boolean() def leader?(conf_or_name \\ Oban, timeout \\ 5_000) do safe_call(conf_or_name, :leader?, timeout, false) end defp safe_call(%Config{name: name, peer: {peer, _}}, fun, timeout, default) do case Registry.whereis(name, Oban.Peer) do pid when is_pid(pid) -> apply(peer, fun, [pid, timeout]) _ -> default end catch :exit, {:timeout, _} = reason -> Logger.warning( message: "Oban.Peer.#{fun}/2 check failed due to #{inspect(reason)}", source: :oban, module: __MODULE__ ) default end defp safe_call(name, fun, timeout, default) do name |> Oban.config() |> safe_call(fun, timeout, default) end end