O
OpenLog
Search
Log in
Back to Suggestions
New Suggest
Suggest edit for "Kafka poll의 처리 방식"
Suggestion title
Description
</>
Edit
Preview
## Summary ## Reason
</>
Edit
Preview
Kafka 컨슈머에서 poll은 브로커로부터 메시지를 가져오는 메서드다. ```kotlin while (true) { val records = consumer.poll(Duration.ofMillis(1000)) for (record in records) { process(record) } } ``` 여기서 중요한 점은 poll이 메시지를 하나만 가져오는 것이 아니라는 것이다. poll 한 번의 결과는 ConsumerRecords이고, 이 안에는 여러 개의 ConsumerRecord가 들어갈 수 있다. 즉 poll은 하나 처리하고 하나를 새로 가져오는 식이 아니라 한 번 브로커로부터 메시지를 읽어올 때 여러 레코드를 가져오고 애플리케이션이 그 결과를 순회하면서 처리하는 구조가 된다. Spring에서 Kafka를 연동할 때는 보통 `@KafkaListener`를 사용하는데, 이 경우엔 개발자가 직접 poll을 호출하는 코드를 쓰지 않는다. ```kotlin @Component class OrderConsumer { @KafkaListener( topics = ["order-topic"], groupId = "order-group" ) fun listen(message: String) { println("message = $message") } } ``` 개발자는 토픽에 대한 리스너 메서드만 작성하고 실제 내부에서는 Spring Kafka의 Listener Container가 다음 작업을 대신 수행한다. ```text KafkaConsumer 생성 topic 구독 poll() 반복 호출 poll 결과를 listener 메서드에 전달 offset commit 처리 예외 처리 rebalance 대응 ``` `@KafkaListener`는 기본적으로 호출될 때마다 메시지 하나를 받는다. 아까 poll은 한번에 여러 레코드를 가져온다고 하지 않았나? 맞다. 브로커로부터 레코드를 읽어오는 단위는 여러개지만 가져온 레코드들에 대해 순회하며 listener를 호출하기 때문에 리스너 메서드는 한 번에 하나씩 처리하는 것처럼 보이게 된다. 예를 들어 poll 한 번에 메시지 10개가 반환되었다면, 기본 `@KafkaListener`메서드는 10번 호출될 수 있다. 반대로 Batch Listener를 쓴다면 kafka listener가 한번에 여러 메서지 묶음을 받을 수 있다. 그럼 대신 리스너 내부적으로 묶음을 풀어서 처리하는 식의 코드가 필요하게 된다. ```kotlin @Component class OrderBatchConsumer { @KafkaListener( topics = ["order-topic"], groupId = "order-group", containerFactory = "batchKafkaListenerContainerFactory" ) fun listen(messages: List<String>) { messages.forEach { message -> println("message = $message") } } } ```
Files changed
No changes yet.
Cancel
Submit suggestion