Skip to content

💄 Surface better error when a publication is in an invalid state #1844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/reference/databases.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,34 @@ For Postgres 12 and 13, we do not support backfills, which rely heavily on the `

If you require backfills, we recommend upgrading to Postgres 14 or later. If this is not an option, please [contact us](mailto:[email protected]) to discuss alternative approaches.

## Known issues

### Publication not recognized by replication slot

When a publication is created *after* the replication slot, Postgres may report that the publication does not exist. Sequin will surface this as an “Issue with publication” health check.

This is a [known Postgres issue](https://www.postgresql.org/message-id/18683-a98f79c0673be358%40postgresql.org).

**Fix**: drop and re-create the replication slot *after* the publication exists:

```sql
-- Drop the slot
SELECT pg_drop_replication_slot('sequin_slot');

-- Re-create the slot
SELECT pg_create_logical_replication_slot('sequin_slot', 'pgoutput');
```

**Prevention**: always create the publication first. If you need to change a publication later, alter it instead of dropping it:

```sql
-- Add or remove tables
ALTER PUBLICATION sequin_publication ADD TABLE new_table;
ALTER PUBLICATION sequin_publication DROP TABLE old_table;
```

See the full `ALTER PUBLICATION` documentation [here](https://www.postgresql.org/docs/current/sql-alterpublication.html).

## Monitoring

Sequin will run health checks against your database connection automatically. You can view the status of the connection at any time in your database's "Overview" page.
2 changes: 1 addition & 1 deletion lib/sequin/databases/databases.ex
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ defmodule Sequin.Databases do
def verify_slot(%PostgresDatabase{} = database, %PostgresReplicationSlot{} = slot) do
with_uncached_connection(database, fn conn ->
with {:ok, _} <- Postgres.get_publication(conn, slot.publication_name),
{:ok, slot_info} <- Postgres.fetch_replication_slot(conn, slot.slot_name),
{:ok, slot_info} <- Postgres.get_replication_slot(conn, slot.slot_name),
:ok <- validate_slot(database, slot_info) do
Postgres.check_replication_permissions(conn)
end
Expand Down
12 changes: 11 additions & 1 deletion lib/sequin/health/health.ex
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,17 @@ defmodule Sequin.Health do
base_check

connected_event.status == :fail ->
put_check_timestamps(%{base_check | status: :error, error: connected_event.error}, [
error_slug = Map.get(connected_event.error, :code)

error_slug =
with true <- is_binary(error_slug),
{:ok, atom} <- Sequin.String.to_existing_atom_safe(error_slug) do
atom
else
_err -> error_slug
end

put_check_timestamps(%{base_check | status: :error, error: connected_event.error, error_slug: error_slug}, [
connected_event
])

Expand Down
4 changes: 2 additions & 2 deletions lib/sequin/postgres/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ defmodule Sequin.Postgres do
"active" => active
}
"""
@spec fetch_replication_slot(db_conn(), String.t()) :: {:ok, map()} | {:error, Error.t()}
def fetch_replication_slot(conn, slot_name) do
@spec get_replication_slot(db_conn(), String.t()) :: {:ok, map()} | {:error, Error.t()}
def get_replication_slot(conn, slot_name) do
query = """
select slot_name, active, database, slot_type
from pg_replication_slots
Expand Down
39 changes: 29 additions & 10 deletions lib/sequin/runtime/slot_processor_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -907,29 +907,48 @@ defmodule Sequin.Runtime.SlotProcessorServer do
defp on_connect_failure(%State{} = state, error) do
conn = get_cached_conn(state)

error_msg =
case Postgres.fetch_replication_slot(conn, state.slot_name) do
{:ok, %{"active" => false}} ->
if is_exception(error) do
error_or_error_msg =
with {:ok, %{"active" => false}} <- Postgres.get_replication_slot(conn, state.slot_name),
{:ok, _pub} <- Postgres.get_publication(conn, state.publication) do
cond do
match?(%Postgrex.Error{postgres: %{code: :undefined_object, routine: "get_publication_oid"}}, error) ->
# Related to this: https://www.postgresql.org/message-id/18683-a98f79c0673be358%40postgresql.org
# Helpful error message shown in front-end.
Error.service(
service: :replication,
code: :publication_not_recognized,
message:
"Publication '#{state.publication}' is in an invalid state. You must drop and re-create the slot to use this publication with this slot."
)

is_exception(error) ->
Exception.message(error)
else
inspect(error)
end

{:ok, %{"active" => true}} ->
true ->
inspect(error)
end
else
{:ok, %{"active" => true} = _slot} ->
"Replication slot '#{state.slot_name}' is currently in use by another connection"

{:error, %Error.NotFoundError{}} ->
{:error, %Error.NotFoundError{entity: :replication_slot}} ->
maybe_recreate_slot(state)
"Replication slot '#{state.slot_name}' does not exist"

{:error, error} ->
Exception.message(error)
end

error =
if is_binary(error_or_error_msg) do
Error.service(service: :replication, message: error_or_error_msg)
else
error_or_error_msg
end

Health.put_event(
state.replication_slot,
%Event{slug: :replication_connected, status: :fail, error: Error.service(service: :replication, message: error_msg)}
%Event{slug: :replication_connected, status: :fail, error: error}
)

:ok
Expand Down
8 changes: 8 additions & 0 deletions lib/sequin/string.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule Sequin.String do
@moduledoc false
alias Sequin.Error

@doc """
Obfuscates a secret based on its length.

Expand Down Expand Up @@ -119,4 +121,10 @@ defmodule Sequin.String do
def format_bytes(bytes) when bytes < 1024 * 1024, do: "#{Float.round(bytes / 1024, 2)} KB"
def format_bytes(bytes) when bytes < 1024 * 1024 * 1024, do: "#{Float.round(bytes / 1024 / 1024, 2)} MB"
def format_bytes(bytes) when bytes < 1024 * 1024 * 1024 * 1024, do: "#{Float.round(bytes / 1024 / 1024 / 1024, 2)} GB"

def to_existing_atom_safe(string) do
{:ok, String.to_existing_atom(string)}
rescue
_ -> {:error, Error.invariant(message: "Not an existing atom: #{string}", code: :not_existing_atom)}
end
end
20 changes: 20 additions & 0 deletions lib/sequin_web/live/databases/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,26 @@ defmodule SequinWeb.DatabasesLive.Show do
)
end

defp maybe_augment_alert(%{error_slug: :publication_not_recognized} = check, database) do
publication_name = database.replication_slot.publication_name

# ""

Map.merge(
check,
%{
alertTitle: "Issue with publication",
alertMessage: """
There is an issue with publication `#{publication_name}`. The publication exists, but the replication slot does not recognize it. This is likely related to a [known issue](https://www.postgresql.org/message-id/18683-a98f79c0673be358%40postgresql.org) with Postgres. This issue occurs when you create a publication *after* creating a replication slot.

You'll need to [drop and re-create your replication slot](https://sequinstream.com/docs/reference/databases#publication-not-recognized-by-replication-slot) in order to use this publication with this slot.
""",
refreshable: false,
dismissable: false
}
)
end

defp maybe_augment_alert(check, _database), do: check

defp preload_database(database) do
Expand Down
Loading