我来说一下Kafka

  这些天正在准备一个技术分享的PPT,在MQ上想找一个RabbitMQ的同行作为对比。目前看来Kafka曝光率很高的,而且之前在技术沙龙的时候有同学说Kafka经常会发生消息消费延迟的问题,本着好学求实的态度(当然最主要乘着老婆孩子都回老家,自己有一些空闲时间了),找了一本Kafka的教材看了一遍,对其大致的结构原理有了一定的认识。
  先说结论吧:如果你要处理的是日志、用户行为记录等非核心数据,而且这些数据流量还挺大,那么用Kafka是比较合适的,毕竟他最初设计出来的初衷就是应对这类场景的;如果你是要处理交易、金融这类核心数据,对数据防丢失、尽量减少重复消费的可能性,那么Kafka真的不适合,要让Kafka实现这类需求也不是不可以,不过要对服务端做很多复杂的配置,而且客户端(包括生产者、消费者)也要做一些额外的编程手段,而这些配置、手段加上之后Kafka的吞吐量肯定会下降很多,那么此时再看Kafka的选择真的是得不偿失了。需要可靠性更强的MQ还是用RabbitMQ吧,他才是金融领域的大神。

一、Kafka原理简介

  Kafka是通过主题(topic)对消息进行分类的,然后主题被分成若干个分区,每一个分区就是一个提交日志,消息以追加的形式写入分区(所以写入的效率是很高的),然后消费者按照先入先出的顺序消费消息。分区是 Kafka最重要的概念,分区可以跨主机存在,通过分区可以实现数据的高性能写入读取、数据冗余和系统的伸缩。
  生产者在生产消息的时候,如果不指定消息键,那么Kafka会通过Round-Robin算法将消息均匀的分布到topic的所有分区上面,这个时候的性能应该是最好的;而如果发布消息的时候提供消息键,则Kafka客户端上的分区器会使用一种不依赖Java和系统升级的散列算法将其映射到一个指定的分区上面去,虽然通过这种方式可以保证消息一定的有序性,但是会大大降低消息的吞吐量,而且如果映射的分区不可用,那么此时发送消息就会出错误。
  消息以追加的形式写入分区后,消息会在设定的保留策略之内(策略是可配置的,比如限制保留时间长度,或者总共消息的大小)一直保存着,消费者通过消息偏移量记录该分区的消息将要消费的位置,消息偏移量是一个不断递增的整数值,在创建的时候Kafka会把它添加到消息里面,并在每个分区中保持唯一。因为消息偏移量标示了将要被消费的消息位置,在消费者崩溃、关闭、重启时都要保留该值的正确性,属于Kafka中重要关键数据,所以是保存在ZooKeeper里面的,而新版本中该偏移保存在Kafka集群自身了。
kafka-cluster
  Kafka的服务器也被称为Broker,其负责接收生产者的消息、为消息设置偏移量、然后提交到磁盘固化保存,同时Broker还能针对消费者的分区请求返回消息体作为响应。多个Broker可以组成集群,其数量可以根据单个Broker的数据容量、单个Broker网络吞吐量、分区的副本数目等因素计算得到。Kafka主题的每个分区可以在不同的Broker产生副本(即分区复制),而借助ZooKeeper所有的分区实例只有一个作为Leader,其余副本作为Fellower存在。

二、Kafka的特点和槽点

2.1 生产者

  Kafka消息的发送有发射不管、同步发送、异步发送的接口,不同的设置会导致生产者的可靠性、效率有巨大差异。
  acks表示需要有多少个分区副本收到消息,生产者才认为消息发送成功的:acks=0表示生产者在成功写入消息之前不会等待任何来自服务器的响应,这意味着如果某些因素导致服务器没有收到消息,生产者也不会发觉;acks=1表示只要集群的首领接收到消息,则生产者就会得到来自服务器的成功响应,如果首领节点崩溃、首领节点还未选举等情况下生产者就会收到一条错误响应,不过一个没有收到消息的节点成为新首领,那么消息还是可能会丢失(?);acks=all等待所有参与复制的及诶单收到消息,生产者才会认为消息是成功发送的,这种情况最安全,但是性能受限于最慢的分区副本的响应速度。
  生产者可以配置retries,如果发送失败生产者会(默认)每100ms重试一次,这种重发的逻辑不用业务端手动去写,还是挺方便的。如果对消息顺序有严格的要求,又想让客户端自动失败自动重发,则可以设置max.in.flight.requests.per.connection=1,此时当生产者重试发一条消息的时就不会尝试重发其他的消息了,相对于本生产者来说是有序的。

2.2 分区

  Kafka的分区可以动态增加,但是分区越多占用的内存就会越多,同时Leader选举也需要更长的时间才能完成,不过这倒也不是问题,但是Kafka的每个分区都只允许有一个消费者使用让人感觉很奇怪:这意味着消费者的最大并发量取决于分区的数目,创建多于分区数目的消费者会让多余的消费者闲置,而消费者小于分区数目的时候会有一个消费者消费对个分区的情况,这很可能会造成消费者负载不均衡的情况。相比而言RabbitMQ所有消费者都消费同一个队列,可以仍以扩充或者缩减消费者的数量,而且每个消费者都可以尽力消费消息,实现了最佳的负载均衡。
  虽然分区数目是可以动态增加(但不能缩减)的,但是如果生产者使用了消息键对消息进行散列映射,那么改变分区数目会导致原先的映射关系发生改变,因此如果需要消息键进行映射,那么分区数目实现就要规划好,争取永远不增加新分区。说到这里我还想到,如果使用消息键进行消息映射,那么特定类型的消息会被映射到同一个分区中,因为每个分区只能被一个消费者消费,那么消息分区更容易造成消费者负载不均衡的,而且如果分区生产速度超过单个消费者的消费速度又该怎么办?难道这个消费者只能做个代理取消息,将消息的处理放到其他的线程组里面去实现么?这样更加复杂化消费者的逻辑了。

2.3 消费者再均衡

  Kafka的消费者有消费者群组的概念,一个消费者群组订阅的是同一个topic的消息,群组中的每一个消费者都消费相同topic的消息。我们可以为同一个topic创建多个消费者群组,每个消费者群组都可以完整的消费该topic的所有消息。
  然后Kafka最头疼的就是消费者分区再均衡,对其真是又爱又恨:当一个消费者关闭、发生崩溃、或者和群组协调器心跳超时(默认3s)的时候,他就离开群组,他原本读取的分区就会有群组其他消费者来自动承担,当需要增加消费能力而增加消费者数目的时候,或者增加新的分区的时候,会导致消费者所有权的变化,这就叫做再均衡。再均衡是实现高可能、灵活伸缩性的基础,只不过在发生再均衡的时候消费者无法消费消息,整个群组会处于不可用的状态。
  除此之外,Java不恰当的垃圾回收配置也会导致集群产生停顿,甚至造成broker与ZooKeeper之间的连接断开、分区副本不同的状态,进而导致状态切换和甚至选举操作,需要注意这种情况。

2.4 消费者提交和偏移量

  Kafka和一般的MQ不同的是,他不会在消费者确认消息后删除该消息,而且会将消息一直保留直到过期策略生效才会将消息删除。消息在分区中按顺序排列,Kafka负责管理偏移量标识那些消息已经被消费过了,接下来需要消费哪些消息。
  如果消费者稳定存在倒没什么问题,而当消费者退出或者新的消费加入的时候,就会触发再均衡,此时就会涉及到偏移量一致性的问题,客户端需要提交偏移量来标识消费完成,而如果消费了一些消息但是在提交偏移量之前崩溃了,那么新消费者会重新消费那些没有提交的消息。Kafka对偏移权重的提交也提供了很多方法:
  enable.auto.commit可以打开自动提交,默认每5s消费者会自动把从poll()方法接收到的最大偏移量提交上去,这种情况是不会顾及哪些消息是真正被处理了的,所以最好调用poll()之前确保当前的消息都被处理完了;同时,如果在5s间隔之前发生了再均衡,那么最近的偏移就不会被提交,前面消费的消息就会被重复消费。
  如果上面的参数为false,那么就需要应用程序手动进行提交偏移量了。使用commitSync()会执行同步提交,该API会自动提交poll()返回的最新偏移量,如果成功就立即返回,失败就抛出异常。
  类似的还有一个commitAsync()异步提交接口,这个接口有一个坑,就是新提交的偏移量会覆盖旧提交的偏移量,即使新提交的偏移量值小于旧提交的偏移量,也会进行偏移量的覆盖。既然偏移量是递增的那完全可以在服务端进行累计提交的逻辑自动忽略偏移量小的提交,但是居然没有做,只能业务层在回调函数中记录最大提交的偏移量来过滤不需要的提交。

2.5 分区复制

  复制功能是Kafka实现分布式高可用的核心。主题的每个topic可以有若干个分区,每个分区可以有多个副本,这些副本被保存在Broker上。每个分区只有一个Leader,生产者和消费者的请求都会经过这个副本,而其余的副本都是Fellower副本,他们的任务就是从Leader那里复制消息,并保持和Leader的一致状态。而且Kafka会保持Broker之间的Leader分布是均衡的。
  Fellower副本为了和Leader副本保持一致,他会和消费者读取消息请求相同方式向Leader请求消息,其请求消息中包含了Fellower想要获取的消息偏移量,并且这个偏移量起到了累计确认的作用,标识着之前的消息都被成功接收了。如果Fellower在10s内没有请求到任何新消息,或者在10s内没有请求到最新的数据,那么它就被认为是不同步的,其不会成为新Leader的候选者,而不同步的副本通过和ZooKeeper重新建立连接再同步最新消息,是可能转化成同步状态的副本的。

三、小结

  之前很多Kafka的用户抱怨Kafka会有消息消费延迟的现象,在了解过其原理之后,在消费者崩溃、关闭、心跳超时、新加入、topic和分区变化等情况都会导致再均衡,此时Kafka的消费者群组是不可用的状态,会导致消费延迟;同时在集群负载过高、GC负担重的时候,也可能导致服务不可用的状态;再次和ZooKeeper交互的抖动也会导致复制副本状态的切换甚至重选举的操作,进一步导致消费的延迟。总体来讲,在Kafka中消费延迟是比较容易出现的事件,自然就见怪不怪了。
  Kafka的配置参数太多太复杂了,一个满足业务需求、资源充分利用的集群还真不简单折腾出来,对于没有经验的新用户来说上生产是比较危险的,而且出了问题配置参数也比较难排查。
  在Kafka的默认配置中,消息的丢失和重消费是比较容易发生的,如果要确保消息准确可靠那么很多参数和使用模式都需要作出变更,其吞吐量必定有很大缩水,原本的优势也就大打折扣了。不过这套MQ最初设计就是用来做日志和行为记录的,所以在这方面使用还是挺合适的,强求他用在核心消息上真的不太推荐。

参考