이전 포스팅에서 Docker 와 Docker-compose
를 설치했었다. Kafka 실습 (2) - Docker 및 Docker-compose 설치
이번에는 Zookeeper
, Kafka
, Kafka Manager
를 설치하고 로컬에서 간단한 실습을 해보자.
Install
앞서 Docker-compose
를 설치한 이유는 한번에 관리하기 위함이다. /home/user
경로에 kafka-docker-compose.yml
파일을 생성하고 다음 내용을 추가한 후 저장한다.
version: '3'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-net
restart: always
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9091:9091"
- "9092:9092"
- "29092:29092"
- "29094:29094"
networks:
- kafka-net
restart: always
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_LOCALHOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_LOCALHOST://localhost:9092,PLAINTEXT_HOST://[ip address]:9091
#KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_LOCALHOST://:9092
KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_LOCALHOST://:9092,PLAINTEXT_HOST://:9091
#KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_JMX_PORT: "29094"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SOCKET_REQUEST_MAX_BYTES: 2147483647
KAFKA_MESSAGE_MAX_BYTES: 2147483647
KAFKA_HEAP_OPTS: "-Xms6g -Xmx8g"
akhq:
image: tchiotludo/akhq
container_name: akhq
ports:
- "18080:8080"
networks:
- kafka-net
restart: always
depends_on:
- kafka
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
kafka:
properties:
bootstrap.servers: kafka:29092
volumes:
esdata1:
driver: local
networks:
kafka-net:
driver: bridge
[ip address]
에는 본인의 가상머신으로 설치한 경우 가상머신의 IP를, 외부 서버로 한다면 Public IP를 입력 한다.
실습에서는 가상머신을 활용하였기 때문에(가상머신도 로컬PC에 설치했음) 사설 IP를 입력
Port 개방
firewall-cmd --zone=public --permanent --add-port=9092/tcp
firewall-cmd --reload
위 명령어들을 통해 Kafka 사용 포트인 9092를 개방한다.
docker-compose -f kafka-docker-compose.yml up
위 명령어를 입력하여 정상적으로 실행되는지 확인한다. Docker-compose를 이용하면 정의된 이미지를 찾아서 다운받아 실행까지 도와준다. 만약 백그라운드로 실행하고 싶다면 위 명령어 끝에 -d
를 추가로 붙이면된다.
Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))
만약 위 같은 에러가 나왔다면 다음을 실행한다.
sudo groupadd docker
sudo gpasswd -a $USER docker
newgrp docker
Error connecting: Error while fetching server API version: ('Connection aborted.', error(2, 'No such file or directory'))
sudo service docker status
만약 위 같은 에러가 나왔다면 Docker가 실행되어있는지 확인하고 실행안되어있다면 실행한다
sudo service docker start
bash를 통해 컨테이너 접근
sudo docker exec -it kafka /bin/bash
정상적으로 서비스가 실행되었다면 bash를 통해 접근한다.
컨테이너는 /opt/kafka/
경로에 있으므로 접근하여 bin
경로로 이동한다.
cd /opt/kafka/bin/
Topic 생성
메시지를 보내기 위해 Topic을 생성한다.
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
Producer & Consumer
메시지를 보낼 때는 Producer
를 사용하고 받을 때는 Consuemr
를 사용한다.
터미널 2개를 동시에 열어두고 각각 명령어를 입력한 뒤 메시지를 보내서 받아오는지 확인한다.
터미널 1
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
터미널 2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
왼쪽은 Producer
로 각각 hello
와 world
를 입력하였고 오른쪽은 Consumer
로 Producer
에서 입력한 메시지들이 출력되었다.
Kafka Manager
Kafka Manager
가 정상 실행되면 포트 9000
으로 웹 브라우저로 접근이 가능하다.
포스팅에서 사용한 IP는 192.168.145.130
이므로 해당 URL로 접근하면 다음 화면을 볼 수 있다.
상단메뉴를 클릭하여 Cluster
-> Add Cluster
클러스터를 추가한다.
- Cluster Name 은 본인 마음대로
- Cluster Zookeeper Hosts 는 docker-compose로 실행할 때 사용한 yml 파일 내에 선언한 hostname:port 로 지정한다
- JMX Polling 체크
- consumer information 체크(선택사항(?))
- Enable Active OffsetCache 체크
- brokerViewThreadPoolSize 4
- offsetCacheThreadPoolSize 4
- KafkaAdminClientThreadPoolSize 4
맨 아래에 가서 Save
를 누르면 추가가 된 것이다.
Go to cluster view
를 클릭한다.
상단메뉴 중 Topic
-> Create
를 클릭하여 새로 생성한다.
여기서 실습 예제를 하기위해 registered_user
로 생성한다.
실습예제는 Python으로 한다. 다른언어도 쉽게 예제가 구글링하면 찾을 수 있으니 본인의 주 언어가 아니면 찾아보길 바란다.
(추후 글쓴이는 여기에 언어별로 계속 추가할 예정)
pip
를 통해 kafka
와 faker
를 설치한다.
pip install kafka-python
pip install faker
producer.py
from kafka import KafkaProducer
import json
import time
from faker import Faker
fake = Faker()
def get_registered_user():
return {
"name": fake.name(),
"address": fake.address(),
"created_at": fake.year()
}
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers=['192.168.145.130:9092'],
value_serializer=json_serializer)
if __name__ == "__main__":
while 1 == 1:
registered_user = get_registered_user()
print(registered_user)
producer.send("registered_user", registered_user)
time.sleep(4)
consumer.py
from kafka import KafkaConsumer
from json import loads
import time
if __name__ == "__main__":
while 1 == 1:
consumer = KafkaConsumer(
'registered_user',
bootstrap_servers=['192.168.145.130:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=1000)
for msg in consumer:
print("Registered user = {}".format(msg.value))
각각 파일을 생성 후 파일이 위치한 경로에서 Shift+우클릭
하여 PowerShell
창을 2개를 실행하고 각각 실행한다.
각 파일은 따로 첨부한다.
python producer.py
python consumer.py
위가 Producer
를 통해 메시지를 보내고 있는 것이고 아래가 Consumer
를 통해 메시지를 받고 있는 모습이다.
'DevOps > Kafka' 카테고리의 다른 글
Kafka 실습 (2) - Docker 및 Docker-compose 설치 (0) | 2021.02.15 |
---|---|
Kafka 실습 (1) - 환경설정 (1) | 2021.02.15 |
Apache Kafka Summary (1) | 2021.02.04 |