Flume 설치 및 기본 설정, sink 테스트 - mongoDB sink 2 of 2

2020. 4. 24. 00:57Development/[Dev] 개발일반

728x90

몽고DB로 적재할 원천 데이터는 심플하게 키와 밸류로 이루어진 json이 들어있는 "로그파일"이며,

(이 로그파일은 웹서버 등에서 자동으로 적재될 것을 가정하였기에 테스트 단계에서는 수동으로 파일을 생성하였다)

 

전체적인 동작은

flume-ng 가 로그파일들을 읽어서(spooldir이라는 방식사용) sink로 보내면 sink가 몽고DB로 보내는 모양이며,

공식 레퍼런스 그림을 토대로 재구성하여 주석을 달면 아래와 같다. (빨간색)

 

기본 이미지출처 : https://flume.apache.org

 

 

오늘의 조리법은 크게 다음과 같다.

(1) 로컬 mongo db 셋팅 : flume database, admin 유저, json 컬렉션, 유저 권한 셋팅
(2) flume.conf 파일 셋팅 : spooldir 방식
(3) mongo db로 로그파일의 json을 insert하는 jar로 프로젝트 수정
(4) flume 실행 및 로그파일 넣기

 


(1) 로컬 mongo db 셋팅 : flume database, admin 유저, json 컬렉션, 유저 권한 셋팅
#db 생성
use flume

#collection 생성
db.createCollection("json")

#유저 생성 - 콘솔에서 비번 admin 타이핑
db.createUser(
  {
    user: "admin",
    pwd: passwordPrompt(),  // or cleartext password
    roles: [
       { role: "readWrite", db: "flume" }
    ]
  }
)

#유저 권한 셋팅
db.grantRolesToUser('admin',[{ role: "admin", db: "flume" }])

#샘플데이터 넣기 - 1건정도
db.json.insert({"CPU" : 0});

 


(2) flume.conf 파일 셋팅 : spooldir 방식 

*. (1)번 항목을 바탕으로 작성하게 된다.

agent.sources = r1
agent.channels = hdfschannel
agent.sinks = sink1

agent.sources.r1.type = spooldir				  #sink 방식
agent.sources.r1.spoolDir = /Users/freddy/log     #로그파일을 읽어올 디렉토리
agent.sources.r1.channels = hdfschannel
agent.sinks.sink1.type = com.sentilab.MongoSink
agent.sinks.sink1.channel = hdfschannel

agent.sinks.sink1.hostNames = localhost
agent.sinks.sink1.database = flume
agent.sinks.sink1.collection = json
agent.sinks.sink1.user = admin
agent.sinks.sink1.password = admin

agent.channels.hdfschannel.type = memory
agent.channels.hdfschannel.capacity = 100
------------------------------------------
:wq!

 

(3) mongo db로 로그파일의 json을 insert하는 jar로 프로젝트 수정 

1) MongoSink.java 파일을 아래와 같이 수정

package com.sentilab;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;

import static com.sentilab.MongoSinkConstants.*;

public class MongoSink extends AbstractSink implements Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MongoSink.class);
    private MongoDatabase database;
    private MongoClient client;
    private MongoCollection<BasicDBObject> collection;
    private List<ServerAddress> mongoHosts;
    private MongoCredential credential;

    private String databaseName;
    private String collectionName;
    private SinkCounter sinkCounter;

    public Status process() throws EventDeliveryException 
    {
        Status status = Status.READY;
        try 
        {
            status = receiveEvents();
            logger.info("status ", status.name());
        } 
        catch (Exception e) 
        {
            logger.info("mongo sink process error", e);
        }

        return status;

    }


    @Override
    public synchronized void start() 
    {
        logger.info("Starting MongoDB sink");
        sinkCounter.start();
        try 
        {
            client = MongoClients.create(
                    MongoClientSettings.builder()
                            .applyToClusterSettings(builder ->
                                    builder.hosts(mongoHosts))
                            .credential(credential)
                            .build());

            database = client.getDatabase(databaseName);
            collection = database.getCollection(collectionName, BasicDBObject.class);
            sinkCounter.incrementConnectionCreatedCount();
        } 
        catch (Exception e) 
        {
            logger.error("Exception while connecting to MongoDB", e);
            sinkCounter.incrementConnectionFailedCount();
            if (client != null) {
                client.close();
                sinkCounter.incrementConnectionClosedCount();
            }
        }
        super.start();
        logger.info("MongoDB sink started");
    }

    @Override
    public synchronized void stop() 
    {
        logger.info("Stopping MongoDB sink");
        if (client != null) 
        {
            client.close();
        }
        sinkCounter.incrementConnectionClosedCount();
        sinkCounter.stop();
        super.stop();
        logger.info("MongoDB sink stopped");
    }

    /* flume.conf 에서 정보를 읽어와서 셋팅하게 된다 */
    @Override
    public void configure(Context context) 
    {
        mongoHosts = parseHostnames(context.getString(HOSTNAMES));
        credential = getCredential(context);
        databaseName = context.getString(DATABASE);
        collectionName = context.getString(COLLECTION);

        if (sinkCounter == null) 
        {
            sinkCounter = new SinkCounter(getName());
        }
    }

    public static List<ServerAddress> parseHostnames(String hostNames) 
    {
        List<ServerAddress> serverAddresses = new LinkedList<>();
        String[] seedStrings = StringUtils.deleteWhitespace(hostNames).split(",");
        for (String seed : seedStrings) 
        {
            String[] hostAndPort = seed.split(":");
            String host = hostAndPort[0];
            int port;
            if (hostAndPort.length == 2) 
            {
                port = Integer.parseInt(hostAndPort[1]);
            } 
            else 
            {
                port = 27017;
            }
            serverAddresses.add(new ServerAddress(host, port));
        }

        return serverAddresses;
    }

    /* flume.conf 에서 정보를 읽어와서 몽고 접속과 관련한 정보를 셋팅하게 된다 */
    private MongoCredential getCredential(Context context) 
    {
        String user = context.getString(USER);
        String database = context.getString(DATABASE);
        String password = context.getString(PASSWORD);
        return MongoCredential.createCredential(user, database, password.toCharArray());
    }



    //채널을 열고 이벤트를 받을 준비를 한다
    private Status receiveEvents() 
    {
        Status status = Status.READY;
        Channel channel = getChannel();
        Transaction tx = null;

        try 
        {
            tx = channel.getTransaction();
            tx.begin();

            Event event = channel.take();
            if (event == null) 
            {
                status = Status.BACKOFF;
            } 
            else 
            {
                saveEvents(parseEventToObj(event));
            }

            tx.commit();
        } 
        catch (Exception e) 
        {
            logger.error("can't process events, drop it!", e);
            if (tx != null) {
                tx.commit();
            }
        } 
        finally 
        {
            if (tx != null) 
            {
                tx.close();
                logger.info("tx.close() ");
            }

        }

        return status;
    }


    /* json 정보 db에 저장  */
    private void saveEvents(LinkedList<BasicDBObject> dto) 
    {
        for(BasicDBObject mongoDTO : dto) 
        {
            try 
            {
                logger.info("documents saving...", mongoDTO.toString());
                collection.insertOne(mongoDTO);
            } 
            catch (Exception e) 
            {
                logger.error("documents save error", e);
                e.printStackTrace();
            }
        }
    }

    /* json 정보 db에 저장 - 싱글, 벌크 저장을 대비하여 오버로딩 처리함 */
    private void saveEvents(BasicDBObject docu) 
    {

        logger.info("documents save start :: dto :: " + docu.toString());

        LinkedList<BasicDBObject> mongoList = new LinkedList<>();
        mongoList.add(docu);
        saveEvents(mongoList);

    }

    //json 데이터를 BasicDBObject 형태로 변경
    private BasicDBObject parseEventToObj(Event event) 
    {
        byte[] body = event.getBody();
        BasicDBObject retMongoDocu = null;

        try 
        {
            String bodyStr = new String(body);

            logger.info("Mongo Sink BODY : " + bodyStr);

            //이 부분이 json 배열(여러건)일 경우 jsonarray로 로직을 변경하여야 한다.
            //아래는 단건 처리
            JSONObject jsonObject = JSON.parseObject(bodyStr);
            BasicDBObject document = new BasicDBObject();
            document.put("CPU", jsonObject.get("CPU"));


            retMongoDocu = document;
            if(retMongoDocu == null) 
            {
                logger.warn("Mongo Sink dto null");
            }
            else 
            {
                logger.warn("Mongo Sink dto not null :: " + retMongoDocu.toString());
            }
            return retMongoDocu;
        } 
        catch (Exception e) 
        {
            logger.error("Can't parse events: " + new String(body), e);
            return null;
        }
    }
}

2) 위와 같이 수정한뒤, sink 프로젝트를 gradle jar로 내보내어 플룸 홈의 lib 폴더로 복사한다.

(지난시간에 했던 플룸 라이브러리 복사 동작을 다시 수행하는 것)

jar로 내보내고 복사하는 방법을 어떻게 하는지 까먹었다면? (복습합시다..)

더보기

 1) java jar 만들기

 1. 빌드 옵션 일부 수정

  -  인텔리j의 우측 [Gradle] 에서 프로젝트 > Tasks > build > jar 를 따라가서 우클릭

-  Arguments 에 Flume Home 디렉토리 경로/lib/*을 넣는다 ( 이 내용은 빼도 되는지 아닌지 확인중)

*. 입력 : -cp /usr/local/Cellar/flume/1.9.0_1/libexec/lib/*

argu 설정

 - 빌드 : 창을 닫고 다시 jar 더블클릭

저 항목을 더블클릭하면 빌드가 시작된다.

*. 정상 빌드 완료시 다음 위치에 2가지 버전으로 jar 파일들이 생성된다.  (외부라이브러리가 함께 들어있는 버전, 아닌버전)

잘 해결되었으면 좋겠습니다..라는 진실된 마음으로 정말 근 몇년간만에 초집중하여 기술을 독학하여 익혔습니다..밤새 공부하느라 힘들었네요..진짜..개념이 없는 상태에서 정리하고 실행시키다보니 멘탈이 ... 그래도 뿌듯하네요~ 최애후배에게 모범된 개발자 선배로서의 늠름(?)함과 능력을 보여줄 수 있었던것만 같습니다~


(4) flume 실행 및 로그파일 넣기

1) 플룸 실행

$ cd $FLUME_HOME/bin/

 

$FLUME_HOME/bin/flume-ng agent -c conf/ --conf-file ../conf/flume.conf --name agent --classpath "$FLUME_HOME/lib/*" -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=27017

 

2) 로그파일 넣기

  1.  flume.conf에 설정했던 agent.sources.r1.spoolDir = /Users/freddy/log 값에 따라

  /Users/freddy/log 디렉토리에 로그파일을 넣는다.

$ cd /Users/freddy/log

$ vi test1.log

{"CPU": 100}

 

*. 복수의 로그 내용을 추출하려고 한다면?

더보기

*.  추후 복수의  배열로 json array 가 된 경우는 아래와 같이 로그내용이 변경되어야 한다. 

+  sink jar 프로젝트도 로그파일의 json 내용을 배열로 읽어올 수 있도록 JSONObject jsonObject = JSON.parseObject(bodyStr); 다음 부분부터 json array로 치환, for 반복문등을 통해 > json object 에 접근하여 값들을 추출하여야 한다. 

[{"CPU": 100},{"CPU": 101},{"CPU": 102}]

 

2) db에 데이터 적재 확인

> mongo

> use flume

> db.json.find()

 

 

-------------------------

후룸라이드에서 일어나며 :

언어와 코딩과 기술들은 단순히 컴퓨터의 문법입니다.

명사 + 조사가 합쳐지면 주어 역할을 한다고 우리가 배웠듯이 그렇게 하나씩 익혀나가면 된다고 생각합니다.

가보지 않아서 가보는 방법을 몰라서 가보는 길까지 어두컴컴하기에 어려운 것일뿐..

 

"와...잘한다..나는 왜이렇게 더디지"

라고 남과 비교하지 마세요- 

지금 어려운게, 더딘게, 모르겠는게 더 위대하고 아름다운겁니다.

앞으로 알게될 즐거움과 성취감과 행복함이 보상으로 주어지니까요!

사막 한가운데서 마신 포카리와 겨울찬바람속에 마신 포카리 중, 어떤게 더 갈증이 해소되고 더 행복할까요?

 

평일에는 항상 정진하는 자세로 하루 1시간은 스스로의 학습을 위해 시간을 투자하세요-

(주말에 공부하는 것도 좋지만...)

 

아시겠지만..이렇게 방송을 하고 글을쓰고 업으로 삼고 있는 저에게도 너무도 진지하게

"다른 길을 알아봐요. 실력이 너무 없네"라고 했던 사람이 있었습니다.

오기와 끈기로 버티면서 엉덩이가 무거운 사람이 결국 이깁니다..

 

포기하지마세요.

후회는 모든것을 다 쏟아붓지 않았을때 나오는 비겁한 합리화의 그림자 입니다.

후회없이 오늘을 살고 후회없이 누군가를 사랑했다면 후회없이 도전했다면

후회는 없을겁니다.

 

실패는 도전을 멈췄을때만 나타나는 단어입니다.

도전을 멈춘적이 없다면 실패할 틈도 없는것 아닐까요.

 

저는, 제 글을 읽고 제 방송을 보는 분들이

뒤로 도망칠 쥐구멍을 만들어 놓고 개발을 하시는 것이 아니길 바랍니다.

사람은 도망칠수 있는 방도가 있으면, 절실하지 않으면 포기하기 됩니다..

포기할 수 없는 상황, 돌아갈 곳이 없는 상황으로 본인을 한번쯤은 몰아세워보세요.

 

그럼 여러분은 어느샌가 저보다 더 센치한 개발자가 되어있으실겁니다.

 

- 그래도 차가운 개발자가 아니라 가슴 따뜻한 "사람"으로 기억되고 싶은 2020년 4월 어느날 밤에. 내일 출근해야하니까 이제는 잠을 청하러 도망치는 센치한 개발자로부터 

 

 

 

728x90