-
2023.09.23 기록장TIL 2023. 9. 22. 23:15
스프링 카프카를 사용하면서 컨슈머를 만들어야 할 때 새로 생성하는 것이 아닌 디폴트 컨슈머를 사용하는 것이 기본 설정을 확장하는 것이 안전하다고 생각했습니다. 자동 세팅을 모두 재정의하는 것보다.
그래서 디폴트 컨슈머 재정의는 싫으니 properties에서 deserializer만 재정의 해줄 수 있습니다.
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer #spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=com.example.demo.CustomDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class에 jsondeserializer를 재정의해줍니다.
여기서의 단점은 jsondeserializer가 사용하는 objectmapper를 커스텀할 수 없다는 것입니다.
jsondeserializer는 자체 생성한 objectmapper를 사용하기 때문입니다.
저는 recode의 key를 snake case를 사용하고 있어서 이를 변환해 주는 작업을 object mapper가 해줘야 합니다.
하지만 재정의가 불가능하니 오류가 날 뿐입니다.
그래서 jsondeserializer를 커스텀하고 이를 넣어줍니다.
기본 Jackson에서 제공하는 빌더를 통해 기본설정이 있는 objectmapper를 생성합니다.
단순히 objectmapper를 생성하면 json 직렬화/역직렬화시기본생성자가 있어야 하는 오류를 만날 수 있습니다.
++
coustomizer로는 불가능하지만 빈으로 받아서 set하는 것은 가능합니다.
기본 설정도 가져가고 이게 가장 베스트인 듯 합니다.
--------------------
이 모든 행위는 카프카 컨슈머에서 값을 멀티 타입으로 받기 위함의 초석입니다.
위처럼 재정의하지 않고 컨버터를 사용하는 방법도 있습니다.
kafka에서 deserializer와 컨버터의 기능은 무엇일까
컨슈머가 poll을 하면 deserializer가 먼저 역직렬화를 한다.
dopoll을 따라가면 deserializer가 실행되고 컨버터는 invokeIfHaveRecords에서 실행된다.
컨버터는 리스너가 사용할 수 있는 메시지 타입으로 컨슈머 레코드를 변환한다.
여기까지 온 이유는 카프카 컨슈머가 멀티타입을 어떻게 역직렬화를 하나를 보고자 함이었다.
이를 구현하는 방법은 2가지가 있는데.
1. 먼저 spring 예제에서 처럼 멀티 타입 컨버터를 사용할 수 있다. 이는 typeMapper에 mapping값을 보고 message로 만드는 것이다. 위에 스샷을 보면 extractAndConvertValue에서 jsonMessageConverter가 실행된다.
2. properties만 사용하기
properties에 타입을 명시하고 jsonSerialsizer를 사용한다면 serialsizer가 안에서 변환을 한다.
spring.kafka.consumer.properties.spring.json.type.mapping=CREATE:com.example.Sample
'TIL' 카테고리의 다른 글
2023.10.15 기록장 (0) 2023.10.15 2023.09.27 기록장 (0) 2023.09.27 2023.09.21 기록장 (0) 2023.09.21 2023.09.20 기록장 (0) 2023.09.20 2023.09.02 기록장 (0) 2023.09.02