什么是kafka?
Kafka是一个分布式的事件流平台,用于大规模数据的收集,处理,存储,集成。它的应用场景包含分布式数据流,流处理,数据集成,pub/sub消息。 为更好的理解kafka,我们需要先知道什么是事件。
什么是事件?
事件是应用程序用于识别或记录的任何类型的操作,事件,变更。例如一次支付,一次网页点击,一次温度读取以及任何对所发生事件的描述。
事件可以看作为通知与状态的组合。通知是为触发下一个事件的时间元素,而状态是事件自身的描述。
我们通常使用熟悉的数据格式,如JSON表达这个状态信息,而且我们更倾向称其为消息。
Kafka的架构及基础概念
Topic
Kafka的最基本的构成单元是topic,它类似于我们熟悉的关系型数据库中的一张表。
我们可以创建不同的topic来保存不同类型的事件,或创建不同的topic来保存同类事件的过滤,转换过的版本。
Partition
Kafka是分布式系统,为了读写吞吐量及水平扩展的便利性,它提供的对topic的分区功能。
分区将单个主题日志分解为多个日志,每个日志都可以驻留在 Kafka 集群中的单独节点上。这样,存储消息、写入新消息和处理现有消息的工作就可以分配给集群中的许多节点。
也就是一个topic包含多个partition分别存储在多个服务器上。topic是逻辑上的数据拆分,而partition是物理上的数据拆分。
Brokers
Kafka集群的每个节点/服务器成为我们Broker,Broker托管topic的多组分区。
Broker处理每个读取/写入到partition的事件请求,partition间的数据复制也是由broker处理。
Replication
如果一个partition只储存在一个broker,当发生节点故障的时候会发生数据丢失。为了实现数据的高可用,每个partition都会有冗余的副本(replica)由多个broker管理。
主分区的副本称为leader replica,读写请求一般都由leader replica处理,而follower replica只负责从leader replica中同步数据,不对外提供服务。
Producers & Consumers
Kafka有两种类型的客户端,一个叫Producer,另一个叫Consumer。
Producer是负责生产消息的客户端,Consumer负责消息的消费。
安装及配置集群
我这里准备了3个虚拟机用于构建集群环境。
No. | host name | IP | node roles |
---|---|---|---|
#1 | kafka-server1 | 172.25.254.131 | broker,Kraft controller (or zookeeper) |
#2 | kafka-server2 | 172.25.254.132 | broker,Kraft controller (or zookeeper) |
#3 | kafka-server3 | 172.25.254.133 | broker, Kraft controller (or zookeeper) |
下载及解压到指定目录:
1
2
3
4
5
6
7
8
9
# 下载
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# 解压
sudo tar -xvf kafka_2.13-3.6.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-3.6.0
# 创建日志目录
sudo mkdir -pv logs/kraft-combined-logs
基于Kraft的集群搭建
编辑各节点kraft配置文件
1
2
cd /usr/local/kafka_2.13-3.6.0
sudo vim config/kraft/server.properties
kafka-server1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.131:9092,CONTROLLER://172.25.254.131:9093
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
kafka-server2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.132:9092,CONTROLLER://172.25.254.132:9093
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
kafka-server3:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.133:9092,CONTROLLER://172.25.254.133:9093
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
生成cluster uuid及存储格式化
1
2
3
4
5
6
7
8
9
# 获取cluster uuid
./bin/kafka-storage.sh random-uuid
A_D5kj5zTbi2EDTeXHDH3g
# 格式化存储,分别在3个节点上执行
sudo ./bin/kafka-storage.sh format -t A_D5kj5zTbi2EDTeXHDH3g -c ./config/kraft/server.properties
Formatting /usr/local/kafka_2.13-3.6.0/kraft-logs/kraft-combined-logs with metadata.version 3.6-IV2.
启动服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 启动
sudo ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
# log
tail -f logs/server.log
[2023-12-01 01:52:44,231] INFO Awaiting socket connections on 172.25.254.131:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2023-12-01 01:52:44,244] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka startTimeMs: 1701395564243 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,247] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
确认集群状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 查看集群状态
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --status
ClusterId: A_D5kj5zTbi2EDTeXHDH3g
LeaderId: 1
LeaderEpoch: 81
HighWatermark: 24195
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [1,2,3]
CurrentObservers: []
# 查看节点分布
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --replication
NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
1 22633 0 1701407094365 1701407094365 Leader
2 22633 0 1701407093990 1701407093990 Follower
3 22633 0 1701407093989 1701407093989 Follower
基于Zookeeper的集群搭建
配置broker配置文件
1
2
cd /usr/local/kafka_2.13-3.6.0
sudo vim config/server.properties
kafka-server1:
修改server.properties
1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://172.25.254.131:9092
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181
修改zookeeper.properties
1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888
kafka-server2:
修改server.properties
1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
broker.id=2
listeners=PLAINTEXT://172.25.254.132:9092
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181
修改zookeeper.properties
1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888
kafka-server3:
修改server.properties
1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################# Server Basics #############################
broker.id=3
listeners=PLAINTEXT://172.25.254.133:9092
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181
修改zookeeper.properties
1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888
设置myid
1
2
3
4
5
6
7
8
cd /usr/local/kafka_2.13-3.6.0
sudo mkdir zookeeper
# 172.25.254.131
echo 1|sudo tee zookeeper/myid
# 172.25.254.132
echo 2|sudo tee zookeeper/myid
# 172.25.254.133
echo 3|sudo tee zookeeper/myid
启动服务
1
2
3
4
5
6
# 启动zookeeper
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
# 启动kafka
sudo nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
tail -f logs/server.log
topic创建 & 消息的生产,消费测试
创建topic
1
2
./bin/kafka-topics.sh --create --bootstrap-server 172.25.254.131:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.
查看生成的topic
1
2
3
4
5
6
./bin/kafka-topics.sh --bootstrap-server 172.25.254.131:9092 --describe --topic test
Topic: test TopicId: Fdm-gBQ9TcuvuCKjm1caWg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
下一步我们测试消息的生产及消费,为了测试集群我们在#1服务器上生产消息,在#2,#3服务器上读取消息。
通过cli命令开启Producer
1
2
# 172.25.254.131
./bin/kafka-console-producer.sh --bootstrap-server 172.25.254.131:9092 --topic test
通过cli命令开启Consumer
1
2
3
4
# 172.25.254.132
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.132:9092 --topic test --from-beginning
# 172.25.254.133
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.133:9092 --topic test --from-beginning
Producer终端中输入一些信息回车后,另外两个Consumer终端能看到消息,说明我们的集群配置正常。