구축 목표]

실시간으로 DB의 데이터가 변할 때 kafka 를 통해 consumer 에게 메세지 보내기.


구현 예상도]

구축 환경

window - Producer ] 


    - DB : MongoDB

    - zookeeper server

    - Kafka server

    - Kafka connect

    - Debezium connector


Linux - virtualBox - Consumer ]

    - Kafka server


포트를 열어둔 상태에서 Linux 환경에서는 topic 을 구독하여 읽어오기만 할 것이니 더이상 건들게 없다.


Window 환경에서 Connect 설치부터 시작]


1. Connect 설치 이해]


kafka Connect 위에 Connector를 설치하는것입니다.

Kafka Connect는 Kafka 설치파일 안에 전부 들어 있습니다.

Kafka Connect 를 구동시킬때 plugin path를 지정함으로써 Connector를 적용시킬 수 있습니다.


2. kafka Connector 다운로드]



여기에서 DB에 맞는 connector .tar.gz 파일 다운로드


다운로드 받은 파일을 kafka 를 설치한 Directory 로 이동 후 압축 해제

PowerShell을 켜서 kafka 가 설치된 config 파일 위치로 이동한다.

여기에서 connect-distributed.properties 를 수정

> notepad .\connect-distributed.properties

중요한 부분만 설명하겠습니다.

// 카프카 서버의 주소ip와 port 기입
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
// 데이터를 저장, 가져올 때 변환하는 정보 기입
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
//offset 커밋 주기 설정
# Flush much faster than normal, which is useful for testing/debugging

위는 건드리지 않아도 됩니다. 

중요한건 맨 아래의 이부분을 수정합니다.

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

주석을 제거하고 위와 같이 아까 설치한 connector 파일의 상단 폴더명까지만 적어줍니다.

>> jar 파일이 들어있는 폴더의 상위 <<

이상하게하면 오류가 뜹니다.


저장을 하고, zookeeper , kafka server를 실행시킵니다.

그리고 나서 다음 명령어로 컨넥터를 실행시킵니다

 .\bin\windows\connect-distributed.bat .\config\connect-distributed.properties



해당 포트 : 8083이 정상가동되었는지 확인은 새로운 창을 열어 다음 명령어로 할 수 있습니다

curl -H "Accept:application/json" localhost:8083/connectors/


아무것도 연결하지 않았기 때문에 


만 뜨면 정상입니다.


Connect 연결 콘솔은 그대로 두고

그럼 이제 DB에 연결할 json 파일을 생성하고 해당 설정파일이 있는 connector를 생성해 보겠습니다.


3. Json 파일로 설정값을 넣은 Connector 생성]


원하는 위치에 mongoconnect.json 파일을 생성합니다

생성한 json 파일을 수정합니다.

원본은 다음과 같습니다.



  "name": "inventory-connector",
  "config": {
      "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
      "tasks.max" : "1",
      "mongodb.hosts" : "localhost:27017",
      "mongodb.name" : "dbserver1",
      "mongodb.user" : "debezium",
      "mongodb.password" : "dbz",
      "database.include.list" : "inventory",
      "database.history.kafka.bootstrap.servers" : "여러분의 kafka ip 주소:9092",
      "transforms": "route",
      "transforms.route.type" : "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.route.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
      "transforms.route.replacement" : "$3"

위의 코드를 설명하겠습니다.

최상단의 name -> 해당 connector 의 이름 = 겹치면 안됩니다.

connector.class -> plugin path 에 지정한 파일 중 해당 커넥터를 설치

mongodb.host -> 설치된 mongodb의 ip 주소 

mongodb.user -> 해당 db의 user

mongodb.password -> 해당 db의 user password

database.include.list -> 해당 디비구조 중 inventroy 라는 곳만 받는다는 의미

지정하지 않는다면 전체 db를 기준으로 잡음 exclude와 같이 사용 불가

database.history.kafka.bootstrap.servers -> kafka의 advertised 에 열어둔 ip 주소

transform -> 받은 메시지를 아래와 같은 형식의 데이터 값으로 받겠다는 의미


자세한 내용은 다음 참고



json을 저장하고 새로운 명령어창을 엽니다.


해당 json 파일이 위치한 곳으로 이동한 뒤 다음 명령어로 적용시킵니다.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @mongoconnect.json

※ curl은 별도로 설치해야 할 수 있음.


정상적으로 연결되었는지 확인은 다음 명령어로 확인 가능하다

curl -H "Accept:application/json" localhost:8083/connectors/

정상적으로 등록된 것을 확인할 수 있습니다.

Connector를 끄는 방법은 다음과 같습니다

curl -X DELETE localhost:8083/connectors/debezium-connector

Connect는 zookeeper, kafka 와 동일한 방법으로 끌 수 있습니다.


Connector까지 설치하였습니다. 다음에는 CDC가 동작하는지 확인해보겠습니다.
