多topic注入kafka消费者
多topic注入kafka消费者
@KafkaListener() 两种方式加载多个topic
# application.properties
spring.kafkaListenerList = topic1,topic2
第一种
通过EL表达式进行注入
@KafkaListener(topics = "#{'${spring.kafkaListenerList}'.split(',')}")
第二种
通过解析数据生成对象进行注入
KafkaListenerReceiver
是自定义的消费对象
- KafkaListenerConfig.java
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@Configuration
@Slf4j
public class KafkaListenerConfig implements BeanDefinitionRegistryPostProcessor, PriorityOrdered {
private List<String> listenerList;
KafkaListenerConfig() throws IOException {
Properties pro = new Properties();
File f;
f =new File( java.net.URLDecoder.decode(this.getClass().getResource("/application.properties").getPath(),"utf-8"));
FileInputStream in = new FileInputStream(f);
pro.load(in);
in.close();
String listString = pro.getProperty("spring.verify.kafkaListenerList");
if (StringUtils.isEmpty(listString)){
log.error("spring.verify.kafkaListenerList is empty or null");
System.exit(0);
}
log.info("spring.kafkaListenerList: "+listString);
listenerList = Arrays.asList(listString.split(","));
}
public List<String> getListenerList() {
return listenerList;
}
public void setListenerList(List<String> listenerList) {
this.listenerList = listenerList;
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
for (String listener : listenerList) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(KafkaListenerReceiver.class);
builder.addConstructorArgValue(listener);
beanDefinitionRegistry.registerBeanDefinition(listener, builder.getBeanDefinition());
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
- KafkaListenerReceiver.java
@KafkaListener(topics = "#{__listener.topic}")