网站首页 > 博客文章 正文
背景
工作中Java开发大部分项目可能都是使用spring/springboot,好处就是可以很容易的集成其他技术或中间件。本文通过源码讲解了springboot集成kafka时如何消费的。
实例
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: cosnumer-group
enable-auto-commit: true
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
producer:
retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
KafkaConsumerListener.java
/**
* 自定义bean
**/
@Component
public class KafkaConsumerListener {
// 加个注解即可实现监听消费
@KafkaListener(topics = "app-test")
public void receive(ConsumerRecord<?, ?> record) {
// handle
}
}
可以看出通过非常简单的代码就可以实现KafkaConsumer的功能。
原理
Springboot项目的主函数(Main)一般都是SpringApplication.run(xx.class, args)。看过源码的应该都知道,springboot项目启动过程其实核心是在ApplicationContext中完成的,主要流程如下:
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
}
}
}
其中bean的创建和初始化都是在
finishBeanFactoryInitialization这一步完成的,spring bean初始化前后都会有相应的处理(类似于拦截器),见源码:
/**
* Initialize the given bean instance, applying factory callbacks
* as well as init methods and bean post processors.
* <p>Called from {@link #createBean} for traditionally defined beans,
* and from {@link #initializeBean} for existing bean instances.
* @see #applyBeanPostProcessorsBeforeInitialization
* @see #invokeInitMethods
* @see #applyBeanPostProcessorsAfterInitialization
*/
protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) {
...
// 初始化前置处理
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
// 初始化
invokeInitMethods(beanName, wrappedBean, mbd);
// 初始化后置处理
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
return wrappedBean;
}
如果bean的方法中有使用注解@KafkaListener,则会在
KafkaListenerAnnotationBeanPostProcessor#
postProcessAfterInitialization 方法中做相应的处理
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
...
// 找出使用KafkaListener注解的方法
Map<Method, Set<KafkaListener>> annotatedMethods = xxx;
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// 做相应的处理
processKafkaListener(listener, method, bean, beanName);
}
}
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
...
// 注册 KafkaEndpoint(记录consumer信息)
this.registrar.registerEndpoint(endpoint, factory);
}
上面的操作已经记录了所有Kafka Consumer的信息(入口是解析@KafkaListener)。
在所有bean创建并初始化之后会调用bean的afterPropertiesSet方法:
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
这一步会逐一创建对应的
KafkaMessageListenerContainer(
KafkaListenerEndpointRegistry
#registerListenerContainer),记录所有的kafkaListenerContainer
/**
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
*/
@SuppressWarnings("unchecked")
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
// KafkaListenerContainerFactory#createListenerContainer(endpoint)
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
}
在完成bean初始化之后(包括后置处理),会在finishRefresh这一步中启动一些继承了Lifecycle的bean
/**
* Start the specified bean as part of the given set of Lifecycle beans,
* making sure that any beans that it depends on are started first.
*/
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = lifecycleBeans.remove(beanName);
bean.start();
}
KafkaListenerEndpointRegistry就继承了Lifecycle,所以相应的start方法就会调用
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
this.running = true;
}
从源码中可以看到,依次会启动
KafkaMessageListenerContainer
// 启动KafkaConsumer
@Override
protected void doStart() {
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
// 设置为可执行的状态,拉取数据的时候会用到
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
KafkaConsumer线程执行,就可以不断消费到数据了
@Override
public void run() {
// running = true 开启拉取数据
while (isRunning()) {
// 拉取数据
pollAndInvoke();
}
}
总结
springboot很容易的集成了kafka组件,kafka producer和consumer都帮我们封装好了,我们使用的时候只需要简单的配置和修改就可以运行了。但是其中的一些原理还得我们通过分析源码才能看出底层做了什么手脚,本文就通过源码介绍了项目启动过程中Kafka consumer是如何启动的。
猜你喜欢
- 2025-07-23 Spring IoC Container 原理解析(spring中ioc的作用与原理)
- 2025-07-23 Spring之底层架构核心概念解析(spring底层设计模式)
- 2025-07-23 深入理解 JSR 303:数据校验在 Spring Boot 中的应用
- 2025-07-23 Spring如何加载「IOC容器」以及「装载Bean」源码解读
- 2025-07-23 Spring Security 自动踢掉前一个登录用户,一个配置搞定
- 2025-07-23 Spring Boot 控制反转(IoC)全面解析:从基础到高级实践
- 2025-07-23 spring cloud Alibaba参考的中文文档
- 2025-07-23 Spring Boot执行过程(执行springboot的jar)
- 2025-07-23 SpringBoot中6种拦截器使用场景(springboot拦截器放行)
- 2025-07-23 Java开发200+个学习知识路线-史上最全(框架篇)
你 发表评论:
欢迎- 07-23Spring IoC Container 原理解析(spring中ioc的作用与原理)
- 07-23Spring之底层架构核心概念解析(spring底层设计模式)
- 07-23深入理解 JSR 303:数据校验在 Spring Boot 中的应用
- 07-23Springboot集成Kafka原理(kafka结合springboot)
- 07-23Spring如何加载「IOC容器」以及「装载Bean」源码解读
- 07-23Spring Security 自动踢掉前一个登录用户,一个配置搞定
- 07-23Spring Boot 控制反转(IoC)全面解析:从基础到高级实践
- 07-23spring cloud Alibaba参考的中文文档
- 最近发表
-
- Spring IoC Container 原理解析(spring中ioc的作用与原理)
- Spring之底层架构核心概念解析(spring底层设计模式)
- 深入理解 JSR 303:数据校验在 Spring Boot 中的应用
- Springboot集成Kafka原理(kafka结合springboot)
- Spring如何加载「IOC容器」以及「装载Bean」源码解读
- Spring Security 自动踢掉前一个登录用户,一个配置搞定
- Spring Boot 控制反转(IoC)全面解析:从基础到高级实践
- spring cloud Alibaba参考的中文文档
- Spring Boot执行过程(执行springboot的jar)
- SpringBoot中6种拦截器使用场景(springboot拦截器放行)
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- flutterrun (59)
- 系统设计图 (58)
- powershellfor (73)
- messagesource (71)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- qcombobox样式表 (68)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)