Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.rocketmq.spring.autoconfigure;

import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
Expand All @@ -53,7 +53,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
private RocketMQMessageConverter rocketMQMessageConverter;

public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
Expand All @@ -66,11 +66,11 @@ public void setApplicationContext(ApplicationContext applicationContext) throws

@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class);
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Objects.nonNull(beans)) {
beans.forEach(this::registerTemplate);
}
beans.forEach(this::registerTemplate);
}

private void registerTemplate(String beanName, Object bean) {
Expand Down Expand Up @@ -131,7 +131,7 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
}

private void validate(ExtRocketMQTemplateConfiguration annotation,
GenericApplicationContext genericApplicationContext) {
GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQTemplateConfiguration",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.rocketmq.spring.autoconfigure;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
Expand All @@ -25,10 +29,10 @@
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
Expand All @@ -40,11 +44,6 @@
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
Expand All @@ -60,7 +59,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
private RocketMQMessageConverter rocketMQMessageConverter;

public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
Expand All @@ -73,12 +72,11 @@ public void setApplicationContext(ApplicationContext applicationContext) throws

@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener.
class);
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Objects.nonNull(beans)) {
beans.forEach(this::registerContainer);
}
beans.forEach(this::registerContainer);
}

private void registerContainer(String beanName, Object bean) {
Expand Down Expand Up @@ -130,7 +128,7 @@ private void registerContainer(String beanName, Object bean) {
}

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

container.setRocketMQMessageListener(annotation);
Expand All @@ -148,13 +146,11 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));

if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}

container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
package org.apache.rocketmq.spring.autoconfigure;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import java.util.stream.Collectors;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
Expand All @@ -46,18 +45,16 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware

private ConfigurableApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}

@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class);
@Override public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Objects.nonNull(beans)) {
beans.forEach(this::registerTransactionListener);
}
beans.forEach(this::registerTransactionListener);
}

private void registerTransactionListener(String beanName, Object bean) {
Expand Down

This file was deleted.