Alex Martsinovich

Software Engineer
Home Posts Github LinkedIn

What Does NimbleOwnership Do Anyway?

If you've worked with Elixir for a while, you know that there are two kinds of tests: the good ones and the ones that need to be justified. The former being, of course, asynchronous tests – fast and joyful, and the latter being synchronous tests, the ones that get a side eye in code reviews.

Anyone who tries to write asynchronous tests sooner or later learns about the concept of ownership. This concept exists in Ecto, Mox, Req, and other libraries. In fact, it became so common that ownership management was extracted into a separate library called NimbleOwnership.

In this post, we'll see how the idea of ownership may come up naturally and what part of it can be handled by NimbleOwnership.

Hell Is Other Tests

When we say "ownership" we mean "resource ownership". You see, asynchronous tests exist in a highly concurrent environment, where anything can happen. Other tests might write to the same database (Ecto) or call the same mocks (Mox, Req) as your target test. When this happens, it may really mess up your test expectations.

To demonstrate this, let's look at another type of global resource: handlers - specifically logger handlers. Let's say we have a logger handler that simply counts the number of logs with an Agent process:

ExUnit.start()

defmodule OtherTests do
  use ExUnit.Case, async: true
  require Logger

  for i <- 1..10 do
    test "test #{i}" do
      Process.sleep(100)
      Logger.info("Other tests")
    end
  end
end

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  @behaviour :logger_handler
  @impl :logger_handler
  def log(_event, %{config: agent}) do
    Agent.update(agent, fn counter -> counter + 1 end)
  end

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    :logger.add_handler(:handler_id, __MODULE__, %{config: agent})
    Process.sleep(100)
    Logger.info("Foo")
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

With the power of Process.sleep/1, we can carefully craft this concurrency train wreck. The test in the MyTest module attaches a logger handler that is called whenever we log something. It then logs a single message and asserts that the counter was, in fact, incremented. Unfortunately, there are other asynchronous tests that aren't shy about logging messages themselves, and those messages also call the same logger handler, which interferes with our test:

1) test test log (MyTest)
   ex_unit.exs:24
   Assertion with == failed
   code:  assert Agent.get(agent, fn counter -> counter end) == 1
   left:  2
   right: 1
   stacktrace:
     ex_unit.exs:29: (test)

We can't let this happen, can we?

Can I See Your PID?

Ok, all we need to do is not let other tests call our logger handler. Luckily for us, handlers are executed in the same process, so we can just add the test PID to the handler config and check it:

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger
  
  @behaviour :logger_handler
  @impl :logger_handler
  def log(event, %{config: {agent, test_pid}}) do
    with true <- test_pid == self() do
      Agent.update(agent, fn counter -> counter + 1 end)
    end
  end
  
  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    :logger.add_handler(:handler_id, __MODULE__, %{
      config: {agent, self()}
    })
    Process.sleep(100)
    Logger.info("Foo")
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

Problem solved! No, seriously, problem solved. OK, OK, I hear what you're saying - in real life, logger handler code is in your application and adding a PID check to it is silly. That's fair. But we can always create a wrapper!

defmodule MyApp.LogHandler do
  @behaviour :logger_handler

  @impl :logger_handler
  def log(_event, %{config: agent}) do
    Agent.update(agent, fn counter -> counter + 1 end)
  end
end

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  @behaviour :logger_handler
  @impl :logger_handler
  def log(event, %{config: {{real_handler, og_cfg}, test_pid}} = cfg) do
    with true <- test_pid == self() do
      real_handler.log(event, Map.put(cfg, :config, og_cfg))
    end
  end

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    :logger.add_handler(:handler_id, __MODULE__, %{
      config: {{MyApp.LogHandler, agent}, self()}
    })
    Process.sleep(100)
    Logger.info("Foo")
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

or even better, a logger filter:

defmodule MyApp.LogHandler do
  @behaviour :logger_handler

  @impl :logger_handler
  def log(_event, %{config: agent}) do
    Agent.update(agent, fn counter -> counter + 1 end)
  end
end

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)

    :logger.add_handler(:handler_id, MyApp.LogHandler, %{
      config: agent,
      filters: [
        ownership_filter: {
          fn event, test_pid ->
            if test_pid == self(), do: event, else: :stop
          end,
          self()
        }
      ]
    })

    Process.sleep(100)
    Logger.info("Foo")
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

It works, but at this point you probably see the limitation. Not all processes are our enemies. Some of them are alright, in fact. Really chill processes. But our solution is relentless:

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    test_pid = self()

    :logger.add_handler(:handler_id, MyApp.LogHandler, %{
      config: agent,
      filters: [
        ownership_filter: {
          fn event, test_pid ->
            if test_pid == self(), do: event, else: :stop
          end,
          test_pid
        }
      ]
    })

    Process.sleep(100)
    task = Task.async(fn -> Logger.info("Foo") end)
    Task.await(task)
    
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end
1) test test log (MyTest)
   ex_unit_spawn.exs:28
   Assertion with == failed
   code:  assert Agent.get(agent, fn counter -> counter end) == 1
   left:  0
   right: 1
   stacktrace:
     ex_unit_spawn.exs:53: (test)
╭─────────────────────╮
│ Don't talk to me or │
│ my son ever again   │
╰─────────────────────╯
                   │  ╭────────────────╮
                   ╰─ │                │
                      │ Logger Filter  │╭────────────────╮
                      │                ││ Logger Handler │
                      ╰────────────────╯╰────────────────╯
POV: You're trying to call the handler from a different process

Sharing Is Caring

How do we allow other processes to access our beloved resources? Let's think about it – we first create a filter that knows to only proceed if it is called in a process with a specific pre-defined PID. So, if we want to allow another PID, we need to expand this from a single PID to a list of allowed PIDs. The problem is that spawned processes' PIDs aren't known at the time when we define the filter. Well, in that case, we can tell the filter to check not the PID itself, but some sort of secret. A token of process friendship that we'll put into any spawned process's dictionary.

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    test_pid = self()

    :logger.add_handler(:handler_id, MyApp.LogHandler, %{
      config: agent,
      filters: [
        ownership_filter: {
          fn event, _config ->
            if Process.get(:friendship_token), do: event, else: :stop
          end,
          nil
        }
      ]
    })

    Process.sleep(100)

    task =
      Task.async(fn ->
        Process.put(:friendship_token, true)
        Logger.info("Foo")
      end)

    Task.await(task)
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

See, it works! All we need to do is hand out this token to all processes that are relevant to the test and make sure the filter knows where to look for it.

Do You Know Who My Dad Is?

Not going to lie, this does sound like a chore. But have no fear, as Elixir has us covered! There is a convention that processes that are part of other processes' hierarchy in a meaningful way keep track of their ancestor PIDs in their dictionary. This list can be found under a magical key $callers. This is called caller tracking, and a lot of modules do this automatically. For example, the Task module or start_supervised/2 ExUnit helper. We can use this to our advantage: instead of having a custom token, we'll ask our filter to look for the test PID in the $callers list.

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    test_pid = self()

    :logger.add_handler(:handler_id, MyApp.LogHandler, %{
      config: agent,
      filters: [
        ownership_filter: {
          fn event, test_pid ->
            callers = Process.get(:"$callers") || []
            if test_pid in [self() | callers], do: event, else: :stop
          end,
          test_pid
        }
      ]
    })

    Process.sleep(100)
    task = Task.async(fn -> Logger.info("Foo") end)
    Task.await(task)
    
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

Since we know that Task.async/1 will add the test PID to the spawned process callers list, we can look for it in the filter. $callers is basically a name-dropping list. If you want to be at the party, you've got to tell them that you know people.

Facebook For Processes

So what does NimbleOwnership do anyway? Oh, it's just a Facebook for processes! It keeps track of friends lists in a large book. Here's a quick primer on what NimbleOwnership lets us do:

  1. We can start a server, which is like a big book.
  2. Processes can register arbitrary keys that symbolize resources. They are like different lists in the book, with every list being tied to a process that registered it.
  3. Owner processes can allow other processes to access those keys. It's like adding a name to a list of invited guests. But don't be fooled, "allowance" here is just a name on the list. The server does not enforce this access in any way. It is a bookkeeper, not a bouncer.
  4. Anyone can check if a process is on any list for a resource. This part is important and a little counterintuitive; we'll take a closer look at it after an example.

So, let's use NimbleOwnership for our test:

Mix.install([
  {:nimble_ownership, "~> 1.0"}
])

{:ok, _} = NimbleOwnership.start_link(name: OwnershipServer)
ExUnit.start()

defmodule OtherTests do
  use ExUnit.Case, async: true
  require Logger

  for i <- 1..10 do
    test "test #{i}" do
      Process.sleep(100)
      Logger.info("Other tests")
    end
  end
end

defmodule MyApp.LogHandler do
  @behaviour :logger_handler

  @impl :logger_handler
  def log(_event, %{config: agent}) do
    Agent.update(agent, fn counter -> counter + 1 end)
  end
end

defmodule MyTest do
  use ExUnit.Case, async: true
  require Logger

  test "test log" do
    {:ok, agent} = Agent.start_link(fn -> 0 end)
    
    NimbleOwnership.get_and_update(
      OwnershipServer, self(), :handler_id, fn nil ->
      {nil, %{}}
    end)

    :logger.add_handler(:handler_id, MyApp.LogHandler, %{
      config: agent,
      filters: [
        ownership_filter: {
          fn event, handler_id ->
            callers = Process.get(:"$callers") || []
            
            OwnershipServer
            |> NimbleOwnership.fetch_owner([self() | callers], handler_id)
            |> case do
              {:ok, _owner_id} -> event
              _ -> :stop
            end
          end,
          :handler_id
        }
      ]
    })

    Process.sleep(100)    
    task = Task.async(fn -> Logger.info("Foo") end)
    Task.await(task)
    
    assert Agent.get(agent, fn counter -> counter end) == 1
  end
end

There aren't many changes. Let's break them down.

First, at the very beginning, we start the OwnershipServer process - just a big book.

Then, at the beginning of the test, before we do anything of interest, we create a key :handler_id in the server. This key is the resource we're talking about. A list in the book with just one name on it – the test PID, which created the list.

Our logger filter is both the bouncer and a partygoer. It finds itself inside a task and all it knows is that it wants to knock on the door with a :handler_id label on it, but only if it is invited. So it goes to the OwnershipServer (everybody knows OwnershipServer, a nice chap and very popular) and asks a question you won't normally hear at a party: "Hey, has anyone allowed me in? I know $callers by the way." And the answer is "Yes," because, even though our test did not allow the task explicitly, it allowed itself. Or rather, it owns the place. So yeah, if you can name-drop the owner of the house, go ahead.

Note that, in the general case, processes can create lists for the same key. So, when we ask the server if anyone allowed us in, we don't really care who allowed us. All we care about is that somebody, somewhere is expecting us. 🥰🥰🥰

For this reason, keys are often unique to the test, and that's what we rely on in our example. If some other test creates a :handler_id resource, our filter will allow it too. In some cases, this is useful, but for our handler example, we wouldn't want that to happen.

Conclusion

Most of the time, you won't have to deal with low-level ownership machinery. But understanding what caller tracking is and how it propagates is vital to writing asynchronous tests. That said, if you ever find yourself testing handlers or other global resources, remember that it's very doable to limit access to them even in a highly concurrent environment. You'll just need a good bouncer.