Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions misk-redis/api/misk-redis.api
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
public abstract interface class misk/redis/DeferredRedis {
public abstract fun blmove (Ljava/lang/String;Ljava/lang/String;Lredis/clients/jedis/args/ListDirection;Lredis/clients/jedis/args/ListDirection;D)Ljava/util/function/Supplier;
public abstract fun blpop ([Ljava/lang/String;D)Ljava/util/function/Supplier;
public abstract fun brpoplpush (Ljava/lang/String;Ljava/lang/String;I)Ljava/util/function/Supplier;
public abstract fun close ()V
public abstract fun del (Ljava/lang/String;)Ljava/util/function/Supplier;
Expand Down Expand Up @@ -63,6 +64,7 @@ public final class misk/redis/FakeRedis : misk/redis/Redis {
public field random Lkotlin/random/Random;
public fun <init> ()V
public fun blmove (Ljava/lang/String;Ljava/lang/String;Lredis/clients/jedis/args/ListDirection;Lredis/clients/jedis/args/ListDirection;D)Lokio/ByteString;
public fun blpop ([Ljava/lang/String;D)Lkotlin/Pair;
public fun brpoplpush (Ljava/lang/String;Ljava/lang/String;I)Lokio/ByteString;
public fun close ()V
public fun del (Ljava/lang/String;)Z
Expand Down Expand Up @@ -136,6 +138,7 @@ public final class misk/redis/RealRedis : misk/redis/Redis {
public static final field Companion Lmisk/redis/RealRedis$Companion;
public fun <init> (Lredis/clients/jedis/UnifiedJedis;Lmisk/redis/RedisClientMetrics;)V
public fun blmove (Ljava/lang/String;Ljava/lang/String;Lredis/clients/jedis/args/ListDirection;Lredis/clients/jedis/args/ListDirection;D)Lokio/ByteString;
public fun blpop ([Ljava/lang/String;D)Lkotlin/Pair;
public fun brpoplpush (Ljava/lang/String;Ljava/lang/String;I)Lokio/ByteString;
public fun close ()V
public fun del (Ljava/lang/String;)Z
Expand Down Expand Up @@ -204,6 +207,7 @@ public final class misk/redis/RealRedis$Companion {

public abstract interface class misk/redis/Redis {
public abstract fun blmove (Ljava/lang/String;Ljava/lang/String;Lredis/clients/jedis/args/ListDirection;Lredis/clients/jedis/args/ListDirection;D)Lokio/ByteString;
public abstract fun blpop ([Ljava/lang/String;D)Lkotlin/Pair;
public abstract fun brpoplpush (Ljava/lang/String;Ljava/lang/String;I)Lokio/ByteString;
public abstract fun close ()V
public abstract fun del (Ljava/lang/String;)Z
Expand Down Expand Up @@ -559,6 +563,7 @@ public final class misk/redis/testing/DockerRedisKt {
public final class misk/redis/testing/FakeRedis : misk/redis/Redis, misk/testing/TestFixture {
public fun <init> (Ljava/time/Clock;Lkotlin/random/Random;)V
public fun blmove (Ljava/lang/String;Ljava/lang/String;Lredis/clients/jedis/args/ListDirection;Lredis/clients/jedis/args/ListDirection;D)Lokio/ByteString;
public fun blpop ([Ljava/lang/String;D)Lkotlin/Pair;
public fun brpoplpush (Ljava/lang/String;Ljava/lang/String;I)Lokio/ByteString;
public fun close ()V
public fun del (Ljava/lang/String;)Z
Expand Down Expand Up @@ -625,6 +630,7 @@ public final class misk/redis/testing/FakeRedis : misk/redis/Redis, misk/testing
public final class misk/redis/testing/FakeRedis$FakePipelinedRedis : misk/redis/DeferredRedis {
public fun <init> (Lmisk/redis/testing/FakeRedis;)V
public fun blmove (Ljava/lang/String;Ljava/lang/String;Lredis/clients/jedis/args/ListDirection;Lredis/clients/jedis/args/ListDirection;D)Ljava/util/function/Supplier;
public fun blpop ([Ljava/lang/String;D)Ljava/util/function/Supplier;
public fun brpoplpush (Ljava/lang/String;Ljava/lang/String;I)Ljava/util/function/Supplier;
public fun close ()V
public fun del (Ljava/lang/String;)Ljava/util/function/Supplier;
Expand Down
2 changes: 2 additions & 0 deletions misk-redis/src/main/kotlin/misk/redis/DeferredRedis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ interface DeferredRedis {

fun lpop(key: String): Supplier<ByteString?>

fun blpop(keys: Array<String>, timeoutSeconds: Double): Supplier<Pair<String, ByteString>?>

fun rpop(key: String, count: Int): Supplier<List<ByteString?>>

fun rpop(key: String): Supplier<ByteString?>
Expand Down
14 changes: 14 additions & 0 deletions misk-redis/src/main/kotlin/misk/redis/FakeRedis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,20 @@ class FakeRedis : Redis {
@Synchronized
override fun lpop(key: String): ByteString? = lpop(key, count = 1).firstOrNull()

@Synchronized
override fun blpop(keys: Array<String>, timeoutSeconds: Double): Pair<String, ByteString>? {
// For the fake implementation, we'll check each key in order and return the first non-empty list
for (key in keys) {
val element = lpop(key)
if (element != null) {
return Pair(key, element)
}
}
// In a real implementation, this would block until timeout or an element is available
// For the fake, we just return null immediately
return null
}

@Synchronized
override fun rpop(key: String, count: Int): List<ByteString?> {
val value = lKeyValueStore[key] ?: Value(emptyList(), clock.instant())
Expand Down
11 changes: 10 additions & 1 deletion misk-redis/src/main/kotlin/misk/redis/RealPipelinedRedis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import redis.clients.jedis.ClusterPipeline
import redis.clients.jedis.Pipeline
import redis.clients.jedis.Response
import redis.clients.jedis.args.ListDirection
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SetParams
import redis.clients.jedis.params.ZRangeParams
import redis.clients.jedis.resps.Tuple
Expand Down Expand Up @@ -302,6 +301,16 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
return Supplier { response.get()?.toByteString() }
}

override fun blpop(keys: Array<String>, timeoutSeconds: Double): Supplier<Pair<String, ByteString>?> {
val keysAsBytes = keys.map { it.toByteArray(charset) }.toTypedArray()
val response = pipeline.blpop(timeoutSeconds, *keysAsBytes)
return Supplier {
response.get()?.let {
Pair(it.key.toString(charset), it.value.toByteString())
}
}
}

override fun rpop(key: String, count: Int): Supplier<List<ByteString?>> {
val keyBytes = key.toByteArray(charset)
val response = pipeline.rpop(keyBytes, count)
Expand Down
8 changes: 8 additions & 0 deletions misk-redis/src/main/kotlin/misk/redis/RealRedis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ class RealRedis(
return jedis { lpop(keyBytes) }?.toByteString()
}

override fun blpop(keys: Array<String>, timeoutSeconds: Double): Pair<String, ByteString>? {
val keysAsBytes = keys.map { it.toByteArray(charset) }.toTypedArray()
val result = jedis { blpop(timeoutSeconds, *keysAsBytes) }
return result?.let {
Pair(it.key.toString(charset), it.value.toByteString())
}
}

override fun rpop(key: String, count: Int): List<ByteString?> {
val keyBytes = key.toByteArray(charset)
return jedis { rpop(keyBytes, count) ?: emptyList() }
Expand Down
11 changes: 11 additions & 0 deletions misk-redis/src/main/kotlin/misk/redis/Redis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,17 @@ interface Redis {
*/
fun lpop(key: String): ByteString?

/**
* Blocking version of [lpop]. Pops an element from the first non-empty list in [keys],
* checking keys in the provided order. If all lists are empty, blocks the connection until
* an [lpush] or [rpush] operation occurs on one of the keys, or until [timeoutSeconds] expires.
*
* @param keys the keys to check for elements, in order
* @param timeoutSeconds the maximum number of seconds to block. 0 blocks indefinitely.
* @return a pair of the key name and the element that was popped, or null if timeout occurred
*/
fun blpop(keys: Array<String>, timeoutSeconds: Double): Pair<String, ByteString>?

/**
* Removes and returns the last [count] elements of the list stored at [key].
*
Expand Down
40 changes: 40 additions & 0 deletions misk-redis/src/test/kotlin/misk/redis/AbstractRedisTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,46 @@ abstract class AbstractRedisTest {
assertThat(redis.rpop("droids", 1)).isEmpty()
}

@Test fun blpopWithSingleKeyReturnsFirstElement() {
val key = "queue"
redis.rpush(key, "first".encodeUtf8(), "second".encodeUtf8(), "third".encodeUtf8())

val result = redis.blpop(arrayOf(key), 1.0)

assertThat(result).isNotNull()
assertThat(result!!.first).isEqualTo(key)
assertThat(result.second).isEqualTo("first".encodeUtf8())
assertThat(redis.lrange(key, 0, -1)).containsExactly(
"second".encodeUtf8(), "third".encodeUtf8()
)
}

@Test fun blpopWithMultipleKeysReturnsFromFirstNonEmptyList() {
// Hash tags ensure all keys map to the same Redis Cluster slot, required for multi-key operations
val key1 = "{same-slot-key}queue1"
val key2 = "{same-slot-key}queue2"
val key3 = "{same-slot-key}queue3"

redis.rpush(key2, "value2a".encodeUtf8(), "value2b".encodeUtf8())
redis.rpush(key3, "value3".encodeUtf8())

val result = redis.blpop(arrayOf(key1, key2, key3), 1.0)

assertThat(result).isNotNull()
assertThat(result!!.first).isEqualTo(key2)
assertThat(result.second).isEqualTo("value2a".encodeUtf8())
// Verify only the left element was popped from key2
assertThat(redis.lrange(key2, 0, -1)).containsExactly("value2b".encodeUtf8())
}

@Test fun blpopWithEmptyKeysReturnsNull() {
val key = "empty_queue"

val result = redis.blpop(arrayOf(key), 0.1)

assertThat(result).isNull()
}

@Test fun lpushAndRpushAreOrderedCorrectly() {
// This test is pulled directly from the Redis documentation for LPUSH and RPUSH.
val elements = listOf("a", "b", "c").map { it.encodeUtf8() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ internal class TestAlwaysPipelinedRedis @Inject constructor(

override fun lpop(key: String): ByteString? = runPipeline { lpop(key) }

override fun blpop(keys: Array<String>, timeoutSeconds: Double): Pair<String, ByteString>? =
runPipeline { blpop(keys, timeoutSeconds) }

override fun rpop(key: String, count: Int): List<ByteString?> =
runPipeline { rpop(key, count) }

Expand Down
18 changes: 18 additions & 0 deletions misk-redis/src/testFixtures/kotlin/misk/redis/testing/FakeRedis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,20 @@ class FakeRedis @Inject constructor(
@Synchronized
override fun lpop(key: String): ByteString? = lpop(key, count = 1).firstOrNull()

@Synchronized
override fun blpop(keys: Array<String>, timeoutSeconds: Double): Pair<String, ByteString>? {
// For the fake implementation, we'll check each key in order and return the first non-empty list
for (key in keys) {
val element = lpop(key)
if (element != null) {
return Pair(key, element)
}
}
// In a real implementation, this would block until timeout or an element is available
// For the fake, we just return null immediately
return null
}

@Synchronized
override fun rpop(key: String, count: Int): List<ByteString?> {
val value = lKeyValueStore[key] ?: Value(emptyList(), clock.instant())
Expand Down Expand Up @@ -668,6 +682,10 @@ class FakeRedis @Inject constructor(
[email protected](key)
}

override fun blpop(keys: Array<String>, timeoutSeconds: Double): Supplier<Pair<String, ByteString>?> = Supplier {
[email protected](keys, timeoutSeconds)
}

override fun rpop(key: String, count: Int): Supplier<List<ByteString?>> = Supplier {
[email protected](key, count)
}
Expand Down