Kafka Installation and Cluster Setup
What is Kafka?
Kafka is a distributed event streaming platform designed for large-scale data collection, processing, storage, and integration. Its use cases include distributed data streams, stream processing, data integration, and pub/sub messaging.
To better understand Kafka, we first need to grasp the concept of an event.
What is an Event?
An event is any type of action, occurrence, or change that an application recognizes or records. Examples include a payment transaction, a webpage click, a temperature reading, or any description of what has occurred.
An event can be seen as a combination of notification and state. The notification is the time-sensitive element that triggers the next event, while the state is the description of the event itself.
We often use familiar data formats like JSON to express this state information, and we frequently refer to it as a message.
Kafka Architecture and Core Concepts
Topic
The most fundamental unit in Kafka is a topic, which is analogous to a table in a relational database.
You can create different topics to store various types of events or filtered and transformed versions of the same events.
Partition
As a distributed system, Kafka offers partitioning for topics to achieve higher read/write throughput and enable horizontal scalability.
Partitions divide a single topic log into multiple logs, each residing on a separate node in the Kafka cluster. This allows the workload of storing messages, writing new messages, and processing existing messages to be distributed across many nodes in the cluster.
In essence, a topic consists of multiple partitions distributed across servers. Topics are logical splits of data, while partitions are physical splits.
Brokers
Each node/server in a Kafka cluster is referred to as a broker, and brokers host multiple partitions of a topic.
Brokers handle all read/write requests for events in partitions, as well as data replication across partitions.
Replication
If a partition is stored on a single broker, node failures can result in data loss. To ensure high availability, each partition has redundant replicas managed by multiple brokers.
The primary replica of a partition is called the leader replica, which handles all read/write requests, while follower replicas synchronize data from the leader but do not serve external requests.
Installation and Cluster Configuration
Here, we set up a Kafka cluster environment using three virtual machines:
No. | host name | IP | node roles |
---|---|---|---|
#1 | kafka-server1 | 172.25.254.131 | broker,Kraft controller (or zookeeper) |
#2 | kafka-server2 | 172.25.254.132 | broker,Kraft controller (or zookeeper) |
#3 | kafka-server3 | 172.25.254.133 | broker, Kraft controller (or zookeeper) |
download and extract kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
sudo tar -xvf kafka_2.13-3.6.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-3.6.0
# create a log directory
sudo mkdir -pv logs/kraft-combined-logs
Kafka Cluster Setup Based on Kraft
Navigate to the Kafka directory and edit the Kraft server configuration file for each node:
cd /usr/local/kafka_2.13-3.6.0
sudo vim config/kraft/server.properties
- server #1
- server #2
- server #3
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.131:9092,CONTROLLER://172.25.254.131:9093
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Server Basics #############################
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.132:9092,CONTROLLER://172.25.254.132:9093
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Server Basics #############################
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.133:9092,CONTROLLER://172.25.254.133:9093
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
generate cluster UUID and format storage
# generate cluster UUID
./bin/kafka-storage.sh random-uuid
A_D5kj5zTbi2EDTeXHDH3g
# format storage on each node
sudo ./bin/kafka-storage.sh format -t A_D5kj5zTbi2EDTeXHDH3g -c ./config/kraft/server.properties
Formatting /usr/local/kafka_2.13-3.6.0/kraft-logs/kraft-combined-logs with metadata.version 3.6-IV2.
start the services
sudo ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
# log
tail -f logs/server.log
[2023-12-01 01:52:44,231] INFO Awaiting socket connections on 172.25.254.131:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2023-12-01 01:52:44,244] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka startTimeMs: 1701395564243 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,247] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
check cluster and node status
# check cluster status
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --status
ClusterId: A_D5kj5zTbi2EDTeXHDH3g
LeaderId: 1
LeaderEpoch: 81
HighWatermark: 24195
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [1,2,3]
CurrentObservers: []
# check node status
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --replication
NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
1 22633 0 1701407094365 1701407094365 Leader
2 22633 0 1701407093990 1701407093990 Follower
3 22633 0 1701407093989 1701407093989 Follower
Kafka Cluster Setup Based on Zookeeper
- server #1
- server #2
- server #3
sudo vim config/server.properties
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://172.25.254.131:9092
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181
sudo vim config/zookeeper.properties
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888
sudo vim config/server.properties
############################# Server Basics #############################
broker.id=2
listeners=PLAINTEXT://172.25.254.132:9092
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181
sudo vim config/zookeeper.properties
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888
sudo vim config/server.properties
############################# Server Basics #############################
broker.id=3
listeners=PLAINTEXT://172.25.254.133:9092
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181
sudo vim config/zookeeper.properties
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888
set myid for each node:
cd /usr/local/kafka_2.13-3.6.0
sudo mkdir zookeeper
# for kafka-server1
echo 1|sudo tee zookeeper/myid
# for kafka-server2
echo 2|sudo tee zookeeper/myid
# for kafka-server3
echo 3|sudo tee zookeeper/myid
start the services
# Start Zookeeper
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
# Start Kafka
sudo nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
tail -f logs/server.log
Topic Creation and Message Testing
Create a topic:
./bin/kafka-topics.sh --create --bootstrap-server 172.25.254.131:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.
View topic details:
./bin/kafka-topics.sh --bootstrap-server 172.25.254.131:9092 --describe --topic test
Topic: test TopicId: Fdm-gBQ9TcuvuCKjm1caWg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Start a Producer on Node 1:
# 172.25.254.131
./bin/kafka-console-producer.sh --bootstrap-server 172.25.254.131:9092 --topic test
Start Consumers on Node 2 and Node 3:
# 172.25.254.132
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.132:9092 --topic test --from-beginning
# 172.25.254.133
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.133:9092 --topic test --from-beginning
When messages are entered into the producer terminal, they will appear on both consumer terminals, confirming the cluster is functioning correctly.