Skip to content

Commit 572a935

Browse files
authored
feat: support for Server-Sent Events (SSE) (#2776)
This is a follow up PR on #2546 and #2544. It solves a bug related with 409s (must refetch) in SSE mode and it replaces the EventSource browser API by the [fetch-event-source](https://github.com/Azure/fetch-event-source) library. I refactored the `ShapeStream.#start` method which was becoming very big and complex. To this end, i split the logic into helper methods that handle the different parts that need to happen (building the shape URL, making the request, parsing the response headers, handling the response body, etc.). I had to patch the [fetch-event-source](https://github.com/Azure/fetch-event-source) library because it relies on browser-specific features such as `document` and `window` (cf. Azure/fetch-event-source#41). But we want our client to also work in server-side JS environments. I also had to patch the `fetch-event-source` library because it does not abort the fetch when you pass an already aborted signal. A complete description of the bug and the fix can be found here: Azure/fetch-event-source#98.
1 parent 50d0f29 commit 572a935

File tree

13 files changed

+1958
-1621
lines changed

13 files changed

+1958
-1621
lines changed

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,10 @@
2020
},
2121
"devDependencies": {
2222
"glob": "^10.3.10"
23+
},
24+
"pnpm": {
25+
"patchedDependencies": {
26+
"@microsoft/fetch-event-source": "patches/@microsoft__fetch-event-source.patch"
27+
}
2328
}
2429
}

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,18 @@ defmodule Electric.Shapes.Api do
317317

318318
# TODO: discuss returning a 307 redirect rather than a 409, the client
319319
# will have to detect this and throw out old data
320+
321+
# In SSE mode we send the must refetch object as an event
322+
# instead of a singleton array containing that object
323+
must_refetch =
324+
if request.params.experimental_live_sse do
325+
hd(@must_refetch)
326+
else
327+
@must_refetch
328+
end
329+
320330
{:error,
321-
Response.error(request, @must_refetch,
331+
Response.error(request, must_refetch,
322332
handle: active_shape_handle,
323333
status: 409
324334
)}
@@ -610,10 +620,9 @@ defmodule Electric.Shapes.Api do
610620
last_message_time: last_message_time,
611621
request:
612622
%{
613-
api:
614-
%{
615-
keepalive_interval: keepalive_interval
616-
} = api,
623+
api: %{
624+
keepalive_interval: keepalive_interval
625+
},
617626
handle: shape_handle,
618627
new_changes_ref: ref
619628
} = request,
@@ -664,7 +673,7 @@ defmodule Electric.Shapes.Api do
664673

665674
{^ref, :shape_rotation} ->
666675
must_refetch = %{headers: %{control: "must-refetch"}}
667-
message = encode_message(api, must_refetch)
676+
message = encode_message(request, must_refetch)
668677

669678
{message, %{state | mode: :done}}
670679

@@ -793,11 +802,19 @@ defmodule Electric.Shapes.Api do
793802
encode(api, :log, stream)
794803
end
795804

796-
@spec encode_message(Api.t() | Request.t(), term()) :: Enum.t()
797-
def encode_message(%Api{} = api, message) do
805+
# Error messages are encoded normally, even when using SSE
806+
# because they are returned on the original fetch request
807+
# with a status code that is not 2xx.
808+
@spec encode_error_message(Api.t() | Request.t(), term()) :: Enum.t()
809+
def encode_error_message(%Api{} = api, message) do
810+
encode(api, :message, message)
811+
end
812+
813+
def encode_error_message(%Request{api: api}, message) do
798814
encode(api, :message, message)
799815
end
800816

817+
@spec encode_message(Request.t(), term()) :: Enum.t()
801818
def encode_message(
802819
%Request{api: api, params: %{live: true, experimental_live_sse: true}},
803820
message

packages/sync-service/lib/electric/shapes/api/response.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ defmodule Electric.Shapes.Api.Response do
8686
message
8787
end
8888

89-
Api.encode_message(api_or_request, body)
89+
Api.encode_error_message(api_or_request, body)
9090
end
9191

9292
@spec send(Plug.Conn.t(), t()) :: Plug.Conn.t()

packages/typescript-client/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
"bugs": {
77
"url": "https://github.com/electric-sql/electric/issues"
88
},
9-
"dependencies": {},
9+
"dependencies": {
10+
"@microsoft/fetch-event-source": "^2.0.1"
11+
},
1012
"devDependencies": {
1113
"@types/pg": "^8.11.6",
1214
"@types/uuid": "^10.0.0",

0 commit comments

Comments
 (0)