Using Phoenix.PubSub to manage side effects

In a normal cycle of a request we do a couple of things before we show something to the user. A CRUD request is received, we do something on the database and return the data on the page that’s shown to the user. Because of this, it takes some time before the user gets shown what he requested.

In some requests there are side effects. What I see as a side effects is everything in the request that has been done that doesn’t have a direct input on what the user sees.

In our case at Defacto we run an LMS on Phoenix with Elixir, in which we check the compliance and qualification of the employees of organizations. We also provide ways to create e-learning content, events and have options to import and export results with API’s. In this example we have a Course, that’s created out of the e-learning content as subjects.

When a new subject is added to the course, there are a couple of things we want to do.

  • Create an audit entry, to have a trail that this user added this.
  • Create an Activity that will be shown elsewhere.
  • Send emails to all the users that are enrolled so they know there’s something new to learn
  • Send API calls to Analytics (when configured, we can send data to Mixpanel)

All these things might occur but the only thing the user wants to see is if he entered the correct data to create the course. So all these actions can be done in the background and outside of the initial request.

Phoenix Pubsub in our LMS

Implementation

EventCenter

In this application the EventCenter will do the basic setup of the PubSub. It is a GenServer and I will leave out basic GenServer code.

# lib/app_name/event_center.ex
defmodule AppName.EventCenter do
  use GenServer
  alias Phoenix.PubSub

  @pubsub AppName.EventCenter.PubSub

  ...

  def broadcast(action, object, actor)
      when action in [:created, :deleted] do
    PubSub.broadcast(@pubsub, topic(), {action, object, actor})
  end

  def broadcast(:updated = action, previous, current, actor) do
    PubSub.broadcast(@pubsub, topic(), {action, previous, current, actor})
  end

  def subscribe do
    PubSub.subscribe(@pubsub, topic())
  end

  def topic do
    Atom.to_string(__MODULE__)
  end

  ...
end

In here we define a couple of functions which I will explain:

def subscribe do
  PubSub.subscribe(@pubsub, topic())
end

This is a function that the subscribers use to get their subscription on the topic.

In our app we only have one topic which is the modulename of the EventCenter. As you can see every broadcast will post to the same topic with:

def broadcast(action, object, actor)
    when action in [:created, :deleted] do
  PubSub.broadcast(@pubsub, topic(), {action, object, actor})
end

def broadcast(:updated = action, previous, current, actor) do
  PubSub.broadcast(@pubsub, topic(), {action, previous, current, actor})
end

We only allow three actions :created, :deleted and :updated. This corresponds to every action we do on the database. Next to that we send the object that was created/deleted/updated and for the updates we send both the old state and the new state of that object. Finally we also add a user to the messages who triggered the action.

These broadcasts will be send to the pubsub and distributed to our subscribers.

Broadcasting

In a function where (for example) a course gets created we send our info to the EventCenter. We only do this when we know that a the course was actually created.

def create_course(%{} = attrs, %User{} = actor) do
  result =
    %Course{created_by_id: user.id}
    |> Course.changeset(attrs, user)
    |> Repo.insert(changeset)

  with {:ok, course} <- result do
    EventCenter.broadcast(:created, course, actor)
  end

  result
end

With a transaction we have to make sure to only broadcast it after the transaction was successfully completed.

def create_event_as_subject(%Course{} = course,%{} = attrs, %User{} = user) do
  result =
    Repo.transaction(fn ->
      with {:ok, event} <- Events.do_create_event(attrs, user),
           {:ok, subject} <- do_create_subject(%{}, course, event) do
        {subject, event}
      else
        {:error, error} -> Repo.rollback(error)
      end
    end)

  with {:ok, {subject, event}} <- result do
    EventCenter.broadcast(:created, event, user)
    EventCenter.broadcast(:created, subject, user)
    {:ok, subject}
  end
end

Subscribers

The (Event)Subscribers will subscribe to the EventCenter that’s shown above. Since every event in our app is posted there, all our subscribers will get notified when an event was broadcasted.

The subscribers in our application are setup within each phoenix context that we have. And look something like this.

defmodule AppName.Audits.EventSubscriber do
  @moduledoc """
  An event subscriber that listens to the `EventCenter` pubsub for messages and
  processes received messages to store activites.
  """
  use GenServer

  import ScoutApm.Tracing

  alias AppName.{Audits, EventCenter}

  ...

  def init(:ok) do
    EventCenter.subscribe()

    {:ok, []}
  end

  def handle_info({action, object, actor}, _) do
    Audits.create_audit(object, Atom.to_string(action), actor)

    {:noreply, []}
  end

  def handle_info({action, object, new_object, actor}, _) do
    Audits.create_audit(object, new_object, Atom.to_string(action), actor)

    {:noreply, []}
  end

  def handle_info(_, state), do: {:noreply, state}
end

Subscribing to the EventCenter (pubsub) is the first thing that the EventSubscriber does with the init/1 function. It calls EventCenter.subscribe/0 which does PubSub.subscribe(@pubsub, topic()) in EventCenter, which makes sure that subscribers get notified on every topic that is broadcasted to the EventCenter.

The handle_info/2 will receive messages that normaly would be something like:

{:created, %AppName.Courses.Course{...}, %AppName.Accounts.User{...}}

Where :created is the action, the struct %AppName.Courses.Course{} is the object that has been created and %AppName.Account.User{} is the struct for the user who created the course. For updates we do not have three items in the message tuple but four, with the object as it was before the update and the updated object that is called new_object.

When a message arrives and matches the handle_info arguments, Audits.created_audit/4 is called within a process that is async from from the process where the course itself is created in. There’s a an Audit created in the background, an Activity, an email was sent to inform users and there was an API call to Mixpanel. All of that was done while the user was looking at his new Course and didn’t have any delay in receiving it.

Everything was done by the EventSubscribers of that task. The EventSubscriber that handles external API calls knows which events it has to send en which it shouldn’t. Same goes for the emails where inside it’s own EventSubscriber we can check if the user wants to be notified, get all users with the right data from the database and send the email.

Tests

Now that everything should send there messages we also want to test if the handle_call/2 functions are actually getting called. To do this we can use ExUnit’s assert_receive/2. With that we can test if EventCenter.broadcast(:created, course, actor) is called with the following code:

# insert/1 and params_for/1 are done with ExMachina
test "handle_call of event subscribers are called with :created, the course and the actor" do
  EventCenter.subscribe()

  user = insert(:user, roles: ["roles", "that", "are", "needed"])

  {:ok, %Course{} = course} = Courses.create_course(params_for(:course), user)

  assert_received({:created, ^course, ^user})
end
  • EventCenter.subscribe() makes sure that we’ve activated the eventcenter and that messages will be passed through the PubSub to the EventSubscriber (as it also does in the application).
  • Courses.create_course(params_for(:course), user) will trigger the broadcast.
  • assert_received({:created, ^course, ^user}) asserts that a message matching pattern was received and is in the current process’ mailbox.

We also test that calls to the EventSubscriber actually do what they are supposed to do.

test "create an audit after broadcasting it" do
  start_supervised(AppName.Audits.EventSubscriber)
  course = insert(:course, status: "draft")
  actor = insert(:user)

  EventCenter.broadcast(:updated, course, %{course | status: "published"}, actor)

  assert_eventually(Repo.exists?(Audit))
  stop_supervised(AppName.Audits.EventSubscriber)
end
  • first the EventSusbscriber is started
  • A course and an user are created
  • Broadcast with :updated, the old course, an updated course, and the actor
  • assert_eventually is an assert in a loop with a :timer.sleep/1 that checks a few times if the assertion is done.
  • Repo.exists?() returns true if the Audit was created.
  • Finally the EventSubscriber is stopped.

Rest of the setup

The EventCenter and EventSubscribers have to be started in the application.

defmodule AppName.Application do
  ...
  def start(_type, _args) do
    children = [
      ...
      AppName.EventCenter,
      AppName.Audits.EventSubscriber,
      ...
    ]
    ...
    Supervisor.start_link(children, opts)
  end
end