Back to feed
ASH avatar
ASH

2026. 5. 23.·v1·

Kafka poll의 처리 방식

Kafka 개념 공부

kafka

Kafka 컨슈머에서 poll은 브로커로부터 메시지를 가져오는 메서드다.

kotlin
while (true) {
    val records = consumer.poll(Duration.ofMillis(1000))

    for (record in records) {
        process(record)
    }
}

여기서 중요한 점은 poll이 메시지를 하나만 가져오는 것이 아니라는 것이다.
poll 한 번의 결과는 ConsumerRecords이고, 이 안에는 여러 개의 ConsumerRecord가 들어갈 수 있다. 즉 poll은 하나 처리하고 하나를 새로 가져오는 식이 아니라 한 번 브로커로부터 메시지를 읽어올 때 여러 레코드를 가져오고 애플리케이션이 그 결과를 순회하면서 처리하는 구조가 된다.

Spring에서 Kafka를 연동할 때는 보통 @KafkaListener를 사용하는데, 이 경우엔 개발자가 직접 poll을 호출하는 코드를 쓰지 않는다.

kotlin
@Component
class OrderConsumer {

    @KafkaListener(
        topics = ["order-topic"],
        groupId = "order-group"
    )
    fun listen(message: String) {
        println("message = $message")
    }
}

개발자는 토픽에 대한 리스너 메서드만 작성하고 실제 내부에서는 Spring Kafka의 Listener Container가 다음 작업을 대신 수행한다.

sql
KafkaConsumer 생성
topic 구독
poll() 반복 호출
poll 결과를 listener 메서드에 전달
offset commit 처리
예외 처리
rebalance 대응

@KafkaListener는 기본적으로 호출될 때마다 메시지 하나를 받는다.
아까 poll은 한번에 여러 레코드를 가져온다고 하지 않았나? 맞다.
브로커로부터 레코드를 읽어오는 단위는 여러개지만 가져온 레코드들에 대해 순회하며 listener를 호출하기 때문에 리스너 메서드는 한 번에 하나씩 처리하는 것처럼 보이게 된다.

예를 들어 poll 한 번에 메시지 10개가 반환되었다면, 기본 @KafkaListener메서드는 10번 호출될 수 있다.

반대로 Batch Listener를 쓴다면 kafka listener가 한번에 여러 메서지 묶음을 받을 수 있다. 그럼 대신 리스너 내부적으로 묶음을 풀어서 처리하는 식의 코드가 필요하게 된다.

kotlin
@Component
class OrderBatchConsumer {

    @KafkaListener(
        topics = ["order-topic"],
        groupId = "order-group",
        containerFactory = "batchKafkaListenerContainerFactory"
    )
    fun listen(messages: List<String>) {
        messages.forEach { message ->
            println("message = $message")
        }
    }
}
0
Comments

Join the thread

Leave feedback, ask for clarification, or keep a focused discussion attached to this article.

0 comments
No comments yet. Start the first thread for this article.
Current user avatar
Styling with Markdown is supported