-
Spring kafka에서 Multi Type Converter 여행기글또 2023. 9. 25. 22:50
안녕하세요.
이번 글은 spring kafka에 대한 글입니다.
kafka consumer를 구현하다가 이벤트의 페이로드가 제각각인 이벤트를 만나게 되어서 이를 spring consumer에서는 어떻게 받을 수 있나. 또 받을 수 있는 여러 가지 방법들을 소개하고자 합니다.
Intro
kafka는 하나의 토픽 안에 여러 이벤트를 넣을 수 있습니다.
이를 spring kafka consumer에서 받을 때에는 적절하게 구현체를 찾도록 해줘야 합니다.
이를 가능하게 해주는 것이 Deserializer와 messageConverter입니다.
먼저 consumer의 동작을 보면 다음과 같습니다.Consumer의 동작 순서
1. dopoll을 따라가면 deserializer가 실행되고 converter는 invokeIfHaveRecords에서 실행됩니다.
(poll) 컨슈머가 poll을 하면 deserializer가 먼저 역직렬화를 합니다.
(converter) converter는 리스너가 사용할 수 있는 메시지 타입으로 consumer record를 변환한다.
(요약) Deserializer 이후에 messageConverter로 리스너에서 사용가능한 메시지 타입으로 만듭니다.
그래서 이벤트 payload를 Deserializer에서 타입을 변환해 주던가 converter에서 변환해 주던가 2가지 선택지가 있습니다.
1. jsonDeserializer를 사용할 때
2. messageConverter를 사용할 때
로 나눠 설명할 수 있습니다.
1. JsonDeserializer를 사용할 때
JsonDeserializer는 타입 변환을 할 때, 내부의 typeMapper를 사용합니다.
기본 typemapper는 DefaultJackson2JavaTypeMapper 이며 내부에서 __TypeId__라는 헤더를 읽어서 타입 변환을 합니다.
__TypeId__헤더는 token:ClassName 형식의 값을 가지고 있고 spring producer가 JsonSerializer를 사용했다면 DefaultJackson2JavaTypeMapper 에 의해 자동으로 들어가게 됩니다.하지만 다른 프로젝트의 consumer가 이벤트를 받는 경우가 많기에 className은 producer의 className과 consumer의 className이 같을 경우는 적습니다.
그래서 token만 약속하고 className은 각자의 className을 적게 됩니다. token만 약속하는 것이지요.
약속은 properties에 적을 수 있습니다.
producer 프로젝트 spring.kafka.producer.properties.spring.json.type.mapping=CREATE:com.example.kafka.CustomProducer$UserInfo,UPDATE:com.example.kafka.CustomProducer$OtherUserInfo consumer 프로젝트 spring.kafka.consumer.properties.spring.json.type.mapping=CREATE:com.example.kafka.ConsumerController$UserInfo,UPDATE:com.example.kafka.ConsumerController$OtherUserInfo
이 값은 json(De)Serializer의 mapper인 DefaultJackson2JavaTypeMapper가 사용하게 됩니다.
jsonDeserializer를 defaultConsumer에 세팅하는 방법은 properties 또는 코드에서도 할 수 있습니다.code properties 고려할 점은 properties에서 jsonDeserializer를 사용할 때 내부 objectMapper는 bean이 아닌 자체 생성한 것을 사용합니다.
이것 때문에 여러가지 이슈가 발생할 수 있습니다. 예로 jackson.deserialization.fail-on-unknown-propertie는 default true라서 없는 key 값이 들어온다면 에러가 발생합니다.
만약 json이 snake case 등을 사용하고 있었다면 CustomJsonDeserializer를 생성하고 properties에 명시해야 합니다.customDeserializer jackson사용 시 기본 생성자를 생성해야 하는 하는데(기본생성자 생성 후 setter, 없다면 reflection.. allAag생성자는 jsonCreator명시..) kotlin data type은 기본생성자가 없습니다. 이를 해결하는 jackson-module-kotlin라이브러리를 씁니다. 이때 jackson을 사용해서 objectmapper를 만들어줘야 해당 이슈가 생기지 않습니다.
2. stringSerializer를 사용할 때
기본적으로 spring kafka는 string(De)Serializer를 사용합니다.
string은 다 받을 수 있기 때문에 Json(De)Serializer를 사용할 때처럼 ErrorHandlingDeserializer (역직렬화 시 발생하는 오류를 잡기 위함)를 사용할 필요는 없습니다.
이때 구현하는 여러 가지 방법들입니다.
2-1. jsonSubType
먼저 kafka는 기본적으로 string(De)Serializer를 사용합니다.
jsonDeserializer를 사용하지 않고 stringDeserializer을 사용하고 jsonConverter를 사용하는 방법입니다.
jackson에서 제공하는 어노테이션을 사용할 수 있습니다.
user interface messageConverter
이것의 단점은 kafkahandler를 사용할 수 없습니다.
아래와 같이 리스너에 달고 사용할 때 직접 타입캐스팅을 해서 사용해야 합니다.method listener
2-2. multi type message converter 사용
컨슈머는 poll()한 이후에 메시지를 deserializer 하고 리스너가 쓸 수 있는 message타입으로 메시지를 변환합니다.
이때 string을 알맞게 타입변환을 시켜주는 converter를 넣어주면 됩니다.
이 또한 DefaultJackson2JavaTypeMapper를 사용하여 구성합니다. jsonDeserializer는 properties에 정의된 값을 읽어서 typemapper를 구성하는데 예제는 하나하나 명시하도록 구현하였습니다.
ps.
위 방법들 모두 properties에 하나하나 명시하는 것이 불편하니 consumer와 producer는 updateConfig라는 메서드를 제공합니다.
이를 통해 사용자 어노테이션 또는 패키지 레벨을 읽어서 런타임시점에 동적으로 넣어줄 수 있습니다.
글 속의 예제GitHub - ChoiGiSung/spring-kafka-json-deserializer-in-kotlin
Contribute to ChoiGiSung/spring-kafka-json-deserializer-in-kotlin development by creating an account on GitHub.
github.com
ReferenceSpring for Apache Kafka
When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka
docs.spring.io
[Jackson] Subtype 별 Polymorphic De/Serialization 을 제공하는 Deduction
Jackson 은 JVM 진영에서 주로 사용하는 JSON 역/직렬화 라이브러리로 JSON 은 이제 Web 에서 빼놓을수없는 포맷이므로.. 자주 사용하게 된다. 그러나 JSON 은 근본이 동적언어인 js 의 (사실상) 표준 포맷
see-ro-e.tistory.com
GitHub - spring-projects/spring-kafka: Provides Familiar Spring Abstractions for Apache Kafka
Provides Familiar Spring Abstractions for Apache Kafka - GitHub - spring-projects/spring-kafka: Provides Familiar Spring Abstractions for Apache Kafka
github.com
'글또' 카테고리의 다른 글
Spring Async에서 SecurityContextHolder 접근하기 (0) 2023.11.04 spring security white list 어노테이션 만들기 (0) 2023.11.01 인프콘 2023 (0) 2023.08.22 스프링 카프카 리트라이 정책 정하기 (0) 2023.08.08 글또 8기 회고 (0) 2023.07.15