[Kafka_CDC]_Kafka CDC 구현_(3단계- Connector설치)

2022. 4. 13. 17:01[Kafka]/[Kafka_CDC]

728x90
반응형

구축 목표]

실시간으로 DB의 데이터가 변할 때 kafka 를 통해 consumer 에게 메세지 보내기.

 

구현 예상도]

구축 환경

window - Producer ] 

 

    - DB : MongoDB

    - zookeeper server

    - Kafka server

    - Kafka connect

    - Debezium connector

 

Linux - virtualBox - Consumer ]

    - Kafka server

 

Zookeeper, Kafka Server 설치는 다음 포스트 참고

https://yn971106.tistory.com/81

 

[Kafka]_설치 와 메세지 전송 테스트

개발환경] 서버 : wsl2로 설치한 Centos * virtual box 로 Linux 설치하여도 무방함 zookeeper , kafka , producer , consumer 전부 하나의 centos 에서 실행함 wsl2 로 window 환경에서 설치하는 방법은 다음 포..

yn971106.tistory.com

Kafka Server를 외부에서 접속하게 하는 방법은 다음 포스트 참고

https://yn971106.tistory.com/82?category=1007788 

 

[Kafka]_실전 응용 환경 적용_(kafka 1대, consumer,producer 분리)

개발환경] 서버 : wsl2로 설치한 Centos , Window * virtual box 로 Linux 설치하여도 무방함 Zookeeper 실행 -> Centos 서버 Kafka 서버 실행 -> Centos 서버 Consumer, Producer -> window 서버  구상도] 위의..

yn971106.tistory.com

 

포트를 열어둔 상태에서 Linux 환경에서는 topic 을 구독하여 읽어오기만 할 것이니 더이상 건들게 없다.

 

Window 환경에서 Connect 설치부터 시작]

 

1. Connect 설치 이해]

 

kafka Connect 위에 Connector를 설치하는것입니다.

Kafka Connect는 Kafka 설치파일 안에 전부 들어 있습니다.

Kafka Connect 를 구동시킬때 plugin path를 지정함으로써 Connector를 적용시킬 수 있습니다.

 

2. kafka Connector 다운로드]

https://debezium.io/releases/1.8/

 

Debezium Release Series 1.8

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable a

debezium.io

여기에서 DB에 맞는 connector .tar.gz 파일 다운로드

 

다운로드 받은 파일을 kafka 를 설치한 Directory 로 이동 후 압축 해제

PowerShell을 켜서 kafka 가 설치된 config 파일 위치로 이동한다.

여기에서 connect-distributed.properties 를 수정

> notepad .\connect-distributed.properties

중요한 부분만 설명하겠습니다.

// 카프카 서버의 주소ip와 port 기입
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
// 데이터를 저장, 가져올 때 변환하는 정보 기입
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
//offset 커밋 주기 설정
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

위는 건드리지 않아도 됩니다. 

중요한건 맨 아래의 이부분을 수정합니다.

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/Users/82105/Desktop/kafka/kafka

주석을 제거하고 위와 같이 아까 설치한 connector 파일의 상단 폴더명까지만 적어줍니다.

>> jar 파일이 들어있는 폴더의 상위 <<

이상하게하면 오류가 뜹니다.

 

저장을 하고, zookeeper , kafka server를 실행시킵니다.

그리고 나서 다음 명령어로 컨넥터를 실행시킵니다

 .\bin\windows\connect-distributed.bat .\config\connect-distributed.properties

 

 

해당 포트 : 8083이 정상가동되었는지 확인은 새로운 창을 열어 다음 명령어로 할 수 있습니다

curl -H "Accept:application/json" localhost:8083/connectors/

 

아무것도 연결하지 않았기 때문에 

[]

만 뜨면 정상입니다.

 

Connect 연결 콘솔은 그대로 두고

그럼 이제 DB에 연결할 json 파일을 생성하고 해당 설정파일이 있는 connector를 생성해 보겠습니다.

 

3. Json 파일로 설정값을 넣은 Connector 생성]

 

원하는 위치에 mongoconnect.json 파일을 생성합니다

생성한 json 파일을 수정합니다.

원본은 다음과 같습니다.

https://github.com/debezium/debezium-examples/blob/main/unwrap-mongodb-smt/mongodb-source.json

 

GitHub - debezium/debezium-examples: Examples for running Debezium (Configuration, Docker Compose files etc.)

Examples for running Debezium (Configuration, Docker Compose files etc.) - GitHub - debezium/debezium-examples: Examples for running Debezium (Configuration, Docker Compose files etc.)

github.com

{
  "name": "inventory-connector",
  "config": {
      "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
      "tasks.max" : "1",
      "mongodb.hosts" : "localhost:27017",
      "mongodb.name" : "dbserver1",
      "mongodb.user" : "debezium",
      "mongodb.password" : "dbz",
      "database.include.list" : "inventory",
      "database.history.kafka.bootstrap.servers" : "여러분의 kafka ip 주소:9092",
      "transforms": "route",
      "transforms.route.type" : "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.route.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
      "transforms.route.replacement" : "$3"
  }
}

위의 코드를 설명하겠습니다.

최상단의 name -> 해당 connector 의 이름 = 겹치면 안됩니다.

connector.class -> plugin path 에 지정한 파일 중 해당 커넥터를 설치

mongodb.host -> 설치된 mongodb의 ip 주소 

mongodb.user -> 해당 db의 user

mongodb.password -> 해당 db의 user password

database.include.list -> 해당 디비구조 중 inventroy 라는 곳만 받는다는 의미

지정하지 않는다면 전체 db를 기준으로 잡음 exclude와 같이 사용 불가

database.history.kafka.bootstrap.servers -> kafka의 advertised 에 열어둔 ip 주소

transform -> 받은 메시지를 아래와 같은 형식의 데이터 값으로 받겠다는 의미

 

자세한 내용은 다음 참고

https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-connector-properties

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

json을 저장하고 새로운 명령어창을 엽니다.

 

해당 json 파일이 위치한 곳으로 이동한 뒤 다음 명령어로 적용시킵니다.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @mongoconnect.json

※ curl은 별도로 설치해야 할 수 있음.

 

정상적으로 연결되었는지 확인은 다음 명령어로 확인 가능하다

curl -H "Accept:application/json" localhost:8083/connectors/

정상적으로 등록된 것을 확인할 수 있습니다.

Connector를 끄는 방법은 다음과 같습니다

curl -X DELETE localhost:8083/connectors/debezium-connector

Connect는 zookeeper, kafka 와 동일한 방법으로 끌 수 있습니다.

 

Connector까지 설치하였습니다. 다음에는 CDC가 동작하는지 확인해보겠습니다.

728x90
반응형