Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion docs/src/main/paradox/common/sse-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,49 @@ Scala
Java
: @@snip [EventStreamMarshallingTest.java](/http-tests/src/test/java/org/apache/pekko/http/javadsl/unmarshalling/sse/EventStreamUnmarshallingTest.java) { #event-stream-unmarshalling-example }

## Configuration

Apache Pekko HTTP provides several configuration options for Server-Sent Events handling:

### Message Size Limits

The SSE client parser has configurable limits to handle various message sizes:

```hocon
pekko.http.sse {
# The maximum size for parsing received server-sent events.
# This value must be larger than `max-line-size`. Set to 0 to disable limit entirely (unlimited).
max-event-size = 115713

# The maximum size for parsing received lines of a server-sent event. Set to 0 to disable limit entirely (unlimited).
max-line-size = 115712
}
```

### Oversized Message Handling

When SSE messages exceed the configured `max-line-size`, Apache Pekko HTTP provides four handling strategies:

- **fail-stream** (default): Fails the stream with a clear error message, maintaining backward compatibility
- **log-and-skip**: Logs a warning and skips the oversized message, continuing stream processing
- **truncate**: Logs a warning and truncates the message to the configured limit, continuing processing
- **dead-letter**: Logs a warning and sends the oversized message to the dead letter queue, continuing processing

```hocon
pekko.http.sse {
# How to handle messages that exceed max-line-size limit
# Options:
# "fail-stream" - Fail the stream with a clear error message (default)
# "log-and-skip" - Log a warning and skip the oversized message
# "truncate" - Log a warning and truncate the message to max-line-size
# "dead-letter" - Log a warning, send oversized message to dead letters
oversized-message-handling = "fail-stream"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems dead later is not needed, because we already has a log and skip

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that, while dead letters often just get logged, a pekko user can subscribe/intercept the events that go to dead letters and then handle them if some special logic is needed. So the dead letter queue is potentially more than just logging but also a way to add last-ditch error handling. But maybe it should just send to dead letters without explicitly logging; let logging happen naturally for unhandled dead letters. …changes pushed; I removed logging from the dead letter option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the dead leatter can not be consumed fast , may be an oom?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the standard pekko dead letter queue. So it's a message sent inside the same JVM. I'm pretty sure the dead letter queue is event-driven (I think it's a system actor, but I don't remember if it's actually backed by an actor or not, but it does have an ActorRef). After it's handled, it'll be garbage collected when there's no held reference like any other message. The timing to consume the message wouldn't require any special handling. It would be like any other pekko message.

}
```

For applications that need to handle very large messages (like blockchain data or detailed JSON payloads), consider
setting `max-line-size = 0` to disable limits entirely, or use one of the non-failing handling modes.

Notice that if you are looking for a resilient way to permanently subscribe to an event stream,
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
connector which reconnects automatically with the id of the last seen event.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.javadsl.settings;

import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import static org.junit.Assert.*;

public class OversizedSseStrategySimpleTest extends JUnitSuite {

@Test
public void testEnumValues() {
// Simple test that the enum values exist and can be converted
OversizedSseStrategy failStream = OversizedSseStrategy.FailStream;
OversizedSseStrategy logAndSkip = OversizedSseStrategy.LogAndSkip;
OversizedSseStrategy truncate = OversizedSseStrategy.Truncate;
OversizedSseStrategy deadLetter = OversizedSseStrategy.DeadLetter;

assertNotNull("FailStream should not be null", failStream);
assertNotNull("LogAndSkip should not be null", logAndSkip);
assertNotNull("Truncate should not be null", truncate);
assertNotNull("DeadLetter should not be null", deadLetter);
}

@Test
public void testFromScala() {
// Test that fromScala method works
OversizedSseStrategy strategy = OversizedSseStrategy.fromScala(
org.apache.pekko.http.scaladsl.settings.OversizedSseStrategy.FailStream$.MODULE$
);
assertEquals("Should convert from Scala FailStream", OversizedSseStrategy.FailStream, strategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.javadsl.unmarshalling.sse;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.settings.OversizedSseStrategy;
import org.apache.pekko.http.javadsl.settings.ServerSentEventSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

public class EventStreamUnmarshallingOversizedTest extends JUnitSuite {

private static ActorSystem system;

@BeforeClass
public static void setup() {
system = ActorSystem.create();
}

@AfterClass
public static void teardown() throws Exception {
system.terminate();
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
}

@Test
public void testOversizedStrategyEnum() {
// Test that the Java enum can be used with the settings
ServerSentEventSettings settings =
ServerSentEventSettings.create(system)
.withLineLength(50)
.withOversizedStrategy(OversizedSseStrategy.FailStream);

assertEquals("Should have correct line length", 50, settings.maxLineSize());
assertEquals("Should have FailStream strategy",
OversizedSseStrategy.FailStream, settings.getOversizedStrategyEnum());

// Test all strategies can be set
settings = settings.withOversizedStrategy(OversizedSseStrategy.LogAndSkip);
assertEquals("Should have LogAndSkip strategy",
OversizedSseStrategy.LogAndSkip, settings.getOversizedStrategyEnum());

settings = settings.withOversizedStrategy(OversizedSseStrategy.Truncate);
assertEquals("Should have Truncate strategy",
OversizedSseStrategy.Truncate, settings.getOversizedStrategyEnum());

settings = settings.withOversizedStrategy(OversizedSseStrategy.DeadLetter);
assertEquals("Should have DeadLetter strategy",
OversizedSseStrategy.DeadLetter, settings.getOversizedStrategyEnum());
}

@Test
public void testOversizedStrategyStringCompatibility() {
// Test that the string-based API still works
ServerSentEventSettings settings =
ServerSentEventSettings.create(system)
.withOversizedStrategy("log-and-skip");

assertEquals("Should have log-and-skip strategy string",
"log-and-skip", settings.getOversizedStrategy());
assertEquals("Should have LogAndSkip strategy enum",
OversizedSseStrategy.LogAndSkip, settings.getOversizedStrategyEnum());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.http
package scaladsl
package settings

import org.apache.pekko.http.scaladsl.settings.OversizedSseStrategy
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

final class OversizedSseStrategySpec extends AnyWordSpec with Matchers {

"OversizedSseStrategy" should {
"parse valid string values correctly" in {
OversizedSseStrategy.fromString("fail-stream") shouldBe OversizedSseStrategy.FailStream
OversizedSseStrategy.fromString("log-and-skip") shouldBe OversizedSseStrategy.LogAndSkip
OversizedSseStrategy.fromString("truncate") shouldBe OversizedSseStrategy.Truncate
OversizedSseStrategy.fromString("dead-letter") shouldBe OversizedSseStrategy.DeadLetter
}

"throw IllegalArgumentException for invalid string values" in {
val exception = intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("invalid-strategy")
}
exception.getMessage should include("Invalid oversized-message-handling: 'invalid-strategy'")
exception.getMessage should include("Valid options are: fail-stream, log-and-skip, truncate, dead-letter")
}

"convert strategy objects back to strings correctly" in {
OversizedSseStrategy.toString(OversizedSseStrategy.FailStream) shouldBe "fail-stream"
OversizedSseStrategy.toString(OversizedSseStrategy.LogAndSkip) shouldBe "log-and-skip"
OversizedSseStrategy.toString(OversizedSseStrategy.Truncate) shouldBe "truncate"
OversizedSseStrategy.toString(OversizedSseStrategy.DeadLetter) shouldBe "dead-letter"
}

"handle case-sensitive strings" in {
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("FAIL-STREAM")
}
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("Fail-Stream")
}
}

"handle empty and null strings" in {
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("")
}
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString(null)
}
}
}
}
Loading
Loading