O
OpenLog
Search
Log in
Back to Suggestions
New Suggest
Suggest edit for "Kafka listener container의 스레드 모델"
Suggestion title
Description
</>
Edit
Preview
## Summary ## Reason
</>
Edit
Preview
Spring API 서버는 보통 HTTP 요청당 스레드 하나가 생성되고, 응답이 반환되면 스레드를 반납하는 식으로 동작한다. 이때 Spring Kafka Listener의 경우에는 poll()을 한 번 했을 때 가져오는 모든 레코드에 대한 작업을 하나의 스레드가 맡게 되는 건지, 아니면 poll()로 가져온 레코드를 하나씩 풀어서 받는 listener 기준으로 스레드가 생성되는 건지가 궁금해졌다. 기본적으로 @KafkaListener는 메시지 하나가 들어올 때마다 새로운 스레드를 생성하지 않는다. Spring Kafka에서는 @KafkaListener가 붙은 메서드를 실행하기 위해 내부적으로 listener container가 만들어진다. 그리고 이 listener container 안에 Kafka consumer thread가 존재한다. ```text @KafkaListener ↓ Listener Container 생성 ↓ Kafka Consumer Thread 실행 ↓ poll() 반복 호출 ``` poll은 한 번에 여러 record를 브로커로부터 가져오는데 Spring Kafka는 기본 record listener 방식에서 이 record들을 하나씩 꺼내 `@KafkaListener`메서드에 전달한다. ```text consumer thread ├─ consume(A) ├─ consume(B) ├─ consume(C) ├─ consume(D) └─ consume(E) ``` 여기서 포인트는 A, B, C, D, E 마다 새로운 스레드가 생기는 것이 아닌 같은 consumer thread에서 listener 메서드를 반복 호출한다는 것이다. `@KafkaListener`하나당 생성되는 consumer thread의 개수는 해당 listener container의 concurrency 수와 같다. 예를 들어 ```kotlin @Component class OrderConsumer { @KafkaListener( topics = ["order-topic"], groupId = "order-group", concurrency = "3" ) fun consume(message: String) { println("${Thread.currentThread().name} - $message") } } ``` 위 케이스에서는 하나의 `@KafkaListener`가 있더라도 consumer thread가 3개만큼 생성된다. ```text @KafkaListener 1개 ↓ listener container 1개 ↓ consumer thread 3개 ``` concurrency가 있으면 리스너 메서드가 코드상으로는 하나여도 여러 consumer thread가 동시에 함수를 호출할 수 있다. 그래서 `@KafkaListener`내부에서 공유 변수를 사용할 때는 주의해야 한다. ```kotlin @Component class OrderConsumer { private var count = 0 @KafkaListener( topics = ["order-topic"], groupId = "order-group", concurrency = "3" ) fun consume(message: String) { count++ } } ``` 위 예시에서는 여러 스레드가 동시에 count++ 를 실행할 수 있기 때문에 thread-safe하지 않게 된다. ### Related - [[Kafka poll의 처리 방식]]
Files changed
No changes yet.
Cancel
Submit suggestion