Skip to content

Commit 06d45c3

Browse files
zenfenanbbende
authored andcommitted
NIFI-4987: Added TTL to RedisDistributedMapCacheClientService
NIFI-4987: PR Review Fixes - Reverted getAndPutIfAbsent and added TTL setting with a different approach NIFI-4987: PR Review Fixes - Added TTL to putIfAbsent() This closes #2726. Signed-off-by: Bryan Bende <[email protected]>
1 parent d75ba16 commit 06d45c3

File tree

1 file changed

+32
-3
lines changed

1 file changed

+32
-3
lines changed

nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
3030
import org.apache.nifi.distributed.cache.client.Deserializer;
3131
import org.apache.nifi.distributed.cache.client.Serializer;
32+
import org.apache.nifi.processor.util.StandardValidators;
3233
import org.apache.nifi.redis.RedisConnectionPool;
3334
import org.apache.nifi.redis.RedisType;
3435
import org.apache.nifi.redis.util.RedisAction;
3536
import org.apache.nifi.util.Tuple;
3637
import org.springframework.data.redis.connection.RedisConnection;
3738
import org.springframework.data.redis.core.Cursor;
3839
import org.springframework.data.redis.core.ScanOptions;
40+
import org.springframework.data.redis.core.types.Expiration;
3941

4042
import java.io.ByteArrayOutputStream;
4143
import java.io.IOException;
@@ -44,6 +46,7 @@
4446
import java.util.Collection;
4547
import java.util.Collections;
4648
import java.util.List;
49+
import java.util.concurrent.TimeUnit;
4750

4851
@Tags({ "redis", "distributed", "cache", "map" })
4952
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
@@ -59,14 +62,25 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
5962
.required(true)
6063
.build();
6164

65+
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
66+
.name("redis-cache-ttl")
67+
.displayName("TTL")
68+
.description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
69+
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
70+
.required(true)
71+
.defaultValue("0 secs")
72+
.build();
73+
6274
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
6375
static {
6476
final List<PropertyDescriptor> props = new ArrayList<>();
6577
props.add(REDIS_CONNECTION_POOL);
78+
props.add(TTL);
6679
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
6780
}
6881

6982
private volatile RedisConnectionPool redisConnectionPool;
83+
private Long ttl;
7084

7185
@Override
7286
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -96,6 +110,11 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
96110
@OnEnabled
97111
public void onEnabled(final ConfigurationContext context) {
98112
this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
113+
this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
114+
115+
if (ttl == 0) {
116+
this.ttl = -1L;
117+
}
99118
}
100119

101120
@OnDisabled
@@ -107,7 +126,13 @@ public void onDisabled() {
107126
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
108127
return withConnection(redisConnection -> {
109128
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
110-
return redisConnection.setNX(kv.getKey(), kv.getValue());
129+
boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
130+
131+
if (ttl != -1L && set) {
132+
redisConnection.expire(kv.getKey(), ttl);
133+
}
134+
135+
return set;
111136
});
112137
}
113138

@@ -124,6 +149,11 @@ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K
124149
redisConnection.multi();
125150
redisConnection.setNX(kv.getKey(), kv.getValue());
126151

152+
// Set the TTL only if the key doesn't exist already
153+
if (ttl != -1L && existingValue == null) {
154+
redisConnection.expire(kv.getKey(), ttl);
155+
}
156+
127157
// execute the transaction
128158
final List<Object> results = redisConnection.exec();
129159

@@ -146,7 +176,6 @@ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K
146176
});
147177
}
148178

149-
150179
@Override
151180
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
152181
return withConnection(redisConnection -> {
@@ -159,7 +188,7 @@ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) t
159188
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
160189
withConnection(redisConnection -> {
161190
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
162-
redisConnection.set(kv.getKey(), kv.getValue());
191+
redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), null);
163192
return null;
164193
});
165194
}

0 commit comments

Comments
 (0)