Add StreamTimeoutStrategy, StreamTimeoutDecision, and Token-bucket-based timeout limiting strategy #6245
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
StreamMessage.timeout()
currently uses a fixed timeout mode. Differentuse-cases may require custom logic, so we introduce a
StreamTimeoutStrategy
interface. The existing
StreamTimeoutMode
will remain as the defaultstrategy.
To tolerate transient network jitter, a token-bucket strategy can delay
timeout until a user-defined number of consecutive delays is reached.
Modifications
StreamTimeoutDecision
timedOut
,hasNextSchedule
,nextScheduleTimeNanos
.TIMED_OUT
,NO_SCHEDULE
,scheduleAt(long)
.StreamTimeoutStrategy
interfaceinitialDecision(long subscribeTimeNanos)
– called once right aftersubscription.
evaluateTimeout(long currentTimeNanos, long lastEventTimeNanos)
– calledeach time the scheduled timer fires.
StreamTimeoutMode
) and token-bucket strategy implementationsWill be added in a follow-up commit.
Result
StreamTimeoutStrategy
toStreamMessage.timeout(strategy)
to control per-stream timeout behaviour.StreamTimeoutMode
continues to work as the default.