2020. 4. 29. 15:32ㆍDevelopment/[Dev] 개발일반
아래도 역시 mac을 기준으로 작성하였으면 수많은 블로그들과 레퍼런스, 오픈 소스를 참조하여 실제 동작이 가능한 부분들로 작성하였다.
오늘의 순서는 다음과 같다.
(1) 카프카, 주키퍼 설치 및 실행
(2) 토픽 생성
(3) 프로듀서, 컨슈머 클래스 생성 및 빌드(jar 만들기)
(4) jar 실행
*. 본 구조는 1 broker, 1 topic 이라는 아주 기본적인 환경으로 구성되어 있음.
(1) kafka & zookeeper 설치하기
1) 카프카 및 주키퍼 설치
$ brew install kafka
$ brew install zookeeper
2) 주키퍼 실행
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don't want/need a background service you can just run:
zkServer start
3) 카프카 실행
$ kafka-server-start /usr/local/etc/kafka/server.properties
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
(2) 토픽 생성
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sentilab
(결과 메세지 : Created topic sentilab.)
*. --replication-factor : 복제본 개수
*. --partitions : 파티션 개수
*. --topic : 토픽 이름 :: sentilab
*. localhost 의 포트는 카프카 서비스 실행시
[ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
이라는 부분에서 포트번호 확인
(3) 프로듀서, 컨슈머 클래스 생성 및 빌드
1) 프로듀서 클래스 생성
package com.sentilab;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SimpleKafkaProducer {
private final static String TOPIC = "sentilab";
private final static String SERVERS = "localhost:9092";
private static final String FIN_MESSAGE = "exit";
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", SERVERS);
prop.put("key.serializer", StringSerializer.class.getName()); //"org.apache.kafka.common.serialization.StringSerializer"
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
while(true) {
Scanner sc = new Scanner(System.in);
System.out.print("Producing > ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// some exception
}
});
} catch (Exception e) {
// exception
} finally {
producer.flush();
}
if(FIN_MESSAGE.equals(message)) {
producer.close();
break;
}
}
}
}
2) 컨슈머 클래스 생성
package com.sentilab;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class SimpleKafkaConsumer {
private final static String TOPIC = "sentilab";
private final static String SERVERS = "localhost:9092";
private static final String EXIT_MESSAGE = "exit";
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", SERVERS);
prop.put("group.id", "test");
prop.put("enable.auto.commit", "true");
prop.put("auto.commit.interval.ms", "1000");
prop.put("session.timeout.ms", "30000");
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = null;
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singletonList(TOPIC));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(1000000);
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (!EXIT_MESSAGE.equals(message));
} catch(Exception e) {
// exception
}
}
}
*. 빌드는 그간 해왔던 java 프로젝트의 jar 묶어 내보내기와 같다.
+
프로듀서와 컨슈머를 구분해서 관리하기 위해 jar 파일을 2부로 복사하여 이름을 각각 다음과 같이 변경한다.
kafka-producer-consumer-java-all.jar
> kafka-producer-consumer-java-producer.jar / kafka-producer-consumer-java-consumer.jar
+ 클래스에서 설정한 각종 컨슈머 설정값들의 의미
1. group.id
- 컨슈머 그룹을 식별하는 고유 아이디
- 주키퍼에서 그룹 아이디로 묶어서 offset을 관리
2. bootstrap.servers
- 연결할 서버의 ip + port 정보.
3. fetch.min.bytes
- 한번에 가져올 수 있는 최소한의 데이터 크기
- 1 = 즉시, 그외에는 대기
4. auto.offset.reset
- 카프카의 초기 offset이 없거나 데이터가 삭제되어 현존하지 않는 경우의 설정
- earliest : 가장 빠른 오프셋
- latest(기본) : 최신 오프셋
- none : 이전 오프셋이 없는 경우 컨슈머 그룹에 예외처리
- anything else : 컨슈머에 예외 처리
5. session.timeout.ms
- 컨슈머가 가져갔다고 통보하는 polling 대기 시간 - 이 시간이 지나도 응답이 없으면 실패 처리
- 컨슈머는 Broker에게 허트비트를 전송하여 연결 세션을 인지시킴 (아래 항목 참조)
6. heartbeat.interval.ms
- session.timeout.ms 값보다 낮게 설정하여 세션을 인지시키는 시간
7. max.poll.interval.ms
- 실제 데이터를 가져가는 polling은 없이 하트비트만 하고 있는 경우 허용할 최대 polling 기다림의 시간
* 더 자세한 정보는 공식 레퍼 : https://kafka.apache.org/documentation/#configuration
(4) 실행
1) 카프카 및 주키퍼 서비스 실행
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
[2020-04-27 11:28:00,340] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
$ kafka-server-start /usr/local/etc/kafka/server.properties
[2020-04-27 11:28:35,656] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100288d175c0000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2020-04-27 11:28:35,658] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
2) 프로듀서, 컨슈머 클래스 실행
1. 컨슈머 부터 실행
$ java -jar kafka-producer-consumer-java-consumer.jar
(송신 대기상태에서 프로듀서 콘솔에 문자를 입력하면 컨슈머쪽 콘솔에 해당 문자열이 출력된다)
123
2. 프로듀서 실행 / 콘솔 입력 > 컨슈머에 동일하게 출력됨
$ java -jar kafka-producer-consumer-java-producer.jar
Producing > 123
'Development > [Dev] 개발일반' 카테고리의 다른 글
인텔리제이(intelliJ) maven project / Gradle project jar 배포 (0) | 2020.05.11 |
---|---|
아파치 카프카(Kafka) : 스프링부트 환경 구축 - 3 of 3 (4) | 2020.05.08 |
아파치 카프카(Kafka) : 설치 및 실행 - 1 of 3 (1) | 2020.04.27 |
Flume 설치 및 기본 설정, sink 테스트 - mongoDB sink 2 of 2 (0) | 2020.04.24 |
Flume 설치 및 기본 설정, sink 테스트 - mongoDB sink 1 of 2 (0) | 2020.04.21 |