Skip to content

Commit c88d6b1

Browse files
bandalgomsuonobc
authored andcommitted
Allow configuring lifecycle in listener containers.
This commit allows configuration of the `phase` and `autoStartup` lifecycle attributes on the `RedisMessageListenerContainer` and `DefaultStreamMessageListenerContainer`. Original Pull Request: #3224 Resolves: #3208 Signed-off-by: Su Ko <[email protected]>
1 parent 396711e commit c88d6b1

File tree

5 files changed

+184
-4
lines changed

5 files changed

+184
-4
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
* @author Mark Paluch
104104
* @author John Blum
105105
* @author Seongjun Lee
106+
* @author Su Ko
106107
* @see MessageListener
107108
* @see SubscriptionListener
108109
*/
@@ -168,6 +169,9 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
168169

169170
private @Nullable Subscriber subscriber;
170171

172+
private int phase = Integer.MAX_VALUE;
173+
private boolean autoStartup = true;
174+
171175
/**
172176
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default,
173177
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
@@ -618,6 +622,40 @@ public void removeMessageListener(MessageListener listener) {
618622
removeMessageListener(listener, Collections.emptySet());
619623
}
620624

625+
@Override
626+
public int getPhase() {
627+
return this.phase;
628+
}
629+
630+
/**
631+
* Specify the lifecycle phase for this container.
632+
* Lower values start earlier and stop later.
633+
* The default is {@code Integer.MAX_VALUE}.
634+
*
635+
* @see SmartLifecycle#getPhase()
636+
* @since 4.0
637+
*/
638+
public void setPhase(int phase) {
639+
this.phase = phase;
640+
}
641+
642+
@Override
643+
public boolean isAutoStartup() {
644+
return this.autoStartup;
645+
}
646+
647+
/**
648+
* Configure if this Lifecycle connection factory should get started automatically by the container at the time that
649+
* the containing ApplicationContext gets refreshed.
650+
* The default is {@code true}.
651+
*
652+
* @see SmartLifecycle#isAutoStartup()
653+
* @since 4.0
654+
*/
655+
public void setAutoStartup(boolean autoStartup) {
656+
this.autoStartup = autoStartup;
657+
}
658+
621659
private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
622660

623661
// stop the listener if currently running

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
53+
* @author Su Ko
5354
* @since 2.2
5455
*/
5556
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
@@ -67,6 +68,9 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
6768

6869
private boolean running = false;
6970

71+
private int phase = Integer.MAX_VALUE;
72+
private boolean autoStartup = false;
73+
7074
/**
7175
* Create a new {@link DefaultStreamMessageListenerContainer}.
7276
*
@@ -90,6 +94,14 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
9094
} else {
9195
this.streamOperations = this.template.opsForStream();
9296
}
97+
98+
if(containerOptions.isAutoStartup().isPresent()){
99+
this.autoStartup = containerOptions.isAutoStartup().get();
100+
}
101+
102+
if(containerOptions.getPhase().isPresent()){
103+
this.phase = containerOptions.getPhase().getAsInt();
104+
}
93105
}
94106

95107
private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainerOptions<?, ?> options) {
@@ -123,9 +135,21 @@ private RedisTemplate<K, V> createRedisTemplate(RedisConnectionFactory connectio
123135

124136
@Override
125137
public boolean isAutoStartup() {
126-
return false;
138+
return this.autoStartup;
127139
}
128140

141+
/**
142+
* Configure if this Lifecycle connection factory should get started automatically by the container at the time that
143+
* the containing ApplicationContext gets refreshed.
144+
* The default is {@code false}.
145+
*
146+
* @see org.springframework.context.SmartLifecycle#isAutoStartup()
147+
* @since 4.0
148+
*/
149+
public void setAutoStartup(boolean autoStartup) {
150+
this.autoStartup = autoStartup;
151+
}
152+
129153
@Override
130154
public void stop(Runnable callback) {
131155

@@ -177,9 +201,21 @@ public boolean isRunning() {
177201

178202
@Override
179203
public int getPhase() {
180-
return Integer.MAX_VALUE;
204+
return this.phase;
181205
}
182206

207+
/**
208+
* Specify the lifecycle phase for this container.
209+
* Lower values start earlier and stop later.
210+
* The default is {@code Integer.MAX_VALUE}.
211+
*
212+
* @see org.springframework.context.SmartLifecycle#getPhase()
213+
* @since 4.0
214+
*/
215+
public void setPhase(int phase) {
216+
this.phase = phase;
217+
}
218+
183219
@Override
184220
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
185221

src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.stream;
1717

1818
import java.time.Duration;
19+
import java.util.Optional;
1920
import java.util.OptionalInt;
2021
import java.util.concurrent.Executor;
2122
import java.util.function.Predicate;
@@ -107,6 +108,7 @@
107108
* @author Christoph Strobl
108109
* @author Christian Rest
109110
* @author DongCheol Kim
111+
* @author Su Ko
110112
* @param <K> Stream key and Stream field type.
111113
* @param <V> Stream value type.
112114
* @since 2.2
@@ -503,12 +505,14 @@ class StreamMessageListenerContainerOptions<K, V extends Record<K, ?>> {
503505
private final @Nullable HashMapper<Object, Object, Object> hashMapper;
504506
private final ErrorHandler errorHandler;
505507
private final Executor executor;
508+
private final @Nullable Integer phase;
509+
private final @Nullable Boolean autoStartup;
506510

507511
@SuppressWarnings({ "unchecked", "rawtypes" })
508512
private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable Integer batchSize,
509513
RedisSerializer<K> keySerializer, RedisSerializer<Object> hashKeySerializer,
510514
RedisSerializer<Object> hashValueSerializer, @Nullable Class<?> targetType,
511-
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor) {
515+
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor,@Nullable Integer phase, @Nullable Boolean autoStartup) {
512516
this.pollTimeout = pollTimeout;
513517
this.batchSize = batchSize;
514518
this.keySerializer = keySerializer;
@@ -518,6 +522,8 @@ private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable In
518522
this.hashMapper = (HashMapper) hashMapper;
519523
this.errorHandler = errorHandler;
520524
this.executor = executor;
525+
this.phase = phase;
526+
this.autoStartup = autoStartup;
521527
}
522528

523529
/**
@@ -598,6 +604,21 @@ public Executor getExecutor() {
598604
return executor;
599605
}
600606

607+
/**
608+
* @return the phase.
609+
* @since 4.0
610+
*/
611+
public OptionalInt getPhase() {
612+
return phase != null ? OptionalInt.of(phase) : OptionalInt.empty();
613+
}
614+
615+
/**
616+
* @return the autoStartup.
617+
* @since 4.0
618+
*/
619+
public Optional<Boolean> isAutoStartup() {
620+
return autoStartup != null ? Optional.of(autoStartup) : Optional.empty();
621+
}
601622
}
602623

603624
/**
@@ -618,6 +639,8 @@ class StreamMessageListenerContainerOptionsBuilder<K, V extends Record<K, ?>> {
618639
private @Nullable Class<?> targetType;
619640
private ErrorHandler errorHandler = LoggingErrorHandler.INSTANCE;
620641
private Executor executor = new SimpleAsyncTaskExecutor();
642+
private @Nullable Integer phase;
643+
private @Nullable Boolean autoStartup;
621644

622645
@SuppressWarnings("NullAway")
623646
private StreamMessageListenerContainerOptionsBuilder() {}
@@ -679,6 +702,28 @@ public StreamMessageListenerContainerOptionsBuilder<K, V> errorHandler(ErrorHand
679702
return this;
680703
}
681704

705+
/**
706+
* Configure a phase for the {@link SmartLifecycle}
707+
*
708+
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
709+
* @since 4.0
710+
*/
711+
public StreamMessageListenerContainerOptionsBuilder<K, V> phase(int phase) {
712+
this.phase = phase;
713+
return this;
714+
}
715+
716+
/**
717+
* Configure a autoStartup for the {@link SmartLifecycle}
718+
*
719+
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
720+
* @since 4.0
721+
*/
722+
public StreamMessageListenerContainerOptionsBuilder<K, V> autoStartup(boolean autoStartup) {
723+
this.autoStartup = autoStartup;
724+
return this;
725+
}
726+
682727
/**
683728
* Configure a key, hash key and hash value serializer.
684729
*
@@ -796,7 +841,7 @@ public StreamMessageListenerContainerOptions<K, V> build() {
796841
Assert.notNull(hashValueSerializer, "Hash Value Serializer must not be null");
797842

798843
return new StreamMessageListenerContainerOptions<>(pollTimeout, batchSize, keySerializer, hashKeySerializer,
799-
hashValueSerializer, targetType, hashMapper, errorHandler, executor);
844+
hashValueSerializer, targetType, hashMapper, errorHandler, executor,phase,autoStartup);
800845
}
801846

802847
}

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,27 @@ void shouldRemoveAllListenersWhenListenerIsNull() {
239239

240240
assertThatNoException().isThrownBy(() -> container.removeMessageListener(null, Collections.singletonList(topic)));
241241
}
242+
243+
@Test // GH-3208
244+
void defaultPhaseShouldBeMaxValue() {
245+
assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
246+
}
247+
248+
@Test // GH-3208
249+
void shouldApplyConfiguredPhase() {
250+
container.setPhase(3208);
251+
assertThat(container.getPhase()).isEqualTo(3208);
252+
}
253+
254+
@Test // GH-3208
255+
void defaultAutoStartupShouldBeTrue() {
256+
assertThat(container.isAutoStartup()).isEqualTo(true);
257+
}
258+
259+
@Test // GH-3208
260+
void shouldApplyConfiguredAutoStartup() {
261+
container.setAutoStartup(false);
262+
assertThat(container.isAutoStartup()).isEqualTo(false);
263+
}
264+
242265
}

src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,44 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
384384
cancelAwait(subscription);
385385
}
386386

387+
@Test // GH-3208
388+
void defaultPhaseShouldBeMaxValue() {
389+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
390+
.create(connectionFactory, containerOptions);
391+
392+
assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
393+
}
394+
395+
@Test // GH-3208
396+
void shouldApplyConfiguredPhase() {
397+
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
398+
.phase(3208)
399+
.build();
400+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
401+
.create(connectionFactory, options);
402+
403+
assertThat(container.getPhase()).isEqualTo(3208);
404+
}
405+
406+
@Test // GH-3208
407+
void defaultAutoStartupShouldBeFalse() {
408+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
409+
.create(connectionFactory, containerOptions);
410+
411+
assertThat(container.isAutoStartup()).isEqualTo(false);
412+
}
413+
414+
@Test // GH-3208
415+
void shouldApplyConfiguredAutoStartup() {
416+
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
417+
.autoStartup(true)
418+
.build();
419+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
420+
.create(connectionFactory, options);
421+
422+
assertThat(container.isAutoStartup()).isEqualTo(true);
423+
}
424+
387425
private static void cancelAwait(Subscription subscription) {
388426

389427
subscription.cancel();

0 commit comments

Comments
 (0)