11-MQ(38)

1. RoMQ中事务消息的实现

RocketMQ 的事务消息也可以被认为是一个两阶段提交

  • 事务开始的时候会先发送一个半消息给 Broker
    • 半消息的意思就是这个消息此时对 Consumer 是不可见的,而且也不是存在真正要发送的队列中,而是一个特殊队列
  • 发送完半消息之后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,还是发送回滚消息

发送提交或者回滚消息失败了怎么办?

  • 影响不大,Broker 会定时的向 Producer 来反查这个事务是否成功,具体的就是 Producer 需要暴露一个接口,通过这个接口 Broker 可以得知事务到底有没有执行成功,没成功就返回未知,因为有可能事务还在执行,会进行多次查询
  • 如果成功那么就将半消息恢复到正常要发送的队列中,这样消费者就可以消费这条消息了

简化的官网示例代码:

  • 无非就是多加个反查事务结果的方法,然后把本地事务执行的过程写在 TransationListener 里面
79ecc60733f64f80816c0bf51b0b69e8
  • 第四步这个消息要么就是正常的消息,要么就是抛弃什么都不存在,此时这个事务消息已经结束它的生命周期
30e8f02e68414af4a2f4502a24b33757

1. RoMQ事务消息源码分析

  • sendMessageInTransaction() 方法,关系结构还是很清晰的
b7135e8c32944975a4c293fcd3d58113

流程:将消息塞入一些属性,标明此时这个消息还是半消息,然后发送至 Broker,然后执行本地事务,然后将本地事务的执行状态发送给 Broker

1. Broker处理消息

在 Broker 的 SendMessageProcessor#sendMessage 中会处理这个半消息请求,因为今天主要分析的是事务消息,所以其他流程不做分析,大致原理

  • sendMessage 中查到接受来的消息的属性里面 MessageConst.PROPERTY_TRANSACTION_PREPAREDtrue ,则这个消息是事务消息
  • 然后再判断一下这条消息是否超过最大消费次数,是否要延迟,Broker 是否接受事务消息等操作后,将这条消息真正的 topic 和队列存入属性中,然后重置消息的 topic 为 RMQ_SYS_TRANS_HALF_TOPIC ,并且队列是 0 的队列中,使得消费者无法读取这个消息
75f8a3df24234553b468d3a0e9af890f
  • 就是来了波狸猫换太子,其实延时消息也是这么实现的,最终将换了皮的消息入盘
  • Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest
ce60444d19874a9c9948de13a67006a7
  • 如果是提交事务就是把皮再换回来写入真正的 topic 所属的队列中,供消费者消费
  • 如果是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了
    • 那个后台服务就是 TransactionalMessageCheckService 服务,它会定时的扫描半消息队列,去请求反查接口看看事务成功了没,具体执行的就是 TransactionalMessageServiceImpl#check 方法
    • 首先取半消息 topic 即 RMQ_SYS_TRANS_HALF_TOPIC 下的所有队列,半消息写入的队列是 id 是 0 的这个队列,然后取出这个队列对应的 half_op 主题下的队列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC 主题下的队列
    • half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在 half_op 中
    • 然后调用 fillOpRemoveMap 方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在 half_op 里面的半消息调用 putBackHalfMsgQueue 又写入了 commitlog 中,然后发送事务反查请求,这个反查请求也是 oneWay,即不会等待响应。当然此时的半消息队列的消费 offset 也会推进
    • 然后 producer 中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到 TransactionMQProducer 的线程池中进行,最终会调用上面发消息时候定义的 checkLocalTransactionState 方法,然后将事务状态发送给 Broker,也是用 oneWay 的方式

为什么要有个 half_op,为什么半消息处理了还要再写入 commitlog 中

  • 首先 RocketMQ 的设计就是顺序追加写入,所以说不会更改已经入盘的消息,那事务消息又需要更新反查的次数,超过一定反查失败就判定事务回滚
  • 因此每一次要反查的时候就将以前的半消息再入盘一次,并且往前推进消费进度。而 half_op 又会记录每一次反查的结果,不论是提交还是回滚都会记录,因此下一次还循环到处理此半消息的时候,可以从 half_op 得知此事务已经结束了,因此就被过滤掉不需要处理了
  • 如果得到的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,因此还能再次反查,并且更新反查次数

Broker 的事务处理流程:

1ad0bb52deb04d0b9378bdf9c5a64a99

2. 消息队列?

维基百科的定义:

In computer science, message queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality.

在计算机科学领域,消息队列和邮箱都是软件工程组件,通常用于进程间或同一进程内的线程通信。它们通过队列来传递消息-传递控制信息或内容,群组通信系统提供类似的功能

  • 消息队列就是一个使用队列来通信的组件
  • 现在而言日常所说的消息队列:常常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题
    • 异步处理
    • 服务解耦
    • 流量控制

3. 消息队列背景

  • 从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。需要有一个「东西」来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰
  • 多引入一个中间件系统的稳定性就下降一层,运维的难度抬高一层。因此要权衡利弊,系统是演进的

1. 异步处理

  • 调用链路长、响应就慢了,并且相对于扣库存和下单,积分和短信没必要那么的 "及时"。只需要在下单结束那个流程,扔个消息到消息队列中就可以直接返回响应了。而且积分服务和短信服务可以并行的消费这条消息
  • 减少请求的等待,还能让服务异步并发处理,提升系统总体性能
261ca544f3e94fe891637d1d52d568a9

2. 服务解耦

消息队列来解决系统间耦合问题,订单服务把订单相关消息塞到消息队列中,下游系统谁需要谁就订阅这个主题

  • 增加了积分服务、短信服务,可能又要来个营销服务,之后领导又想做个大数据,又来个数据分析服务等
9b148424287b4aa58f91534da850bda2

3. 流量控制

「削峰填谷」,后端服务相对而言都是比较「弱」的。因为业务较重,处理时间较长(eg:秒杀活动爆发式流量打过来可能就顶不住了)。因此需要引入一个中间件来做缓冲,消息队列再适合不过了

  • 网关的请求先放入消息队列中,后端服务尽自己最大能力去消息队列中消费请求。超时的请求可以直接返回错误
  • 当然还有一些服务特别是某些后台任务,不需要及时地响应,并且业务处理复杂且流程长,那么过来的请求先放入消息队列中,后端服务按照自己的节奏处理

分别对应着生产者生产过快消费者消费过慢两种情况,消息队列都能在其中发挥很好的缓冲效果

ca653a4134984dcd8acc6d426f37d1de

4. 消息队列模型

1. 队列模型

生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,即每条消息只能被一个消费者消费

6dfbf004406c4188ae5db5c8d351b048

2. 发布/订阅模型

为了解决一条消息能被多个消费者消费的问题,发布/订阅模型就来了。该模型是将消息发往一个Topic 即主题中,所有订阅了这个 Topic 的订阅者都能消费这条消息

  • RabbitMQ 采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题
    • RocketMQ 和 Kafka 采用发布/订阅模型
  • 发布/订阅模型兼容队列模型,即只有一个消费者的情况下和队列模型基本一致
e2b685a829094866887a4a242cb68f08

5. 消息队列核心术语?

发送消息方为生产者 Producer,接受消费消息方为消费者 Consumer,消息队列服务端为Broker,还有个命名中心(RocketMQ 中叫 namesrv, Kafka 中用 zookeeper )存储 Broker、生产消费者等服务地址、主题信息等

  • 仅关注消息的发送和消费流程:
    • 消息从 Producer 发往 Broker
    • Broker 将消息存储至本地
    • 然后 Consumer 从 Broker 拉取消息,或者 Broker 推送消息至 Consumer,最后消费
56c2619792794d3aa0c0f7e12404cfcf
  • 为了提高并发度,往往发布/订阅模型还会引入队列或分区的概念。即消息是发往一个主题下的某个队列或某个分区中
    • RocketMQ 中叫队列
    • Kafka 叫分区
    • 某个主题下有 5 个队列,那么这个主题的并发度就提高为 5,同时可以有 5 个消费者并行消费该主题的消息。一般可以采用轮询或key hash 取余等策略来将同一个主题的消息分配到不同的队列中
  • 与之对应的消费者一般都有组的概念 Consumer Group,即消费者都是属于某个消费组的。一条消息会发往多个订阅了这个主题的消费组
    • 假设现在有两个消费组分别是 Group 1 和 Group 2,它们都订阅了 Topic-a。此时有一条消息发往 Topic-a ,那么这两个消费组都能接收到这条消息
    • 然后这条消息实际是写入 Topic 某个队列中,消费组中的某个消费者对应消费一个队列的消息
    • 在物理上除了副本拷贝之外,一条消息在 Broker 中只会有一份,每个消费组会有自己的 offset 即消费点位来标识消费到的位置。在消费点位之前的消息表明已经消费过了。当然这个 offset 是队列级别的。每个消费组都会维护订阅的 Topic 下的每个队列的 offset
2563e12150c742d4a3acc8884d650256

6. 保证消息不丢失?

常见的消息队列,只要配置得当,我们的消息就不会丢

  • 可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务。eg:日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应
670fdbfe47594249bde0184e2b43b2e8

消息的生命周期三个阶段:

1. 生产消息

  • 生产者发送消息至 Broker,需要处理 Broker 的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好 try-catch,妥善的处理响应,如果 Broker 返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等

2. 存储消息

  • 存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了
  • 如果 Broker 是集群部署,有多副本机制,即消息不仅仅要写入当前 Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了

3. 消费消息

  • 消费者真正执行完业务逻辑之后,再发送给 Broker 消费成功,才是真正的消费

7. 如何处理重复消息?

假设我们发送消息只管发,不管 Broker 的响应,那么发往 Broker 的消息是不会重复的但是不允许这样,因为无法保证消息的可靠性

  • 要确定 Broker 收到消息就得等 Broker 的响应,那么就可能存在 Broker 已经写入了,但是响应由于网络原因生产者没有收到,于是生产者又重发了一次,此时消息就重复了
  • 消费者消费时,假设消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新 Consumer offset 了,但是这一刻消费者挂了。另一个消费者顶上,业务又被执行了一遍

对平常业务而言消息重复是不可避免的,只能在业务上处理重复消息所带来的影响。关键点就是幂等

1. 幂等处理重复消息

幂等是数学上的概念。理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的

# 执行多少遍 money 都是 150,这就叫幂等
update t1 set money = 150 where id = 1 and money = 100;

因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果

  • 更通用的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号
  • 通过数据库的约束(唯一键)。eg:insert into update on duplicate key...
  • 记录关键的key。eg:处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个 ID 是否已经被处理过了,如果没处理再进行下一步
  • 也可以用全局唯一ID等

8. 如何保证消息的有序性

有序性分:全局有序和部分有序

1. 全局有序

如果要保证消息的全局有序,首先只能由一个生产者往 Topic 发送消息,并且一个 Topic 内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!

  • 不过一般情况下都不需要全局有序,即使是同步 MySQL Binlog 也只需要保证单表消息有序即可
d2f4274f90a04b3b8c8b7985744337e7

2. 部分有序

因此绝大部分的有序需求是部分有序,部分有序就可以将 Topic 内部划分成需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率

25eb539d6b7c43ffbd984051bc3ec361

9. 如何处理消息堆积?

消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配导致的

  • 有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了
  • 先定位消费慢的原因
    • 如果是 bug 则处理 bug
    • 如果是因为本身消费能力较弱,优化下消费逻辑。eg:批量处理
    • 水平扩容,增加 Topic 的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者

10. 推、拉模式优缺点

(热点)基本上会问 RocketMQ 采用的是推模式还是拉模式啊?是拉模式?不是有 PushConsumer 吗?

  • 推拉模式指的是 Comsumer 和 Broker 之间的交互
  • 默认认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息

1. 推模式

推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送

1. 好处

  • 消息实时性高,Broker 接受完消息之后可以立马推送给 Consumer
  • 对于消费者使用来说更简单

2. 缺点

  • 推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。推送速率过快就像 DDos 攻击一样
  • 并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。增加了 Broker 自身的复杂度
  • 所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下

2. 拉模式

Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer

1. 好处

  • 拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。根据一定的策略停止拉取,或间隔拉取
  • 拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费时自然由消费者主动发起
  • 拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送

2. 缺点

  • 消息延迟。毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率。eg:隔个2 秒请求一次
  • 消息忙请求。eg:消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功

3. 到底是推还是拉

RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ

  • 拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有存储功能,它的使命就是接受消息,保存好消息等待消费者消费即可
  • 而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向

4. 长轮询

RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式

  • 所谓的“长轮询”:通过消费者去 Broker 拉取消息时,当有消息的情况下 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,即保持连接,暂时 hold 主请求,然后在对应队列或分区有新消息到来的时候都会提醒消息来了,通过之前 hold 主的请求及时返回消息,保证消息的及时性
  • 一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住请求,避免了多次频繁的拉取动作,当消息一到就返回消息

5. RocketMQ长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已

因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了

  • 后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个 PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息
  • 然后 Broker 的 PullMessageProcessor 里的 processRequest() 用来处理拉消息请求的,有消息就直接返回
3f3a0bf7c01c401fbf72a452769911ea
  • suspendPullRequest() 做了什么
6505e8cba9bc4ba8b9bca1501d2cb402
  • PullRequestHoldService 这个线程会每 5 秒从 pullRequestTablePullRequest 请求,然后看看待拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,最终调用 PullMessageProcessorexecuteRequestWhenWakeup() 重新尝试处理这个消息的请求,也就是再来一次,整个长轮询的时间默认 30 秒
95b72a85e9e84891a822bb2f5487907f
  • 简单的说,就是 5 秒会检查一次消息是否到了,如果到了则调用 processRequest 再处理一次。不太实时啊? 5秒?
  • 别急,还有个 ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueueIndexFile 两种类型的数据,并且也会有唤醒请求的操作,来弥补每 5s 一次这么慢的延迟
  • 消息写入并且会调用 pullRequestHoldService#notifyMessageArriving

流程:

c1bc5ee3d1ee4bb3bb7da6931cb28378

6. Kafka长轮询

消费者请求在 “长轮询” 中阻塞等待

  • 就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求
  • 并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回

消费者端源码(突出重点,删减一些代码)

0c2ddef76adf4cdc916a4f00b311b56f

上面那个 poll 接口,其实从注解直接就知道确实是等待数据的到来或超时

5cc08de2a5944ea2a000d3c592439346

最终 client.poll 调用的是什么

4d8f34441d444299b6b998ef27fecdc2

最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)


Broker 处理所有请求的入口,在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest

4ec888247d514df2a945d34ab2aa6a70

截取最重要的部分

5cf3e37840654e96955cbac5829af10d

fetchMessages 方法内部实现,源码给的注释已经很清晰

74d6ac55b6a24dd7bb20596c2614ea90

利用时间轮,来执行定时任务,这里是 delayedFetchPurgatory,专门用来处理延迟拉取操作


延迟操作需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后消息到了之后该执行的方法,还需要有执行完毕之后方法,当然还得有个超时之后的方法

这几个方法其实对应的就是代码里的 DelayedFetch,这个类继承了 DelayedOperation 内部有:

  • isCompleted:检查条件是否满足的方法
  • tryComplete:条件满足之后执行的方法
  • onComplete:执行完毕之后调用的方法
  • onExpiration:过期之后需要执行的方法

判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?

  • 这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,在 ReplicaManager#appendRecords 方法内部再深入个两方法可以看到
f0d6c774084143ed8f33c19842761e51

11. RoMQ事务消息缺点

一个事务涉及 mysql 和 mq,到底哪个写入成功重要?(本质:mysql 和 mq 之间的写入顺序)

  • 在 RocketMQ 中,事务消息的实现方案是先发半消息(半消息对消费者不可见),待半消息发送成功之后,才能执行本地事务,等本地事务执行成功之后,再向 Broker 发送请求将半消息转成正常消息,这样消费者就可以消费此消息
  • 这种顺序等于先得成功写入 mq,然后再写入数据库
    • 问题:即 mq 集群挂了,事务就无法继续进行了,等于整个应用无法正常执行了

RocketMQ 事务消息流程图:

1021ce92895a4f218cc7c90856281790

1. 实现

参考下 qmq 的方法

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。目前在公司内部日常消息qps在60W左右,生产上承载将近4W+消息topic,消息的端到端延迟可以控制在10ms以内

1. 本地消息

  • 本地消息是分布式事务实现方式,就是利用了关系型数据库的事务能力,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息表的插入,即将业务的执行和将消息放入到消息表中的操作放在同一个事务中提交
  • 这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果其他服务调用成功就修改这条本地消息的状态
  • 如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理
  • 本地事务消息表还是很简单的,也是一种最大努力通知的思想
16fd08f03b8b499ba73d7cb55a27861a

2. qmq事务消息

数据库中建一张表

4599d2f6a9dc4caba6ca52feb15249b9
  • 核心思想:就是本地消息表!利用关系型数据库的事务能力,将业务的写入和消息表的写入融在一个事务中,这样业务成功则消息表肯定写入成功
  • 然后在事务提交之后,立刻发送事务消息

如果发送成功,则删除本地消息表中的记录

@Transactional // 在一个事务中
public void yes() {
    Order order = buildOrder();
    orderDao.insert(order);
    Message message = buildMessage(order);
    messageDao.insert(message);
    // 异步,在事务提交后执行

    triggerAfterTransactionCommit(() -> {
        messageClient.send(message);
        messageDao.delete(message);
    });
}

如果消息发送失败,也就是比如 mq 集群挂了,并不会影响事务的执行,业务的执行和事务消息的插入都已经成功了,那此时待消息已经安安静静的在消息库里等着,后台能会有一个补偿任务,会将这些消息捞出来重新发送,直到发送成功

  • 顺序就属于先写数据库,再发mq,即使 mq 集群挂了,也不会影响事务的进行,不会导致应用无法正常执行了

2. RocketMQ、QMQ

RocketMQ 和 QMQ 事务消息的区别,QMQ 事务消息更优的原因

  • RocketMQ 只支持单事务消息,也就是无法在一个事务内发送多种事务消息
  • 而 QMQ 可以在一次事务中发多个消息
  • QMQ 的消息模型多了个 pull log,便于解决 consumer 的动态扩容缩容问题,这也是比 RocketMQ 更灵活的一个地方
@Transactional // 在一个事务中
public void yes() {
    Order order = buildOrder();
    orderDao.insert(order);
    producer.sendMessage(buildMessageA(order));
    producer.sendMessage(buildMessageB(order));
    producer.sendMessage(buildMessageC(order));
}
  • 然后 RocketMQ 事务消息的实现还需要提供一个反查机制,因为 RocketMQ 事务消息的提交是 oneway 的发送方式,有可能 Broker 没有接收到事务提交的消息,所以 Broker 会定时去生产者那边查看事务是否已经执行完成,因此生产者需要保存本地事务执行结果,简单的就是用一个 map 保存,让 Broker 可以通过消息的事务 id 查找到事务执行的结果
    • 如果还要考虑发送事务消息的生产者挂了,那么 Broker 会找同个生产组的其他生产者来查询事务结果,所以这个存储还得提出来放到第三方,而不是本地内存保存
    • RocketMQ 得多维护一个本地事务执行结果,稍微有点麻烦
bb3877370e3c4ff284204db251d5f429
  • QMQ 还得建表呢,不过按照 QMQ 说的:如果公司方便的话,可以直接合并进 DBA 的初始化数据库的自动化流程中,这样就透明了

RocketMQ 的 api 不太友好,改造有点大,之后的迁移不太方便

  • 完整的使用 RocketMQ 事务消息的代码:
9e542561fa0e4ae5ad3001de1873c0cc
  • 如果想要搞事务消息,首先新建 transationMQproducer,然后再新建一个 transcationListenerImpl, 再覆盖 listener 执行事务的方法和回查事务的方法,等于你得把业务逻辑实现在 transcationListenerImpl 内部,这和我们平日里在 service 里面实现事务的差距就有点大了
  • 而 QMQ 提供了内置 Spring 事务的方式,所以就直接在 service 实现就行了
@Transactional // 在一个事务中
public void yes() {
    Order order = buildOrder();
    orderDao.insert(order);
    producer.sendMessage(buildMessageA(order));
}

12. Kafka事务消息实现

  • RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束
  • 而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败
757416110917497dae50279c3c35492b

Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,不是我们想的那种事务消息,RocketMQ 的才是

  • 消息可靠性有三种,分别是最多一次、恰好一次、最少一次,基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次
  • 消息恰好被消费一次当然我们所有人追求的,基本上难以达到
  • 它的恰好一次只能存在一种场景,就是从 Kafka 作为消息源,然后做了一番操作之后,再写入 Kafka 中
    • 通过幂等,和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次
    • Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次
cc69983e02984350ac200c0abdbeaad0

Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分

  • 在开始事务时,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中
  • 然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息
  • 然后发送完毕之后生产者会向事务协调者发送提交或回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了
  • 最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了
adf3846fc8b84209b51ecf8c2db6e0c7

13. Kafka时间轮实现

Kafka 中的时间轮是多层次时间轮实现

  • 添加任务的方法。在添加的时候就设置任务执行的绝对时间
7b11adce34514322b99429c6f02c928c
  • Netty 中是通过固定的时间间隔扫描,时候未到就等待来进行时间轮的推动。这样会有空推进的情况
  • 而 Kafka 就利用了空间换时间的思想,通过 DelayQueue,来保存每个槽,通过每个槽的过期时间排序。这样拥有最早需要执行任务的槽会有优先获取。如果时候未到,那么 delayQueue.poll 就会阻塞着,这样就不会有空推进的情况

推进的方法:

47160730be27436cbe1e0706cd889bcc
  • add 方法每次都是根据 expiration < currentTime + interval 来进行对比的,而 advanceClock 就是用来推进更新 currentTime

1. 小结

  • Kafka 用了多层次时间轮来实现,并且是按需创建时间轮,采用任务的绝对时间来判断延期,并且对于每个槽(槽内存放的也是任务的双向链表)都会维护一个过期时间,利用 DelayQueue 来对每个槽的过期时间排序,来进行时间的推进,防止空推进的存在
  • 每次推进都会更新 currentTime 为当前时间戳,当然做了点微调使得 currentTime 是 tickMs 的整数倍。并且每次推进都会把能降级的任务重新插入降级
  • 可以看到这里的 DelayQueue 的元素是每个槽,而不是任务,因此数量就少很多了,这应该是权衡了对于槽操作的延时队列的时间复杂度与空推进的影响

14. Kafka索引设计

1. 索引在Kafka中的实践

首先 Kafka 的索引是稀疏索引,避免索引文件占用过多的内存,从而可以在内存中保存更多的索引。对应的就是 Broker 端参数 log.index.interval.bytes 值,默认4KB,即4KB的消息建一条索引

Kafka 中有三大类索引:位移索引、时间戳索引、已中止事务索引。分别对应 .index.timeindex.txnindex 文件

相关的源码:

  • AbstractIndex.scala:抽象类,封装了所有索引的公共操作
  • OffsetIndex.scala:位移索引,保存了位移值和对应磁盘物理位置的关系
  • TimeIndex.scala:时间戳索引,保存了时间戳和对应位移值的关系
  • TransactionIndex.scala:事务索引,启用 Kafka 事务之后才会出现这个索引(本文暂不涉及事务相关内容)
05079c3c639c44ba8bb0e9e22929c621

AbstractIndex 的定义,在代码里已经注释了,成员变量里面还有个 entrySize 。这个变量其实是每个索引项的大小,每个索引项的大小是固定的

d2d6ab28376145a0a39d8b40b6994217

2. entrySize

  • 在 OffsetIndex 中是 override def entrySize = 8,8 个字节。 在 TimeIndex 中是 override def entrySize = 12,12 个字节
  • 在 OffsetIndex 中,每个索引项存储了位移值和对应的磁盘物理位置,因此 4 + 4 = 8,但是不对啊,磁盘物理位置是整型没问题,但是 AbstractIndex 的定义 baseOffset 来看,位移值是长整型,不是因为 8 个字节么?
  • 因此存储的位移值实际上是相对位移值,即真实位移值 -baseOffset 的值
  • 相对位移用整型存储够么?够,因为一个日志段文件大小的参数 log.segment.bytes 是整型,因此同一个日志段对应的 index 文件上的位移值 -baseOffset 的值的差值肯定在整型的范围内

为什么要这么麻烦,还要存个差值?

  1. 为了节省空间,一个索引项节省了4 字节,想想那些日消息处理数万亿的公司
  2. 因为内存资源是很宝贵的,索引项越短,内存中能存储的索引项就越多,索引项多了直接命中的概率就高了。这其实和 MySQL InnoDB 为何建议主键不宜过长一样。每个辅助索引都会存储主键的值,主键越长,每条索引项占用的内存就越大,缓存页一次从磁盘获取的索引数就越少,一次查询需要访问磁盘次数就可能变多

互相转化的源码:

65d930329264448493ac2731b9139231

上述解释了位移值是 4 字节,因此 TimeIndex 中时间戳 8 个字节 + 位移值 4 字节 = 12 字节

3. _warmEntries

我们能通过索引项快速找到日志段中的消息,如何快速找到想要的索引项呢?

  • 一个索引文件默认 10MB,一个索引项 8Byte,因此一个文件可能包含 100 多 W 条索引项
  • 不论是消息还是索引,其实都是单调递增,并且都是追加写入的,因此数据都是有序的。在有序的集合中快速查询,二分查找:
368caa5b53fa4c40a898af746a0a450f

这和 _warmEntries 有什么关系?首先想想二分有什么问题?

  • 就 Kafka 而言,索引是在文件末尾追加的写入的,并且一般写入的数据立马就会被读取。所以数据的热点集中在尾部。并且操作系统基本上都是用页为单位缓存和管理内存的,内存又是有限的,因此会通过类 LRU 机制淘汰内存
  • 看起来 LRU 非常适合 Kafka 的场景,但是使用标准的二分查找会有缺页中断的情况,毕竟二分是跳着访问的

when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not cached in the page cache)

当我们查找索引的时候,标准的二分查找对缓存不友好,可能会造成不必要的缺页中断(线程被阻塞等待从磁盘加载没有被缓存到page cache 的数据)

注释还友好的给出了例子:

556cd98581e649458b5747e564bdb693
  • 简单的来讲,假设某索引占 page cache 13 页,此时数据已经写到了 12 页。按照 kafka 访问的特性,此时访问的数据都在第 12 页,因此二分查找的特性,此时缓存页的访问顺序依次是0,6,9,11,12。因为频繁被访问,所以这几页一定存在 page cache 中
  • 当第12页不断被填充,满了之后会申请新页第13页保存索引项,而按照二分查找的特性,此时缓存页的访问顺序依次是:0,7,10,12。7 和 10 很久没被访问到了,很可能已经不再缓存中了,然后需要从磁盘上读取数据。注释说:在他们的测试中,这会导致至少会产生从几毫秒跳到 1 秒的延迟

基于以上问题,Kafka 使用了改进版的二分查找,改的不是二分查找的内部,而且把所有索引项分为热区和冷区

  • 查询热数据部分时,遍历的 Page 永远是固定的,这样能避免缺页中断
  • 想到了一致性 hash,一致性 hash 相对于普通的 hash 不就是在 node 新增的时候缓存的访问固定,或者只需要迁移少部分数据

源码实现:

cdab7914fa894b308a03404f3af85fc6

实现并不难,但是为何是把尾部的8192作为热区?

This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section lookup. So that, the entire warm section is really "warm". When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N), and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS, SPARC, Power, ARM etc.).

  • 大致内容就是现在处理器一般缓存页大小是4096,那么8192可以保证页数小于等3,用于二分查找的页面都能命中

This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.

  • 8KB的索引可以覆盖 4MB(offset index)or 2.7MB(time index)的消息数据,足够让大部分在 insync 内的节点在热区查询

可以看到朴素的算法在真正工程上的应用还是需要看具体的业务场景的,不可生搬硬套。并且彻底的理解算法也是很重要的,例如死记硬背二分,怕是看不出来以上的问题。还有底层知识的重要性。不然也是看不出来对缓存不友好的

从 Kafka 的索引冷热分区到 MySQL InnoDB 的缓冲池管理

MySQL 的将缓冲池分为了新生代和老年代。默认是 37 分,即老年代占3,新生代占7。即看作一个链表的尾部 30% 为老年代,前面的 70% 为新生代。替换了标准的LRU淘汰机制

3076b16c0bd840b19028ea3510ab1a0b

MySQL的缓冲池分区是为了解决预读失效和缓存污染问题

  1. 预读失效:因为会预读页,假设预读的页不会用到,那么就白白预读了,因此让预读的页插入的是老年代头部,淘汰也是从老年代尾部淘汰。不会影响新生代数据
  2. 缓存污染:在类似 like 全表扫描时,会读取很多冷数据。并且有些查询频率其实很少,因此让这些数据仅仅存在老年代,然后快速淘汰才是正确的选择,MySQL为了解决这种问题,仅仅分代是不够的,还设置了一个时间窗口,默认是 1s,即在老年代被再次访问并且存在超过 1s,才会晋升到新生代,这样就不会污染新生代的热数据

15. Kafka控制器事件处理全流程

(不是热点,但可以是亮点)

Controller 是核心组件,作用是管理和协调整个 Kafka 集群

  • 主题的管理,创建和删除主题
  • 分区管理,增加或重分配分区
  • 分区 Leader 选举
  • 监听 Broker 相关变化,即 Broker 新增、关闭等
  • 元数据管理,向其他 Broker 提供元数据服务

为什么需要 Controller ?

理解:凡是管理或协调某样东西,都需要有个 Leader ,由他来把控全局,管理内部,对接外部。这其实对外也是好的,外部不需要和整体沟通,只要和一个决策者交流,效率更高

《深入理解Kafka:核心设计与实践原理》

在 Kafka 的早期版本中,并没有采用 Kafka Controller 这样一概念来对分区和副本的状态进行管理,而是依赖于 ZooKeeper,每个 broker 都会在 ZooKeeper 上为分区和副本注册大量的监听器(Watcher)。 当分区或副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖 ZooKeeper 的设计会有脑裂、羊群效应,以及造成 ZooKeeper 过载的隐患。在目前的新版本的设计中,只有 Kafka Controller 在 ZooKeeper 上注册相应的监听器,其他的 broker 极少需要再监听 ZooKeeper 中 的数据变化,这样省去了很多不必要的麻烦

1. Zookeeper

Controller 极度依赖 ZooKeeper 的(不过社区准备移除 ZooKeeper ,文末再提一下)

  • ZooKeeper 是一个开源的分布式协调服务框架,最常用来作为注册中心等。 ZooKeeper 的数据模型就像文件系统一样,以根目录 "/" 开始,结构上的每个节点称为 znode,可以存储一些信息。节点分为持久节点和临时节点,临时节点会随着会话结束而自动被删除
  • 并且有 Watcher 功能,节点自身数据变更、节点新增、节点删除、子节点数量变更都可以通过变更监听器通知客户端
8fee18d6a4154e50be3872bed8fa78f0

Controller 是如何依赖 Zookeeper?

  • 每个 Broker 在启动时会尝试向 ZooKeeper 注册 /controller 节点来竞选控制器,第一个创建 /controller 节点的 Broker 会被指定为控制器。这就是是控制器的选举
  • /controller 节点是个临时节点,其他 Broker 会监听着此节点,当 /controller 节点所在的 Broker 宕机之后,会话就结束了,此节点就被移除。其他 Broker 伺机而动,都来争当控制器,还是第一个创建 /controller 节点的 Broker 被指定为控制器。这就是控制器故障转移,即 Failover
  • 当然还包括各种节点的监听(eg:主题的增减等),都通过 Watcher 功能,来实现相关的监听,进行对应的处理
  • Controller 在初始化时,会从 ZooKeeper 拉取集群元数据信息,保存在自己的缓存中,然后通过向集群其他 Broker 发送请求的方式将数据同步给对方

Controller 主要用来管理和协调集群,具体是通过 ZooKeeper 临时节点和 Watcher 机制来监控集群的变化(当然还有来自定时任务或其他线程的事件驱动),更新集群的元数据,并且通知集群中的其他 Broker 进行相关的操作

2. Controller底层事件模型

不管是监听 Watcher 的 ZooKeeper Watcher 线程 ,还是定时任务线程亦或是其他线程都需要访问或更新 Controller 从集群拉取的元数据。多线程 + 数据竞争 = 线程不安全。因此需要加锁来保证线程安全

  • 一开始 Kafka 就是用大量的锁来保证线程间的同步,各种加锁使得性能下降,并且多线程加锁的方式使得代码复杂度急剧上升,一不小心就会出各种问题,bug难修复
  • 因此在 0.11 版本之后将多线程并发访问改成了单线程事件队列模式。将涉及到共享数据竞争相关方面的访问抽象成事件,将事件塞入阻塞队列中,然后单线程处理
  • 也就是说其它线程还是在的,只是把涉及共享数据的操作封装成事件由专属线程处理

由于集群元数据会有并发修改问题,因此将操作抽象成事件,由阻塞队列和单线程处理来替换之前的多线程处理,降低代码的复杂度,提升代码的可维护性和性能

3. Controller的请求发送

Controller 从 ZooKeeper 那儿得到变更通知之后,需要告知集群中的 Broker (包括它自身)做相应的处理

Controller 只会给集群的 Broker 发送以下三种请求:

1. LeaderAndIsrRequest

  • 告知 Broker 主题相关分区 Leader 和 ISR 副本都在哪些 Broker 上

2. StopReplicaRequest

  • 告知 Broker 停止相关副本操作,用于删除主题场景或分区副本迁移场景

3. UpdateMetadataRequest

  • 更新 Broker 上的元数据

4. 事件封装成对应的请求

  • Controller 事件处理线程会把事件封装成对应的请求,然后将请求写入对应的 Broker 的请求阻塞队列,然后 RequestSendThread 不断从阻塞队列中获取待发送的请求
89172fad77da42fab2641dcbd08601a0
  • 先解释下 ControllerBrokerStateInfo,它就是个 POJO 类,可以理解为集群每个 broker 对应一个 ControllerBrokerStateInfo
06fa2b8ed9d640789e73863137d8f46c
  • 然后再看下 ControllerChannelManager,从名字可以看出它管理 Controller 和集群 Broker 之间的连接,并为每个 Broker 创建一个 RequestSendThread 线程
f29500ccc68046acad17f5dd4dc193ef

16. Kafka中Zookeeper作用?

ZooKeeper 是一个开源的分布式协调服务框架,你也可以认为它是一个可以保证一致性的分布式(小量)存储系统。特别适合存储一些公共的配置信息、集群的一些元数据等

  • 它有持久节点和临时节点,而临时节点这个玩意再配合 Watcher 机制就很有用
  • 当创建临时节点的客户端与 ZooKeeper 断连之后,这个临时节点就会消失,并且订阅了节点状态变更的客户端会收到这个节点状态变更的通知
ca2c807cf4dc416989f8dad7463568e6
  • 所以集群中某一服务上线或下线,都可以被检测到。因此可以用来实现服务发现,也可以实现故障转移的监听机制
  • Kafka 就是强依赖于 ZooKeeper,没有 ZooKeeper 的话 Kafka 都无法运行

ZooKeeper 为 Kafka 提供了元数据的管理。eg:一些 Broker 的信息、主题数据、分区数据等

  • 在每个 Broker 启动的时候,都会和 ZooKeeper 进行交互,这样 ZooKeeper 就存储了集群中所有的主题、配置、副本等信息
  • 还有一些选举、扩容等机制也都依赖 ZooKeeper
2dc609607126488abaa3b26733c0dcf5
  • 例如:控制器的选举。每个 Broker 启动都会尝试在 ZooKeeper 注册 /controller 临时节点来竞选控制器,第一个创建 /controller 节点的 Broker 会被指定为控制器
  • 竞争失败的节点也会依赖 watcher 机制,监听这个节点,如果控制器宕机了,那么其它 Broker 会继续来争抢,实现控制器的 failover

17. Kafka抛弃Zookeeper?

kafka 2.8 版本移除了对 zk 的依赖。软件架构都是演进的,之所以要变更那肯定是因为出现了瓶颈

先来看看运维的层面的问题

  • 首先身为一个中间件,需要依赖另一个中间件。要说依赖 Netty 这种,那肯定是没问题的。但是 Kafka 的运行需要提供 ZooKeeper 集群,被动了增加了运维的复杂度。所以运维人员不仅得照顾 Kafka 集群,还得照顾 ZooKeeper 集群

再看性能层面的问题

  • ZooKeeper 有个特点,强一致性。如果 ZooKeeper 集群的某个节点的数据发生变更,则会通知其它 ZooKeeper 节点同时执行更新,就得等着大家(超过半数)都写完了才行,这写入的性能就比较差了
bc09bdb1695b41cda0b8c9da7e606e69
  • 小量存储系统,一般而言,ZooKeeper 只适用于存储一些简单的配置或者是集群的元数据,不是真正意义上的存储系统。如果写入的数据量过大,ZooKeeper 的性能和稳定性就会下降,可能导致 Watch 的延时或丢失
  • 所以在 Kafka 集群比较大,分区数很多的时候,ZooKeeper 存储的元数据就会很多,性能就差了
  • 还有,ZooKeeper 也是分布式的,也需要选举,它的选举也不快,而且发生选举的那段时候是不提供服务的!

基于 ZooKeeper 的性能问题 Kafka 之前就做了一些升级

  • 以前 Consumer 的位移数据是保存在 ZooKeeper 上的,所以当提交位移或者获取位移的时候都需要访问 ZooKeeper ,这量一大 ZooKeeper 就顶不住
  • 所以后面引入了位移主题(Topic是__consumer_offsets),将位移的提交和获取当做消息一样来处理,存储在日志中,避免了频繁访问 ZooKeeper 性能差的问题
  • 还有像一些大公司,可能要支持百万分区级别,这目前的 Kafka 单集群架构下是无法支持稳定运行的,也就是目前单集群可以承载的分区数有限。所以 Kafka 需要去 ZooKeeper

所以没了 Zookeeper 之后的 Kafka 是怎样的?

  • 没了 Zookeeper 的 Kafka 就把元数据存储到自己内部了,利用之前的 Log 存储机制来保存元数据。解决了之前元数据过多的问题,可以支持更多的分区
  • 就和上面说到的位移主题一样,会有一个元数据主题,元数据会像普通消息一样保存在 Log 中。所以元数据和之前的位移一样,利用现有的消息存储机制稍加改造来实现了功能
b683e754f13b44d999e8bcd7e81de97a
  • 然后还搞了个 KRaft 来实现 Controller Quorum,这个协议是基于 Raft 的,就理解为它能解决 Controller Leader 的选举,并且让所有节点达成共识
    • 在之前基于 Zookeeper,故障转移太慢了。当 Controller 变更时,需要重新加载所有的元数据到新的 Controller 身上,并且需要把这些元数据同步给集群内的所有 Broker
    • 而 Controller Quorum 中的 Leader 选举切换则很快,因为元数据都已经在 quorum 中同步了,也就是 quorum 的 Broker 都已经有全部了元数据,所以不需要重新加载元数据。并且其它 Broker 已经基于 Log 存储了一些元数据,所以只需要增量更新即可,不需要全量了

18. Kafka处理请求全流程

(不是热点,但是可以是亮点)

1. Reactor模式

基本上只要是底层的高性能网络通信就离不开 Reactor 模式。像 Netty、Redis 都是使用 Reactor 模式

  • 新来一个请求,要么在当前线程直接处理了,要么新起一个线程处理
f0f519583f854021acd01c40646ffd76
  • 池化技术确实能缓解资源问题,但是池子是有限的,池子里的一个线程不还是得候着某个连接,等待指示
705099cb249c4262a837057c0f5b74b5
  • 引入IO多路复用,由一个线程来监视一堆连接,同步等待一个或多个 IO 事件的到来,然后将事件分发给对应的 Handler 处理,这就叫 Reactor 模式

网络通信模型的发展:

单线程 => 多线程 => 线程池 => Reactor模型

Kafka 所采用的 Reactor 模型:

853c9df0a5624e6fb19330bd2426db48

2. Kafka Broker网络通信模型

  • Broker 中有个 Acceptor(mainReactor)监听新连接的到来,与新连接建连之后轮询选择一个 Processor(subReactor)管理这个连接
  • 而 Processor 会监听其管理的连接,当事件到达之后,读取封装成 Request,并将 Request 放入共享请求队列中
  • 然后 I/O 线程池不断的从该队列中取出请求,执行真正的处理。处理完之后将响应发送到对应的 Processor 的响应队列中,然后由 Processor 将 Response 返还给客户端
  • 每个 listener 只有一个 Acceptor 线程,因为它只是作为新连接建连再分发,没有过多的逻辑,很轻量,一个足矣

  • Processor 在 Kafka 中称之为网络线程,默认网络线程池有 3 个线程,对应的参数是 num.network.threads。并且可以根据实际的业务动态增减
  • 还有个 I/O 线程池,即 KafkaRequestHandlerPool,执行真正的处理,对应的参数是 num.io.threads,默认值是 8。I/O 线程处理完之后会将 Response 放入对应的 Processor 中,由 Processor 将响应返还给客户端

网络线程和 I/O 线程之间利用的经典的生产者 - 消费者模式,不论是用于处理 Request 的共享请求队列,还是IO处理完返回的 Response

  • 生产者和消费者之间解耦了,可以对生产者或消费者做独立的变更和扩展。并且可以平衡两者的处理能力
c5a37efd7ab5401bbf0652ff10f81b91

3. 源码级别剖析网络通信模型

Kafka 网络通信组件主要由两大部分构成:

  • SocketServer
  • KafkaRequestHandlerPool

1. SocketServer

3838e5048a844ba7a97eadeaf50a710b

SocketServer 旗下管理着:Acceptor 线程、Processor 线程、RequestChannel 等对象

1. RequestChannel
05598908b8324c05bf4b2819d89b67b1
2. Acceptor
88ea6e6d36764a6b897e87787ef33c50

继承了AbstractServerThread,run()

6cebb7c4603c4157b1a8e622a16769ff

accept(key)

09a43f481d1a473597812f38dd61a6a7
  • 标准 selector 的处理,获取准备就绪事件,调用 serverSocketChannel.accept() 得到 socketChannel,将 socketChannel 交给通过轮询选择出来的 Processor,之后由它来处理 I/O 事件
3. Processor

三个关键的成员

af2455e6676f4da0a024811af85115b9

主要的处理逻辑

6a2361a94d7d472092d160b8a53f405d

主要是将底层读事件 I/O 数据封装成 Request 存入队列中,然后将 I/O 线程塞入的 Response,返还给客户端,并处理 Response 的回调逻辑

2. KafkaRequestHandlerPool

I/O 线程池,实际处理请求的线程

5a99c5f269a14b3d8b8a09ec00b9f396

IO 线程作用:

5e9bdcd432b9452ab5e8d1b976bac44b

核心:不断的从 requestChannel 拿请求,然后调用 handle() 处理请求

  • handle() 是位于 KafkaApis 类中,可以理解为通过 switch,根据请求头里面不同的 apikey 调用不同的 handle 来处理请求
04aacbd6ce904799b302fbcac60a47e2

举例看下较为简单的处理 LIST_OFFSETS 的过程,即 handleListOffsetRequest,来完成一个请求的闭环

  • 红色箭头标示了调用链。表明处理完请求之后是塞给对应的 Processor 的
4030ae665fbb459fb7e0405ecc0e7ad7

更详细的总览图,源码分析到的类基本上都对应的加上去了:

503e2778eaa64774aa4957aa3bd93cac

4. 请求处理优先级

data-plane、control-plane 对应的就是数据类请求、控制类请求

为什么需要分两类请求呢?直接在请求里面用 key 标明请求是要读写数据啊还是更新元数据不就行了吗?

  • 我们想删除某个 topic,我们肯定是想这个 topic 马上被删除的,而此时 producer 还一直往这个 topic 写数据。删除请求排在第N个... 等前面的写入请求处理好了才轮到删除的请求。实际上前面那些往这个 topic 写入的请求都是没用的,平白的消耗资源
  • 进行 Preferred Leader 选举时,producer 将 ack 设置为 all 时,老 leader 还在等着 follower 写完数据向他报告呢,谁知 follower 已经成为了新 leader。而通知它 leader 已经变更的请求由于被一堆数据类型请求堵着呢,老 leader 还在等着,直到超时

那如何让控制类的请求优先被处理?优先队列?

  • 社区采取的是两套 Listener,即数据类型一个 listener,控制类一个 listener
  • 对应的就是网络通信模型,在 kafka 中有两套! kafka通过两套监听变相的实现了请求优先级,毕竟数据类型请求肯定很多,控制类肯定少,这样看来控制类肯定比大部分数据类型先被处理!

19. RaMQ无法路由的消息

RMQ无法路由的消息会去到哪里?

在 RabbitMQ 中,如果消息无法路由到任何队列,其去向取决于交换机的配置和消息的属性设置

  1. 如果配置了备用交换机,那么无法路由的消息将被发送到备用交换机中
  2. 如果将消息的 mandatory 标志设置为 true,且消息无法路由到任何队列,RabbitMQ 将会把消息返回给发布者,发布者通过回调函数(returnListener)接收消息。反之,如果 mandatory 标志设置为 false 则会直接将消息丢弃
  3. 如果配置了死信队列,则可以被投递到死信队列中

20. RaMQ中消息进入死信交换机

RMQ中消息什么时候会进入死信交换机?

  • 消息被拒绝(basic.rejectbasic.nack),且 requeue 参数为 false
  • 消息在队列中过期(TTL 到期),即超时未被消费
  • 队列达到最大长度(超过最大队列长度限制)

21. AMQP协议?

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种开放标准的应用层协议,定义了消息的格式、消息队列的行为以及客户端和消息中间件之间的通信方式

  • 作用:为不同平台上的消息传递提供标准化的、高效的和可靠的支持

1. 基本组成部分

  • Message Broker:消息中间件。负责接收、存储和转发消息。在 AMQP 中,RabbitMQ 是一个典型的消息代理实现
  • Exchange:交换机。负责接收消息并根据绑定规则将消息路由到相应的队列。AMQP 定义了多种交换机类型。eg:direct、topic、fanout、headers
  • Queue:队列。用于存储消息,消费者可以从队列中接收消息
  • Binding:绑定。定义交换机与队列之间的关系和消息路由规则
  • Message:消息。是在系统中传输的基本数据单元,包含消息头和消息体

2. 主要特性

  • 消息路由:AMQP 支持复杂的消息路由机制,通过交换机和绑定可以实现灵活的消息传递路径
  • 消息可靠性:通过消息确认、持久化、死信队列等机制保证消息的可靠性和持久性
  • 消息传递保证:提供至少一次(at least once)、至多一次(at most once)和正好一次(exactly once)三种传递保证
  • 消息顺序:确保消息在队列中的顺序传递

3. 优点

  • 跨平台:AMQP 是开放标准协议,可以在不同的编程语言和操作系统上实现和使用
  • 灵活性:支持多种消息路由模式,可以满足不同的消息传递需求
  • 高可靠性:通过消息确认、持久化和死信队列等机制保证消息传递的可靠性
  • 可扩展性:支持大规模的分布式消息传递系统,可以根据需求扩展系统容量

RabbitMQ 应用了 AMQP 协议,实现了 AMQP 协议的全部功能

22. RabbitMQ的事务机制?

RabbitMQ 支持的事务,不是我们理解的 ACID 的那类事务,而是保证消息发送的原子性

  • 它允许发布者在一个事务内发布多条消息,并通过提交(commit)或回滚(rollback)操作确保所有消息的原子性操作。这表示要么所有消息都被成功发布,要么没有消息被发布

使用方式其实和数据库的事务方式很像:

  1. 开始事务
  2. 发布消息
  3. 提交事务(如果成功)或回滚事务(如果失败)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TxProducer {
    private final static String QUEUE_NAME = "transaction_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            try {
                // 1. 开始事务
                channel.txSelect();

                // 2. 发布消息
                String message = "Hello World!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");

                // 3. 提交事务
                channel.txCommit();
            } catch (Exception e) {
                // 4. 回滚事务
                channel.txRollback();
                System.out.println(" [x] Transaction rolled back");
            }
        }
    }
}
  • 需要注意,事务消息的性能比较差,因为一开始事务需要请求 broker 并等待 broker 的响应,确认没问题后,再发送消息,且后续的提交事务也需要等待 broker 的确认,这个操作是同步的所以性能会比较低
  • 一般会采用 comfirm 模式替代,有批量 comfirm 和异步 comfirm 等机制

23. RabbitMQ主要角色

1. Producer(生产者)

生产者负责发送消息到 RabbitMQ 服务器。在发送消息时,生产者指定将消息发送到哪个交换机,并附带一个路由键

2. Exchange(交换机)

交换机接收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列。交换机本身不存储消息,它只是消息的转发中心

交换机类型:

  • Direct Exchange:根据消息的路由键精确匹配队列
  • Fanout Exchange:将消息广播到所有绑定的队列
  • Topic Exchange:根据路由键模式匹配队列,支持模糊匹配(eg:通配符 * 和 #)
  • Headers Exchange:根据消息头的属性匹配队列,而不是路由键

3. Queue(队列)

队列是消息的存储容器,消费者从队列中获取消息进行处理。队列可以有多个消费者,RabbitMQ 会根据负载均衡策略将消息分发给不同的消费者

4. Consumer(消费者)

消费者负责从队列中接收消息并进行处理。消费者可以是监听队列的应用程序,当队列中有消息时,RabbitMQ 会将消息分发给消费者

5. Broker(代理)

Broker 是 RabbitMQ 服务器本身,负责接收、存储、转发消息,并维护交换机、队列、绑定等资源。Broker 提供了一整套的消息队列服务,包括消息路由、存储、确认、重试等功能

6. 角色间数据流转流程

  1. 生产者发送消息:生产者创建一个消息,并指定将消息发送到某个交换机,通常会附带一个路由键
  2. 交换机路由消息:交换机根据绑定的路由规则决定将消息路由到哪个队列。不同类型的交换机会有不同的路由逻辑
  3. 队列存储消息:消息被路由到指定的队列。队列负责存储消息,直到消费者来消费
  4. 消费者消费消息:消费者监听某个队列,并从中获取消息进行处理。消费者可以是同步的也可以是异步的

在 RabbitMQ 中还有一个 VHost(虚拟主机)概念,它其实就是一个命名空间,将资源分组,使得不同虚拟主机之间的资源互不干涉,每个 VHost 拥有自己的交换机、队列、权限设置等

整体架构图:

f8ca4712bb074b76888115e600f91de6

24. RaMQ的key最大多少字节

RabbitMQ 的 routing key 和 binding key 的最大长度是多少字节?

RabbitMQ 的 Routing key(路由键)和 Binding key(绑定键)的最大长度都是 255 字节,超过这个长度的键会被 RabbitMQ 拒绝

扩展解释:

  • Routing Key(路由键):生产者发送消息时附带的一个键,用于交换机根据该键将消息路由到相应的队列
  • Binding Key(绑定键):在队列和交换机绑定时指定的键,用于定义交换机如何根据路由键将消息路由到队列

25. RaMQ保证消息顺序性

  • 首先生产者需要按序发送,确认上一条消息发送成功后再发送下一条,然后需要使得这类消息仅发送至一个队列中,且只有一个消费者消费这个队列,且消费者必须要消费完一条消息后,再消费另一条消息,不能是多线程并发消费,这样才能保证消息的顺序性
  • 如果要全局消息顺序,那么就是全局只使用一个队列,但是一般业务场景不会要求全局顺序,仅要求部分顺序(eg:按照订单或用户维度顺序即可,那么就可以利用 用户 ID订单 ID 对消息进行分区,即相同的用户(订单)的消息发往一个队列,保证局部顺序可提高并发度)

26. RabbitMQ工作模式

1. 简单模式

(Simple Queue)

  • 作用:生产者将消息发送到队列,消费者从队列中获取消息。每条消息只能被一个消费者消费
  • 适用场景:适用于消息量较小且不需要复杂路由逻辑的场景
1df6a51cf7d943d08f76b503a3f02ba9

2. 工作队列模式

(Work Queue)

  • 作用:通过多个消费者分担任务,达到负载均衡的目的。消息在多个消费者之间分配,每条消息只能被一个消费者处理
  • 适用场景:适用于需要并行处理大量独立任务的场景,如图像处理、视频转码等
24a7836ee45846059fb1aafc6d8530d6

3. 发布/订阅模式

(Publish/Subscribe)

  • 作用:消息发布到交换机,所有绑定到该交换机的队列都会收到消息。常用的交换机类型是 fanout
  • 适用场景:适用于广播消息的场景。eg:日志处理、事件通知等
23c28718126a417dafc9817f17872ea2

4. 路由模式

(Routing)

  • 作用:消息发布到交换机,并根据路由键将消息发送到相应的队列。常用的交换机类型是 direct
  • 适用场景:适用于需要对消息进行分类处理的场景,如日志系统中按严重级别分类的日志处理
c85d1f8aa151440fbbc7ac85a806fc5c

5. 主题模式

(Topics)

  • 作用:消息发布到交换机,并根据路由键模式(通配符匹配)将消息发送到相应的队列。常用的交换机类型是 topic
  • 适用场景:适用于需要根据复杂的路由规则对消息进行分类处理的场景。eg:日志系统中按模块和严重级别分类的日志处理
bf5260cc9bc149598f6d1cc975e6e40e

6. RPC模式

  • 作用:实现远程过程调用,客户端发送请求消息到队列,服务器端处理后返回响应消息
  • 适用场景:适用于需要实现远程服务调用的场景。eg:微服务之间的通信
1b135349e89149ab8ff96c76b6a5d2e6

7. 发布确认模式

  • 作用:用于确保消息已经成功地发布并被 broker 接收。它提供了一种轻量级的方法来确认消息的持久性和可靠性,适用于需要高可靠性消息传递的场景
  • 适用场景:金融交易、订单处理等确保消息被可靠地传递和处理,防止丢失的场景

27. RabbitMQ缺点

  1. 性能问题
    • 当队列中堆积了大量消息时,RabbitMQ 的性能会急剧下降。因为它需要处理和存储这些消息,尤其在内存和磁盘资源有限的情况下,性能影响更为明显
    • 相比 Kafka 和 RocketMQ,它的性能通常是最差的。尽管它可以处理中小企业的需求,但在处理大规模消息时,效率可能不高
  2. 开发语言限制
    • RabbitMQ 是用 Erlang 语言开发的。对于希望通过添加自定义功能或进行二次开发的团队来说,有点困难,毕竟语言比较小众,大部分程序员没有 Erlang 开发经验
  3. 功能性不足
    • 相比于 Kafka 和 RocketMQ,RabbitMQ 在一些高级功能上可能不如它们丰富(eg:Kafka 在大数据和实时计算场景中有更成熟的应用)。而 RocketMQ 在处理消息顺序和事务消息方面表现更好

28. RaMQ发送消息的过程?

  • 生产者发送消息时需要指定 exchange 和 route_key,根据 AMQP 协议格式构建其他内容,接着序列化为二进制格式发送给 Broker,由 Broker 的 exchange 根据路由规则将数据分发到不同的 Queue 中

29. RaMQ消息重复投递及消费

RabbitMQ 如何避免消息的重复投递以及重复消费?

  • 首先 RabbitMQ 有消息确认机制,所以生产者发送消息可等待 RabbitMQ 响应来确认消息是否投递成功。但如果消息被 RabbitMQ 存储后,返回生产者响应的这个过程由于网络原因超时了,那么生产者由于没接收到这个确认,会再次发送此消息,因此消息就已经重复了,这个问题无法避免
  • 消费者消费消息时,当消费完毕也需要通知 RabbitMQ,此时如果又因为网络原因,导致 RabbitMQ 没收到通知,那么也会导致消息重投,这个问题也无法避免
  • 所以不论怎么样,RabbitMQ 都无法保证消息不重复,因此只能保证消费重复的消息和消费一次消息得到的结果是一致的,这就是幂等性问题

1. 如何实现幂等操作

  1. 给消息设置一个全局唯一的 ID。eg:订单的订单号或者 UUID 之类的
  2. 消费者在消费消息时,添加分布式锁(保证互斥的都行),接着从数据库流水表中查看是否已经有这个唯一 ID 的消费记录
    • 如果有的话说明已经被消费过了,即跳过后续的业务执行即可
    • 如果没有,说明是第一次消费这条消息,那么就执行业务逻辑,且将其写入数据库流水表中(和业务处理在同一个事务下,这样保证业务执行成功则流水表一定被插入),然后再释放锁
  3. 当然加锁不是必须的,通过数据库唯一索引也可以避免重复数据的产生。只不过加锁可以解决高并发下的资源消耗问题(如果没有锁,每个线程都能执行完业务,然后等最后事务提交那一刻由数据库拦截,最后只有一个事务成功提交,其他都需要回滚,这挺费时和浪费资源)

30. RaMQ保证消息持久化

  1. 队列/交换机的持久化
    • 将队列和交换机的 Durable 设置为 True,这样创建的队列和交换机都是持久化的,即 RabbitMQ 服务器重启后,队列和交换机的元数据会保留
  2. 消息的持久化
    • 消息的持久化是在生产者发送消息时,将 Delivery Mode 设置为 2,表示消息会持久化消息,这样的消息在发送到持久化队列后,服务器重启后消息也还存在着
    • 扩展下 Delivery Mode
      • 默认此参数为 1,表示非持久化消息,当消息发送至 broker 时,不会将其写入磁盘,如果消费者还未消费到这条消息,broker 宕机了,那么消息可能就丢失了
      • 参数设置为 2,表示持久化消息,会写入到磁盘中,但是这个过程实际上是异步的,所以正常情况下即使 broker 宕机了,也是有可能丢消息的,只是能保证大部分消息的持久化

31. RaMQ中的Channel

RabbitMQ 在网络层有 Connection 和 Channel 两个概念

  • Connection 其实对应的就是一条 TCP 连接,而 Channel 是 Connection 中的虚拟连接,一个 Connection 可以创建多个虚拟连接
  • 在 RabbitMQ 中,客户端和 Broker 之间的通信都是基于 Channel 维度的,这样可以减少实际的 TCP 连接数,节省系统资源的使用
985b83e7b23b4a14a68a5f27b081e2cc

32. RaMQ消息如何进行路由

在 RabbitMQ 中,消息的路由是通过交换机(Exchange)来实现的。交换机根据不同的路由规则将消息分发到一个或多个队列

消息路由的基本流程:

  1. 生产者将消息发送到指定的交换机
  2. 交换机根据路由键(Routing Key)和绑定键(Binding Key)将消息发送到一个或多个队列当中

路由键(Routing Key)可以是任意字符串,也可以根据交换机的类型以及绑定的规则进行匹配。

1. 四种交换机类型

  1. Direct Exchange(直连交换机)
    • 通过完全匹配路由键(Routing Key)将消息路由到队列。消息的路由键必须与队列绑定时指定的绑定键(Binding Key)完全匹配
  2. Fanout Exchange(扇出交换机)
    • 将消息广播到所有绑定的队列,而不考虑路由键。适用于广播消息的场景
  3. Topic Exchange(主题交换机)
    • 通过模式匹配路由键,将消息路由到队列。路由键可以包含通配符 * 匹配一个单词,# 匹配零个或多个单词
  4. Header Exchange(头交换机)
    • 通过匹配消息的头部属性(Headers)将消息路由到队列。路由规则基于消息的头部属性,而不是路由键
    • 如果消息没有匹配的队列的话,消息会被丢弃或返回给生产者,根据生产者的配置决定消息的去留

33. RaMQ的Queue最多消息?

RaMQ 上一个 Queue 最多能存放多少条消息?

  • RabbitMQ 队列的消息数量没有固定的限制,但是可以设置队列的最大长度。这个最大长度可以是消息的数量限制(通过声明队列时使用的 x-max-length 参数来设置消息数量上限),也可以是消息体总字节数的限制(使用 x-max-length-bytes 参数来设置总字节数上限)
  • 如果同时设置了这两个参数,两个限制都将适用,队列将在达到任一限制时采取措施
  • 这里的措施就是当队列达到最大长度时,新发送的消息会被拒绝。如果开启了 publisher confirms,生产者会收到 basic.nack 消息来告知拒绝。或丢弃消息或移动死信队列

34. RaMQ保证高可用?

要实现 RabbitMQ 的高可用,单机部署肯定是不合适的,因为单机故障,消息必然无法发送和消费。三种集群模式:

1. 标准集群

  • 多台 RabbitMQ 服务器通过网络连接组成一个集群,所有节点共享元数据(eg:队列、交换机、绑定等),但一个消息仅存储在单个节点上
  • 客户端可以连接到集群中的任一节点,因为元数据是共享的,所以任何一个节点都知晓具体消息在哪个节点上,因此可以实现负载均衡。但消息仅会存储在一个节点上不会自动复制到其他节点,所以如果某个节点故障,那么实际上这部分消息就无法消费了,但别的节点还可以提供服务
0fddad7a744144b3a02b4059fc7ab999

2. 镜像集群模式

  • 相比于标准模式,镜像队列模式下,队列的主副本和一个或多个副本会同步到不同的节点上,即队列的消息会复制到多个节点,如果主队列所在的节点故障,副本节点会自动接管,保证队列的可用性
bad57f84ee194a75a9096f4f80d3b48a

3. Federated集群模式

  • 其实就是多活模式,主要用于异地的数据复制。eg:在北京、上海两个机房都部署 RabbitMQ 集群,它们之间依赖 RabbitMQ 的 federation 插件实现持续的可靠的 AMQP 数据通信
    • 如果北京机房出了故障,上海的机房还可以顶上
59eb14a4a9da4893acd0623f4bd44cce

35. RoMQ优缺点

优点:

  • 单机吞吐量达到了 10 万级,并且支撑 10 亿级别的消息堆积问题,不会因为消息堆积导致性能下降
  • 其底层的开发语言是 Java,可以较方便地结合公司业务进行二次开发
  • 消息可靠性非常高,在经过参数优化配置之后,消息几乎可以实现 0 丢失,并且支持扩展,适合使用分布式架构
  • 其背后有阿里背书,经过了多次双 11 的考验,如果业务中有金融、高并发的场景,非常适合使用 RocketMQ
  • 功能丰富,支持延迟消息、事务消息等,满足各种复杂需求

缺点:

  • 与 Kafka 相比,RocketMQ 的生态系统和社区相对较小,在一些特定场景和工具支持上不如 Kafka
  • 学习曲线不低,运维和管理相比而言会比较复杂

36. RoMQ自己实现NameServer

为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?而选择自己实现 NameServer?

RocketMQ 在开发的时候充分吸取了前人的教训,特意轻量化注册中心的实现

  1. 设计简洁且专用
    • RocketMQ 的 NameServer 设计相对简单,专门用于 RocketMQ 的需求。相比于通用的 Zookeeper,NameServer 更加轻量级且易于部署和维护
  2. 高可用性
    • RocketMQ 的 NameServer 是无状态的,多个 NameServer 实例之间是对等的,可以通过 DNS 或者 VIP 进行负载均衡,天然具备高可用性。而 Zookeeper 是一个强一致性系统,对节点之间的同步有严格要求,在某些极端情况下(如网络分区),Zookeeper 的可用性可能受到影响
  3. 性能优化
    • NameServer 只处理简单的配置和路由信息,不涉及复杂的状态同步和一致性协议,相比 Zookeeper 的 ZAB 协议,性能更高
    • 因为 Zookeeper 的写是不可扩张的,整个 Zookeeper 集群的写入只能在 Leader 节点,因此对于频繁写入操作,压力都打在 Leader 上,不好扩展。对于 RocketMQ 的大规模分布式消息系统,性能优化是非常重要的
  4. 降低依赖性
    • 使用自研的 NameServer 可以降低对外部系统的依赖,简化系统架构,减少维护复杂度。同时,RocketMQ 可以完全掌控注册中心的实现和优化
  5. 定制化需求
    • RocketMQ 需要实现一些特定的功能和优化(eg:针对消息队列的动态路由和管理),使用自研的 NameServer 可以更好地满足这些定制化需求,而不必受限于 Zookeeper 的实现和接口

37. Kafka性能高?

  • 顺序写:磁盘顺序写的性能是远远大于随机写的性能的。Kafka 采用不断追加写文件的方式来实现顺序写,从而提高磁盘性能
  • 页缓存:这个主要使用到了 Linux 系统底层的一个机制,即 Page Cache,又称页缓存,当消息写入到 Page Cache 之后立刻返回,然后等到系统的刷盘线程进行刷盘操作之后,页缓存将对应的内容一次性写入磁盘。之所以快是因为我们写入内存的速度是远远大于写入磁盘的速度的,然后我们写入内存之后,再将内存中的东西再批量一次性写入磁盘,这个过程就是聚集写的过程,从一定程度上减少了 IO 的次数,两种机制结合起来,从一定程度上提高了 kafka 的刷盘性能
  • Kafka 支持批量接收和发送消息,并且支持消息在压缩之后进行接收和发送。这样从一定程度上就减少了网络传输的次数和负担,提高了 Kafka 消息读写以及消息进行网络传输的效率
  • 零拷贝(重点):也是 Kakfa 和 RocketMQ 高效的原因。Kakfa 的 Borker 传递数据给消费者的过程使用了零拷贝技术,其底层主要就是使用了 sendfile 系统调用,减少了系统用户态与内核态的上下文切换以及数据拷贝,从而提高 Kafka 数据拷贝的性能
  • 采用分段与索引的策略来提高性能。即利用偏移量和时间索引文件实现快速消息查找,从而提高 Kafka 消息查找的效率

38. Kafka应用场景

Kafka 的吞吐很高,典型场景:

  1. 日志聚合:Kafka 可以用来收集应用日志、服务器日志、数据库日志等,将这些日志集中到一个地方进行分析和监控
  2. 监控数据:实时收集系统监控数据(如 CPU 使用率、内存使用率、网络流量等)并通过 Kafka 进行分析和报警
  3. 数据管道:Kafka 可以作为数据管道,将数据从一个系统传输到另一个系统。eg:将数据从数据库传输到数据仓库、数据湖、Elasticsearch 等
  4. 流处理相关:结合流处理框架(Apache Flink、Apache Storm、Apache Spark Streaming 等),使用 Kafka 作为数据流平台,进行实时数据处理和分析

可以看到一般都是大数据、实时大流量数据的场景用 Kafka,而业务的消息用 Kafka 的比较少,更多使用的是 RocketMQ 之类的,因为相比而言 RocketMQ 业务功能会多很多。eg:延迟消息、事务消息等