Kafka
常见问题
## Java API
- 一个独立的Kafka服务器被称作broker,broker是集群的组成部分,每个集群都有一个broker同时充当了集群控制器的角色
- kafka的主题被分为多个分区,分区存储在磁盘中
- kafka的消息特点:消息会保留一段时间,即使应用程序下线,消息仍然会保存在kafka里,是基于磁盘的数据存储。
- API使用:
- producer
- record的参数,topic , key ,value key可以省略,当省略时就是一个没有key的value,key一般用于把相同key的数据写入同一个分区里
- send默认是发送并忘记,send会返回一个Future对象使用.get方法得到RecordMetadata对象,可以获取消息的偏移量
- 异步发送.send(record,回调函数类),回调函数需要实现Callback类,并且重写其中的方法
- 配置设置:
1. acks = 0 生产者无需等待服务器的响应,但消息丢失时不会知晓, = 1,只要集群的首领节点收到即可,= all需要所有参与复制的节点都受到消息才会受到服务器的响应,不建议
2. buffer.memory设置producer内存缓冲区大小,
3. compression.type设置消息的压缩格式,默认不会压缩
4. retries重试次数
5. batch.size 当多个消息被发送到同一个分区时,producer会把他们放在一起,当作一个批次,这个参数指定一个批次可以使用的最大内存
6. max.in.flight.requests.per.connection 生产者在接收到服务器响应之前可以发送多少个消息,设置为 1 时可以保证消息是按照顺序写入的,适合在银行等严格要求顺序的时候使用
- 自定义分区:
public class DemoPartitionser implements Partitioner {
private String myKey;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
// 获取分区数
int partitionNum = partitionInfos.size();
if((keyBytes == null) || (!(key instanceof String))){
throw new IllegalArgumentException("key is null or not a string");
}
// 如果key为key,则分配到最后一个分区
if("key".equals(key)){
return partitionNum;
}
//其余的消息都分配到最后一个分区
return Math.abs(key.hashCode()) % (partitionNum - 1);
}
@Override
public void close() {
}
/*
* @description:
* @author bronya
* @date: 2024/4/20 14:50
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
myKey = (String) map.get("key");
}
}
- consumer:
- API:
1. consumer类的创建方式和producer一致,但是推荐指定 group.id来指定属于哪个群组,
2. consumer.subscirbe()订阅相关的topic,同时支持正则表达式 consumer.subscribe(Arrays.asList("topic1", "topic2"));
3. consumer.poll(xxx) 参数是轮询的阻塞时间,会等待broker返回数据
4. 记得.close()
5. 建议一个线程中只有一个消费者
- 配置:
- 大部分和producer类似
- fetch.min.bytes消费者从服务器获取记录的最小字节数,当数据量大于等于这个值才会被返回给消费者
- 提交和偏移量:消费者往_consumer_offset特殊主题发送消息,消息包含每个分区的偏移量,当触发再均衡时,消费者会读取每个分区组后一次提交的偏移量,然后从偏移量指定的地方开始处理
- 提交方式:
1. 自动提交:每隔一段时间自动提交一次
2. 提交当前偏移量:设置auto.commit.offset为false.然后使用commitSync()提交偏移量,会提交最新的一次由poll获得的偏移量
3. 异步提交:commitAsync(),也可使用回调函数来处理
4. 可以提交特定偏移量,而不是最后一次poll得到的偏移量
- 通过实现 ConsumerRebalanceListener接口来定义在consumer失去对分区的所有权时需要处理的事件,可在这里使用.seek()方法加上自定义的函数来实现从数据库中获得偏移量
## Kafka本体
### 优势
高吞吐,高性能,持久化(将消息持久化到磁盘,通过将数据持久化硬盘,以及follower节点来防止数据丢失)
缺点是异步的不适合电商场景
### 架构设计
- Producer
- Consumer
- Topic 主题,由用户定义并配置在Kafka服务器,建立Producer和Consumer之间的订阅关系,身缠这发送消息到特定的Topic下,消费者从这个Topic下消费消息。逻辑概念,相当于数据库中的表
- Partition 消息分区,一个Topic可以分为多个partition,partition是一个有序的队列,partition的每条消息都会被分配一个有序的id(offset) 物理实际概念,每一个partition对应一个log,producer生产的数据会不断地追加到该log文件末端,且每条数据都有自己的offset。系哦啊飞着组中的每个消费者都会实时记录自己消费到哪个offset
- Broker 一台Kafka就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic
- ConsumerGroup 消费者组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者获取部分消息。
- 一个Partition对应一个唯一的文件夹,文件夹下使用的是Segment File的存储方式进行存储。将大文件拆成小文件,分为索引未见和数据文件
### 基本流程
- producer先从zookeeper的broker/**/state节点找到该partiton的leader
- producer将消息发送给该leader
- leader将消息写出本地log
- follower从leader pull 消息
- 写入本地log,后向leader发送ACK
- leader收到所有ISR中的replication的ACK,增加HT(high watermark ,最后commit 的offset)并向producer发送ACK
#### 生产过程
1. Producer创建时,先创建一个Sender线程并且设置守护线程
2. 生产的消息经过拦截器->序列化器->分区器,将消息存在缓冲区
3. 批量发送的条件:缓冲区数据大小达到batch.size或者linger.ms达到上限
4. 发往指定分区,最后到达broker
- acks = 0,消息放到缓冲区就认为发送完成
- acks = 1消息写到主分区即可完成,如果主分区收到消息之后宕机,副本分区来不及同步消息,消息就会丢失
- acks = all 等待所有的ISR副本的缺人记录
5. 如果设置了重试次数并且大于0,就会进行重试
6. 成功,返回元数据给生产者
ISR(In-Sync Replicas)是指与领导者副本保持同步的副本集合
#### 生产者Offset
消息写入的时候,每一个分区都有一个offset,即每个分区的最新最大的offset。
#### 消费者Offset
不同消费组中的消费者可以针对一个分区存储不同的Offset,互不影响。
#### LogSegment
日志文件的组成部分
#### Leader选举
- Kafka会在Zookeeper上针对每个Topic维护一个成为ISR的集合
- 当集合中副本都跟Leader同的副本同步之后,kafka才会认为消息已提交
- 只有这些跟Leader保持同步的Follower才应该被选作新的Leader
public class DemoPartitionser implements Partitioner {
private String myKey;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
// 获取分区数
int partitionNum = partitionInfos.size();
if((keyBytes == null) || (!(key instanceof String))){
throw new IllegalArgumentException("key is null or not a string");
}
// 如果key为key,则分配到最后一个分区
if("key".equals(key)){
return partitionNum;
}
//其余的消息都分配到最后一个分区
return Math.abs(key.hashCode()) % (partitionNum - 1);
}
@Override
public void close() {
}
/*
* @description:
* @author bronya
* @date: 2024/4/20 14:50
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
myKey = (String) map.get("key");
}
}