diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index 12635762b9..f2a571294e 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -39,6 +39,7 @@ import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.ErrorHandler; import org.springframework.util.ObjectUtils; @@ -50,6 +51,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Edsuns * @since 2.2 */ class DefaultStreamMessageListenerContainer> implements StreamMessageListenerContainer { @@ -229,8 +231,18 @@ private Function> getReadFunction(StreamReadRequest : this.readOptions; Consumer consumer = consumerStreamRequest.getConsumer(); - return (offset) -> template.execute((RedisCallback>) connection -> connection.streamCommands() - .xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset))); + return (offset) -> { + List records = template.execute((RedisCallback>) connection -> connection.streamCommands() + .xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset))); + if (CollectionUtils.isEmpty(records) && !ReadOffset.lastConsumed().equals(offset)) { + // see https://redis.io/docs/latest/commands/xreadgroup/ + // if ID in XREADGROUP command is other than >, new messages won't be read + // so reads new messages here + return template.execute((RedisCallback>) connection -> connection.streamCommands() + .xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, ReadOffset.lastConsumed()))); + } + return records; + }; } return (offset) -> template.execute((RedisCallback>) connection -> connection.streamCommands()