网站首页 > 博客文章 正文
生产者生产消息的速度大于消费者的消费速度如何解决?kafka有消息保留机制,有些消息在消息被消费之前就有可能被清理掉了,从而导致消息的丢失。
当生产者发送消息的速度大于消费者消费消息的速度时,可以采用多线程的方式实现消息的消费,提高整体的消费能力。多线程的实现方式有多种:
第一种也是最常见的方式:==线程封闭==,即为每个线程实例化一个kafkaconsumer对象,这种线程称为消费线程。每个消费线程可以消费一个或多个分区的消息。这种实现方式的并发度受限于分区的实际个数,当==消费线程的个数==大于分区时,就会有部分消费线程一直处于空闲状态。==优点==是可以顺序地消费各个分区的消息。==缺点==也很明显,每个消费线程都要维护一个独立的tcp连接,如果分区数和消费线程的值都比较大,那么会造成不小的系统开销。
第二种与第一种对应,==多个消费线程同时消费一个分区==,这个通过assign(),seek()方法实现。==优点==是可以打破原有的消费线程个数不能超过分区数的限制,进一步提高消费能力。但这种方式对于==位移提交和顺序控制==的处理就会变得异常复杂,实际使用极少。
第三种:==利用多线程提高消息处理的能力==。因为如果消息处理得非常快,那么poll()拉取的频次也会更高,进而提高整体消费的性能。一般而言,poll()拉取消息的速度是相当快的,而整体消费的瓶颈也正在消息的逻辑处理上。所以将处理消息模块改为多线程的实现方式,能带动整体消费性能的提升。
- 注意kafkaConsumerThread(消费线程)可以横向扩展来提高整体的消费能力。消费线程中线程池(ThreadPoolExecutor)的最后一个参数拒绝策略设置为callerRunsPolicy,可以有效的防止==消费能力不及==poll()拉取能力时导致的异常。
- 缺点就是对于==消息的顺序处理==比较困难。这里引入一个共享的offsets来参与提交(Map\<TopicPatition, OffsetAndMetadata\> offsets) ,每一个消息处理完之后都会把位移保存到共享变量offsets中,kafkaConsumerThread在每一次poll()方法之后都读取Offsets中的内容并进行位称提交。
- 写入offsets时问题:==如何解决并发问题?==注意在实现过程中需要对offsets读写需要进行加锁处理,防止出现并发问题。
for(TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
sychronized(offsets) {
if(!offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
} else {
long position = offsets.get(tp).offset();
if(position < lastConsumeOffset - 1) {
offsets.put(tp, new OffsetMetaData(lastConsumedOffset + 1)) {
}
}
}
}
- 如何解决位移覆盖的问题?
synchronized(offsets) {
if(!offsets.isEmpty()) {
kafkaConsumer.commitAndSync(offsets);
offsets.clear();
}
}
- 这种移位提交的方式会有数据丢失的风险,什么情况下产生,如何解决?(扩展 滑动窗口解决方案//---todo)
- 上一篇: 认识kafka消费者(从kafka读取数据)
- 下一篇: 学Kafka,就必须了解的再均衡问题
猜你喜欢
- 2024-12-11 Kafka知识点总结 一篇读懂 建议收藏
- 2024-12-11 连 Kafka 的稳定性都不懂,也敢说自己会用Kafka
- 2024-12-11 从架构上详解(SLB,Redis,Mysql,Kafka,Clickhouse)热点问题
- 2024-12-11 Kafka最佳实践 - 合理安排kafka的broker、partition、consumer数量
- 2024-12-11 学Kafka,就必须了解的再均衡问题
- 2024-12-11 认识kafka消费者(从kafka读取数据)
- 2024-12-11 Kafka快速入门
- 2024-12-11 扫盲Kafka?看这一篇就够了!
- 2024-12-11 kafka启动顺序&命令启动
- 2024-12-11 超详细 Kafka 入门(最佳实践)
你 发表评论:
欢迎- 08-03 Docker 命令入门实战:搞懂这些才算真正入门!
- 08-03Docker 常用命令分类汇总
- 08-03docker常用命令大全,看这一篇就够了
- 08-03Docker命令大全详解(39个常用命令)
- 08-03Docker 常用命令手册
- 08-03Docker命令最全详解(39个最常用命令)
- 08-03Docker命令最全详解(29个最常用命令)
- 08-03C++语法进阶-字符:字符变量(char)
- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- flutterrun (59)
- powershellfor (73)
- messagesource (71)
- plsql64位 (73)
- vueproxytable (64)
- npminstallsave (63)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- nacos启动失败 (64)
- ssh-add (70)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- qcombobox样式表 (68)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)