Kafka기본개념, 설치 및 Cluster구성하기

Posted by Geuni's Blog on December 1, 2023

kafka란?

Kafka는 분산 이벤트 스트리밍 플랫폼으로, 대규모 데이터 수집, 처리, 저장, 통합에 사용된다.

특히 데이터 파이프라인, 실시간 분석, 이벤트 스트리밍, pub/sub와 같은 환경에서 널리 사용된다.

Kafka를 더 잘 이해하기 위해서 이벤트란 무엇인지 알아야 할 필요가 있다.

이벤트란?

이벤트는 어플리케이션에 의해 식별되거나 기록되는 모든 유형의 작업, 사건 또는 변경 사항이다. 예를 들어, 결제, 웹페이지 클릭, 온도 측정 등과 같이 발생한 사건에 대한 데이터적인 표현이다.

이벤트는 통지(notification) 와 상태(state)의 조합으로 볼수 있다. 통지는 다음 이벤트를 트리거할수 있는 시간적인 요소이며 상태는 사건자체에 대한 데이터적인 표현이다.

우리는 일반적으로 JSON과 같은 익숙한 데이터 형태로 상태정보를 표현하며 메시지라는 단어로 이를 표현하는데 더 익숙하다.

Kafka아키텍처 및 기본개념

Topic

kafka의 가장 기본적인 구성 단위는 관계형 데이터베이스의 테이블과 같은 topic이다.

우리는 여러개의 topic을 만들어 다양한 종류의 이벤트를 저장하거나 같은 종류의 이벤트를 필터링 혹은 가공하여 다양한 topic에 저장한다.

Partition

Kafka는 분산식 시스템이며 시스템 확장성 및 데이터 R/W성능을 고려하여 topic에 대한 partition기능을 제공한다.

partition기능은 단일 topic의 데이터를 여러개의 그룹으로 쪼개어 Kafka Cluster중의 각각의 Node에 분산시켜 저장하는것이 가능하도록 해준다.

다시 말하면 topic은 여러개의 partition으로 분할되어 여러 서버에 분산되어 저장될수 있다. topic을 통하여 데이터를 논리적으로 분할했다면 partition은 데이터의 물리적으로 분할이 가능하도록 해준다.

Brokers

Kafka Cluster중 매개의 Node/서버를 Broker라고 한다. 여러그룹의 topic partition을 Broker가 관리를 해준다.

주로 producer 혹은 consumer의 request를 받아 partition에 데이터를 읽거나 쓰는 작업을 하게되며 partition의 데이터 복사(replication)작업도 broker가 처리해준다.

Replication

만일 partition하나가 한개의 broker에만 저장된다면 Node장애가 발생했을때 데이터 유실이 발생한다. 데이터 고가용성을 고려하여 일반적으로 매개partition은 여러개의 복사본(replica)을 가지게되며 이를 broker가 관리를 해준다. 。

메인 partition을 leader replica라고 부르며 client request는 일반적으로 leader replica가 처리하게된다. follower replica는 leader replica로부터 데이터 sync만 받으며 대외 서비스는 안한다.

Producers & Consumers

Kafka는 두가지 유형의 client가 있다. 하나는 Producer 또 하나는 Consumer라 부른다.

Producer는 메시지를 게시하는 client이고 Consumer는 메시지 수신/소비하는 client이다.

설치 및 Cluster구성하기

Cluster 구성을 위하여 3개의 VM을 준비한다.

No. host name IP node roles
#1 kafka-server1 172.25.254.131 broker,Kraft controller (or zookeeper)
#2 kafka-server2 172.25.254.132 broker,Kraft controller (or zookeeper)
#3 kafka-server3 172.25.254.133 broker, Kraft controller (or zookeeper)

파일 다운로드 및 압축해제:

1
2
3
4
5
6
# 다운로드
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# 압축해제
sudo tar -xvf kafka_2.13-3.6.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-3.6.0

Kraft 기반의 Cluster구성

instance별로 kraft config파일을 아래와 같이 변경한다.

1
2
3
4
5
cd /usr/local/kafka_2.13-3.6.0
# log directory 생성
sudo mkdir -pv logs/kraft-combined-logs
# kraft config변경
sudo vim config/kraft/server.properties

kafka-server1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.131:9092,CONTROLLER://172.25.254.131:9093
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3


kafka-server2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.132:9092,CONTROLLER://172.25.254.132:9093
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

kafka-server3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@172.25.254.131:9093,2@172.25.254.132:9093,3@172.25.254.133:9093
############################# Socket Server Settings #############################
listeners=PLAINTEXT://172.25.254.133:9092,CONTROLLER://172.25.254.133:9093
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs/kraft-combined-logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

cluster uuid생성 및 스토리지 포맷

1
2
3
4
5
6
7
8
9
# 임의의 한대 서버에서 cluster uuid 생성
./bin/kafka-storage.sh random-uuid
A_D5kj5zTbi2EDTeXHDH3g

# 위에서 생성된 cluster uuid로 instance별로 스토리지 포맷
sudo ./bin/kafka-storage.sh format -t A_D5kj5zTbi2EDTeXHDH3g -c ./config/kraft/server.properties
Formatting /usr/local/kafka_2.13-3.6.0/kraft-logs/kraft-combined-logs with metadata.version 3.6-IV2.


서버별로 각instance 구동

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# startup
sudo ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

# check log
tail -f logs/server.log 
[2023-12-01 01:52:44,231] INFO Awaiting socket connections on 172.25.254.131:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2023-12-01 01:52:44,243] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2023-12-01 01:52:44,244] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,244] INFO Kafka startTimeMs: 1701395564243 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-01 01:52:44,247] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

Cluster상태 확인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# check cluster status
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --status
ClusterId:              A_D5kj5zTbi2EDTeXHDH3g
LeaderId:               1
LeaderEpoch:            81
HighWatermark:          24195
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [1,2,3]
CurrentObservers:       []
# check node status
./bin/kafka-metadata-quorum.sh --bootstrap-server 172.25.254.131:9092,172.25.254.132:9092,172.25.254.133:9092 describe --replication
NodeId	LogEndOffset	Lag	LastFetchTimestamp	LastCaughtUpTimestamp	Status  	
1     	22633       	0  	1701407094365     	1701407094365        	Leader  	
2     	22633       	0  	1701407093990     	1701407093990        	Follower	
3     	22633       	0  	1701407093989     	1701407093989        	Follower	

Zookeeper 기반의 Cluster구성

instance별로 config/server.properties, config/zookeeper.properties 아래와 같이 변경

1
2
cd /usr/local/kafka_2.13-3.6.0
sudo vim config/server.properties

kafka-server1:

config/server.properties

1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://172.25.254.131:9092
advertised.listeners=PLAINTEXT://172.25.254.131:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181


config/zookeeper.properties

1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

kafka-server2:

config/server.properties

1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################# Server Basics #############################
broker.id=2
listeners=PLAINTEXT://172.25.254.132:9092
advertised.listeners=PLAINTEXT://172.25.254.132:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181


config/zookeeper.properties

1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

kafka-server3:

config/server.properties

1
sudo vim config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################# Server Basics #############################
broker.id=3
listeners=PLAINTEXT://172.25.254.133:9092
advertised.listeners=PLAINTEXT://172.25.254.133:9092
############################# Log Basics #############################
log.dirs=/usr/local/kafka_2.13-3.6.0/logs
num.partitions=3
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper #############################
zookeeper.connect=172.25.254.131:2181,172.25.254.132:2181,172.25.254.133:2181

config/zookeeper.properties

1
sudo vim config/zookeeper.properties
1
2
3
4
5
6
7
8
dataDir=/usr/local/kafka_2.13-3.6.0/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.25.254.131:2888:3888
server.2=172.25.254.132:2888:3888
server.3=172.25.254.133:2888:3888

instance별로 myid설정(zookeeper node구분값, zookeeper.properties 중 자신의 server.{ID}와 일치해야 됨.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cd /usr/local/kafka_2.13-3.6.0
sudo mkdir zookeeper
# 172.25.254.131
echo 1|sudo tee zookeeper/myid
# 172.25.254.132
echo 2|sudo tee zookeeper/myid
# 172.25.254.133
echo 3|sudo tee zookeeper/myid

# root권한이 필요할 경우
# 172.25.254.131
echo 1|sudo tee zookeeper/myid
# 172.25.254.132
echo 2|sudo tee zookeeper/myid
# 172.25.254.133
echo 3|sudo tee zookeeper/myid

서비스 구동

1
2
3
4
5
6
# zookeeper구동
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
# kafka구동
sudo nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
# check log
tail -f logs/server.log 

topic생성 & 메시징 테스트

topic 생성

1
2
./bin/kafka-topics.sh --create --bootstrap-server 172.25.254.131:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.

생성된 topic 확인

1
2
3
4
5
6
./bin/kafka-topics.sh --bootstrap-server 172.25.254.131:9092 --describe --topic test
Topic: test	TopicId: Fdm-gBQ9TcuvuCKjm1caWg	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: test	Partition: 1	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1
	Topic: test	Partition: 2	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

kafka가 제공하는 CLI 툴을 이용하여 메시지 전송테스트가 가능하다.

cluster 정상작동여부를 확인하기 위하여 #1 서버에서 메시지를 생성하여 #2, #3번에서 메시지를 받아보자.

#1서버에서 Producer CLI구동

1
2
# 172.25.254.131
./bin/kafka-console-producer.sh --bootstrap-server 172.25.254.131:9092 --topic test

#2, #3 서버에서 Consumer CLI구동

1
2
3
4
# 172.25.254.132
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.132:9092 --topic test --from-beginning  
# 172.25.254.133
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.254.133:9092 --topic test --from-beginning  

Producer 터미널창에 메시지를 입력후 Consumer창에서 메시지 확인이 가능하다면 Cluster구성이 정상적으로 됏다고 볼수있다.

kafka_produce_consume