Skip to content

fix: shutdown connect on relevant settings changes #1413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 49 additions & 15 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ defmodule Realtime.Api do
alias Realtime.RateCounter
alias Realtime.GenCounter
alias Realtime.Tenants
alias Realtime.Tenants.Connect

defguard requires_disconnect(changeset)
when changeset.valid? == true and
(is_map_key(changeset.changes, :jwt_secret) or
is_map_key(changeset.changes, :jwt_jwks))

defguard requires_restarting_db_connection(changeset)
when changeset.valid? == true and
(is_map_key(changeset.changes, :extensions) or
is_map_key(changeset.changes, :jwt_secret) or
is_map_key(changeset.changes, :jwt_jwks))

@doc """
Returns the list of tenants.
Expand Down Expand Up @@ -112,23 +124,23 @@ defmodule Realtime.Api do

"""
def update_tenant(%Tenant{} = tenant, attrs) do
tenant
|> Tenant.changeset(attrs)
|> tap(&maybe_trigger_disconnect/1)
|> Repo.update()
end
changeset = Tenant.changeset(tenant, attrs)
updated = Repo.update(changeset)

case updated do
{:ok, tenant} ->
maybe_invalidate_cache(changeset)
maybe_trigger_disconnect(changeset)
maybe_restart_db_connection(changeset)
Logger.debug("Tenant updated: #{inspect(tenant, pretty: true)}")

{:error, error} ->
Logger.error("Failed to update tenant: #{inspect(error, pretty: true)}")
end

defp maybe_trigger_disconnect(%Ecto.Changeset{
changes: changes,
valid?: true,
data: %{external_id: external_id}
})
when is_map_key(changes, :jwt_jwks) or is_map_key(changes, :jwt_secret) do
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect)
updated
end

defp maybe_trigger_disconnect(_), do: nil

@doc """
Deletes a tenant.

Expand Down Expand Up @@ -198,7 +210,8 @@ defmodule Realtime.Api do
{value, settings} = Map.pop(extension.settings, from)
new_settings = Map.put(settings, to, value)

Ecto.Changeset.cast(extension, %{settings: new_settings}, [:settings])
extension
|> Ecto.Changeset.cast(%{settings: new_settings}, [:settings])
|> Repo.update!()
end
end
Expand All @@ -225,4 +238,25 @@ defmodule Realtime.Api do
|> Map.put(:events_per_second_rolling, avg)
|> Map.put(:events_per_second_now, current)
end

defp maybe_invalidate_cache(%Ecto.Changeset{changes: changes, valid?: true, data: %{external_id: external_id}})
when changes != %{} do
Tenants.Cache.distributed_invalidate_tenant_cache(external_id)
end

defp maybe_invalidate_cache(_changeset), do: nil

defp maybe_trigger_disconnect(%Ecto.Changeset{data: %{external_id: external_id}} = changeset)
when requires_disconnect(changeset) do
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect)
end

defp maybe_trigger_disconnect(_changeset), do: nil

defp maybe_restart_db_connection(%Ecto.Changeset{data: %{external_id: external_id}} = changeset)
when requires_restarting_db_connection(changeset) do
Connect.shutdown(external_id)
end

defp maybe_restart_db_connection(_changeset), do: nil
end
7 changes: 1 addition & 6 deletions lib/realtime/api/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@ defmodule Realtime.Api.Tenant do
@doc false
def changeset(tenant, attrs) do
# TODO: remove after infra update
extension_key =
if attrs[:extensions] do
:extensions
else
"extensions"
end
extension_key = if attrs[:extensions], do: :extensions, else: "extensions"

attrs =
if attrs[extension_key] do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.56.12",
version: "2.56.13",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
159 changes: 142 additions & 17 deletions test/realtime/api_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Realtime.ApiTest do
use Realtime.DataCase, async: true
use Realtime.DataCase, async: false

use Mimic

Expand All @@ -9,13 +9,17 @@ defmodule Realtime.ApiTest do
alias Realtime.Crypto
alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Tenants.Connect

@db_conf Application.compile_env(:realtime, Realtime.Repo)

setup do
tenant_fixture(%{max_concurrent_users: 10_000_000})
tenant_fixture(%{max_concurrent_users: 25_000_000})
tenants = Api.list_tenants()
tenant1 = Containers.checkout_tenant(run_migrations: true)
tenant2 = Containers.checkout_tenant(run_migrations: true)
Api.update_tenant(tenant1, %{max_concurrent_users: 10_000_000})
Api.update_tenant(tenant2, %{max_concurrent_users: 20_000_000})

tenants = Api.list_tenants()
%{tenants: tenants}
end

Expand Down Expand Up @@ -97,9 +101,7 @@ defmodule Realtime.ApiTest do
end

describe "update_tenant/2" do
test "valid data updates the tenant" do
tenant = tenant_fixture()

test "valid data updates the tenant", %{tenants: [tenant | _]} do
update_attrs = %{
external_id: tenant.external_id,
jwt_secret: "some updated jwt_secret",
Expand All @@ -117,30 +119,85 @@ defmodule Realtime.ApiTest do
assert {:error, %Ecto.Changeset{}} = Api.update_tenant(tenant, %{external_id: nil, jwt_secret: nil, name: nil})
end

test "valid data and jwks change will send disconnect event" do
tenant = tenant_fixture()
test "valid data and jwks change will send disconnect event", %{tenants: [tenant | _]} do
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id)

assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["test"]}})
assert_receive :disconnect, 500
end

test "valid data and jwt_secret change will send disconnect event" do
tenant = tenant_fixture()
test "valid data and jwt_secret change will send disconnect event", %{tenants: [tenant | _]} do
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id)

assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_secret: "potato"})

assert_receive :disconnect, 500
end

test "valid data but not updating jwt_secret or jwt_jwks won't send event" do
tenant = tenant_fixture()
test "valid data but not updating jwt_secret or jwt_jwks won't send event", %{tenants: [tenant | _]} do
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id)

assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{max_events_per_second: 100})
refute_receive :disconnect, 500
end

test "valid data and jwt_secret change will restart the database connection", %{tenants: [tenant | _]} do
{:ok, old_pid} = Connect.lookup_or_start_connection(tenant.external_id)
Process.monitor(old_pid)
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_secret: "potato"})
assert_receive {:DOWN, _, :process, ^old_pid, :shutdown}, 500
refute Process.alive?(old_pid)
Process.sleep(100)
assert {:ok, new_pid} = Connect.lookup_or_start_connection(tenant.external_id)
assert %Postgrex.Result{} = Postgrex.query!(new_pid, "SELECT 1", [])
end

test "valid data and tenant data change will not restart the database connection", %{tenants: [tenant | _]} do
{:ok, old_pid} = Connect.lookup_or_start_connection(tenant.external_id)
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{max_concurrent_users: 100})
refute_receive {:DOWN, _, :process, ^old_pid, :shutdown}, 500
assert Process.alive?(old_pid)
assert {:ok, new_pid} = Connect.lookup_or_start_connection(tenant.external_id)
assert old_pid == new_pid
end

test "valid data and extensions data change will restart the database connection", %{tenants: [tenant | _]} do
config = Realtime.Database.from_tenant(tenant, "realtime_test", :stop)

extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "#{config.port}",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"publication" => "supabase_realtime_test",
"ssl_enforced" => false
}
}
]

{:ok, old_pid} = Connect.lookup_or_start_connection(tenant.external_id)
Process.monitor(old_pid)
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{extensions: extensions})
assert_receive {:DOWN, _, :process, ^old_pid, :shutdown}, 500
refute Process.alive?(old_pid)
Process.sleep(100)
assert {:ok, new_pid} = Connect.lookup_or_start_connection(tenant.external_id)
assert %Postgrex.Result{} = Postgrex.query!(new_pid, "SELECT 1", [])
end

test "valid data and change to tenant data will refresh cache", %{tenants: [tenant | _]} do
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{name: "new_name"})
assert %Tenant{name: "new_name"} = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant.external_id)
end

test "valid data and no changes to tenant will not refresh cache", %{tenants: [tenant | _]} do
reject(&Realtime.Tenants.Cache.get_tenant_by_external_id/1)
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{name: tenant.name})
end
end

describe "delete_tenant/1" do
Expand Down Expand Up @@ -192,4 +249,72 @@ defmodule Realtime.ApiTest do
assert %{extensions: [%{settings: %{"poll_interval" => _}}]} = tenant
end
end

describe "requires_disconnect/1" do
defmodule TestRequiresDisconnect do
import Api

def check(changeset) when requires_disconnect(changeset), do: true
def check(_changeset), do: false
end

test "returns true if jwt_secret is changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{jwt_secret: "new_secret"}}
assert TestRequiresDisconnect.check(changeset)
end

test "returns true if jwt_jwks is changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{jwt_jwks: %{keys: ["test"]}}}
assert TestRequiresDisconnect.check(changeset)
end

test "returns false if jwt_secret and jwt_jwks are not changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{max_concurrent_users: 10}}
refute TestRequiresDisconnect.check(changeset)
end

test "returns false if valid? is false" do
changeset = %Ecto.Changeset{valid?: false, changes: %{jwt_secret: "new_secret"}}
refute TestRequiresDisconnect.check(changeset)
end
end

describe "requires_restarting_db_connection/1" do
defmodule TestRequiresRestartingDbConnection do
import Api

def check(changeset) when requires_restarting_db_connection(changeset), do: true
def check(_changeset), do: false
end

test "returns true if extensions is changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{extensions: []}}
assert TestRequiresRestartingDbConnection.check(changeset)
end

test "returns true if jwt_secret and max_events_per_second are changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{jwt_secret: "new_secret"}}
assert TestRequiresRestartingDbConnection.check(changeset)
end

test "returns true if jwt_jwks and max_concurrent_users are changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{jwt_jwks: %{keys: ["test"]}}}
assert TestRequiresRestartingDbConnection.check(changeset)
end

test "returns true if multiple relevant fields are changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{jwt_secret: "new_secret", jwt_jwks: %{keys: ["test"]}}}
assert TestRequiresRestartingDbConnection.check(changeset)
end

test "returns false if no relevant fields are changed" do
changeset = %Ecto.Changeset{valid?: true, changes: %{postgres_cdc_default: "potato"}}
refute TestRequiresRestartingDbConnection.check(changeset)
end

test "returns false if valid? is false" do
changeset = %Ecto.Changeset{valid?: false, changes: %{jwt_secret: "new_secret"}}
refute TestRequiresRestartingDbConnection.check(changeset)
end
end
end
1 change: 1 addition & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Mimic.copy(Realtime.Nodes)
Mimic.copy(Realtime.RateCounter)
Mimic.copy(Realtime.Tenants.Authorization)
Mimic.copy(Realtime.Tenants.Migrations)
Mimic.copy(Realtime.Tenants.Cache)
Mimic.copy(RealtimeWeb.ChannelsAuthorization)
Mimic.copy(RealtimeWeb.Endpoint)
Mimic.copy(RealtimeWeb.JwtVerification)
Loading