본문으로 바로가기
반응형

이전 포스팅에서 Docker 와 Docker-compose를 설치했었다. Kafka 실습 (2) - Docker 및 Docker-compose 설치

이번에는 Zookeeper, Kafka, Kafka Manager를 설치하고 로컬에서 간단한 실습을 해보자.


Install

앞서 Docker-compose를 설치한 이유는 한번에 관리하기 위함이다. /home/user 경로에 kafka-docker-compose.yml 파일을 생성하고 다음 내용을 추가한 후 저장한다.

version: '3'

services:

  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-net
    restart: always

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9091:9091"
      - "9092:9092"
      - "29092:29092"
      - "29094:29094"
    networks:
      - kafka-net
    restart: always
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      #KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_LOCALHOST://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_LOCALHOST://localhost:9092,PLAINTEXT_HOST://[ip address]:9091
      #KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_LOCALHOST://:9092
      KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_LOCALHOST://:9092,PLAINTEXT_HOST://:9091
      #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_JMX_PORT: "29094"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 2147483647
      KAFKA_MESSAGE_MAX_BYTES: 2147483647
      KAFKA_HEAP_OPTS: "-Xms6g -Xmx8g"

  akhq:
    image: tchiotludo/akhq
    container_name: akhq
    ports:
      - "18080:8080"
    networks:
      - kafka-net
    restart: always
    depends_on:
      - kafka
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            kafka:
              properties:
                bootstrap.servers: kafka:29092

volumes:
  esdata1:
    driver: local

networks:
  kafka-net:
    driver: bridge

[ip address]에는 본인의 가상머신으로 설치한 경우 가상머신의 IP를, 외부 서버로 한다면 Public IP를 입력 한다.
실습에서는 가상머신을 활용하였기 때문에(가상머신도 로컬PC에 설치했음) 사설 IP를 입력


Port 개방

firewall-cmd --zone=public --permanent --add-port=9092/tcp
firewall-cmd --reload

위 명령어들을 통해 Kafka 사용 포트인 9092를 개방한다.


docker-compose -f kafka-docker-compose.yml up

위 명령어를 입력하여 정상적으로 실행되는지 확인한다. Docker-compose를 이용하면 정의된 이미지를 찾아서 다운받아 실행까지 도와준다. 만약 백그라운드로 실행하고 싶다면 위 명령어 끝에 -d 를 추가로 붙이면된다.


Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))

만약 위 같은 에러가 나왔다면 다음을 실행한다.

sudo groupadd docker
sudo gpasswd -a $USER docker
newgrp docker

Error connecting: Error while fetching server API version: ('Connection aborted.', error(2, 'No such file or directory'))

sudo service docker status

만약 위 같은 에러가 나왔다면 Docker가 실행되어있는지 확인하고 실행안되어있다면 실행한다

sudo service docker start

bash를 통해 컨테이너 접근

sudo docker exec -it kafka /bin/bash

정상적으로 서비스가 실행되었다면 bash를 통해 접근한다.

컨테이너는 /opt/kafka/ 경로에 있으므로 접근하여 bin 경로로 이동한다.

cd /opt/kafka/bin/

Topic 생성

메시지를 보내기 위해 Topic을 생성한다.

./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test


Producer & Consumer

메시지를 보낼 때는 Producer를 사용하고 받을 때는 Consuemr를 사용한다.
터미널 2개를 동시에 열어두고 각각 명령어를 입력한 뒤 메시지를 보내서 받아오는지 확인한다.

터미널 1

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

터미널 2

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

왼쪽은 Producer로 각각 helloworld를 입력하였고 오른쪽은 ConsumerProducer에서 입력한 메시지들이 출력되었다.


Kafka Manager

Kafka Manager가 정상 실행되면 포트 9000 으로 웹 브라우저로 접근이 가능하다.

포스팅에서 사용한 IP는 192.168.145.130 이므로 해당 URL로 접근하면 다음 화면을 볼 수 있다.


상단메뉴를 클릭하여 Cluster -> Add Cluster 클러스터를 추가한다.

  • Cluster Name 은 본인 마음대로
  • Cluster Zookeeper Hosts 는 docker-compose로 실행할 때 사용한 yml 파일 내에 선언한 hostname:port 로 지정한다
  • JMX Polling 체크
  • consumer information 체크(선택사항(?))

  • Enable Active OffsetCache 체크
  • brokerViewThreadPoolSize 4
  • offsetCacheThreadPoolSize 4
  • KafkaAdminClientThreadPoolSize 4

맨 아래에 가서 Save를 누르면 추가가 된 것이다.

Go to cluster view를 클릭한다.


상단메뉴 중 Topic -> Create를 클릭하여 새로 생성한다.
여기서 실습 예제를 하기위해 registered_user로 생성한다.


실습예제는 Python으로 한다. 다른언어도 쉽게 예제가 구글링하면 찾을 수 있으니 본인의 주 언어가 아니면 찾아보길 바란다.
(추후 글쓴이는 여기에 언어별로 계속 추가할 예정)

pip를 통해 kafkafaker를 설치한다.

pip install kafka-python
pip install faker

producer.py

from kafka import KafkaProducer
import json
import time
from faker import Faker

fake = Faker()

def get_registered_user():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "created_at": fake.year()
    }

def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(bootstrap_servers=['192.168.145.130:9092'],
                         value_serializer=json_serializer)

if __name__ == "__main__":
    while 1 == 1:
        registered_user = get_registered_user()
        print(registered_user)
        producer.send("registered_user", registered_user)
        time.sleep(4)

consumer.py

from kafka import KafkaConsumer
from json import loads
import time

if __name__ == "__main__":
    while 1 == 1:
        consumer = KafkaConsumer(
            'registered_user',
            bootstrap_servers=['192.168.145.130:9092'],
            auto_offset_reset='latest',
            enable_auto_commit=True,
            value_deserializer=lambda x: loads(x.decode('utf-8')),
            consumer_timeout_ms=1000)

        for msg in consumer:
            print("Registered user = {}".format(msg.value))

각각 파일을 생성 후 파일이 위치한 경로에서 Shift+우클릭 하여 PowerShell 창을 2개를 실행하고 각각 실행한다.

각 파일은 따로 첨부한다.

producer.py
671 B
consumer.py
522 B

python producer.py
python consumer.py

위가 Producer를 통해 메시지를 보내고 있는 것이고 아래가 Consumer를 통해 메시지를 받고 있는 모습이다.

반응형

'DevOps > Kafka' 카테고리의 다른 글

Kafka 실습 (2) - Docker 및 Docker-compose 설치  (0) 2021.02.15
Kafka 실습 (1) - 환경설정  (1) 2021.02.15
Apache Kafka Summary  (1) 2021.02.04