Skip to content

Conversation

rrwright
Copy link

@rrwright rrwright commented Sep 5, 2025

Problem

The current SSE (Server-Sent Events) client implementation fails its stream every time a message arrives over the size limits. When a Pekko SSE client connects to a Pekko SSE server who serves a message larger than the size limit, the client fails the stream, tries to reconnect and resume at the same time (oversized) message and fails again. The client falls into an infinite connection retry loop. The oversized message error is swallowed by the sse-connector (another PR for the pekko-connectors library will follow), so the user never knows what is happening. The observed behavior appears as if the stream is still active and just gets stuck at the message that preceeded the oversized message.

Solution

This PR introduces a new configuration value: pekko.http.sse.oversized-message-handling which gives the user options for how the SSE client should handle receiving a message sized over the configured limit. Four options are available:

  1. fail-stream: Terminates the stream when oversized messages are encountered (this is the previous behavior and the new default behavior)
  2. log-and-skip: Log oversized messages as a warning and continues processing
  3. truncate: Truncates and consume oversized messages to fit within limits and continue
  4. dead-letter: Sends oversized messages to dead letter queue and continue

Tests

LineParserOversizedSimpleSpec:
LineParser with oversized message handling
- should fail the stream with FailStream strategy (53 milliseconds)
- should skip oversized messages with LogAndSkip strategy (3 milliseconds)
- should truncate oversized messages with Truncate strategy (3 milliseconds)
- should send oversized messages to dead letters with DeadLetter strategy (2 milliseconds)
EventStreamParserOversizedSpec:
An EventStreamParser with oversized message handling
- should parse normal SSE messages correctly with all strategies (6 milliseconds)
- should fail the stream when using FailStream strategy with oversized SSE line (1 millisecond)
- should skip oversized SSE lines and continue processing with LogAndSkip strategy (0 milliseconds)
- should truncate oversized SSE lines and continue processing with Truncate strategy (1 millisecond)
- should send oversized SSE lines to dead letters and continue processing with DeadLetter strategy (1 millisecond)
- should handle multiple oversized lines in complex SSE events with LogAndSkip strategy (1 millisecond)
- should handle multiline data with some oversized lines using Truncate strategy (0 milliseconds)
- should handle streaming SSE data with oversized content across chunks (3 milliseconds)
- should handle event field oversizing with different strategies (7 milliseconds)
- should work with unlimited line sizes when maxLineSize is 0 (2 milliseconds)
LineParserEdgeCasesSpec:
LineParser edge cases
- should handle lines exactly at the size limit (1 millisecond)
- should handle lines one byte over the limit (1 millisecond)
- should handle oversized content spanning multiple chunks (1 millisecond)
- should handle mixed line endings with oversized content (2 milliseconds)
- should handle empty and whitespace lines with size limits (0 milliseconds)
- should handle chunk boundaries at line endings (0 milliseconds)
- should handle consecutive oversized messages (1 millisecond)
- should work with unlimited line size (maxLineSize = 0) (5 milliseconds)
- should handle line ending edge cases (0 milliseconds)
OversizedSseStrategySpec:
OversizedSseStrategy
- should parse valid string values correctly (2 milliseconds)
- should throw IllegalArgumentException for invalid string values (0 milliseconds)
- should convert strategy objects back to strings correctly (1 millisecond)
- should handle case-sensitive strings (0 milliseconds)
- should handle empty and null strings (0 milliseconds)
EventStreamUnmarshallingSimpleSpec:
EventStreamUnmarshalling with oversized message handling
- should fail the stream with FailStream strategy (44 milliseconds)
- should skip oversized content with LogAndSkip strategy (1 millisecond)
- should truncate oversized content with Truncate strategy (1 millisecond)
- should send oversized content to dead letters with DeadLetter strategy (1 millisecond)

Debatable Topics

Calling out specifically some changes which could very reasonably be otherwise:

Config values

Size limits of oversized messages are determined by two config values (one deprecated), max-event-size and max-live-size which have their own complex relationships and very small default values. This PR increases the default values to the smallest reasonable size for modern uses (estimated by looking at standards applied in browsers. I found none for SSE, but Websocket limits seem to be within the same spirit and some data on browser limits was available.

The previous limits (4k and 8k) are remarkably small for modern use cases. As an example: ethereum block chain events (a common enough SSE use case) can be hundreds of kilobytes. While it is good practice to keep Pekko messages small, these limits affect what the client is allowed to consume from the server over HTTP. They aren't primarily about Pekko messages. Larger defaults seem like a good idea, but it's not core to the primary change proposed in this PR.

Unlimited message size

This PR adds the option to disable line and event size limits entirely by setting those config settings to zero.

@He-Pin
Copy link
Member

He-Pin commented Sep 6, 2025

That's ture, we at work configured it to 5MB

Copy link
Member

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks , but seems the Java api is missing, it is needed , if we want to build a client like Spring Ai

# "log-and-skip" - Log a warning and skip the oversized message
# "truncate" - Log a warning and truncate the message to max-line-size
# "dead-letter" - Log a warning, send oversized message to dead letters
oversized-message-handling = "fail-stream"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems dead later is not needed, because we already has a log and skip

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that, while dead letters often just get logged, a pekko user can subscribe/intercept the events that go to dead letters and then handle them if some special logic is needed. So the dead letter queue is potentially more than just logging but also a way to add last-ditch error handling. But maybe it should just send to dead letters without explicitly logging; let logging happen naturally for unhandled dead letters. …changes pushed; I removed logging from the dead letter option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the dead leatter can not be consumed fast , may be an oom?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the standard pekko dead letter queue. So it's a message sent inside the same JVM. I'm pretty sure the dead letter queue is event-driven (I think it's a system actor, but I don't remember if it's actually backed by an actor or not, but it does have an ActorRef). After it's handled, it'll be garbage collected when there's no held reference like any other message. The timing to consume the message wouldn't require any special handling. It would be like any other pekko message.

case object LogAndSkip extends OversizedSseStrategy
case object Truncate extends OversizedSseStrategy
case object DeadLetter extends OversizedSseStrategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java api is needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I just pushed changes that should now include the java api.

Notice that if you are looking for a resilient way to permanently subscribe to an event stream,
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
connector which reconnects automatically with the id of the last seen event.
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)connector which reconnects automatically with the id of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add back a space or new line before the word 'connector' after the link?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrong header for new files - that are not derived from Akka files.
The right header is https://github.com/apache/pekko-http/blob/main/project/AddMetaInfLicenseFiles.scala#L1-L16

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check all the new files and their source headers?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I believe this is updated now for all the relevant files.

max-event-size = 8192
# The maximum size for parsing received server-sent events.
# This value must be larger than `max-line-size`. Set to 0 to disable limit entirely (unlimited).
max-event-size = 115713
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the values? - that will affect other users

Copy link
Author

@rrwright rrwright Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in the PR description and in other reviewer comments, 8k (and 4k) is extremely small for any modern use case. I believe the argument for keeping these values small is because Pekko messages should be small (and instead convey a link to fetch larger payloads through an outside channel if needed), but these limits are not actually limiting Pekko messages, they are limiting what is read out of the HTTP connection. Small limits combined with the existing behavior of the SSE connector (swallows stream failure messages caused by this limit being triggered) cause silent failures and lost data.

Also noted in the PR description, this is another new arbitrary value and very reasonably argued against, and not core the changes proposed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to look at current state of art for other http clients when it comes to default max message size, maybe we can just copy it from there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure. I had used browsers as the prototypical comparison. The proposed size, 115713, comes from Chrome's limit on web-socket message size. Size limits for SSE lines and messages apparently doesn't exist for browsers (at least that I could find). I'll try to collect some other related examples from other clients and include them here to make the case.

/**
* How to handle messages that exceed max-line-size limit.
* Valid options: "fail-stream", "log-and-skip", "truncate", "dead-letter"
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new methods in src/main should have @since 1.3.0 (next non patch release)

Copy link
Author

@rrwright rrwright Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Looking through code for precedents, it looks like it may also apply to enums, sealed traits, or other things. I'm not entirely sure where it's required, but please advise if some were missed or others are unnecessary. (in latest commit)

@He-Pin
Copy link
Member

He-Pin commented Sep 9, 2025

need sbt javafmtAll and scalafmt

@pjfanning
Copy link
Contributor

Scala 2.12 build has

[error] /home/runner/work/pekko-http/pekko-http/http/src/main/scala/org/apache/pekko/http/javadsl/settings/ServerSentEventSettings.scala:65:56: value fromString is not a member of object org.apache.pekko.http.javadsl.settings.OversizedSseStrategy
[error]     self.copy(oversizedStrategy = OversizedSseStrategy.fromString(newValue))
[error]                                                        ^
Error: value fromString is not a member of object org.apache.pekko.http.javadsl.settings.OversizedSseStrategy
[warn] /home/runner/work/pekko-http/pekko-http/http/src/main/scala/org/apache/pekko/http/javadsl/settings/ServerSentEventSettings.scala:20:37: imported `OversizedSseStrategy` is permanently hidden by definition of Java enum OversizedSseStrategy in package settings
[warn] import pekko.http.scaladsl.settings.OversizedSseStrategy
[warn]                                     ^
Warning: imported `OversizedSseStrategy` is permanently hidden by definition of Java enum OversizedSseStrategy in package settings
Warning: Unused import

Can you fix the import and try to work out what causes the compile issue - which doesn't seem to happen in Scala 2.13 build?

@mdedetrich mdedetrich modified the milestones: 1.3.0, 2.0.0 Sep 9, 2025
@mdedetrich
Copy link
Contributor

This PR is targeting main which is pointing to 2.0.0 which is fine, but once it gets merged it should get backported to 1.3.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants