Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2025 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/

package com.linecorp.armeria.common.stream;

import static java.util.Objects.requireNonNull;

import java.time.Duration;

import com.linecorp.armeria.common.StreamTimeoutException;

/**
* The default {@link StreamTimeoutStrategy} implementation used by
* {@link StreamMessage#timeout(Duration, StreamTimeoutMode)}.
*
* <p>This strategy applies the behavior defined by the given {@link StreamTimeoutMode},
* evaluating whether the stream has timed out based on elapsed time since the last event.
*
*/
final class DefaultStreamTimeoutStrategy implements StreamTimeoutStrategy {

private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)";

static DefaultStreamTimeoutStrategy of(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
requireNonNull(timeoutDuration, "timeoutDuration");
requireNonNull(timeoutMode, "timeoutMode");
return new DefaultStreamTimeoutStrategy(timeoutDuration, timeoutMode);
}

private final StreamTimeoutMode timeoutMode;

private final Duration timeoutDuration;

private final long timeoutNanos;

private DefaultStreamTimeoutStrategy(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
this.timeoutMode = timeoutMode;
this.timeoutDuration = timeoutDuration;
timeoutNanos = timeoutDuration.toNanos();
}

@Override
public StreamTimeoutDecision initialDecision() {
return StreamTimeoutDecision.scheduleAfter(timeoutNanos);
}

@Override
public StreamTimeoutDecision evaluateTimeout(long currentTimeNanos, long lastEventTimeNanos) {
if (timeoutMode == StreamTimeoutMode.UNTIL_EOS) {
return StreamTimeoutDecision.TIMED_OUT;
}

final long elapsedNanos = currentTimeNanos - lastEventTimeNanos;
if (timeoutNanos <= elapsedNanos) {
return StreamTimeoutDecision.TIMED_OUT;
}

if (timeoutMode == StreamTimeoutMode.UNTIL_FIRST) {
return StreamTimeoutDecision.NO_SCHEDULE;
}

final long delayNanos = timeoutNanos - elapsedNanos;
return StreamTimeoutDecision.scheduleAfter(delayNanos);
}

@Override
public StreamTimeoutException newTimeoutException() {
return new StreamTimeoutException(
String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,6 @@ default StreamMessage<T> timeout(Duration timeoutDuration) {
default StreamMessage<T> timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
requireNonNull(timeoutDuration, "timeoutDuration");
requireNonNull(timeoutMode, "timeoutMode");
return new TimeoutStreamMessage<>(this, timeoutDuration, timeoutMode);
return new TimeoutStreamMessage<>(this, DefaultStreamTimeoutStrategy.of(timeoutDuration, timeoutMode));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2025 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/

package com.linecorp.armeria.common.stream;

import static com.google.common.base.Preconditions.checkArgument;

/**
* Result object returned by both {@link StreamTimeoutStrategy#initialDecision()}
* and
* {@link StreamTimeoutStrategy#evaluateTimeout(long, long)}.
*
* <p>A decision has exactly <strong>three</strong> possible meanings:</p>
* <ul>
* <li>{@link #TIMED_OUT} – the stream is already timed-out.</li>
* <li>{@link #NO_SCHEDULE} – keep the stream alive with <em>no</em> further timeout checks.</li>
* <li>{@link #scheduleAfter(long)} – keep the stream alive and re-evaluate after the given
* <em>delay (nanoseconds)</em>.</li>
* </ul>
*/
public final class StreamTimeoutDecision {

/**
* Indicates that the stream has timed out and must be closed.
*/
public static final StreamTimeoutDecision TIMED_OUT =
new StreamTimeoutDecision(true, 0);

/**
* Indicates that no further timeout checks are necessary.
*/
public static final StreamTimeoutDecision NO_SCHEDULE =
new StreamTimeoutDecision(false, 0);

/**
* Creates a decision instructing the caller to run the next timeout check after
* the specified <em>positive</em> delay.
*
* @param nextDelayNanos delay (in nanoseconds) before the next evaluation; must be {@code > 0}
* @return a new {@link StreamTimeoutDecision}
*
* @throws IllegalArgumentException if {@code nextDelayNanos&nbsp;&le;&nbsp;0}
*/
public static StreamTimeoutDecision scheduleAfter(long nextDelayNanos) {
checkArgument(nextDelayNanos > 0,
"delay must be positive: %s", nextDelayNanos);
return new StreamTimeoutDecision(false, nextDelayNanos);
}

private final boolean timedOut;

/**
* Delay until the next check (ns); {@code 0} ⇒ no schedule.
*/
private final long nextDelayNanos;

/**
* Creates a new {@link StreamTimeoutDecision}.
*
* @param timedOut whether the stream should be closed immediately
* @param nextDelayNanos delay in nanoseconds until the next evaluation (0 means no further check)
*/
private StreamTimeoutDecision(boolean timedOut, long nextDelayNanos) {
this.timedOut = timedOut;
this.nextDelayNanos = nextDelayNanos;
}

/**
* Returns whether the stream should be closed immediately due to a timeout.
*
* @return {@code true} if the stream must be closed immediately
*/
boolean timedOut() {
return timedOut;
}

/**
* Returns the delay in nanoseconds until the next timeout evaluation.
*
* @return a positive value when another check is required,
* or {@code 0} when no further scheduling is needed
* (i.e.&nbsp;this decision is {@link #NO_SCHEDULE} or {@link #TIMED_OUT})
*/
long nextDelayNanos() {
return nextDelayNanos;
}

@Override
public String toString() {
if (timedOut) {
return "TIMED_OUT";
}
return (nextDelayNanos == 0) ? "NO_SCHEDULE"
: "scheduleAfter(" + nextDelayNanos + "ns)";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2025 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/

package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.StreamTimeoutException;

/**
* A strategy that decides whether a {@link StreamMessage} has timed out and,
* if not, how long to wait before the next timeout check.
*
* <p>Two entry points:</p>
* <ul>
* <li>{@link #initialDecision()} – called once right after subscription.</li>
* <li>{@link #evaluateTimeout(long, long)} – called when the timer fires
* to re-evaluate and, if necessary, reschedule.</li>
* </ul>
*
* <p>All timestamps use the same monotonic time source as
* {@link System#nanoTime()}.</p>
*/
public interface StreamTimeoutStrategy {
/**
* Called immediately after subscription.
*
* @return the first {@link StreamTimeoutDecision}
*/
StreamTimeoutDecision initialDecision();

/**
* Re-evaluates timeout status when the scheduled timer fires.
*
* @param currentTimeNanos current {@code System.nanoTime()} value
* @param lastEventTimeNanos {@code System.nanoTime()} of the last data event
* @return a {@link StreamTimeoutDecision} describing whether the stream
* timed out and, if not, the delay before the next check
*/
StreamTimeoutDecision evaluateTimeout(long currentTimeNanos, long lastEventTimeNanos);

/**
* Builds an exception that will be propagated via
* {@code Subscriber#onError(Throwable)} when this strategy determines
* the stream has timed out.
*
* <p>This method is invoked <em>only</em> when the most recent
* {@link StreamTimeoutDecision} returned by this strategy reports
* {@link StreamTimeoutDecision#timedOut() timed-out}{@code == true}.
* </p>
*
* @return the {@link StreamTimeoutException} that should be emitted for
* this timeout event
*/
default StreamTimeoutException newTimeoutException() {
return new StreamTimeoutException("stream timed out");
}
}
Loading
Loading