Distributing Phoenix -- Part 2: Learn You a 𝛿-CRDT for Great Good

Distributing Phoenix – Part 1: Kubernetes and Elixir 1.9 walk into a bar...
Distributing Phoenix – Part 3: LiveView is Magic


Background

Previously in Part 1, we covered setting a up a distributed Phoenix application foundation using Kubernetes, Docker, and libcluster. The application didn't do much – anything really. However, it gave us a good base to move forward without having to worry about distribution, and allows us to continue verifying the distributed state as we move on.

In this article, I cover creating the car park business logic, and ensuring that the data is distributed across the cluster using Horde and the DeltaCrdt libraries.

Distributing Supervisors

One of the main issues with distributed systems is when you need a global supervisor working across the entirety of a cluster. Consider the following use case: one needs to maintain a pool of limited connections to an external API.

Although we can easily pool with libraries such as poolboy, in a naive architecture each node running an application would spin up its own pool. And while we could run checks to ensure only one node spins up a pool, when that node goes down we would need to make sure a new node takes over. Additionally, one needs to ensure that the processes are handed off with their state intact. At Homepolish, we wrote our own process monitoring library based on libcluster which did exactly this. In the end though, we ended up migrating to Swarm by Paul Schoenfelder, as the design was nearly identical to what we had come up with and met our needs perfectly – almost.

Some of Swarm's design choices caused us a lot of frustration, specifically the lack of graceful shutdown and the inability to handle supervisors in the standard way. Eventually, the community filled in some gaps, but then came Derek Kraan and his Horde library. This was the evolutionary change needed to really make working with distributed supervisors manageable.

How It Works

Horde maintains processes on each node in the cluster. These track members and nodes subscribing to distributed processes, handle the registry of these processes, and ensure that when nodes go down, other nodes pick up them up.

Basic Horde Cluster

In the above figure, we can see that we have 3 processes distributed across the nodes Alice, Bob, and Charlie. The three nodes are aware of each other's supervisors, and can target any of the three processes using the registry. When Charlie goes down, the processes are restarted on Alice and Bob. If the shutdown is graceful we can even disconnect Charlie from the cluster first, handing off state and draining in-flight processes prior to shutdown.

Note: The actual process tree is a bit more complex due to the syncing and monitoring processes Horde uses.

Implementing Horde

One of the biggest positives for Horde is its near identical API to normal DynamicSupervisor and Registry modules. Thus, working with the library is very straight forward.

We'll add the dependency and modify application.ex to wire up the cluster members. A few lines is all that's required to get Horde up and running.

children = [
  ...
  ParkingWeb.Endpoint,
  # Add the global registry
  {Horde.Registry, keys: :unique, name: Parking.Registry},
  ...
  # Wire up Horde on start
  %{
    id: Parking.HordeConnector,
    restart: :transient,
    start: {
      Task,
      :start_link,
      [
        fn ->
          # Join nodes to distributed Registry
          Horde.Cluster.set_members(Parking.Registry, membership(Parking.Registry, nodes()))
        end
      ]
    }
  }
]
...

## PRIVATE FUNCTIONS

defp nodes, do: [Node.self()] ++ Node.list()

defp membership(horde, nodes), do: Enum.map(nodes, fn node -> {horde, node} end)
application.ex

We add the global registry to our list of supervised children, as well an initialization function to run on the start of our node. This latter part will join the members to the registry horde.

Adding Some Gates

Now that we can register processes globally, we'll need some processes worth registering. In describing our use case, we discussed having multiple gates to allow vehicles to enter and exit through. It's unimportant which node these processes exist on, so we can distribute them, however, we want to ensure there are a fixed number of gates. We can accomplish this by setting a max number of gates and registering them using a simple numbering scheme; i.e. "gate-1", "gate-2", etc.

Without Horde, we'd get a new set of n gates whenever a new node would spin up. Using our distributed global registry though, we'll have exactly n number of gates spread out across the cluster.

Gate GenServer

Before we jump into the registration, we'll first create the gate GenServer that will handle the interaction of entering and exiting the parking lot. It won't contain any real logic yet, but will allow us to visualize the transfer of processes between the nodes in the cluster.

defmodule Parking.Gate do
  use GenServer

  require Logger

  ## Client API

  def child_spec(number),
    do: %{id: id(number), start: {__MODULE__, :start_link, [number]}}

  def start_link(number) do
    state = %{number: number}

    # Register with gate number as unique id
    {:ok, _pid} = GenServer.start_link(__MODULE__, state, name: via_registry(number))
  end

  def init(%{number: number} = state) do
    Logger.warn("===> Gate No. #{number} running on #{inspect(Node.self())}")
    Logger.debug(inspect(state, pretty: true))
    {:ok, state}
  end

  def id(number), do: String.to_atom("gate-" <> to_string(number))

  # Vehicle enters the lot
  def enter(number, license) do
    GenServer.cast(via_registry(number), {:enter, license})
  end

  # Vehicle exits the lot
  def exit(number, license) do
    GenServer.cast(via_registry(number), {:exit, license})
  end

  ## PRIVATE FUNCTIONS

  defp via_registry(number) do
    {:via, Horde.Registry, {Parking.Registry, id(number)}}
  end

  ## Server Callbacks

  def handle_cast({:enter, license}, state) do
    # TODO: Add new license to state of lot
    {:noreply, state}
  end

  def handle_cast({:exit, license}, state) do
    # TODO: Remove license from state of lot
    {:noreply, state}
  end
end
gate.ex

The important thing to note about the above code block is the process is named via the Horde registry. Horde will recognize and handle name collisions as new nodes come online. That is, when Bob says "create gate-1", Horde will determine if it should create a new process on Bob, or leave it on Alice.

Supervising the Gates

At this point we can register our gates with their own horde supervisor. In the same manner we set membership on the registry, we'll ensure all our nodes have a gate supervisor to distribute and track the status of our gates. To do so, we'll go back to application.ex and modify it some more.

# We're going to have 4 gates
@max_gates 4
...

children = [
  ...
  {Horde.Registry, keys: :unique, name: Parking.Registry},
  # Add the supervisor for the gates here
  Supervisor.child_spec(
    {Horde.Supervisor, strategy: :one_for_one, name: Parking.GateSupervisor},
    id: :gate_supervisor
  ),
  # Wire up Horde on start
  %{
    ...
      [
        fn ->
          ...

          # Setup supervisors
          Horde.Cluster.set_members(
            Parking.GateSupervisor,
            membership(Parking.GateSupervisor, nodes())
          )

          # Spin up gates
          1..@max_gates |> Enum.map(&init_gate/1)
        end
      ]
    }
  }
]
...

## PRIVATE FUNCTIONS

defp init_gate(number),
    do: Horde.Supervisor.start_child(Parking.GateSupervisor, {Parking.Gate, number})
application.ex

Verify

Let's open up three terminals and spin up a node on each one. Then we'll kill off one of them and watch the processes distribute accordingly. Run the following in each terminal respectively:

iex --sname alice --cookie monster -S mix
iex --sname bob --cookie monster -S mix
iex --sname charlie --cookie monster -S mix

Already we should see log statements indicating where each gate is running. But to get a more explicit view, we'll run :observer.start for each node.

Process Distribution on 3 Nodes

In my case, Bob keeps all 4 gate processes. These can be seen branching from Parking.GateSupervisor.ProcessSupervisor in the top right. Also note the registry node has links to the 4 processes.

Process Distribution after Killing Node

Now we run :init.stop on Bob, and the processes redistribute to both Alice and Charlie. Each registry node also maintains the links to the processes on that node.

Sharing State

We'll eventually get to making the gates do something, but we need something for the gates to interact with. The vehicles entering and exiting the lot need to be tracked in some form of state. We'll do this with a CRDT (conflict-free replicated data type), or more specifically, a Delta-CRDT, which tracks the state transitions on the data type rather than the state itself. This is the same technology Horde uses to keep the distributed registry and supervisors in sync.

Note: CRDTs ensure eventual consistency of state, not immediate consistency. There are situations where race conditions will prevent all processes from having perfect information at the time of function execution. We'll actually encounter one of these situations later on in the article.

We'll first add a supervisor to monitor both the CRDT and the Lot module – which we'll use later to interact with the CRDT.

defmodule Parking.LotSupervisor do
  use Supervisor

  def start_link([args, opts]), do: Supervisor.start_link(__MODULE__, args, opts)

  def init(args) do
    children = [
      # Manages distributed state
      {
        DeltaCrdt,        
        sync_interval: 300,
        max_sync_size: :infinite,
        shutdown: 30_000,
        crdt: DeltaCrdt.AWLWWMap,
        name: Parking.Lot.Crdt
      },
      # Interface for tracking state of cars through gates
      {Parking.Lot, args}
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end
end
lot_supervisor.ex

And now the Lot module...

defmodule Parking.Lot do
  use GenServer

  require Logger

  @crdt Parking.Lot.Crdt

  ## Client API

  def child_spec(config), do: %{id: __MODULE__, start: {__MODULE__, :start_link, [config]}}

  def start_link(config) do
    {:ok, _pid} = GenServer.start_link(__MODULE__, Map.new(config), name: __MODULE__)
  end

  def init(state) do
    Logger.warn("===> Parking.Lot interfaced for #{inspect(Node.self())}")

    crdt_state =
      DeltaCrdt.read(@crdt)
      |> Enum.group_by(fn {{k, _}, _} -> k end, fn {{_, kv}, v} -> {kv, v} end)

    # Set vehicles from CRDT if present
    vehicles = crdt_state |> Map.get(:vehicle, []) |> Map.new()
    state = Map.put(state, :vehicles, vehicles)

    {:ok, state}
  end

  def track_entry(license, gate_number),
    do: GenServer.cast(__MODULE__, {:track_entry, license, gate_number})

  def track_exit(license, gate_number),
    do: GenServer.cast(__MODULE__, {:track_exit, license, gate_number})

  def available_spaces(), do: GenServer.call(__MODULE__, :available_spaces)
  
  def vehicles(), do: GenServer.call(__MODULE__, :vehicles)

  ## Server Callbacks

  def handle_call(:available_spaces, _from, %{max_spaces: max_spaces, vehicles: vehicles} = state) do
    {:reply, max_spaces - map_size(vehicles), state}
  end
  
  def handle_call(:vehicles, _from, %{vehicles: vehicles} = state) do
    {:reply, vehicles, state}
  end

  # Track an entry of a vehicle, capturing the license and the time entered
  def handle_cast({:track_entry, license, gate_number}, state) do
    Logger.info("===> Vehicle '#{license}' entered though gate #{gate_number}")

    # Update CRDT and state
    time = System.monotonic_time()

    DeltaCrdt.mutate(@crdt, :add, [{:vehicle, license}, time], :infinity)
    vehicles = state |> Map.get(:vehicles) |> Map.put(license, time)

    {:noreply, %{state | vehicles: vehicles}}
  end

  # Track an exit of a vehicle
  def handle_cast({:track_exit, license, gate_number}, state) do
    Logger.info("===> Vehicle '#{license}' exited though gate #{gate_number}")

    # Update CRDT and state
    DeltaCrdt.mutate(@crdt, :remove, [{:vehicle, license}], :infinity)
    vehicles = state |> Map.get(:vehicles) |> Map.delete(license)

    {:noreply, %{state | vehicles: vehicles}}
  end
end
lot.ex

There's a lot going on in here, so we'll highlight the important parts.

The init/1 function reads from the CRDT to get the current state. In this case we use tuples for the keys so that we could track more than just vehicle state. If the CRDT has no vehicles in it yet, we default to an empty map.

There are two functions for tracking entries and exits, imaginatively called track_entry/2 and track_exit/2. These functions are handled in the server callbacks as casts that update the CRDT accordingly – either adding to the vehicle map using the license number as a key, or removing from the map using the same. They also update the current state of the GenServer.

Note: We're also setting the time the vehicle entered. I have a thought to use this later for asserting the cost of parking for X amount of time, but can be ignored for now.

Lastly, we have two functions for retrieving state, available_spaces/0 and vehicles/0, which return the current state of the lot as projected from the CRDT.

Before we can verify this works though, we actually need to hook it up. We need to modify application.ex again to add Parking.LotSupervisor as a child.

@max_spaces 100
...

children = [
  ...
  {Horde.Registry, keys: :unique, name: Parking.Registry},
  # Add the lot supervisor here
  {Parking.LotSupervisor, [[max_spaces: @max_spaces], [name: Parking.LotSupervisor]]},
  ...
]
application.ex

Verify

If we spin up our nodes again, we should be able to see entries made on Alice from Bob, and the inverse.

iex(alice@panthro)7> Parking.Lot.track_entry("abc 123", 1)
:ok
[info] ===> Vehicle 'abc 123' entered though gate 1
iex(alice@panthro)8> Parking.Lot.available_spaces         
99
iex(alice@panthro)9> DeltaCrdt.read(Parking.Lot.Crdt)     
%{{:vehicle, "abc 123"} => -576460581433871794}

Now on Bob...

iex(bob@panthro)1> DeltaCrdt.read(Parking.Lot.Crdt)
%{}

Wait, what?

Much in the same way the registry and supervisors in Horde need to have the membership declared, so too does the CRDT. In fact, this is exactly what occurs when one invokes Horde.Cluster.set_members/2, albeit with much more sophisticated code than we're about to use.

Wire the CRDT

To simplify the management of members for the lot CRDT, we'll add some functionality to lot_supervisor.ex, and call it from the start function in application.ex.

def join_neighbourhood(nodes) do
  # Map all nodes to the CRDT process for that node
  crdts =
    Enum.map(nodes, fn node ->
      :rpc.call(node, Process, :whereis, [Parking.Lot.Crdt])
    end)

  # Creates combinations of all possible node sets in the neighbourhood
  # i.e. for a set [1, 2, 3] -> [{1, [2, 3]}, {2, [1, 3]}, {3, [1, 2]}]
  combos = for crdt <- crdts, do: {crdt, List.delete(crdts, crdt)}

  # Enumerate the list wire up the neighbours
  Enum.each(combos, fn {crdt, crdts} ->
    :ok = DeltaCrdt.set_neighbours(crdt, crdts)
  end)
end
lot_supervisor.ex

We also need to add the following code after the Horde.Cluster.set_members/2 call in application.ex.

# Establish Parking Lot CRDT network
Parking.LotSupervisor.join_neighbourhood(nodes())

Now both nodes will be aware of the entries and exits, however the lot state will still not be correct. We can see this by going back to iex.

iex(bob@panthro)1> DeltaCrdt.read(Parking.Lot.Crdt)                       
%{}
iex(alice@panthro)2> Parking.Lot.track_entry("abc 123", 1)
:ok
[info] ===> Vehicle 'abc 123' entered though gate 1
iex(alice@panthro)3> DeltaCrdt.read(Parking.Lot.Crdt)     
%{{:vehicle, "abc 123"} => -576460718763928164}
iex(bob@panthro)2> DeltaCrdt.read(Parking.Lot.Crdt)
%{{:vehicle, "abc 123"} => -576460718763928164}
iex(bob@panthro)3> Parking.Lot.available_spaces                          
100

Note the incorrect number of spaces on Bob. This is because although the CRDT updated, the state of the GenServer was not changed. We can fix this by adding some handling to the lot module and setting the handler on the supervisor.

def init(args) do
  children = [
    # Manages distributed state
    {
      DeltaCrdt,
      # Add this line to handle diff changes on the CRDT
      on_diffs: fn _diffs -> send(Parking.Lot, :update_state) end,
      ...
  ]
lot_supervisor.ex
def handle_info(:update_state, state) do
  # Read CRDT state and group by key type
  crdt_state =
    DeltaCrdt.read(@crdt)
    |> Enum.group_by(fn {{k, _}, _} -> k end, fn {{_, kv}, v} -> {kv, v} end)

  # Set vehicles from CRDT if present
  vehicles = crdt_state |> Map.get(:vehicle, []) |> Map.new()
  state = Map.put(state, :vehicles, vehicles)

  {:noreply, state}
end
lot.ex

Now all the nodes should be aware of the changes, and if we rerun the verifications above, we'll see everything remains in sync.

Note: The on_diffs option isn't well documented, but digging around in Horde's code illustrated its purpose relatively well. Although being ignored here, the diffs argument passed to the handler can be used to identify the specific changes being projected, and thus one could use them for more specific handling.

Connecting the Gates to the Lot

We're finally ready to start tracking cars! Well, almost. We still need the gates to inform the lot on entry and exit. Let's do that now. We need to modify the server callbacks in gate.ex, replacing the casts for enter and exit with the code below.

def handle_cast({:enter, license}, %{number: number} = state) do
  if Parking.Lot.available_spaces() >= 0 do
    Parking.Lot.track_entry(license, number)
  else
    Logger.warn("===> Gate No. #{number} reporting lot full")
  end

  {:noreply, state}
end

def handle_cast({:exit, license}, %{number: number} = state) do
  Parking.Lot.track_exit(license, number)
  {:noreply, state}
end
gate.ex

And for ease of testing, let's add a vehicle generator...

defmodule Parking.Vehicle do
  def generate(), do: :crypto.strong_rand_bytes(7) |> Base.encode32() |> binary_part(0, 7)
end
vehicle.ex

Verify

We can spin up a node now and run the following code to send cars through the gates:

iex(alice@panthro)1> for _i <- 1..105, do: Parking.Gate.enter(Enum.random(1..4), Parking.Vehicle.generate())

[info] ===> Vehicle 'A47542W' entered though gate 2
[info] ===> Vehicle 'YBL2YQI' entered though gate 3
[info] ===> Vehicle 'NWOBMQA' entered though gate 1
[info] ===> Vehicle 'ELAJ5PB' entered though gate 3
[info] ===> Vehicle 'DS7Y7DU' entered though gate 4
[info] ===> Vehicle 'F5SGUIM' entered though gate 4
[info] ===> Vehicle '4N7GI5I' entered though gate 2
[info] ===> Vehicle 'P2YXNVN' entered though gate 3
[info] ===> Vehicle 'HEMHOOL' entered though gate 1
[info] ===> Vehicle 'OV4JABN' entered though gate 2
[info] ===> Vehicle '7EOQS4K' entered though gate 3
...
[warn] ===> Gate No. 1 reporting lot full
[warn] ===> Gate No. 1 reporting lot full
[warn] ===> Gate No. 1 reporting lot full

It works! Well, with a caveat.

Depending on how fast one's computer is, they may find that more cars entered than should have. This is because the state of the GenServer indicating the number of vehicles vs the available spaces may not have been updated by the time the process executed.

In a more realistic situation for this example a gate boom would take time to raise, a vehicle to pass through, and the gate boom to lower. Vehicles that arrive to the gate at the same time line up as this process occurs, and would enter or exit in a serial manner at each gate. Transferring the analogy to the coding world, we could implement locks, queues, and back pressure to ensure the data is more accurate.

In a distributed system using CRDTs there is another issue though due to an inherent problem: weaker guarantees. This is what we touched on earlier in the note section when introducing the shared state concept. We can ensure that the state will converge, but it's not guaranteed that it is accurate when being read. There will always be latency between nodes, and race conditions will occur. The best solution for this appears to simply be design with it in mind – that is, ensure there are fail safes and fall backs for when these situations arise. Alternatively, one can use solutions that provide stronger guarantees.

Note: CRDTs aren't the only way to share state in a distributed system, nor are they always beneficial to use case being solved. If strong consistency is preferred, PAXOS or RAFT implementations may be better suited.

Follow Up

This has been quite the journey, but we now have a distributed system that will dynamically setup gates on nodes as they join and leave the cluster, and share the state of the lot across all members.

In the next article we'll add a frontend to visualize the lot status using LiveView.

Show Comments