1. RabbitMQ 란?
서버간 메세지를 전달해주는 오픈소스 메세지 브로커입니다.
RabbitMQ는 AMQP프로토콜을 구현한 메세지 브로커입니다.
AMQP란?
AMQP란 Advanced Message Queuing Protocol로 client application과 middleware broker와의 메세지를 주고 받기 위한 프로토콜이다.
2. 구조
1) Producer
메시지를 보낼 때 RabbitMQ에 직접 연결하고 메시지를 publish(발행)합니다.
2) Exchanges
Exchange는 받은 메시지를 하나 이상의 Queue에 라우팅(전달)합니다.
라우팅 방식은 Exchange의 타입에 따라 다릅니다.
- Direct Exchange: Routing Key가 특정 Queue의 Binding Key와 정확히 일치할 때 메시지를 해당 Queue로 라우팅합니다.
- Topic Exchange: Routing Key와 Binding Key 사이에 와일드카드 매칭을 허용하여 메시지를 라우팅합니다.
- Fanout Exchange: Binding Key를 무시하고 모든 연결된 Queue에 메시지를 브로드캐스트합니다.
- Headers Exchange: Routing Key를 무시하고 메시지의 헤더 속성을 기반으로 메시지를 라우팅합니다.
3) Bindings
Binding은 Exchange와 Queue 사이의 연결을 정의합니다.
Binding을 통해 Exchange는 어떤 Queue에 메시지를 전달할지 결정합니다.
4) Queues
Queue는 메시지를 저장하는 버퍼입니다. 각 Queue는 메시지를 받아서 저장하고, Consumer가 처리할 준비가 될 때까지 메시지를 보관합니다.
5) Consumer
Consumer는 Queue에 연결되어 있으며, 메시지가 Queue에 도착하면 그 메시지를 가져와서 처리합니다.
3. 언제 쓰는건데?
사실 이 글을 쓰는 주요 목적입니다.
예시가 있다면 이해가 빠르죠!
1) 서비스 설계 상황
- MSA 서비스(계정, 쿠폰, 알림)로 구성되어 있습니다.
2) 비즈니스 요구사항
- 회원가입을 하면 가입 환영 쿠폰을 발급해줍니다.
- 회원가입을 하면 가입 인사 이메일을 전송합니다.
3) 해결방법
3-1) message Queue를 쓰지 않는다면?
가입 시 자동으로 이벤트 참여를 시키려면 회원가입(signUp)함수에 지속적으로 추가해야합니다.
이렇게 되면 회원가입이라는 함수는 많은 일을 하게되고 비즈니스 복잡도가 증가하게 됩니다.
물론 이 방법으로도 비즈니스 요구사항을 해결할 수 있습니다.
// 계정서버
fun signUp(userSignUpRequest: UserSignUpRequest){
...
val createdUserEntity = userRepository.save(userEntity)
couponClient.issueSignUpCoupon(userId = createdUserEntity.id)
notificationClient.notifySignUpEmail(userId = createdUserEntity.id)
}
3-2) message Queue를 쓴다면?
Producer(계정)서버에서 가입 이벤트를 발행하고, 가입한 행위를 필요로하는 Consumer(쿠폰, 알림)들이 메시지를 받아갑니다.
이렇게 작성하면 회원가입 함수는 회원가입을 하고 가입 이벤트를 발행하는 일만 할 수 있습니다.
회원가입 했을 때 추가적인 비즈니스 요구사항이 생기면 계정서버의 작업은 필요 없이, 메시지를 받아서 작업하면 됩니다.
이때는 Direct를 쓰는게 명확하다고 생각합니다
// 계정서버
fun signUp(userSignUpRequest: UserSignUpRequest){
...
val createdUserEntity = userRepository.save(userEntity)
userSignUpEventProducer.publish(
event = userEventFactory.userSignUpEvent(userEntity = createdUserEntity),
)
}
// 쿠폰 서버
fun receiveUserSignUpMessage(payload: Long, message: Message): Any? {
issueSignUpCoupon(userId = payload)
}
fun issueSignUpCoupon(userId: Long){
...
}
// 알림 서버
fun receiveUserSignUpMessage(payload: Long, message: Message): Any? {
notifySignUpEmail(userId = payload)
}
fun notifySignUpEmail(userId: Long){
...
}
4. 메시지 유실에 대해
1) 사례
이 챕터는 제가 일을 하던중 겪은 이슈를 정리하기위해 추가했습니다.
메시지를 수신하여 외부 서비스 api 호출을 하는 작업을 했는데, 인증 만료로 401 ERROR가 발생했습니다.
지속적으로 외부 서비스에 401 ERROR를 발생시켰고 뒤늦게 발견하게 됐습니다.
2) 분석
제 로직은 Exception을 던졌고, messageQueue에도 제대로 성공했다는 메시지를 보내지 못했습니다.
rabbitMQ는 성공 메시지를 받지 못해서 메시지를 유실시키지 않게 하기위해 실패한 메시지를 다시 큐에 쌓아둡니다.
이 상황이 계속 반복되어 실패하는 메시지를 통해 외부 서비스를 계속 호출하는 이슈를 만났습니다.
Consumer가 메시지를 성공적으로 처리했음을 RabbitMQ에 알리는 것은 중요합니다.
이것이 "acknowledgment"이며, 이를 통해 RabbitMQ는 메시지를 큐에서 안전하게 제거할 수 있습니다.
만약 이 ack가 전송되지 않으면, RabbitMQ는 메시지가 처리되지 않았다고 가정하고 다른 Consumer에게 메시지를 다시 전송할 것입니다.
'개발' 카테고리의 다른 글
pytorch one hot encoding (0) | 2021.09.15 |
---|