• 周六. 7 月 27th, 2024

    Kafka高可用,高吞吐量低延迟的高并发的特性背后实现机制

    root

    2 月 2, 2021 #kafka, #zookeeper

    1 概述

    Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式消息系统,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

    2 消息系统介绍

    一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式

    2.1 点对点传递模式

    生产者发送一条消息到queue,只有一个消费者能收到 ,这种架构描述示意图如下:

    2.2  发布-订阅消息传递模式

    发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息,这种架构描述示意图如下:

    3 Kafka HA设计解析

    3.1 如何将所有Replica均匀分布到整个集群

    为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。

    Kafka分配Replica的算法如下:

    1.将所有Broker(假设共n个Broker)和待分配的Partition排序

    2.将第i个Partition分配到第(i mod n)个Broker上

    3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

    3.2 消息传递同步策略

    Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication 数量为多少,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader向Producer发送ACK(前提消息生产者send消息是设置ack=all或-1)。

    为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。

    Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。

    Kafka Replication的数据流如下图所示:

    4 高可用机制

    4.1 kafka副本

    分区中的所有副本统称为AR(Assigned Repllicas)存储在zk中,所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集

    1、leader副本:响应客户端的读写请求

    2、follow副本:备份leader的数据,不进行读写操作

    3、ISR集合表:leader副本和所有能够与leader副本保持基本同步的follow副本,如 果follow副本和leader副本数据同步速度过慢(消息差值超过replica.lag.max.messages阈值 )或者卡住的节点( 心跳丢失超过replica.lag.time.max.ms阈值 ),该follow将会被T出ISR集合表

    ISR集合表中的副本必须满足的条件

    1. 副本所在的节点与zk相连
    2. 副本的最后一条消息和leader副本的最后一条消息的差值不能超过阈值replica.lag.time.max.ms如果该follower在此时间间隔之内没有追上leader,则该follower将会被T出ISR

    4.2 kafka通过两个手段容错

    • 数据备份:以partition为单位备份,副本数可设置。当副本数为N时,代表1个leader,N-1个followers,followers可以视为leader的consumer,拉取leader的消息,append到自己的系统中
    • failover
    1. 当leader处于非同步中时,系统从followers中选举新leader( kakfa采用一种轻量级的方式:从broker集群中选出一个作为controller,这个controller监控挂掉的broker,为该broker上面的leader分区重新选主 )
    2. 当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入 ISR

    5 kafka高吞吐机制

    5.1、零拷贝

    通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据

    5.2、分区

    kafka中的topic中的内容可以被分为多分partition存在,每个partition都以segment顺序存储方式存放可以提升读写效率,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力

    5.3、批量发送

    kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka

    1. 等消息条数到固定条数
    2. 一段时间发送一次

    5.4、数据压缩

    Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩 压缩的好处就是减少传输的数据量,减轻对网络传输的压力

    6 kafka防止消息丢失和重复

    6.1 ack简介

    Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。

    ack有3个可选值,分别是1,0,-1,ack的默认值就是1

    ack=1,简单来说就是,producer只要收到一个leader成功写入的通知就认为推送消息成功了。

    ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功

    ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。

    6.1 生产者丢失和重复

    6.2 消费者丢失和重复

    7 kafka消息如何确定发送到Partition分区

    7.1 生产者在发送消息时指定一个partition

    ProducerRecord(String topic, Integer partition, K key, V value)
    Creates a record to be sent to a specified topic and partition
    ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
    Creates a record to be sent to a specified topic and partition
    ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
    Creates a record with a specified timestamp to be sent to a specified topic and partition
    ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
    Creates a record with a specified timestamp to be sent to a specified topic and partition
    ProducerRecord(String topic, K key, V value)
    Create a record to be sent to Kafka
    ProducerRecord(String topic, V value)
    Create a record with no key

    7.2 kafka的消息内容包含了key-value键值对,key的作用之一就是确定消息的分区所在。默认情况下, kafka 采用的是 hash 取模的分区算法即 hash(key) % partitions.size

    7.3 既没指定partition也没有key 则”metadata.max.age.ms”的时间范围内轮询算法选择一个

    kafka消息如何确定Partition分区到Consumer

    如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。

    如果所有的消费者实例在不同的消费组中,每条消息记录会广播到消费组中所有的消费者进程。

    如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。

    c1、c2在一个group A中消费不同的P,同样group B也是这样,保证消费中某个节点丢失可以正常消费

    9 kafka强依赖于ZooKeeper的工作原理

    10 kafka如何保证数据的顺序消费——案例

    10.1 关于顺序消费的几点说明:

    ①、kafka的顺序消息仅仅是通过partitionKey,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。

    ②、除了发送消息需要指定partitionKey外,producer和consumer实例化无区别。

    ③、kafka broker宕机,kafka会有自选择,所以宕机不会减少partition数量,也就不会影响partitionKey的sharding。

    10.2 问题:在1个topic中,有3个partition,那么如何保证数据的消费?

    1、如顺序消费中的第①点说明,生产者在写的时候,可以指定一个 key, 比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

    2、消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。

    3、但是消费者里可能会有多个线程来并发来处理消息。因为如果消费者是单线程消费数据,那么这个吞吐量太低了。而多个线程并发的话,顺序可能就乱掉了。

    10.3 解决方案 

    写N个queue,将具有相同key%的数据都存储在同一个queue,然后对于N个线程,每个线程分别消费一个queue即可。
    注:在单线程中,一个 topic,一个 partition,一个 consumer,内部单线程消费,这样的状态数据消费是有序的。但由于单线程吞吐量太低,在数据庞大的实际场景很少采用。

    来源:https://my.oschina.net/u/4938149/blog/4939207

    root