[Kafka_CDC]_kafka CDC 기능으로 변경 데이터 - consumer MongoDB 저장(5단계. CDC log DB저장)

2022. 4. 17. 15:39[Kafka]/[Kafka_CDC]

728x90
반응형

구축 목표]

실시간으로 Source DB의 데이터가 변할 때 Debezium Kafka Connecter 를 통해 consumer 에게 메세지 보내고 해당 데이터 ( Json 형식의 string type) 을 paylaod 부분 추출하여 MongoDB에 저장.

 

주요 과제]

kafka connector 에서 제공되는 log는 json 형식의 굉장히 긴 String 값입니다.

debezium document에서 제공되는 key를 접근하는 방법 탐구, 및 디비 저장

 

구현 예상도]

구축 환경

window - Producer ] 

 

    - DB : MongoDB

    - zookeeper server

    - Kafka server

    - Kafka connect

    - Debezium connector

 

window- Consumer ]

    - Kafka server

    - DB: MongoDB

    - Kafka consumer

    - Springboot

 

기본구축환경은 지난 포스트 1,2,3,4 단계 참고

https://yn971106.tistory.com/87

 

[Kafka_CDC]_Kafka CDC 구현_(4단계- CDC 확인)

구축 목표] 실시간으로 DB의 데이터가 변할 때 kafka 를 통해 consumer 에게 메세지 보내기. 구현 예상도] 구축 환경 window - Producer ] - DB : MongoDB - zookeeper server - Kafka server - Kafka connect -..

yn971106.tistory.com

-> 4단계 완료시

kafka server, zookeeper server , Mongodb 설치 , consuemr, producer 연결테스트 완료

connector 설치 완료후 메시지 전송기능이 구축 되어 있어야 함.

 

----

Spring boot 로 Mongo DB 연결 및 데이터 삽입은 다음 포스트 참고

https://yn971106.tistory.com/89

 

[MongoDB]_Springboot 에 mongoDB연결 및 데이터 입력(_insert)

목표] Springboot 에 Mongodb 연결후 데이터 삽입 확인 Tool : Intellij DB: MongoDB 빌더 : Maven --- 1. Springboot project 생성 next 다음에는 3가지 Dependency 추가한다 pom.xml 에 다음 Dependency 추가..

yn971106.tistory.com

Spring boot 로 Kafka 연결은 다음 포스트 참고

https://yn971106.tistory.com/90

 

[Kafka]_Springboot에 Kafka 연결하기(feat.consumer)

목표] 1. Springboot 에 kafka 연결 후 consumer 기능 구현하기. 툴: IntelliJ 환경 : Springboot 빌더 : maven --- 1. Springboot 프로젝트를 생성한다] maven 으로 선택해야한다. 2. pom.xml에 dependency 추가..

yn971106.tistory.com

 

--

소스코드 ] 

@SpringBootApplication
public class MongoinsertApplication {

    @Autowired
    private BookRepository repository;



    @KafkaListener(topics = "book", groupId = "springtest")
    public void consume(String data) throws Exception{

        System.out.println("data = " + data);
        // Set<String> keySet = data.keySet();
        // Json 객체로 보이는 것을 변환하기.
        //1단계 String을 json으로 변환
        JSONObject json = new JSONObject(data);

        //엄청 긴 줄에서 key = payload를 가지는 jsonObject로 나눔.
        JSONObject payload = json.getJSONObject("payload");


        // 나눠진 json에서 op 라는 key를 가진 값을 받아옴
        String changestate = payload.get("op").toString();

        System.out.println("changestate = " + changestate);

        String after = payload.get("after").toString();


        System.out.println("after = " + after);


        //string 을 맵으로 변경해서 key 와 value 를 찾기
        Map<String, Object> res = new ObjectMapper().readValue(after, HashMap.class);
        String dbdata = res.get("data").toString();
        String dbid = res.get("id").toString();
        String time = res.get("time").toString();




        System.out.println("tttt = "+ dbdata+dbid+ time);


        switch (changestate){
            case "c":
                System.out.println("생성");
                Book book = new Book();
                book.setId(dbid);
                book.setData(dbdata);
                book.setTime(time);
                repository.insert(book);

                break;
            case "r":
                System.out.println("조회");
                break;
            case "u":
                System.out.println("업데이트");
                Book book2 = new Book();
                book2.setId(dbid);
                book2.setData(dbdata);
                book2.setTime(time);
                repository.insert(book2);

                break;
            case "d":
                System.out.println("삭제");
                break;

        }



    }

    public static void main(String[] args){
        SpringApplication.run(MongoinsertApplication.class, args);
    }

    @Bean
    public ApplicationRunner applicationRunner(){
        return args -> {
            


        };
    }
}

 

KafkaListener 가 가져온 String 데이터는 한줄로 출력이 되는데 양식은 풀어보면 다음과 같다

{
	"schema": {
			"type" : "struct" ,
			"fields" :[ 
							{
							"type":"string",
							"optional":true,
							"name":"io.debezium.data.Json",
							"version":1,
							"field":"after"
							},
							{"type":"string",
							"optional":true,
							"name":"io.debezium.data.Json",
							"version":1,
							"field":"patch"
							},
							{"type":"string",
							"optional":true,
							"name":"io.debezium.data.Json",
							"version":1,
							"field":"filter"
							},
							{"type":"struct",
							"fields":[
								{
								"type":"array",
								"items":{
									"type":"string",
									"optional":false
									},
								"optional":true,
								"field":"removedFields"
								},
								{
								"type":"string",
								"optional":true,
								"name":"io.debezium.data.Json",
								"version":1,
								"field":"updatedFields"
								},
								{
								"type":"array",
								"items":{
									"type":"struct",
									"fields":[
										{
										"type":"string",
										"optional":false,
										"field":"field"
										},
										{
										"type":"int32",
										"optional":false,
										"field":"size"
										}
										],
									"optional":false,
									"name":"io.debezium.connector.mongodb.changestream.truncatedarray"
									},
								"optional":true,
								"field":"truncatedArrays"
								}
								],
							"optional":true,
							"name":"io.debezium.connector.mongodb.changestream.updatedescription",
							"field":"updateDescription"
							},
							{
							"type":"struct",
							"fields":[
								{
								"type":"string",
								"optional":false,
								"field":"version"
								},
								{
								"type":"string",
								"optional":false,
								"field":"connector"
								},
								{
								"type":"string",
								"optional":false,
								"field":"name"
								},
								{
								"type":"int64",
								"optional":false,
								"field":"ts_ms"
								},
								{
								"type":"string",
								"optional":true,
								"name":"io.debezium.data.Enum",
								"version":1,
								"parameters":{
										"allowed":"true,last,false,incremental"
										},
										"default":"false","field":"snapshot"
								},
								{
								"type":"string",
								"optional":false,
								"field":"db"
								},
								{
								"type":"string",
								"optional":true,
								"field":"sequence"
								},
								{
								"type":"string",
								"optional":false,
								"field":"rs"
								},
								{
								"type":"string",
								"optional":false,
								"field":"collection"
								},
								{
								"type":"int32",
								"optional":false,
								"field":"ord"
								},
								{
								"type":"int64",
								"optional":true,
								"field":"h"
								},
								{
								"type":"int64",
								"optional":true,
								"field":"tord"
								},
								{
								"type":"string",
								"optional":true,
								"field":"stxnid"
								},
								{
								"type":"string",
								"optional":true,
								"field":"lsid"
								},
								{
								"type":"int64",
								"optional":true,
								"field":"txnNumber"
								}
								],
							"optional":false,
							"name":"io.debezium.connector.mongo.Source",
							"field":"source"
							},
							{
							"type":"string",
							"optional":true,
							"field":"op"
							},
							{
							"type":"int64",
							"optional":true,
							"field":"ts_ms"
							},
							{
							"type":"struct",
							"fields":[
										{
										"type":"string",
										"optional":false,
										"field":"id"
										},
										{
										"type":"int64",
										"optional":false,
										"field":"total_order"
										},
										{
										"type":"int64",
										"optional":false,
										"field":"data_collection_order"
										}
									],
							"optional":true,
							"field":"transaction"
							}
					],
			"optional":false,
			"name":"dbserver1.inventory.book.Envelope"
			},
	"payload":{
			"after":"{\"_id\": {\"$oid\": \"6257d0ba46a64f552c136e7d\"},\"data\": \"ffff\",\"id\": 10,\"time\": \"2022-04-14 16:43:54\"}",
			"patch":null,
			"filter":null,
			"updateDescription":{
					"removedFields":null,
					"updatedFields":"{\"data\": \"ffff\"}",
					"truncatedArrays":null
					},
			"source":{
					"version":"1.8.1.Final",
					"connector":"mongodb",
					"name":"dbserver1",
					"ts_ms":1649994836000,
					"snapshot":"false",
					"db":"inventory",
					"sequence":null,
					"rs":"rs0",
					"collection":"book",
					"ord":1,
					"h":null,
					"tord":null,
					"stxnid":null,
					"lsid":null,
					"txnNumber":null
					},
			"op": "u",
			"ts_ms":1649994836055,
			"transaction":null
			}
}

 

해당 로그에서 중요한 부분은 payload 의 부분이다.

debezium 문서 참고

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

코드 설명]

JSONObject json = new JSONObject(data);

//엄청 긴 줄에서 key = payload를 가지는 jsonObject로 나눔.
JSONObject payload = json.getJSONObject("payload");

json 형식의 String 을 key 와 value로 구분짓는 jsonObject로 전환 시키고

해당 jsonObject 에서 payload 의 Json 객체를 가져온다

 

// 나눠진 json에서 op 라는 key를 가진 값을 받아옴
String changestate = payload.get("op").toString();

그 객체를 key값으로 참조하여 안의 "op" 부분을 가져온다.

 

String after = payload.get("after").toString();
//string 을 맵으로 변경해서 key 와 value 를 찾기
Map<String, Object> res = new ObjectMapper().readValue(after, HashMap.class);
String dbdata = res.get("data").toString();
String dbid = res.get("id").toString();
String time = res.get("time").toString();

값의 변경은 payload Json 객체의 "after" 키를 가지는 값에 저장된다.

해당 키의 값은 Map 형식으로 되어 있다.

String 을 Map 형태로 풀어주고, 해당 key 에 맞는 값을 각각의 string 변수에 저장한다.

 

switch (changestate){
    case "c":
        System.out.println("생성");
        Book book = new Book();
        book.setId(dbid);
        book.setData(dbdata);
        book.setTime(time);
        repository.insert(book);

        break;
    case "r":
        System.out.println("조회");
        break;
    case "u":
        System.out.println("업데이트");
        Book book2 = new Book();
        book2.setId(dbid);
        book2.setData(dbdata);
        book2.setTime(time);
        repository.insert(book2);

        break;
    case "d":
        System.out.println("삭제");
        break;

}

"op" 부분은 CRUD 의 소문자 형태로 전달된다.

Switch case 문을 통하여 각각의 상태에 대해 핸들링한다.

위에서는 삽입과 업데이트에 대한 변경에만 값을 insert 하도록 하였다.

 

Mongodb model을 참조하여 값을 저장하고 이를 insert 한다.

 

값 확인]

 

감사합니다.

728x90
반응형