Skip to content

Commit 041e823

Browse files
authored
LIBS-808 - Add support for prefetch count configuration (#16)
* LIBS-808 - Add support for prefetch count configuration * LIBS-808 - improvements * LIBS-808 - fix * LIBS-808 - improve test method naming * LIBS-808 - fix
1 parent a161bd3 commit 041e823

16 files changed

+261
-156
lines changed

README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<dependency>
1313
<groupId>com.avides.spring</groupId>
1414
<artifactId>spring-rabbit</artifactId>
15-
<version>2.5.0</version>
15+
<version>2.6.0</version>
1616
</dependency>
1717
```
1818

@@ -143,6 +143,7 @@ spring.rabbitmq.queues[0].exchange.name=com.example.exchange.zero
143143
spring.rabbitmq.queues[0].exchange.type=DIRECT
144144
spring.rabbitmq.queues[0].limit=500000
145145
spring.rabbitmq.queues[0].listener.bean-name=myListenerZero
146+
spring.rabbitmq.queues[0].listener.prefetch-count=55
146147
spring.rabbitmq.queues[0].listener.max-concurrent-consumers=2
147148
spring.rabbitmq.queues[0].rabbit-admin.bean-name=myRabbitAdminZero
148149
spring.rabbitmq.queues[0].rabbit-admin.connection-factory.bean-name=myConnectionFactoryZero
@@ -203,6 +204,17 @@ See [ExchangeProperties](#exchangeproperties)
203204

204205
See [MessageConverterProperties](#messageconverterproperties)
205206

207+
### spring.rabbitmq.prefetch-count
208+
209+
@Min(1)
210+
211+
Default 500. Could be overridden by each listener.
212+
213+
``` ini
214+
spring.rabbitmq.prefetch-count=50
215+
```
216+
217+
206218
### spring.rabbitmq.max-concurrent-consumers
207219

208220
@Min(1)
@@ -453,6 +465,15 @@ spring.rabbitmq.queues[0].listener.bean-name=myListener
453465

454466
See [MessageConverterProperties](#messageconverterproperties)
455467

468+
#### .prefetch-count
469+
470+
Could be null or @Min(1)
471+
472+
473+
``` ini
474+
spring.rabbitmq.queues[0].listener.prefetch-count=30
475+
```
476+
456477
#### .max-concurrent-consumers
457478

458479
Could be null or @Min(1)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.avides.spring</groupId>
55
<artifactId>spring-rabbit</artifactId>
6-
<version>2.5.0</version>
6+
<version>2.6.0</version>
77

88
<name>spring-rabbit</name>
99
<description>Makes configuring RabbitMQ for Spring Boot applications more comfortable</description>

src/main/java/com/avides/spring/rabbit/configuration/SpringRabbitAutoConfiguration.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.avides.spring.rabbit.configuration.domain.ListenerProperties;
4747
import com.avides.spring.rabbit.configuration.domain.MessageConverterProperties;
4848
import com.avides.spring.rabbit.configuration.domain.QueueProperties;
49-
import com.avides.spring.rabbit.configuration.domain.RabbitAdminProperties;
5049
import com.avides.spring.rabbit.configuration.domain.RabbitTemplateProperties;
5150
import com.avides.spring.rabbit.configuration.provider.ConnectionFactoryProvider;
5251
import com.avides.spring.rabbit.configuration.util.DefaultValueResolver;
@@ -119,6 +118,10 @@ public class SpringRabbitAutoConfiguration implements InitializingBean
119118
@NestedConfigurationProperty
120119
private MessageConverterProperties messageConverter;
121120

121+
@NotNull
122+
@Min(1)
123+
private Integer prefetchCount = Integer.valueOf(500);
124+
122125
@NotNull
123126
@Min(1)
124127
private Integer maxConcurrentConsumers = Integer.valueOf(1);
@@ -227,45 +230,46 @@ private void configureQueues()
227230
{
228231
if (queueProperties.isCreationEnabled())
229232
{
230-
String queueName = queueProperties.getName();
231-
RabbitAdminProperties rabbitAdminProperties = queueProperties.getRabbitAdmin();
232-
String rabbitAdminBeanName = rabbitAdminProperties.getBeanName();
233-
BeanReferenceConnectionFactoryProperties customConnectionFactoryProperties = rabbitAdminProperties.getConnectionFactory();
233+
var queueName = queueProperties.getName();
234+
var rabbitAdminProperties = queueProperties.getRabbitAdmin();
235+
var rabbitAdminBeanName = rabbitAdminProperties.getBeanName();
236+
var customConnectionFactoryProperties = rabbitAdminProperties.getConnectionFactory();
234237

235-
ConnectionFactory resolvedConnectionFactory = DefaultValueResolver
238+
var resolvedConnectionFactory = DefaultValueResolver
236239
.resolveConnectionFactory(customConnectionFactoryProperties, CONNECTION_FACTORY_BEAN_NAME, applicationContext);
237240

238-
RabbitAdmin rabbitAdmin = new RabbitAdminCreator(resolvedConnectionFactory).createInstance();
241+
var rabbitAdmin = new RabbitAdminCreator(resolvedConnectionFactory).createInstance();
239242
if (!applicationContext.containsBean(rabbitAdminBeanName))
240243
{
241244
applicationContext.registerBean(rabbitAdminBeanName, RabbitAdmin.class, () -> rabbitAdmin, beanDefinition -> beanDefinition
242245
.setScope(BeanDefinition.SCOPE_SINGLETON));
243246
log.info("RabbitAdmin build with bean-name '{}'", rabbitAdminBeanName);
244247
}
245248

246-
Creator<Queue> queueCreator = new QueueCreator(queueProperties, rabbitAdmin, DefaultValueResolver
249+
var queueCreator = new QueueCreator(queueProperties, rabbitAdmin, DefaultValueResolver
247250
.resolveExchange(queueProperties.getExchange(), exchange));
248251
addToContext(queueName, resolvedConnectionFactory, queueCreator.createInstance());
249252

250-
Creator<Queue> dlxQueueCreator = new DlxQueueCreator(rabbitAdmin, queueProperties);
253+
var dlxQueueCreator = new DlxQueueCreator(rabbitAdmin, queueProperties);
251254
addToContext(queueName + ".dlx", resolvedConnectionFactory, dlxQueueCreator.createInstance());
252255

253256
if (queueProperties.getListener() != null)
254257
{
255-
ListenerProperties listenerProperties = queueProperties.getListener();
258+
var listenerProperties = queueProperties.getListener();
256259

257260
if (listenerProperties.isCreationEnabled())
258261
{
259-
String listenerBeanName = listenerProperties.getBeanName();
260-
int resolvedMaxConcurrentConsumers = DefaultValueResolver
261-
.resolveMaxConcurrentConsumers(listenerProperties.getMaxConcurrentConsumers(), maxConcurrentConsumers);
262-
MessageConverter resolvedMessageConverter = DefaultValueResolver.resolveMessageConverter(listenerProperties
262+
var listenerBeanName = listenerProperties.getBeanName();
263+
var resolvedPrefetchCount = DefaultValueResolver.resolveValue(listenerProperties.getPrefetchCount(), prefetchCount);
264+
var resolvedMaxConcurrentConsumers = DefaultValueResolver
265+
.resolveValue(listenerProperties.getMaxConcurrentConsumers(), maxConcurrentConsumers);
266+
var resolvedMessageConverter = DefaultValueResolver.resolveMessageConverter(listenerProperties
263267
.getMessageConverter(), messageConverter, applicationContext, existingMessageConverterList);
264-
Object listener = applicationContext.getBean(listenerBeanName);
268+
var listener = applicationContext.getBean(listenerBeanName);
265269

266-
Creator<DefaultMessageListenerContainer<Object>> listenerCreator = new ListenerCreator(resolvedConnectionFactory, queueName, resolvedMaxConcurrentConsumers, resolvedMessageConverter, listener);
270+
var listenerCreator = new ListenerCreator(resolvedConnectionFactory, queueName, resolvedPrefetchCount, resolvedMaxConcurrentConsumers, resolvedMessageConverter, listener);
267271

268-
String beanName = createListenerContainerBeanName(listenerProperties, queueName, customConnectionFactoryProperties);
272+
var beanName = createListenerContainerBeanName(listenerProperties, queueName, customConnectionFactoryProperties);
269273

270274
applicationContext
271275
.registerBean(beanName, DefaultMessageListenerContainer.class, listenerCreator::createInstance, beanDefinition -> beanDefinition

src/main/java/com/avides/spring/rabbit/configuration/creator/ListenerCreator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public class ListenerCreator implements Creator<DefaultMessageListenerContainer<
1616

1717
private final String queueName;
1818

19+
private final int prefetchCount;
20+
1921
private final int maxConcurrentConsumers;
2022

2123
private final MessageConverter messageConverter;
@@ -27,6 +29,7 @@ public DefaultMessageListenerContainer<Object> createInstance()
2729
{
2830
DefaultMessageListenerContainer<Object> container = new DefaultMessageListenerContainer<>(connectionFactory);
2931
container.setQueueNames(queueName);
32+
container.setPrefetchCount(prefetchCount);
3033
container.setMaxConcurrentConsumers(maxConcurrentConsumers);
3134
apppendListenerToContainer(container, messageConverter);
3235
return container;

src/main/java/com/avides/spring/rabbit/configuration/domain/ListenerProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public class ListenerProperties
2727
@Valid
2828
private MessageConverterProperties messageConverter;
2929

30+
@Min(1)
31+
private Integer prefetchCount;
32+
3033
@Min(1)
3134
private Integer maxConcurrentConsumers;
3235
}

src/main/java/com/avides/spring/rabbit/configuration/util/DefaultValueResolver.java

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ public class DefaultValueResolver
2323
/**
2424
* Resolves and creates a {@link Exchange} for a custom and a default exchange configuration considering the override of a possible custom configuration.
2525
*
26-
* @param customProperties
27-
* custom exchange configuration that overrides the default
28-
* @param defaultProperties
29-
* default exchange configuration that is used if no custom configuration exists
26+
* @param customProperties custom exchange configuration that overrides the default
27+
* @param defaultProperties default exchange configuration that is used if no custom configuration exists
3028
* @return resolved {@link Exchange}
3129
*/
3230
public Exchange resolveExchange(ExchangeProperties customProperties, ExchangeProperties defaultProperties)
@@ -46,12 +44,10 @@ else if (defaultProperties != null)
4644
/**
4745
* Resolves a {@link ConnectionFactory} for multiple connection factories considering the override of a possible custom connection factory.
4846
*
49-
* @param customProperties
50-
* {@link BeanReferenceConnectionFactoryProperties} containing the configuration for a custom connection factory that overrides the default
51-
* @param defaultBeanName
52-
* bean name for the default connection factory (created by spring and renamed by the configuration)
53-
* @param applicationContext
54-
* context containing all spring beans
47+
* @param customProperties {@link BeanReferenceConnectionFactoryProperties} containing the configuration for a custom connection factory that overrides the
48+
* default
49+
* @param defaultBeanName bean name for the default connection factory (created by spring and renamed by the configuration)
50+
* @param applicationContext context containing all spring beans
5551
* @return resolved {@link ConnectionFactory}
5652
*/
5753
public ConnectionFactory resolveConnectionFactory(BeanReferenceConnectionFactoryProperties customProperties, String defaultBeanName,
@@ -77,39 +73,40 @@ public ConnectionFactory resolveConnectionFactory(BeanReferenceConnectionFactory
7773
/**
7874
* Resolves a bean name for multiple connection factories considering the override of a possible custom connection factory.
7975
*
80-
* @param customProperties
81-
* {@link BeanReferenceConnectionFactoryProperties} containing the bean name for a custom connection factory that overrides the default
82-
* @param defaultBeanName
83-
* bean name for the default connection factory (created by spring and renamed by the configuration) that is used if no custom
76+
* @param customProperties {@link BeanReferenceConnectionFactoryProperties} containing the bean name for a custom connection factory that overrides the
77+
* default
78+
* @param defaultBeanName bean name for the default connection factory (created by spring and renamed by the configuration) that is used if no custom
8479
* @return beanName for the resolved connectionFactory
8580
*/
8681
public String resolveConnectionFactoryBeanName(BeanReferenceConnectionFactoryProperties customProperties, String defaultBeanName)
8782
{
88-
if (customProperties != null)
89-
{
90-
return customProperties.getBeanName();
91-
}
92-
93-
return defaultBeanName;
83+
return customProperties != null ? customProperties.getBeanName() : defaultBeanName;
9484
}
9585

9686
/**
9787
* Resolves the max concurrent consumers for a {@link DefaultMessageListenerContainer} considering the override of a possible custom configuration.
9888
*
99-
* @param customMax
100-
* custom quantity to override the default
101-
* @param defaultMax
102-
* quantity which is used if no custom configuration exists
89+
* @deprecated Please use {@link #resolveValue(Integer, Integer)} instead
90+
* @param customMax custom quantity to override the default
91+
* @param defaultMax quantity which is used if no custom configuration exists
10392
* @return resolved max concurrent consumers for a {@link DefaultMessageListenerContainer}
10493
*/
94+
@Deprecated(forRemoval = true, since = "2.6.0")
10595
public int resolveMaxConcurrentConsumers(Integer customMax, Integer defaultMax)
10696
{
107-
if (customMax != null)
108-
{
109-
return customMax.intValue();
110-
}
97+
return resolveValue(customMax, defaultMax);
98+
}
11199

112-
return defaultMax.intValue();
100+
/**
101+
* Resolves the value between a custom and a default value
102+
*
103+
* @param customValue custom value to override the default
104+
* @param defaultValue value which is used if no custom configuration exists
105+
* @return resolved max
106+
*/
107+
public int resolveValue(Integer customValue, Integer defaultValue)
108+
{
109+
return customValue != null ? customValue.intValue() : defaultValue.intValue();
113110
}
114111

115112
/**
@@ -122,14 +119,10 @@ public int resolveMaxConcurrentConsumers(Integer customMax, Integer defaultMax)
122119
* <li>existing if only one {@link MessageConverter} exists</li>
123120
* </ol>
124121
*
125-
* @param customProperties
126-
* custom configuration to override the default
127-
* @param defaultProperties
128-
* default configuration which is used if no custom configuration exists
129-
* @param applicationContext
130-
* context containing all spring beans
131-
* @param existingMessageConverters
132-
* list of existing {@link MessageConverter}
122+
* @param customProperties custom configuration to override the default
123+
* @param defaultProperties default configuration which is used if no custom configuration exists
124+
* @param applicationContext context containing all spring beans
125+
* @param existingMessageConverters list of existing {@link MessageConverter}
133126
* @return resolved {@link MessageConverter}
134127
*/
135128
public MessageConverter resolveMessageConverter(MessageConverterProperties customProperties, MessageConverterProperties defaultProperties,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.avides.spring.rabbit.configuration;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.List;
6+
7+
import org.junit.Test;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.beans.factory.annotation.Qualifier;
10+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
11+
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
12+
import org.springframework.boot.test.context.SpringBootTest;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.test.context.ActiveProfiles;
15+
16+
import com.avides.spring.rabbit.listener.container.DefaultMessageListenerContainer;
17+
import com.avides.spring.rabbit.test.support.AbstractIT;
18+
import com.avides.spring.rabbit.test.support.DummyListenerOne;
19+
import com.avides.spring.rabbit.test.support.DummyListenerZero;
20+
21+
@ActiveProfiles({ "prefetchCount" })
22+
@SpringBootTest(classes = { SpringRabbitAutoConfigurationForDifferentPrefetchCountIT.TestConfiguration.class, DummyListenerZero.class, DummyListenerOne.class })
23+
public class SpringRabbitAutoConfigurationForDifferentPrefetchCountIT extends AbstractIT
24+
{
25+
@Autowired
26+
private List<DefaultMessageListenerContainer<Object>> listenerContainer;
27+
28+
@Qualifier("[email protected]@firstConnectionFactory")
29+
@Autowired
30+
private DefaultMessageListenerContainer<Object> dummyListenerZeroContainer;
31+
32+
@Qualifier("[email protected]@secondConnectionFactory")
33+
@Autowired
34+
private DefaultMessageListenerContainer<Object> dummyListenerOneContainer;
35+
36+
@Test
37+
public void testPrefetchCount()
38+
{
39+
assertEquals(2, listenerContainer.size());
40+
assertEquals(20, getPrefetchCount(dummyListenerZeroContainer));
41+
assertEquals(4, getPrefetchCount(dummyListenerOneContainer));
42+
}
43+
44+
@EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class)
45+
@Configuration
46+
static class TestConfiguration extends AbstractIT.TestConfiguration
47+
{
48+
// nothing, just to exclude RabbitAutoConfiguration
49+
}
50+
}

src/test/java/com/avides/spring/rabbit/configuration/SpringRabbitAutoConfigurationForMultipleConnectionFactoriesIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ private void checkListenerContainer()
9292
assertEquals("guest", dummyListenerZeroContainer.getConnectionFactory().getUsername());
9393
assertEquals("/", dummyListenerZeroContainer.getConnectionFactory().getVirtualHost());
9494
assertEquals(host, dummyListenerZeroContainer.getConnectionFactory().getHost());
95-
assertTrue(dummyListenerOneContainer.isActive());
95+
assertTrue(dummyListenerZeroContainer.isActive());
96+
assertEquals(500, getPrefetchCount(dummyListenerZeroContainer));
9697

9798
assertEquals(AcknowledgeMode.AUTO, dummyListenerOneContainer.getAcknowledgeMode());
9899
assertEquals(1, dummyListenerOneContainer.getQueueNames().length);
@@ -101,6 +102,7 @@ private void checkListenerContainer()
101102
assertEquals("/", dummyListenerOneContainer.getConnectionFactory().getVirtualHost());
102103
assertEquals(host, dummyListenerOneContainer.getConnectionFactory().getHost());
103104
assertTrue(dummyListenerOneContainer.isActive());
105+
assertEquals(500, getPrefetchCount(dummyListenerOneContainer));
104106
}
105107

106108
private void checkQueues()

0 commit comments

Comments
 (0)