아파치 카프카(Kafka) : 설치 및 실행 - 2 of 3

2020. 4. 29. 15:32Development/[Dev] 개발일반

728x90

아래도 역시 mac을 기준으로 작성하였으면 수많은 블로그들과 레퍼런스, 오픈 소스를 참조하여 실제 동작이 가능한 부분들로 작성하였다.

 

오늘의 순서는 다음과 같다.

(1) 카프카, 주키퍼 설치 및 실행 
(2) 토픽 생성
(3) 프로듀서, 컨슈머 클래스 생성 및 빌드(jar 만들기)
(4) jar 실행

*. 본 구조는 1 broker, 1 topic 이라는 아주 기본적인 환경으로 구성되어 있음.

 


(1) kafka & zookeeper 설치하기

1) 카프카 및 주키퍼 설치

$ brew install kafka

$ brew install zookeeper

 

2) 주키퍼 실행

$  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

To have launchd start zookeeper now and restart at login:
  brew services start zookeeper
Or, if you don't want/need a background service you can just run:
  zkServer start

 

3) 카프카 실행

$ kafka-server-start /usr/local/etc/kafka/server.properties

To have launchd start kafka now and restart at login:
  brew services start kafka

Or, if you don't want/need a background service you can just run:
  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

 


(2) 토픽 생성

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sentilab

(결과 메세지 : Created topic sentilab.)

 

*. --replication-factor : 복제본 개수

*. --partitions : 파티션 개수

*. --topic : 토픽 이름 :: sentilab

*. localhost 의 포트는 카프카 서비스 실행시

[ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)

이라는 부분에서 포트번호 확인

 


(3) 프로듀서, 컨슈머 클래스 생성 및 빌드

1) 프로듀서 클래스 생성

package com.sentilab;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class SimpleKafkaProducer {
    private final static String TOPIC = "sentilab";
    private final static String SERVERS = "localhost:9092";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", SERVERS);
        prop.put("key.serializer", StringSerializer.class.getName()); //"org.apache.kafka.common.serialization.StringSerializer"
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

        while(true) {
            Scanner sc = new Scanner(System.in);
            System.out.print("Producing > ");
            String message = sc.nextLine();

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
            try {
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        // some exception
                    }
                });

            } catch (Exception e) {
                // exception
            } finally {
                producer.flush();
            }

            if(FIN_MESSAGE.equals(message)) {
                producer.close();
                break;
            }
        }
    }

}

2) 컨슈머 클래스 생성

package com.sentilab;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    private final static String TOPIC = "sentilab";
    private final static String SERVERS = "localhost:9092";
    private static final String EXIT_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", SERVERS);
        prop.put("group.id", "test");
        prop.put("enable.auto.commit", "true");
        prop.put("auto.commit.interval.ms", "1000");
        prop.put("session.timeout.ms", "30000");
        prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String, String> consumer = null;
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singletonList(TOPIC));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(1000000);

                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (!EXIT_MESSAGE.equals(message));
        } catch(Exception e) {
            // exception
        }

    }

}

 

*. 빌드는 그간 해왔던 java 프로젝트의 jar 묶어 내보내기와 같다.

+

프로듀서와 컨슈머를 구분해서 관리하기 위해 jar 파일을 2부로 복사하여 이름을 각각 다음과 같이 변경한다.

kafka-producer-consumer-java-all.jar 

> kafka-producer-consumer-java-producer.jar / kafka-producer-consumer-java-consumer.jar

 

+ 클래스에서 설정한 각종 컨슈머 설정값들의 의미

더보기

1. group.id 
  - 컨슈머 그룹을 식별하는 고유 아이디
  - 주키퍼에서 그룹 아이디로 묶어서 offset을 관리


2. bootstrap.servers 
 - 연결할 서버의 ip + port 정보.


3. fetch.min.bytes
  - 한번에 가져올 수 있는 최소한의 데이터 크기
  - 1 = 즉시, 그외에는 대기


4. auto.offset.reset
  - 카프카의 초기 offset이 없거나 데이터가 삭제되어 현존하지 않는 경우의 설정
  - earliest : 가장 빠른 오프셋
  - latest(기본) : 최신 오프셋
  - none : 이전 오프셋이 없는 경우 컨슈머 그룹에 예외처리
  - anything else : 컨슈머에 예외 처리

5. session.timeout.ms
  - 컨슈머가 가져갔다고 통보하는 polling 대기 시간 - 이 시간이 지나도 응답이 없으면 실패 처리
  - 컨슈머는 Broker에게 허트비트를 전송하여 연결 세션을 인지시킴 (아래 항목 참조)

6. heartbeat.interval.ms
  - session.timeout.ms 값보다 낮게 설정하여 세션을 인지시키는 시간

 

7. max.poll.interval.ms
   - 실제 데이터를 가져가는 polling은 없이 하트비트만 하고 있는 경우 허용할 최대 polling 기다림의 시간

 

* 더 자세한 정보는 공식 레퍼 : https://kafka.apache.org/documentation/#configuration


(4) 
실행

1) 카프카 및 주키퍼 서비스 실행

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

 

[2020-04-27 11:28:00,340] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

 

$ kafka-server-start /usr/local/etc/kafka/server.properties

 

[2020-04-27 11:28:35,656] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100288d175c0000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)

[2020-04-27 11:28:35,658] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)

 

2) 프로듀서, 컨슈머 클래스 실행

 1. 컨슈머 부터 실행

 $ java -jar kafka-producer-consumer-java-consumer.jar

 (송신 대기상태에서 프로듀서 콘솔에 문자를 입력하면 컨슈머쪽 콘솔에 해당 문자열이 출력된다)

 123

 

 2. 프로듀서 실행 / 콘솔 입력 > 컨슈머에 동일하게 출력됨

 $ java -jar kafka-producer-consumer-java-producer.jar

Producing > 123

728x90