[Kafka_CDC]_Kafka CDC 구현_(4단계- CDC 확인)

2022. 4. 13. 17:33[Kafka]/[Kafka_CDC]

728x90
반응형

구축 목표]

실시간으로 DB의 데이터가 변할 때 kafka 를 통해 consumer 에게 메세지 보내기.

 

구현 예상도]

구축 환경

window - Producer ] 

 

    - DB : MongoDB

    - zookeeper server

    - Kafka server

    - Kafka connect

    - Debezium connector

 

Linux - virtualBox - Consumer ]

    - Kafka server

 

Connector가 정상적으로 연결이 되었다면,

DB 구조인 Inventory 안에 Customer 를 바라보고 CDC가 대기중 일 것입니다.

 

그럼 해당 topic을 확인해 보겠습니다.

 

1. consumer 부분인 Linux 환경]

 

kafka 가 설치된 곳에서 다음 명령어 실행

해당 kafka server에 무슨 토픽이 있는지 확인해 봅니다.

bin/kafka-topic.sh --list --bootstrap-server 열어둔 ip 주소:9092

정상적으로 토픽을 읽어와 줍니다.

connect-configs

connect-offsets

connect-status

는 커넥터의 정보등이 담겨있고,

s 가 없는건, 제가 오타를 내서 여러개가 생긴 것이니 무시하셔도 됩니다.

 

디비 구조랑 똑같은

customers

orders

products

가 있는데요

각각 토픽을 조회하면 그동안의 기록을 볼 수도, 연결중에 데이터가 바뀌면 바뀐 값을 알려주는 로그를 볼 수도 있습니다.

예비 데이터를 넣은 customers를 조회합니다.

$ bin/kafka-console-consumer.sh --topic customers --from-beginning --bootstrap-server 열어둔 ip 주소:9092

이러한 데이터가 보입니다.

그중 payload 라는 부분이 중요합니다.

1001 의 id를 가지는 값은 기본적으로 입력된 값입니다.

이제 해당 Consumer 를 켜두고 window 환경에서 데이터를 입력해 보도록 하겠습니다.

 

2. window 에서 데이터 입력]

디비에 쿼리를 입력해야 하기 때문에 compose 파일이 있는 곳에서 다음 명령어를 입력합니다.

$ docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'

해당 디비 셀에 들어간 것을 확인할 수 있습니다.

여기에서 다음 명령어로 값을 입력해 줍니다.

db.customer.insert([{_id : NumberLong("5555"), first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com', unique_id : UUID() }]);

들어갔습니다. 

이제 다시 consumer 를 보겠습니다.

하단에 새로운 로그가 보입니다.

payload 뒤로 입력된 값이 보입니다.

가장 중요한점은

맨 아래의 "op" 부분입니다

op : "c" 의 의미는 create 라는 뜻입니다.

생성이 된걸 감지하고 로그를 쏜 것입니다.

op 에는 CRUD 의 정보가 담겨있습니다.

 

이를 통해 데이터의 움직임을 알 수 있게 되었습니다.

 

로그의 더 자세한 정보는 공식문서에 있습니다. (snapshot 부분 참고)

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

감사합니다.

 

모자란 점

Tutorial DB를 사용한 이유]

 

우선 tutorial을 따라서 docker 에 올라간 kafka connect는 정상적으로 작동합니다.

하지만 권한이 없어서 외부연결이 불가하여 아에 처음부터 만들려고 하였습니다.

 

Mongodb를 설치하고 연결하니 연결은 된거 같은데, topic이 생성되지 않았습니다.

Document를 확인해보니 MongoDB를 사용하려면 다음과 같은 조건이 필요하다고 되어있습니다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

그래서 마지막 심정으로 tutorialDB에 연결하니 성공하였습니다.

결국 DB설정문제로 안된 것이므로...

 

다음시간에는 Tutorial DB 가 아니라 새로만든 DB에 적용 및, 데이터를 랜덤한 시간에 집어넣는 코드를 작성하고

consumer 부분에는 받아온 log를 DB에 저장하는 방법을 도전해 보겠습니다.

728x90
반응형