Postgres & Phoenix Search Without Triggers
While integrating search in our Phoenix application, we ended up at a lovely blog post that explained how we could implement it for all our entities, without having to think about it for every update insert or delete.
Our first implementation of search looked pretty much as described there, we had materialized views per type that we wanted to conduct searches on. These materialized views are created by triggers in the database on every insert, update and delete of that entity. Next to the materialized views, we also created indexes to increase the search speed.
Slow pages
Searches were fast but some pages turned out to be slow. This was because the
materialized views were recreated by every trigger (create, update, delete) of one
entity and building the materialized view could take up to 3 seconds.
This happened despite that option concurrently
has been set for refreshing the
materialized views. Which is documented by PostgreSQL:
“only one REFRESH at a time may run against any one
materialized view”
Since the triggers, and thus the refreshes, are happening in the same transaction as the statement, all create/update/delete actions on one table had to wait for each other. This could become very slow and batch operations also needed to ensure that the triggers were off before firing.
To improve performance we needed a way of asynchronously refreshing our
views. This is a difficult task for PostgreSQL but for Elixir this fits like a
glove. We removed the database triggers and replaced them with a GenServer.
In the MyApp.Repo
we have monkey patched all insert, update and delete functions.
These functions are defined with a macro in the module and this can be bypassed by using the
option read_only: true
on Ecto.Repo
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
defmodule MyApp.Repo do
use Ecto.Repo,
otp_app: :my_app,
adapter: Ecto.Adapters.Postgres,
read_only: true
...
# a monkey patch of `Repo.insert/2`
def insert(struct, opts \\ []) do
__MODULE__
|> Ecto.Repo.Schema.insert(get_dynamic_repo(), struct, opts)
|> perform_callback()
end
...
end
With extending the insert and similar functions of Repo (update/2, delete/2 etc.)
all actions will first be resolved and with the result it will call the
async perform_callback/1
.
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
defp perform_callback(result) do
case result do
# insert, update & delete
{:ok, %module{}} ->
Search.Interface.refresh_for(module)
# insert_all & update_all
{count, [%module{} | _]} when count > 0 ->
Search.Interface.refresh_for(module)
# insert!, update! and delete!
%module{} ->
Search.Interface.refresh_for(module)
# do nothing
_ ->
nil
end
result
end
The function perform_callback/1
will check on the type of the
module that was changed in the database and will call the
Search.Interface that looks like this:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
defmodule MyApp.Search.Interface do
...
def refresh_for(module) do
module
|> module_to_table()
|> search_views_for_table()
|> Enum.each(&refresh_later/1)
end
defp module_to_table(module) do
with %{__meta__: meta} <- struct(module), %{source: table} <- meta, do: table
end
defp search_views_for_table("courses"), do: ~w(course_search)
...
defp refresh_later(view_name), do: GenServer.cast(RefresherServer, {:refresh, view_name})
end
This is an interface for the GenServer, that is created for clarity with functions
that can be called from the outside of the GenServer. It checks on the module that comes
in and selects the materialized view that should be refreshed. The list returned by
search_views_for_table/1
can be extended so that other views can be refreshed.
The function can be copied and altered so views of other modules will be updated as well.
In the end, we cast to the GenServer in refresh_later/1
with the
view we want to refresh. The cast of the interface lets the view be added async
to the RefresherServer shown here.
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
defmodule MyApp.Search.RefresherServer do
@minimum_time_in_queue : 5_000
@max_concurrency 5
@valid_search_views ~w(course_search ...)
defstruct queue: [], running: [], timer_ref: nil
...
# Internal API
defp cancel_running_timer_if_any(nil), do: nil
defp cancel_running_timer_if_any(ref), do: Process.cancel_timer(ref)
defp fill_progress_queue(%{queue: [view | remainder], running: progress_queue} = state)
when length(progress_queue) < @max_concurrency do
%Task{ref: task_ref} = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, __MODULE__, :refresh_materialized_view, [view])
fill_progress_queue(%{state | queue: remainder, running: [task_ref | progress_queue]})
end
defp fill_progress_queue(state), do: state
def refresh_materialized_view(view_name) when view_name in @valid_search_views,
do: Repo.query("REFRESH MATERIALIZED VIEW CONCURRENTLY #{view_name};", [], timeout: :infinity)
end
Aside from a handle_cast/2
and the default GenServer setup, this is what our
RefresherServer looks like. To hook this up, our handle_cast takes the refresh
message coming from the refresh_later/1
in the Search.Interface. It will take
the view that needs to be updated and checks if that view is in the queue and if not, it
will add it.
0
1
2
3
4
def handle_cast({:refresh, view}, %{queue: queue} = state) do
new_queue = if view in queue, do: queue, else: [view | queue]
{:noreply, %{state | queue: new_queue}, {:continue, :set_timer}}
end
The handle_cast/2
reply will make the GenServer continue by canceling
the current timer and then setting the timer again. This ensures that we
don’t refresh the materialized views for every update and allows us to give
the database some time (@minimum_time_in_queue 5_000
) to complete the
refresh that is actually happening on the database.
0
1
2
3
4
5
6
def handle_continue(:set_timer, %{queue: [_ | _], timer_ref: ref} = state) do
cancel_running_timer_if_any(ref)
new_timer = Process.send_after(self(), :stage_refresh, @minimum_time_in_queue)
{:noreply, %{state | timer_ref: new_timer}}
end
Finally a handle_info/2
triggered by the timer will call fill_progress_queue/1
and that triggers the refresh of our materialized view and returns the current
state of the GenServer. For each query a task would spin to refresh the
materialized view, once that task returns an :ok
it will be removed from the
queue.
0
1
2
3
4
5
6
7
8
9
10
def handle_info(:stage_refresh, state) do
new_state = fill_progress_queue(state)
{:noreply, %{new_state | timer_ref: nil}, {:continue, :set_timer}}
end
def handle_info({ref, {:ok, _}}, %{running: views} = state) do
Process.demonitor(ref, [:flush])
{:noreply, %{state | running: List.delete(views, ref)}}
end
With this implementation our database was freed of slow pages caused by refreshing the views too often.
It’s good to note that this solution will use database connections. Repo calls are done
to update the refreshes of the materialized views, if the queue size
(@max_concurrency
) is larger than or equal to your amount of database
connections it could still block other calls from within your application.
Tests
To use this GenServer for searches in your tests, you should either skip the callbacks during the tests or lower the timeout duration. Without one of these two options, tests will become slow.
When testing the search, be aware that the search views are generated asynchronously and thus need to be waited on before they can be found.
Searching with Typos
In the blog mentioned before, a trigram index was added but it
wasn’t used. This meant that search queries with typos wouldn’t match while
this is a great strength of the trigram searches. This can be solved by using
the
similarity
function of PostgreSQL. The %
is an operator for the similarity function that
uses the created index. It takes the similarity_threshold which is 0.3 by
default. A better explanation of using similarity search searching can be found in
Super Fuzzy Searching on PostgreSQL.
With that update our search queries now look like this:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
defmacro matching_courses_ids_and_ranks(search_string) do
quote do
fragment(
"""
SELECT course_search.id AS id,
ts_rank(
course_search.document, plainto_tsquery(unaccent(?))
) AS rank
FROM course_search
WHERE course_search.document @@ plainto_tsquery(unaccent(?))
OR course_search.title % ?
""",
^unquote(search_string),
^unquote(search_string),
^unquote(search_string),
)
end
end
Did you mean …?
To search with suggestions, we’re saving the searches that are done. Whenever someone has done a search query while having a typo in it, we can check if the correct searches have been done before. In the following database query the keyword is used to look through the searches which has been searched for the most and also matches our threshold. The query is also making sure it is not the current search keyword of the user.
The resulting answer can be used to make a line below the input bar with:
Did you mean: <anser>
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Searches
|> join(
:left_lateral,
[s],
exact in fragment(
"""
SELECT keyword, count(*) as total
FROM searches AS s
WHERE keyword = ?
GROUP BY keyword
""",
^keyword
)
)
|> where(
[s],
fragment("? % ? and similarity(?, ?) > 0.5", s.keyword, ^keyword, s.keyword, ^keyword)
)
|> group_by([s, exact], [s.keyword, exact.total])
|> order_by([s], fragment("? <-> ?", s.keyword, ^keyword))
|> select([s, exact], %{keyword: s.keyword})
|> having(
[s, exact],
count(s.keyword) >= @suggestion_threshold and coalesce(exact.total, 0) < count(s.keyword)
)
|> limit(1)
|> Repo.one()
The same table can also be used for the typeahead. The following query is used to get suggestions based on the keyword that the user is typing. It takes the keyword and checks if it can be completed into anything that has been searched for before. It will check if the suggestions have been searched more than the threshold and returns a limited number of suggetions while the user is typing.
0
1
2
3
4
5
6
7
Searches
|> where([s], ilike(s.keyword, ^"#{keyword}%"))
|> group_by([s], s.keyword)
|> order_by([s], desc: count(s.keyword))
|> having([s], count(s.keyword) >= @typeahead_suggestion_threshold)
|> limit(@typeahead_suggestions)
|> select([s], %{keyword: s.keyword, id: s.keyword})
|> Repo.all()