Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/main/java/io/lettuce/core/ClaimedStreamMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.lettuce.core;

import java.time.Duration;
import java.util.Map;

/**
* Stream message returned by XREADGROUP when entries were claimed from the PEL using CLAIM min-idle-time. Contains additional
* metadata: milliseconds since last delivery and redelivery count.
*/
public class ClaimedStreamMessage<K, V> extends StreamMessage<K, V> {

private final long msSinceLastDelivery;

private final long redeliveryCount;

public ClaimedStreamMessage(K stream, String id, Map<K, V> body, long msSinceLastDelivery, long redeliveryCount) {
super(stream, id, body);
this.msSinceLastDelivery = msSinceLastDelivery;
this.redeliveryCount = redeliveryCount;
}

public long getMsSinceLastDelivery() {
return msSinceLastDelivery;
}

public Duration getSinceLastDelivery() {
return Duration.ofMillis(msSinceLastDelivery);
}

public long getRedeliveryCount() {
return redeliveryCount;
}

@Override
public boolean isClaimed() {
// "Really claimed" implies it was previously delivered at least once.
return redeliveryCount >= 1;
}

}
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/StreamMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public Map<K, V> getBody() {
return body;
}

/**
* Whether this message was reclaimed from the pending entries list (PEL) using XREADGROUP … CLAIM. Default: false.
*
* Note: When CLAIM is used, servers may attach delivery metadata to all entries in the reply (including fresh ones). Use
* this indicator to distinguish actually reclaimed entries (true) from normal entries (false).
*/
public boolean isClaimed() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o)
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/io/lettuce/core/XReadArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class XReadArgs implements CompositeArgument {

private boolean noack;

private Long claimMinIdleTime;

/**
* Builder entry points for {@link XReadArgs}.
*/
Expand Down Expand Up @@ -90,6 +92,21 @@ public static XReadArgs noack(boolean noack) {
return new XReadArgs().noack(noack);
}

/**
* Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP.
*/
public static XReadArgs claim(long milliseconds) {
return new XReadArgs().claim(milliseconds);
}

/**
* Create a new {@link XReadArgs} and set CLAIM min-idle-time. Only valid for XREADGROUP.
*/
public static XReadArgs claim(Duration timeout) {
LettuceAssert.notNull(timeout, "Claim timeout must not be null");
return claim(timeout.toMillis());
}

}

/**
Expand Down Expand Up @@ -141,6 +158,29 @@ public XReadArgs noack(boolean noack) {
return this;
}

/**
* Claim idle pending messages first with a minimum idle time (milliseconds). Only valid for XREADGROUP.
*
* @since 7.0
*/
public XReadArgs claim(long milliseconds) {

this.claimMinIdleTime = milliseconds;
return this;
}

/**
* Claim idle pending messages first with a minimum idle time. Only valid for XREADGROUP.
*
* @since 7.0
*/
public XReadArgs claim(Duration timeout) {

LettuceAssert.notNull(timeout, "Claim timeout must not be null");

return claim(timeout.toMillis());
}

public <K, V> void build(CommandArgs<K, V> args) {

if (block != null) {
Expand All @@ -154,6 +194,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
if (noack) {
args.add(CommandKeyword.NOACK);
}

if (claimMinIdleTime != null) {
args.add("CLAIM").add(claimMinIdleTime);
}
}

/**
Expand Down
64 changes: 61 additions & 3 deletions src/main/java/io/lettuce/core/output/StreamReadOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import io.lettuce.core.StreamMessage;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.ClaimedStreamMessage;

import io.lettuce.core.internal.LettuceAssert;

/**
Expand All @@ -31,6 +33,10 @@ public class StreamReadOutput<K, V> extends CommandOutput<K, V, List<StreamMessa

private Map<K, V> body;

private Long msSinceLastDelivery;

private Long redeliveryCount;

private boolean bodyReceived = false;

public StreamReadOutput(RedisCodec<K, V> codec) {
Expand All @@ -51,6 +57,20 @@ public void set(ByteBuffer bytes) {
return;
}

// Handle extra metadata for claimed entries that may arrive as bulk strings (RESP2/RESP3)
if (id != null && bodyReceived && key == null && bytes != null) {
// Use a duplicate so decoding doesn't advance the original buffer position.
String s = decodeString(bytes.duplicate());
if (msSinceLastDelivery == null && isDigits(s)) {
msSinceLastDelivery = Long.parseLong(s);
return;
}
if (redeliveryCount == null && isDigits(s)) {
redeliveryCount = Long.parseLong(s);
return;
}
}

if (id == null) {
id = decodeString(bytes);
return;
Expand All @@ -75,6 +95,23 @@ public void set(ByteBuffer bytes) {
key = null;
}

@Override
public void set(long integer) {

// Extra integers appear only for claimed entries (XREADGROUP with CLAIM)
if (id != null && bodyReceived) {
if (msSinceLastDelivery == null) {
msSinceLastDelivery = integer;
return;
}
if (redeliveryCount == null) {
redeliveryCount = integer;
return;
}
}
super.set(integer);
}

@Override
public void multi(int count) {

Expand All @@ -91,15 +128,25 @@ public void multi(int count) {
@Override
public void complete(int depth) {

if (depth == 3 && bodyReceived) {
subscriber.onNext(output, new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body));
// Emit the message when the entry array (id/body[/extras]) completes.
if (depth == 2 && bodyReceived) {
Map<K, V> map = body == null ? Collections.emptyMap() : body;
if (msSinceLastDelivery != null || redeliveryCount != null) {
subscriber.onNext(output,
new ClaimedStreamMessage<>(stream, id, map, msSinceLastDelivery == null ? 0L : msSinceLastDelivery,
redeliveryCount == null ? 0L : redeliveryCount));
} else {
subscriber.onNext(output, new StreamMessage<>(stream, id, map));
}
bodyReceived = false;
key = null;
body = null;
id = null;
msSinceLastDelivery = null;
redeliveryCount = null;
}

// RESP2/RESP3 compat
// RESP2/RESP3 compat for stream key reset upon finishing the outer array element
if (depth == 2 && skipStreamKeyReset) {
skipStreamKeyReset = false;
}
Expand All @@ -113,6 +160,17 @@ public void complete(int depth) {
}
}

private static boolean isDigits(String s) {
if (s == null || s.isEmpty())
return false;
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
if (c < '0' || c > '9')
return false;
}
return true;
}

@Override
public void setSubscriber(Subscriber<StreamMessage<K, V>> subscriber) {
LettuceAssert.notNull(subscriber, "Subscriber must not be null");
Expand Down
Loading