Skip to main content

Kafka Installation and Cluster Setup

· 7 min read

What is Kafka?

Kafka is a distributed event streaming platform designed for large-scale data collection, processing, storage, and integration. Its use cases include distributed data streams, stream processing, data integration, and pub/sub messaging.

To better understand Kafka, we first need to grasp the concept of an event.

What is an Event?

An event is any type of action, occurrence, or change that an application recognizes or records. Examples include a payment transaction, a webpage click, a temperature reading, or any description of what has occurred.

An event can be seen as a combination of notification and state. The notification is the time-sensitive element that triggers the next event, while the state is the description of the event itself.

We often use familiar data formats like JSON to express this state information, and we frequently refer to it as a message.

Kafka Architecture and Core Concepts

Topic

The most fundamental unit in Kafka is a topic, which is analogous to a table in a relational database.

You can create different topics to store various types of events or filtered and transformed versions of the same events.

Partition

As a distributed system, Kafka offers partitioning for topics to achieve higher read/write throughput and enable horizontal scalability.

Partitions divide a single topic log into multiple logs, each residing on a separate node in the Kafka cluster. This allows the workload of storing messages, writing new messages, and processing existing messages to be distributed across many nodes in the cluster.

In essence, a topic consists of multiple partitions distributed across servers. Topics are logical splits of data, while partitions are physical splits.

Brokers

Each node/server in a Kafka cluster is referred to as a broker, and brokers host multiple partitions of a topic.

Brokers handle all read/write requests for events in partitions, as well as data replication across partitions.

Replication

If a partition is stored on a single broker, node failures can result in data loss. To ensure high availability, each partition has redundant replicas managed by multiple brokers.

The primary replica of a partition is called the leader replica, which handles all read/write requests, while follower replicas synchronize data from the leader but do not serve external requests.

Installation and Cluster Configuration

Here, we set up a Kafka cluster environment using three virtual machines:

No.host nameIPnode roles
#1kafka-server1172.25.254.131broker,Kraft controller (or zookeeper)
#2kafka-server2172.25.254.132broker,Kraft controller (or zookeeper)
#3kafka-server3172.25.254.133broker, Kraft controller (or zookeeper)

download and extract kafka

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
sudo tar -xvf kafka_2.13-3.6.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-3.6.0

# create a log directory
sudo mkdir -pv logs/kraft-combined-logs

Kafka Cluster Setup Based on Kraft

Navigate to the Kafka directory and edit the Kraft server configuration file for each node:

cd /usr/local/kafka_2.13-3.6.0
sudo vim config/kraft/server.properties
config/kraft/server.properties
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.131:9092,CONTROLLER://172.25.254.131:9093
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

generate cluster UUID and format storage

# generate cluster UUID
./bin/kafka-storage.sh random-uuid
A_D5kj5zTbi2EDTeXHDH3g

# format storage on each node
sudo ./bin/kafka-storage.sh format -t A_D5kj5zTbi2EDTeXHDH3g -c ./config/kraft/server.properties
Formatting /usr/local/kafka_2.13-3.6.0/kraft-logs/kraft-combined-logs with metadata.version 3.6-IV2.


start the services

sudo ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

# log
tail -f logs/server.log
[2023-12-01 01:52:44,231] INFO Awaiting socket connections on 172.25.254.131:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2023-12-01 01:52:44,244] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka startTimeMs: 1701395564243 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,247] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

check cluster and node status


# check cluster status
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --status
ClusterId: A_D5kj5zTbi2EDTeXHDH3g
LeaderId: 1
LeaderEpoch: 81
HighWatermark: 24195
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [1,2,3]
CurrentObservers: []
# check node status
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --replication
NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
1 22633 0 1701407094365 1701407094365 Leader
2 22633 0 1701407093990 1701407093990 Follower
3 22633 0 1701407093989 1701407093989 Follower

Kafka Cluster Setup Based on Zookeeper

sudo vim config/server.properties
config/server.properties
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://172.25.254.131:9092
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181

sudo vim config/zookeeper.properties
config/zookeeper.properties
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

set myid for each node:

cd /usr/local/kafka_2.13-3.6.0
sudo mkdir zookeeper
# for kafka-server1
echo 1|sudo tee zookeeper/myid
# for kafka-server2
echo 2|sudo tee zookeeper/myid
# for kafka-server3
echo 3|sudo tee zookeeper/myid

start the services

# Start Zookeeper
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
# Start Kafka
sudo nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

tail -f logs/server.log

Topic Creation and Message Testing

Create a topic:

./bin/kafka-topics.sh --create --bootstrap-server 172.25.254.131:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.

View topic details:

./bin/kafka-topics.sh --bootstrap-server 172.25.254.131:9092 --describe --topic test
Topic: test TopicId: Fdm-gBQ9TcuvuCKjm1caWg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

Start a Producer on Node 1:

# 172.25.254.131
./bin/kafka-console-producer.sh --bootstrap-server 172.25.254.131:9092 --topic test

Start Consumers on Node 2 and Node 3:

# 172.25.254.132
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.132:9092 --topic test --from-beginning
# 172.25.254.133
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.133:9092 --topic test --from-beginning

When messages are entered into the producer terminal, they will appear on both consumer terminals, confirming the cluster is functioning correctly.

kafka_produce_consume