MQ系列1:消息中间件执行原理MQ系列2:消息中间件的技术选型MQ系列3:RocketMQ 架构分析MQ系列4:NameServer 原理解析MQ系列5:RocketMQ消息的发送模式MQ系列6:消息的消费MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障MQ系列9:高可用架构分析MQ系列10:如何保证消息幂等性消费MQ系列11:如何保证消息可靠性传输
1 介绍消息的有序性在很多业务场景中占有很重要的位置。比如购物场景,需要按照 创建订单 --> 订单付款 --> 完成订单 顺序执行。又比如出行场景,接单 --> 接送到达目的地 --> 付款 --> 完成订单。这种是严格按照顺序执行的,这样的顺序消费才不会出问题,而且各个订单之间是互相独立和并行执行的。所以,在MQ中,如何稳定地保证顺序性消息处理,是一个不可避免的话题。
2 消息的有序性说明消息的有序执行,一般不是单个组件的能力。而是整个消息从生产,排队,存储到消费都是有序的,比如上面提到的购物和出行场景。这就要求我们在消息队列(如果是Kafka,还是RocketMQ、RabbitMQ)中,保证以下前提:
(资料图片)
消息生产的有序性:即生产者组件有序发送消息消息入出队列的有序性:即消息是按照进入的先后顺序排队列放的,遵循FIFO原则。消息的存储的有序性:与上一点一致,部分场景下为了提高可用,就是要持久化到磁盘,这时候应该遵循有序存放,才能保证后续有序消费消息消费的有序性:即按照顺序进行消费。又分为全局顺序消息与部分顺序消息,全局是指Topic下的所有消息都要保证顺序;部分顺序消息保证每一组消息被顺序消费即可。这边还有个问题,如果想让全局都是顺序性消费,那么只能用一个消费者去消费队列(一般来说也是单个生产者),这是会严重影响整体性能的,一般没这个,都是分组顺序执行消费的。
2.1 消息生产的有序性要保证整个消息队列的有序性执行,首先要保证消息生产的有序性。RocketMQ在Broker中防止了很多Topic,主题(Topic)可以看做消息的归类,我们将消息进行类型划分,相同类型的消息称为一个 Topic。比如我们在淘宝或京东上购买商品的的过程,就可能产生:购物车消息、交易消息、物流消息等,1条消息必然归属于1个 Topic 。1个 Topic可以有0 ~ n 个生产者向其发送消息;也可以被 0~n 个消费者订阅和处理,于是就有出现了生产者组和消费者组,如下图:
或者同一个Topic中,创建不同的Queue,同一个消息生产者将消息隔离发送到不同的Queue中:
按照上述的模式,同理,我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储,比如一次完整的消费过程:创建订单、付款、完成订单按照顺序在一个队列(Queue)中执行那就可以了。
★ 同时我们要保证同一组的消息在消息生产的时候投送到一个组中。这个相对来说不难,可以这么做:
比如一个订单的多个子消息的父订单号是一致,我们把这些消息按照订单号取模,投送到对应的Queue中就行了,比如 订单号 % 队列数量( 163105015 % 9)发送消息自定义消息标签(消息标签可以用队列编号命名),一组消息使用同一个标签,改组标签对应的消息都投向标签所在的队列。★ 业务程序方面,必须使用同步发送的方式,这样才能保证生产者发送的消息有序,否则按照FIFO的原则,很可能 订单完成会被先消费。但是我们业务程序,比如Java代码中为了提升性能,可能使用多线程的模式进行事件触发。多线程下保证生产者顺序性,可以使用锁并配合 spring的publish event(按照顺序执行的内部队列),持久化之后,再按照先进先出的顺序推送消息进入MQ中。可以参考下 ,大概就是将你的事件进行顺序化一下。
★ 上述方法也不能完完全全的避免顺序化执行。如果broker服务发生故障,或者消息发生丢失,都有可能导致事件消费不完整,出现不一致的问题。
2.2 消息有序性存储Broker 存储架构采用文件存储机制(类似Kafka),即直接在磁盘上使用文件来保存消息,而不是采用Redis或者MySQL之类的持久化工具。它会把消息存储所属相关的文件存储在ROCKETMQ_HOME下,包含三个部分:
CommitLog 消息元数据ConsumeQueue 消息逻辑队列IndexFile 索引文件存储消息的元数据,所有消息都会顺序存入到CommitLog文件中。ConsumeQueue是指存储消息在CommitLog上的索引,一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。所以一切都是顺序性操作下来的,而且按照 MessageQueue 做了隔离了,不用担心乱序的问题。详细参考 《MQ系列8:数据存储,消息队列的高可用保障》
2.3 消息消费的有序性最后一步就是消费的有序性了,既然消息生产和消息持久化都可以做到有序性。那么只要保证消费的有序性,就能保证整个消息队列的有序执行。这边以RocketMQ为例子,RockerMQ采用MessageListener 回调函数进行监听,监听到消息之后进行数据处理。MessageListener主要提供了两种消费模式,如下:
有序消费模式MessageListenerOrderly并发消费模式MessageListenerConcurrently其中有序消费模式有序消费模式MessageListenerOrderly可以保证按照顺序进行消息处理。但是消费的业务代码实现是多线程并行的,依然是无法保证的。实际上RocketMQ也是这么做的,MessageListenerConcurrently拉到消息之后会提交到线程池去消费,而MessageListenerOrderly则是通过分布式锁和本地锁保证同时只有一条线程去消费一个队列(Queue)上的数据。这种消费模式就是使用以下3把锁来确保顺序性:
broker端的分布式锁messageQueue的本地synchronized锁ProcessQueue的本地consumeLock3 总结要消息的顺序性消费:需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue),然后使用线程池消费的时候使用分布式锁和本地锁保证同时只有一条线程去消费一个队列(Queue)上的数据。
- MQ系列12:如何保证消息顺序性
- 天天热资讯!确认禁赛了!至少41场!唉,这小子没救了…
- 企业所得税怎么算?营业利润的计算公式是什么?
- 头狼:黄金又到了见证奇迹的时刻,1938干多-天天微动态
- 去现场吧,与文博会来个亲密接触
- 诺基亚成国产手机恶梦 一年收百亿专利费:5G时代华为等翻身|微动态
- 苹果 iOS 17 相机引入“水平”辅助线,帮用户调正角度拍摄 每日视讯
- 双倍余额递减法计提折旧公式是什么?双倍余额递减法是什么?
- 中国驻英国使馆发言人就英方宣称将发布移除中国监控设备时间表答记者问 最新
- 个体工商户营业执照年检网上申报流程?个体工商户营业执照年检网上申报截止时间?
- 热门:江苏华辰06月07日主力资金大幅流出
- 国恩股份:公司专注于新材料纵向一体化平台发展,对于半导体领域业务暂无涉及
- 环球即时看!中集集风 “量海101” 正式下水 助力广东省“十四五”海上风场运维
- 毛利率计算公式是什么?毛利率怎么分类?
- 金塔县游泳馆即将开馆试运行 世界独家
- 会计的报考条件是什么?考会计证需要多少钱?
- 全球微资讯!SK-II、OLAY计划在王府井集团旗下门店卖出2.5亿元
- 克明食品:子公司5月生猪销售收入同比增38.4% 天天快播报
- 焦点热文:金埔园林:“金埔转债”于6月8日配售及网上申购
- 【天天新要闻】苏利股份拟定增募资不超6亿元 去年初发可转债募9.57亿
- 注意!出版传媒将于6月28日召开股东大会-最新消息
- 每日热讯!世界第6的钱天一为何打不上球?王艺迪被偏爱,马琳不该犯错
- 谁的水平更高?AI挑战写今年高考作文 你给打几分
- O药直击Seagen腹地:一场并不意外的意外
- 【新华500】新华500指数(989001)7日跌0.46%
- 今日快讯:海鸥股份:公司所属行业为通用设备制造
- 全球实时:华立股份06月07日主力资金大幅流出
- 中天精装06月07日主力资金大幅流出
- 天天观点:龙舟赛期间南昌免费发放1万张景区门票
- Lunaz展示其电动阿斯顿马丁DB6EV