2022. 4. 13. 17:01ㆍ[Kafka]/[Kafka_CDC]
구축 목표]
실시간으로 DB의 데이터가 변할 때 kafka 를 통해 consumer 에게 메세지 보내기.
구현 예상도]
구축 환경
window - Producer ]
- DB : MongoDB
- zookeeper server
- Kafka server
- Kafka connect
- Debezium connector
Linux - virtualBox - Consumer ]
- Kafka server
Zookeeper, Kafka Server 설치는 다음 포스트 참고
https://yn971106.tistory.com/81
Kafka Server를 외부에서 접속하게 하는 방법은 다음 포스트 참고
https://yn971106.tistory.com/82?category=1007788
포트를 열어둔 상태에서 Linux 환경에서는 topic 을 구독하여 읽어오기만 할 것이니 더이상 건들게 없다.
Window 환경에서 Connect 설치부터 시작]
1. Connect 설치 이해]
kafka Connect 위에 Connector를 설치하는것입니다.
Kafka Connect는 Kafka 설치파일 안에 전부 들어 있습니다.
Kafka Connect 를 구동시킬때 plugin path를 지정함으로써 Connector를 적용시킬 수 있습니다.
2. kafka Connector 다운로드]
https://debezium.io/releases/1.8/
여기에서 DB에 맞는 connector .tar.gz 파일 다운로드
다운로드 받은 파일을 kafka 를 설치한 Directory 로 이동 후 압축 해제
PowerShell을 켜서 kafka 가 설치된 config 파일 위치로 이동한다.
여기에서 connect-distributed.properties 를 수정
> notepad .\connect-distributed.properties
중요한 부분만 설명하겠습니다.
// 카프카 서버의 주소ip와 port 기입
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
// 데이터를 저장, 가져올 때 변환하는 정보 기입
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
//offset 커밋 주기 설정
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
위는 건드리지 않아도 됩니다.
중요한건 맨 아래의 이부분을 수정합니다.
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/Users/82105/Desktop/kafka/kafka
주석을 제거하고 위와 같이 아까 설치한 connector 파일의 상단 폴더명까지만 적어줍니다.
>> jar 파일이 들어있는 폴더의 상위 <<
이상하게하면 오류가 뜹니다.
저장을 하고, zookeeper , kafka server를 실행시킵니다.
그리고 나서 다음 명령어로 컨넥터를 실행시킵니다
.\bin\windows\connect-distributed.bat .\config\connect-distributed.properties
해당 포트 : 8083이 정상가동되었는지 확인은 새로운 창을 열어 다음 명령어로 할 수 있습니다
curl -H "Accept:application/json" localhost:8083/connectors/
아무것도 연결하지 않았기 때문에
[]
만 뜨면 정상입니다.
Connect 연결 콘솔은 그대로 두고
그럼 이제 DB에 연결할 json 파일을 생성하고 해당 설정파일이 있는 connector를 생성해 보겠습니다.
3. Json 파일로 설정값을 넣은 Connector 생성]
원하는 위치에 mongoconnect.json 파일을 생성합니다
생성한 json 파일을 수정합니다.
원본은 다음과 같습니다.
https://github.com/debezium/debezium-examples/blob/main/unwrap-mongodb-smt/mongodb-source.json
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"mongodb.hosts" : "localhost:27017",
"mongodb.name" : "dbserver1",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
"database.include.list" : "inventory",
"database.history.kafka.bootstrap.servers" : "여러분의 kafka ip 주소:9092",
"transforms": "route",
"transforms.route.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement" : "$3"
}
}
위의 코드를 설명하겠습니다.
최상단의 name -> 해당 connector 의 이름 = 겹치면 안됩니다.
connector.class -> plugin path 에 지정한 파일 중 해당 커넥터를 설치
mongodb.host -> 설치된 mongodb의 ip 주소
mongodb.user -> 해당 db의 user
mongodb.password -> 해당 db의 user password
database.include.list -> 해당 디비구조 중 inventroy 라는 곳만 받는다는 의미
지정하지 않는다면 전체 db를 기준으로 잡음 exclude와 같이 사용 불가
database.history.kafka.bootstrap.servers -> kafka의 advertised 에 열어둔 ip 주소
transform -> 받은 메시지를 아래와 같은 형식의 데이터 값으로 받겠다는 의미
자세한 내용은 다음 참고
json을 저장하고 새로운 명령어창을 엽니다.
해당 json 파일이 위치한 곳으로 이동한 뒤 다음 명령어로 적용시킵니다.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @mongoconnect.json
※ curl은 별도로 설치해야 할 수 있음.
정상적으로 연결되었는지 확인은 다음 명령어로 확인 가능하다
curl -H "Accept:application/json" localhost:8083/connectors/
정상적으로 등록된 것을 확인할 수 있습니다.
Connector를 끄는 방법은 다음과 같습니다
curl -X DELETE localhost:8083/connectors/debezium-connector
Connect는 zookeeper, kafka 와 동일한 방법으로 끌 수 있습니다.
Connector까지 설치하였습니다. 다음에는 CDC가 동작하는지 확인해보겠습니다.
'[Kafka] > [Kafka_CDC]' 카테고리의 다른 글
[Kafka_CDC]_kafka CDC 기능으로 변경 데이터 - consumer MongoDB 저장(5단계. CDC log DB저장) (0) | 2022.04.17 |
---|---|
[Kafka_CDC]_Kafka CDC 구현_(4단계- CDC 확인) (0) | 2022.04.13 |
[Kafka_CDC]_Kafka CDC 구현_(2단계- tutorial Mongodb 설치) (0) | 2022.04.13 |
[Kafka_CDC]_Kafka CDC 구현_(1단계- 설계) (0) | 2022.04.13 |