跳到主要内容

Kafka介绍,安装及集群搭建

· 阅读需 9 分钟

什么是kafka?

Kafka是一个分布式的事件流平台,用于大规模数据的收集,处理,存储,集成。它的应用场景包含分布式数据流,流处理,数据集成,pub/sub消息。 为更好的理解kafka,我们需要先知道什么是事件。

什么是事件?

事件是应用程序用于识别或记录的任何类型的操作,事件,变更。例如一次支付,一次网页点击,一次温度读取以及任何对所发生事件的描述。

事件可以看作为通知与状态的组合。通知是为触发下一个事件的时间元素,而状态是事件自身的描述。

我们通常使用熟悉的数据格式,如JSON表达这个状态信息,而且我们更倾向称其为消息。

Kafka的架构及基础概念

Topic

Kafka的最基本的构成单元是topic,它类似于我们熟悉的关系型数据库中的一张表。

我们可以创建不同的topic来保存不同类型的事件,或创建不同的topic来保存同类事件的过滤,转换过的版本。

Partition

Kafka是分布式系统,为了读写吞吐量及水平扩展的便利性,它提供的对topic的分区功能。

分区将单个主题日志分解为多个日志,每个日志都可以驻留在 Kafka 集群中的单独节点上。这样,存储消息、写入新消息和处理现有消息的工作就可以分配给集群中的许多节点。

也就是一个topic包含多个partition分别存储在多个服务器上。topic是逻辑上的数据拆分,而partition是物理上的数据拆分。

Brokers

Kafka集群的每个节点/服务器成为我们Broker,Broker托管topic的多组分区。

Broker处理每个读取/写入到partition的事件请求,partition间的数据复制也是由broker处理。

Replication

如果一个partition只储存在一个broker,当发生节点故障的时候会发生数据丢失。为了实现数据的高可用,每个partition都会有冗余的副本(replica)由多个broker管理。

主分区的副本称为leader replica,读写请求一般都由leader replica处理,而follower replica只负责从leader replica中同步数据,不对外提供服务。

Producers & Consumers

Kafka有两种类型的客户端,一个叫Producer,另一个叫Consumer。

Producer是负责生产消息的客户端,Consumer负责消息的消费。

安装及配置集群

我这里准备了3个虚拟机用于构建集群环境。

No.host nameIPnode roles
#1kafka-server1172.25.254.131broker,Kraft controller (or zookeeper)
#2kafka-server2172.25.254.132broker,Kraft controller (or zookeeper)
#3kafka-server3172.25.254.133broker, Kraft controller (or zookeeper)

下载及解压到指定目录:

# 下载
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# 解压
sudo tar -xvf kafka_2.13-3.6.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-3.6.0

# 创建日志目录
sudo mkdir -pv logs/kraft-combined-logs

基于Kraft的集群搭建

编辑各节点kraft配置文件

cd /usr/local/kafka_2.13-3.6.0
sudo vim config/kraft/server.properties
config/kraft/server.properties
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.131:9092,CONTROLLER://172.25.254.131:9093
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

生成cluster uuid及存储格式化

# 获取cluster uuid
./bin/kafka-storage.sh random-uuid
A_D5kj5zTbi2EDTeXHDH3g

# 格式化存储,分别在3个节点上执行
sudo ./bin/kafka-storage.sh format -t A_D5kj5zTbi2EDTeXHDH3g -c ./config/kraft/server.properties
Formatting /usr/local/kafka_2.13-3.6.0/kraft-logs/kraft-combined-logs with metadata.version 3.6-IV2.


启动服务

# 启动
sudo ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

# log
tail -f logs/server.log
[2023-12-01 01:52:44,231] INFO Awaiting socket connections on 172.25.254.131:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2023-12-01 01:52:44,244] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka startTimeMs: 1701395564243 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,247] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

确认集群状态


# 查看集群状态
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --status
ClusterId: A_D5kj5zTbi2EDTeXHDH3g
LeaderId: 1
LeaderEpoch: 81
HighWatermark: 24195
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [1,2,3]
CurrentObservers: []
# 查看节点分布
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --replication
NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
1 22633 0 1701407094365 1701407094365 Leader
2 22633 0 1701407093990 1701407093990 Follower
3 22633 0 1701407093989 1701407093989 Follower

基于Zookeeper的集群搭建

配置broker配置文件

cd /usr/local/kafka_2.13-3.6.0
sudo vim config/server.properties
sudo vim config/server.properties
config/server.properties
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://172.25.254.131:9092
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181

sudo vim config/zookeeper.properties
config/zookeeper.properties
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

设置myid

cd /usr/local/kafka_2.13-3.6.0
sudo mkdir zookeeper
# 172.25.254.131
echo 1|sudo tee zookeeper/myid
# 172.25.254.132
echo 2|sudo tee zookeeper/myid
# 172.25.254.133
echo 3|sudo tee zookeeper/myid

启动服务

# 启动zookeeper
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
# 启动kafka
sudo nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

tail -f logs/server.log

topic创建 & 消息的生产,消费测试

创建topic

./bin/kafka-topics.sh --create --bootstrap-server 172.25.254.131:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.

查看生成的topic

./bin/kafka-topics.sh --bootstrap-server 172.25.254.131:9092 --describe --topic test
Topic: test TopicId: Fdm-gBQ9TcuvuCKjm1caWg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

下一步我们测试消息的生产及消费,为了测试集群我们在#1服务器上生产消息,在#2,#3服务器上读取消息。

通过cli命令开启Producer

# 172.25.254.131
./bin/kafka-console-producer.sh --bootstrap-server 172.25.254.131:9092 --topic test

通过cli命令开启Consumer

# 172.25.254.132
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.132:9092 --topic test --from-beginning
# 172.25.254.133
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.133:9092 --topic test --from-beginning

Producer终端中输入一些信息回车后,另外两个Consumer终端能看到消息,说明我们的集群配置正常。

kafka_produce_consume