消息为什么会丢失1. 在消息生产过程中丢失消息原因解决办法2. 在消息队列中丢失消息原因解决办法集群 Kafka 防止消息丢失的原理建议3. 在消费的过程中丢失消息原因注意点如何保证消息只被消费一次避免消息丢失的代价如何避免消息重复1. 什么是幂等2. 在生产、消费过程中增加幂等性的保证方案问题业务层面的处理总结
消息为什么会丢失

为了保证消息只被消费一次,首先要保证消息不会丢失。在
生产者 → 消息 → 消息队列 → 消费者 这个过程中,有如下场景存在消息丢失的可能:- 消息从生产者写入到消息队列的过程
- 消息在消息队列中的存储场景
- 消息被消费者消费的过程
1. 在消息生产过程中丢失消息
原因
首先,消息的生产者一般是我们的业务服务器,消息队列是独立部署在单独的服务器上的。二者之间的网络虽然是内网,但是也存在抖动的可能,一旦发生网络抖动,消息就有可能因为网络错误而丢失。
解决办法
消息重传:当发现发送超时后就将消息重新发一次,但是也不能无限制地重传消息。一般来说,如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。
2. 在消息队列中丢失消息

原因
以 Kafka 举例,消息在 Kafka 时存储在本地磁盘上的,为了减少消息存储时对磁盘的随机
I/O ,我们一般会将消息先写入到操作系统的 Page Cache 中,然后再找到合适的时机刷新到磁盘上。比如,Kafka 可以配置当达到某一时间间隔,或者积累了一定消息数量的时候再刷盘,也就是异步刷盘。
如果发生机器掉电或者机器异常重启,那么 Page Cache 中还没来得及刷盘的消息就会丢失。
解决办法
- 缩短刷盘间隔,但这样会对性能有较大影响,而且从经验来看,机器宕机或者掉电的概率不高,所以不建议这样做。
- 集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。
集群 Kafka 防止消息丢失的原理
集群 Kafka 中有一个 Leader 负责消息的写入和消费,同时可以有多个 Follower 负责数据的备份。Follower 中有一个特殊集群叫做 ISR(in-sync replicas),当 Leader 发生故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 中的数据会异步地复制给 Follower,这样在 Leader 掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。
由于默认消息时异步复制的,所以一旦 Leader 宕机,哪些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一个选项叫做
acks ,当这个选项被设置为 all 时,生产者发送的每一条消息除了发送给 Leader 之外还会发送给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。
从上面这张图来看,当设置
acks=all 时,需要同步执行 1,3,4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以在实际应用中需要仔细地权衡考量。建议
- 如果需要确保消息一条都不能丢失,那么不建议开始消息队列的同步刷盘,而是使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息菜返回成功
- 如果对消息的丢失有一定容忍度,那么可以不要部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了
- 如果我们的系统业务一般对于消息的丢失有一定容忍度,比如 tracking 系统的调度,如果某个 tracking 消息丢失了,我们后面只需要补发最近没有被调度的 tracking 任务即可。
3. 在消费的过程中丢失消息

以 Kafka 为例,一个消费者消费消息的进度是记录在消息队列集群总的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。
原因
接收消息和处理消息的过程都可能发生异常或者失败,比如:
- 接收消息时网络发生抖动,导致消息并没有被正确接收到
- 处理消息时可能发生业务异常导致处理流程未执行完成,这时如果更新消费进度,那么这条消息就永远不会被处理了,也可以认为时丢失了
注意点
一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题。比如一条消息处理之后,消费者恰好宕机,那么因为没有更新消费进度,所以这个消费者重启后,还会重复消费这条消息。
如何保证消息只被消费一次
避免消息丢失的代价
从上面的总结中可以发现,为了避免消息丢失,我们需要付出两方面的代价:
- 性能方面的损耗
- 消息重复消费
性能的损耗我们可以接受,因为一般业务系统只有在写请求时才会发送消息队列操作,而一般系统的写请求的量级并不高,但是消息一旦被重复消费,就会造成业务逻辑处理的错误。
如何避免消息重复
想要避免消息重复是很难做到的,因为网络的抖动、机器的宕机和处理的异常都是比较难避免的,在工业上没有成熟的方法。
因此,我们会把要求放宽,只要保证即使消费到了重复的消息,从消费的最终结果来看和只消费一次是等同的就好,也就是保证在消息的生产和消费的过程是 幂等 的。
1. 什么是幂等
幂等是一个数学上的概念,它的含义是多次执行同一个操作和执行一次操作,最终得到的结果是相同的。
说白了,你可以这么理解幂等: 一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。
比如,如果我们消费一条消息的时候,要给现有的库存数量减 1,那么如果消费两条相同的消息就会给库存数量减 2,这就不是幂等的。
而如果消费一条消息后,处理逻辑是将库存的数量设置为 0,或者是如果当前库存数量是 10 时则减 1,这样在消费多条消息时,所得到的结果就是相同的, 这就是幂等的。
2. 在生产、消费过程中增加幂等性的保证
方案
无论是在生产端的幂等姓保证方式,还是消费端通用的幂等姓保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先对比这个 ID 是否存在,如果存在,则认为消息已经被使用过。
这是一种标准的实现幂等的方式,可以在项目中直接拿来使用。逻辑上的伪代码如下:
is_id_existed = select_by_id(id) # 判断 ID 是否存在
if is_id_existed:
return # 存在则直接返回
else:
process(message) # 不存在,则处理消息
save(id) # 存储 ID问题
如果消息在处理之后,没有来得及写入数据库,消费者宕机重启后发现数据库中没有这条消息,还是会重复执行两次消费逻辑。
这时候就需要引入事务机制,保证消息处理和写入数据库必须 同时成功 或者 同时失败,但这样消息处理的成本就更高了,所以,如果对于消息重复没有特别严格的要求,可以直接使用这种通用方案,而不考虑引入事务。
业务层面的处理
增加乐观锁。比如,消息处理程序需要给一个人的账户加钱,可以通过乐观锁的方式解决。
具体方式:给每个人的账号数据中增加一个版本号的字段,在生产消息时,先查询这个账户的版本号,并将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号之后,在执行更新账户金额的 SQL 的时候带上版本号,类似于执行:
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;这样,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;
在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。
总结
- 消息的丢失可以通过生产端的重试、消息队列配置集群模式,以及消费端合理处理消费进度三个方式解决
- 为了解决消息的丢失,通常会造成性能上的问题以及消息重复的问题
- 通过保证消息处理的幂等性可以解决消息的重复问题
但值得注意的是,虽然有很多应对消息丢失的方法,但并不是说消息丢失一定不能接受。
毕竟可以看到,在允许消息丢失的情况下,消息队列的性能会更好,方案实现的复杂度更低。比如像日志处理这样的场景,日志存在的意义在于排查系统的问题,而系统出现问题的几率不高,偶发的丢失几条日志是可以接受的。
所以方案设计看场景,这是一切设计的原则。