网站首页 > 博客文章 正文
要想实现消息有序,需要从Producer和Consumer两方面来考虑。
如果对Kafka不了解的话,可以先看这篇博客《一文快速了解Kafka》。
针对消息有序的业务需求,还分为全局有序和局部有序。
- 全局有序:一个Topic下的所有消息都需要按照生产顺序消费。
- 局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。
全局有序
由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。
因此要满足全局有序,需要1个Topic只能对应1个Partition。
而且对应的consumer也要使用单线程或者保证消费顺序的线程模型,否则会出现下图所示,消费端造成的消费乱序。
局部有序
要满足局部有序,只需要在发消息的时候指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。
如下图所示,在不增加partition数量的情况下想提高消费速度,可以考虑再次hash唯一标识(例如订单orderId)到不同的线程上,多个消费者线程并发处理消息(依旧可以保证局部有序)。
消息重试对顺序消息的影响
对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了。这时对于本身顺序为AB的消息顺序变成了BA。
针对这种问题,严格的顺序消费还需要max.in.flight.requests.per.connection参数的支持。
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,同时也会提升吞吐量。把它设为1就可以保证消息是按照发送的顺序写入服务器的。
此外,对于某些业务场景,设置max.in.flight.requests.per.connection=1会严重降低吞吐量,如果放弃使用这种同步重试机制,则可以考虑在消费端增加失败标记的记录,然后用定时任务轮询去重试这些失败的消息并做好监控报警。
猜你喜欢
- 2024-12-11 Kafka知识点总结 一篇读懂 建议收藏
- 2024-12-11 连 Kafka 的稳定性都不懂,也敢说自己会用Kafka
- 2024-12-11 从架构上详解(SLB,Redis,Mysql,Kafka,Clickhouse)热点问题
- 2024-12-11 Kafka最佳实践 - 合理安排kafka的broker、partition、consumer数量
- 2024-12-11 学Kafka,就必须了解的再均衡问题
- 2024-12-11 面试题之kafka:如何提高整体的消费能力
- 2024-12-11 认识kafka消费者(从kafka读取数据)
- 2024-12-11 Kafka快速入门
- 2024-12-11 扫盲Kafka?看这一篇就够了!
- 2024-12-11 kafka启动顺序&命令启动
你 发表评论:
欢迎- 08-03 Docker 命令入门实战:搞懂这些才算真正入门!
- 08-03Docker 常用命令分类汇总
- 08-03docker常用命令大全,看这一篇就够了
- 08-03Docker命令大全详解(39个常用命令)
- 08-03Docker 常用命令手册
- 08-03Docker命令最全详解(39个最常用命令)
- 08-03Docker命令最全详解(29个最常用命令)
- 08-03C++语法进阶-字符:字符变量(char)
- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- flutterrun (59)
- powershellfor (73)
- messagesource (71)
- plsql64位 (73)
- vueproxytable (64)
- npminstallsave (63)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- nacos启动失败 (64)
- ssh-add (70)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- qcombobox样式表 (68)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)