2026. 5. 23.·v2·
Kafka listener container의 스레드 모델
Kafka 개념 공부
Spring API 서버는 보통 HTTP 요청당 스레드 하나가 생성되고, 응답이 반환되면 스레드를 반납하는 식으로 동작한다.
이때 Spring Kafka Listener의 경우에는 poll()을 한 번 했을 때 가져오는 모든 레코드에 대한 작업을 하나의 스레드가 맡게 되는 건지, 아니면 poll()로 가져온 레코드를 하나씩 풀어서 받는 listener 기준으로 스레드가 생성되는 건지가 궁금해졌다.
기본적으로 @KafkaListener는 메시지 하나가 들어올 때마다 새로운 스레드를 생성하지 않는다.
Spring Kafka에서는 @KafkaListener가 붙은 메서드를 실행하기 위해 내부적으로 listener container가 만들어진다.
그리고 이 listener container 안에 Kafka consumer thread가 존재한다.
@KafkaListener
↓
Listener Container 생성
↓
Kafka Consumer Thread 실행
↓
poll() 반복 호출poll은 한 번에 여러 record를 브로커로부터 가져오는데 Spring Kafka는 기본 record listener 방식에서 이 record들을 하나씩 꺼내 @KafkaListener메서드에 전달한다.
consumer thread
├─ consume(A)
├─ consume(B)
├─ consume(C)
├─ consume(D)
└─ consume(E)여기서 포인트는 A, B, C, D, E 마다 새로운 스레드가 생기는 것이 아닌 같은 consumer thread에서 listener 메서드를 반복 호출한다는 것이다.
@KafkaListener하나당 생성되는 consumer thread의 개수는 해당 listener container의 concurrency 수와 같다.
예를 들어
@Component
class OrderConsumer {
@KafkaListener(
topics = ["order-topic"],
groupId = "order-group",
concurrency = "3"
)
fun consume(message: String) {
println("${Thread.currentThread().name} - $message")
}
}위 케이스에서는 하나의 @KafkaListener가 있더라도 consumer thread가 3개만큼 생성된다.
@KafkaListener 1개
↓
listener container 1개
↓
consumer thread 3개concurrency가 있으면 리스너 메서드가 코드상으로는 하나여도 여러 consumer thread가 동시에 함수를 호출할 수 있다. 그래서 @KafkaListener내부에서 공유 변수를 사용할 때는 주의해야 한다.
@Component
class OrderConsumer {
private var count = 0
@KafkaListener(
topics = ["order-topic"],
groupId = "order-group",
concurrency = "3"
)
fun consume(message: String) {
count++
}
}위 예시에서는 여러 스레드가 동시에 count++ 를 실행할 수 있기 때문에 thread-safe하지 않게 된다.
Related
Join the thread
Leave feedback, ask for clarification, or keep a focused discussion attached to this article.