Skip to content

Commit 57f6055

Browse files
committed
💄 Surface better error when a publication is in an invalid state
1 parent 526894e commit 57f6055

File tree

6 files changed

+84
-14
lines changed

6 files changed

+84
-14
lines changed

docs/reference/databases.mdx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,34 @@ For Postgres 12 and 13, we do not support backfills, which rely heavily on the `
269269

270270
If you require backfills, we recommend upgrading to Postgres 14 or later. If this is not an option, please [contact us](mailto:[email protected]) to discuss alternative approaches.
271271

272+
## Known issues
273+
274+
### Publication not recognized by replication slot
275+
276+
When a publication is created *after* the replication slot, Postgres may report that the publication does not exist. Sequin will surface this as an “Issue with publication” health check.
277+
278+
This is a [known Postgres issue](https://www.postgresql.org/message-id/18683-a98f79c0673be358%40postgresql.org).
279+
280+
**Fix**: drop and re-create the replication slot *after* the publication exists:
281+
282+
```sql
283+
-- Drop the slot
284+
SELECT pg_drop_replication_slot('sequin_slot');
285+
286+
-- Re-create the slot
287+
SELECT pg_create_logical_replication_slot('sequin_slot', 'pgoutput');
288+
```
289+
290+
**Prevention**: always create the publication first. If you need to change a publication later, alter it instead of dropping it:
291+
292+
```sql
293+
-- Add or remove tables
294+
ALTER PUBLICATION sequin_publication ADD TABLE new_table;
295+
ALTER PUBLICATION sequin_publication DROP TABLE old_table;
296+
```
297+
298+
See the full `ALTER PUBLICATION` documentation [here](https://www.postgresql.org/docs/current/sql-alterpublication.html).
299+
272300
## Monitoring
273301

274302
Sequin will run health checks against your database connection automatically. You can view the status of the connection at any time in your database's "Overview" page.

lib/sequin/databases/databases.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ defmodule Sequin.Databases do
388388
def verify_slot(%PostgresDatabase{} = database, %PostgresReplicationSlot{} = slot) do
389389
with_uncached_connection(database, fn conn ->
390390
with {:ok, _} <- Postgres.get_publication(conn, slot.publication_name),
391-
{:ok, slot_info} <- Postgres.fetch_replication_slot(conn, slot.slot_name),
391+
{:ok, slot_info} <- Postgres.get_replication_slot(conn, slot.slot_name),
392392
:ok <- validate_slot(database, slot_info) do
393393
Postgres.check_replication_permissions(conn)
394394
end

lib/sequin/health/health.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,10 @@ defmodule Sequin.Health do
545545
base_check
546546

547547
connected_event.status == :fail ->
548-
put_check_timestamps(%{base_check | status: :error, error: connected_event.error}, [
548+
error_slug = Map.get(connected_event.error, :code)
549+
error_slug = if is_binary(error_slug), do: String.to_existing_atom(error_slug), else: error_slug
550+
551+
put_check_timestamps(%{base_check | status: :error, error: connected_event.error, error_slug: error_slug}, [
549552
connected_event
550553
])
551554

lib/sequin/postgres/postgres.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,8 @@ defmodule Sequin.Postgres do
459459
"active" => active
460460
}
461461
"""
462-
@spec fetch_replication_slot(db_conn(), String.t()) :: {:ok, map()} | {:error, Error.t()}
463-
def fetch_replication_slot(conn, slot_name) do
462+
@spec get_replication_slot(db_conn(), String.t()) :: {:ok, map()} | {:error, Error.t()}
463+
def get_replication_slot(conn, slot_name) do
464464
query = """
465465
select slot_name, active, database, slot_type
466466
from pg_replication_slots

lib/sequin/runtime/slot_processor_server.ex

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -907,29 +907,48 @@ defmodule Sequin.Runtime.SlotProcessorServer do
907907
defp on_connect_failure(%State{} = state, error) do
908908
conn = get_cached_conn(state)
909909

910-
error_msg =
911-
case Postgres.fetch_replication_slot(conn, state.slot_name) do
912-
{:ok, %{"active" => false}} ->
913-
if is_exception(error) do
910+
error_or_error_msg =
911+
with {:ok, %{"active" => false}} <- Postgres.get_replication_slot(conn, state.slot_name),
912+
{:ok, _pub} <- Postgres.get_publication(conn, state.publication) do
913+
cond do
914+
match?(%Postgrex.Error{postgres: %{code: :undefined_object, routine: "get_publication_oid"}}, error) ->
915+
# Related to this: https://www.postgresql.org/message-id/18683-a98f79c0673be358%40postgresql.org
916+
# Helpful error message shown in front-end.
917+
Error.service(
918+
service: :replication,
919+
code: :publication_not_recognized,
920+
message:
921+
"Publication '#{state.publication}' is in an invalid state. You must drop and re-create the slot to use this publication with this slot."
922+
)
923+
924+
is_exception(error) ->
914925
Exception.message(error)
915-
else
916-
inspect(error)
917-
end
918926

919-
{:ok, %{"active" => true}} ->
927+
true ->
928+
inspect(error)
929+
end
930+
else
931+
{:ok, %{"active" => true} = _slot} ->
920932
"Replication slot '#{state.slot_name}' is currently in use by another connection"
921933

922-
{:error, %Error.NotFoundError{}} ->
934+
{:error, %Error.NotFoundError{entity: :replication_slot}} ->
923935
maybe_recreate_slot(state)
924936
"Replication slot '#{state.slot_name}' does not exist"
925937

926938
{:error, error} ->
927939
Exception.message(error)
928940
end
929941

942+
error =
943+
if is_binary(error_or_error_msg) do
944+
Error.service(service: :replication, message: error_or_error_msg)
945+
else
946+
error_or_error_msg
947+
end
948+
930949
Health.put_event(
931950
state.replication_slot,
932-
%Event{slug: :replication_connected, status: :fail, error: Error.service(service: :replication, message: error_msg)}
951+
%Event{slug: :replication_connected, status: :fail, error: error}
933952
)
934953

935954
:ok

lib/sequin_web/live/databases/show.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,26 @@ defmodule SequinWeb.DatabasesLive.Show do
413413
)
414414
end
415415

416+
defp maybe_augment_alert(%{error_slug: :publication_not_recognized} = check, database) do
417+
publication_name = database.replication_slot.publication_name
418+
419+
# ""
420+
421+
Map.merge(
422+
check,
423+
%{
424+
alertTitle: "Issue with publication",
425+
alertMessage: """
426+
There is an issue with publication `#{publication_name}`. The publication exists, but the replication slot does not recognize it. This is likely related to a [known issue](https://www.postgresql.org/message-id/18683-a98f79c0673be358%40postgresql.org) with Postgres. This issue occurs when you create a publication *after* creating a replication slot.
427+
428+
You'll need to drop and re-create your replication slot in order to use this publication with this slot.
429+
""",
430+
refreshable: false,
431+
dismissable: false
432+
}
433+
)
434+
end
435+
416436
defp maybe_augment_alert(check, _database), do: check
417437

418438
defp preload_database(database) do

0 commit comments

Comments
 (0)