Kafka介绍,安装及集群搭建

Posted by Geuni's Blog on December 1, 2023

什么是kafka?

Kafka是一个分布式的事件流平台,用于大规模数据的收集,处理,存储,集成。它的应用场景包含分布式数据流,流处理,数据集成,pub/sub消息。 为更好的理解kafka,我们需要先知道什么是事件。

什么是事件?

事件是应用程序用于识别或记录的任何类型的操作,事件,变更。例如一次支付,一次网页点击,一次温度读取以及任何对所发生事件的描述。

事件可以看作为通知与状态的组合。通知是为触发下一个事件的时间元素,而状态是事件自身的描述。

我们通常使用熟悉的数据格式,如JSON表达这个状态信息,而且我们更倾向称其为消息。

Kafka的架构及基础概念

Topic

Kafka的最基本的构成单元是topic,它类似于我们熟悉的关系型数据库中的一张表。

我们可以创建不同的topic来保存不同类型的事件,或创建不同的topic来保存同类事件的过滤,转换过的版本。

Partition

Kafka是分布式系统,为了读写吞吐量及水平扩展的便利性,它提供的对topic的分区功能。

分区将单个主题日志分解为多个日志,每个日志都可以驻留在 Kafka 集群中的单独节点上。这样,存储消息、写入新消息和处理现有消息的工作就可以分配给集群中的许多节点。

也就是一个topic包含多个partition分别存储在多个服务器上。topic是逻辑上的数据拆分,而partition是物理上的数据拆分。

Brokers

Kafka集群的每个节点/服务器成为我们Broker,Broker托管topic的多组分区。

Broker处理每个读取/写入到partition的事件请求,partition间的数据复制也是由broker处理。

Replication

如果一个partition只储存在一个broker,当发生节点故障的时候会发生数据丢失。为了实现数据的高可用,每个partition都会有冗余的副本(replica)由多个broker管理。

主分区的副本称为leader replica,读写请求一般都由leader replica处理,而follower replica只负责从leader replica中同步数据,不对外提供服务。

Producers & Consumers

Kafka有两种类型的客户端,一个叫Producer,另一个叫Consumer。

Producer是负责生产消息的客户端,Consumer负责消息的消费。

安装及配置集群

我这里准备了3个虚拟机用于构建集群环境。

No. host name IP node roles
#1 kafka-server1 172.25.254.131 broker,Kraft controller (or zookeeper)
#2 kafka-server2 172.25.254.132 broker,Kraft controller (or zookeeper)
#3 kafka-server3 172.25.254.133 broker, Kraft controller (or zookeeper)

下载及解压到指定目录:

1
2
3
4
5
6
7
8
9
# 下载
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# 解压
sudo tar -xvf kafka_2.13-3.6.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-3.6.0

# 创建日志目录
sudo mkdir -pv logs/kraft-combined-logs

基于Kraft的集群搭建

编辑各节点kraft配置文件

1
2
cd /usr/local/kafka_2.13-3.6.0
sudo vim config/kraft/server.properties

kafka-server1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.131:9092,CONTROLLER://172.25.254.131:9093
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3


kafka-server2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.132:9092,CONTROLLER://172.25.254.132:9093
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

kafka-server3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.133:9092,CONTROLLER://172.25.254.133:9093
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

生成cluster uuid及存储格式化

1
2
3
4
5
6
7
8
9
# 获取cluster uuid
./bin/kafka-storage.sh random-uuid
A_D5kj5zTbi2EDTeXHDH3g

# 格式化存储,分别在3个节点上执行
sudo ./bin/kafka-storage.sh format -t A_D5kj5zTbi2EDTeXHDH3g -c ./config/kraft/server.properties
Formatting /usr/local/kafka_2.13-3.6.0/kraft-logs/kraft-combined-logs with metadata.version 3.6-IV2.


启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 启动
sudo ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

# log
tail -f logs/server.log 
[2023-12-01 01:52:44,231] INFO Awaiting socket connections on 172.25.254.131:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2023-12-01 01:52:44,244] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka startTimeMs: 1701395564243 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,247] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

确认集群状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 查看集群状态
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --status
ClusterId:              A_D5kj5zTbi2EDTeXHDH3g
LeaderId:               1
LeaderEpoch:            81
HighWatermark:          24195
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [1,2,3]
CurrentObservers:       []
# 查看节点分布
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --replication
NodeId	LogEndOffset	Lag	LastFetchTimestamp	LastCaughtUpTimestamp	Status  	
1     	22633       	0  	1701407094365     	1701407094365        	Leader  	
2     	22633       	0  	1701407093990     	1701407093990        	Follower	
3     	22633       	0  	1701407093989     	1701407093989        	Follower	

基于Zookeeper的集群搭建

配置broker配置文件

1
2
cd /usr/local/kafka_2.13-3.6.0
sudo vim config/server.properties

kafka-server1:

修改server.properties

1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://172.25.254.131:9092
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181


修改zookeeper.properties

1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

kafka-server2:

修改server.properties

1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
broker.id=2
listeners=PLAINTEXT://172.25.254.132:9092
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181


修改zookeeper.properties

1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

kafka-server3:

修改server.properties

1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################# Server Basics #############################
broker.id=3
listeners=PLAINTEXT://172.25.254.133:9092
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181

修改zookeeper.properties

1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

设置myid

1
2
3
4
5
6
7
8
cd /usr/local/kafka_2.13-3.6.0
sudo mkdir zookeeper
# 172.25.254.131
echo 1|sudo tee zookeeper/myid
# 172.25.254.132
echo 2|sudo tee zookeeper/myid
# 172.25.254.133
echo 3|sudo tee zookeeper/myid

启动服务

1
2
3
4
5
6
# 启动zookeeper
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
# 启动kafka
sudo nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

tail -f logs/server.log 

topic创建 & 消息的生产,消费测试

创建topic

1
2
./bin/kafka-topics.sh --create --bootstrap-server 172.25.254.131:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.

查看生成的topic

1
2
3
4
5
6
./bin/kafka-topics.sh --bootstrap-server 172.25.254.131:9092 --describe --topic test
Topic: test	TopicId: Fdm-gBQ9TcuvuCKjm1caWg	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: test	Partition: 1	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1
	Topic: test	Partition: 2	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

下一步我们测试消息的生产及消费,为了测试集群我们在#1服务器上生产消息,在#2,#3服务器上读取消息。

通过cli命令开启Producer

1
2
# 172.25.254.131
./bin/kafka-console-producer.sh --bootstrap-server 172.25.254.131:9092 --topic test

通过cli命令开启Consumer

1
2
3
4
# 172.25.254.132
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.132:9092 --topic test --from-beginning  
# 172.25.254.133
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.133:9092 --topic test --from-beginning  

Producer终端中输入一些信息回车后,另外两个Consumer终端能看到消息,说明我们的集群配置正常。

kafka_produce_consume