当前位置:首页 > > 架构师社区
[导读]既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题。在一些涉及到了金钱交易的场景下,消息丢失还是很致命的。那么在RocketMQ中存在哪几种消息丢失的场景呢?

作者:霁云HYY

来源:https://blog.csdn.net/LO_YUN/article/details/103949317


既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题。在一些涉及到了金钱交易的场景下,消息丢失还是很致命的。那么在RocketMQ中存在哪几种消息丢失的场景呢?
先来一张最简单的消费流程图:

RocketMQ消息丢失场景及解决办法

上图中大致包含了这么几种场景:
  • 生产者产生消息发送给RocketMQ
  • RocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失
  • 消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束
这三种场景都可能会产生消息的丢失,如下图所示:

RocketMQ消息丢失场景及解决办法

1、场景1中生产者将消息发送给Rocket MQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失
2、场景2中消息需要持久化到磁盘中,这时会有两种情况导致消息丢失
  • RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失
  • 如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失
3、消费者成功从RocketMQ中获取到了消息,还没有将消息完全消费完的时候,就通知RocketMQ我已经将消息消费了,然后消费者宕机,但是RocketMQ认为消费者已经成功消费了数据,所以数据依旧丢失了。
那么如何保证消息的零丢失呢?

RocketMQ消息丢失场景及解决办法

1、场景1中保证消息不丢失的方案是使用RocketMQ自带的事务机制来发送消息,大致流程为:
  • 首先生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑
  • half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路
  • 如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息
  • 如果生产者的核心链路执行成功,则通知RocketMQ commit half消息,让消费者可以消费这条数据
其中还有一些RocketMQ长时间没有收到生产者是要commit/rollback操作的响应,回调生产者接口的细节,感兴趣的可以参考《  RocketMQ分布式事务原理》( https://blog.csdn.net/LO_YUN/article/details/101673893)


在使用了RocketMQ事务将生产者的消息成功发送给RocketMQ,就可以保证在这个阶段消息不会丢失
2、在场景2中要保证消息不丢失,首先需要将os cache的异步刷盘策略改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘。
一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对RocketMQ采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障。
3、在场景3中,消息到达了消费者,RocketMQ在代码中就能保证消息不会丢失
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){                                  
        //对消息进行处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
上面这段代码中,RocketMQ在消费者中注册了一个监听器,当消费者获取到了消息,就会去回调这个监听器函数,去处理里面的消息
当你的消息处理完毕之后,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS 只有返回了CONSUME_SUCCESS,消费者才会告诉RocketMQ我已经消费完了,此时如果消费者宕机,消息已经处理完了,也就不会丢失消息了
如果消费者还没有返回CONSUME_SUCCESS时就宕机了,那么RocketMQ就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失
为了保证消息不会丢失,在consumeMessage方法中就直接写消息消费的业务逻辑就可以了,如果非要搞一些骚操作,比如下面的代码
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context)
     //开启子线程异步处理消息
     new Thread() {
   public void run() {
    //对消息进行处理
   }
  }.start();                                 
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
如果新开子线程异步处理消息的话,就有可能出现消息还没有被消费完,消费者告诉RocketMQ消息已经被消费了,结果宕机丢失消息的情况。
使用上面一整套的方案就可以在使用RocketMQ时保证消息零丢失,但是性能和吞吐量也将大幅下降
  • 使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能
  • 同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级
  • 主从机构的话,需要Leader将数据同步给Follower
  • 消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成
消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景而定,选择合适的方案才是最好的

特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:

RocketMQ消息丢失场景及解决办法

RocketMQ消息丢失场景及解决办法

RocketMQ消息丢失场景及解决办法

长按订阅更多精彩▼

RocketMQ消息丢失场景及解决办法

如有收获,点个在看,诚挚感谢

免责声明:本文内容由21ic获得授权后发布,版权归原作者所有,本平台仅提供信息存储服务。文章仅代表作者个人观点,不代表本平台立场,如有问题,请联系我们,谢谢!

本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除( 邮箱:macysun@21ic.com )。
换一批
延伸阅读

当我们谈起C语言,很多人第一印象是面向底层、面向系统的编译型语言,写出来的程序一般都是从头到尾跑一遍就结束,很少和用户交互。但实际上,C语言从诞生开始就支持交互式的程序设计,通过标准输入输出和用户实时交互,接收用户输入、...

关键字: C语言 编程

在STM32嵌入式开发中,精确延时是非常基础但又极其关键的功能。无论是驱动单总线传感器(比如DS18B20)、控制LCD屏幕时序、还是生成精确的脉冲信号,都需要用到微秒级甚至纳秒级精度的延时。很多新手刚开始使用STM32...

关键字: STM32 嵌入式

在C语言开发中,位操作符是最容易被新手忽略,却能在嵌入式开发、底层驱动、算法优化中发挥巨大作用的工具。和常规的算术操作、逻辑操作相比,位操作直接操作二进制位,执行效率更高,占用代码空间更小,能轻松实现很多用常规方法很难实...

关键字: C语言 位操作符

在C语言开发中,原生字符串的使用一直存在诸多不便。传统C语言中,字符串本质是以'\0'结尾的固定字符数组,开发人员必须提前预估字符串的最大长度:如果预估过小,拼接或插入字符时会出现缓冲区溢出,引发内存越界错误;如果预估过...

关键字: C语言 字符串

随着半导体测试向更高复杂性与并行度演进,多工位自动测试设备(ATE)和SiC/GaN测试对电感、电容和电阻(LCR)测量的需求不断提升。然而,传统的外接台式LCR仪表和基于线缆的设置难以扩展,而且会降低可重复性。本文介绍...

关键字: 半导体 电阻 嵌入式

智能高尔夫球追踪系统是一项创新的嵌入式电子项目,旨在展示如何将紧凑型物联网硬件集成到体育科技应用中。在体育领域,高尔夫球扮演着主要角色,但在现代时代,所有设备都变得更加智能化,高尔夫球也由此演变为智能高尔夫球。本项目结合...

关键字: 嵌入式 物联网 NRF无线技术

在工业自动化、智能传感、嵌入式组网等分布式总线系统中,设备自动地址分配是实现节点互联互通、即插即用的核心技术。传统人工配置地址方式存在操作繁琐、扩展性差、地址冲突风险高、维护成本高等诸多问题,已无法适配大规模、动态化的总...

关键字: 总线 嵌入式 组网

2026年6月8日 – 专注于引入新品的全球电子元器件和工业自动化产品授权代理商贸泽电子 (Mouser Electronics) 正式宣布,首次荣获全球嵌入式应用安全连接解决方案知名供应商NXP® Semiconduc...

关键字: 物联网 移动设备 嵌入式

城市灯火通明、生活井然运转的背后,总有人在不被注意的地方,日复一日地坚持着。他们或许没有惊天动地的故事,却在漫长岁月里,用自己的方式守护着他人的生活。近日,乡村教师班爱花、爱心厨房运营者丫丫妈,以及“扛楼女工”云姐的故事...

关键字: 西门子家电 洗碗机 嵌入式

2026年5月15日,正值“世界无幽日”,一组数据再次引发公众关注:据《中国幽门螺杆菌感染防控》白皮书显示,我国幽门螺杆菌人群感染率已接近50%,涉及超过7亿人口,且家庭内传播特征极为显著——父母若感染,子女感染风险升高...

关键字: 洗碗机 AI 嵌入式
关闭