RabbitMQ feat Nodejs
들어가면서
이번에 개발하고 있는 토이프로젝트를 점차 MSA 기반으로 만들려고 하다보니, 필요한 부분을 PoC 작업을 수행하고 있다. 그러다보니, 청약정보를 받고, 이를 메일 서비스에서 일일이 받는것보다 MQ서버에서 올려 놓으면 땡겨오는 형식으로 서비스를 만들면, 좀더 서비스간 유기적인 연계작업을 할수 있을 것 같아서 알아보았다.
참고자료
https://musclebear.tistory.com/139 https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/ https://semtax.tistory.com/20
RabbitMQ란?
메시지 브로커 서비스로, 다수의 사용자가 브로커에 메시지를 전달하면, 브로커가 메시지를 받아서 보관하고 있다가 해당 메시지를 받으려는 사람이 보관한 메시지를 꺼내는 형태의 서비스이다. (ex 우체통)
메시지 브로커는 큐기반의 자료구조를 기반으로 하고있다. 사실 kafka, activemq 등과같은 큐도 있고 심지어는 redis도 메시지큐로 사용가능하다.
AMQP
클라이언트가 메시지 미들웨어 브로커와 통신할 수 있게 해주는 메세징 프로토콜이다. 메시지를 발행하려는 Producers에서 Broker의 Exchange로 메시지를 전달하면, Binding 이라는 규칙에 의해 연결된 Queue로 메시지가 복사된다.
Broker
Producers -> [Exchange -- Binding --> Queue] -> Consumers
메시지를 받아가는 Consumer에서는 브로커의 Queue를 통해 메시지를 받아 처리한다.
AMQP에는 네트워크에 문제가 있거나, 메시지를 처리하지 못하는 경우를 대비해서 2가지 수신 확인 모델을 갖춘다.
- Consumer는 메시지를 받으면 명시적으로 broker에게 통지하고, 브로커는 이 알림을 받을때만 Queue에서 메시지를 삭제한다.
- Broker가 메시지를 전달하면 자동으로 삭제하는 방식이다.
Exchange와 Queue를 연결하는 Bindings
모든 메시지는 Queue로 직접 전달되지 않고, 반드시 Exchange에서 먼저 받는다. Exchange Type과 Binding 규칙에 따라 적절한 Queue로 전달
Exchange 종류
- Direct 메시지에 포함된 routing key를 기반으로 Queue에 메시지를 전달한다.
- Fanout routing key 관계없이 연결된 모든 Queue에 동일한 메시지를 전달한다.
- Topic routing key 전체가 일치하거나 일부 패턴과 일치하는 모든 Queue로 메시지가 전달된다. pub/sub 패턴 구현에 사용
- Headers binding key는 무시되고, 헤더값이 바인딩 시지정된 값과 같은 경우에만 일치하는 것으로 간주한다.
Durability
브로커가 재시작 될 때 남아 있는지 여부
- durable
- trasient
Auto-delete
마지막 Queue 연결이 해제되면 삭제
Bindings
생성된 Exchange에는 전달 받은 메시지를 원하는 Queue로 전달하기 위해 Bindings라는 규칙을 정의할 수 있다.
메시지를 보관하는 Queue
Consumer는 큐를 통해 메시지를 가져간다.
- Name queue의 이름
- Durablility durable은 브로커가 재시작되어도 디스크에 저장되어 남고, transient는 재시작시 사라진다.
- Auto delete 마지막 Consumer가 구독을 끝내면 자동 삭제
- Arguments 메시지 TTL, Max Length 같은 추가 기능을 명시한다.
하나의 연결을 공유하는 Channels
Consumer에서 Broker로 많은 연결을 맺는 것은 바람직하지 않다. RabbitMQ는 Channel이라는 개념을 통해 하나의 TCP 연결을 공유해서 사용할 수 있는 기능을 제공한다. 멀티스레드, 프로세스 사용 작업시엔 별도의 Channel을 열고 사용하는 것이 바람직하다.
RabbitMQ GCP에서 호스팅하기
우선, GCP상에서 VM 인스턴스를 만들고 docker를 설치했다. 이후엔, docker를 서비스에 올린 후 아래 명령어를 쳤다.
docker run -d --name rabbitmq -p 5672:5672 -p 8080:15672 --restart=unless-stopped rabbitmq:management
docker 옵션 정보
-d : 백그라운드로 실행 --name rabbitmq : 해당 컨테이너 이름을 rabbitmq로 실행 -p 5672:5672 -p 15672:15672 : HOST와 컨테이너간 포트 포워딩 (5672: rabbitmq 기본통신포트, 15672: rabbitmq-server의 통신포트 (이외에 클러스터 구성이 필요하다면 25672도 필요)) --restart=unless-stopped : 해당 컨테이너를 사용자가 멈추기 전까지 계속 재부팅 rabbitmq:management : rabbitmq이미지 중 management기능이 있는 rabbitmq-server까지 포함된 이미지를 실행
출처: https://musclebear.tistory.com/139 [곰돌이의 데이터 기록부]
여기까지하면, 8080포트로 외부에서 접속하면 다음과 같은 화면이 나온다.
Username, password는 모두 guest로 접속하면 된다.
여기까지 진행하면 MQ서버는 정상적으로 띄운것이다.
Nodejs로 RabbitMQ 이용하기
Nodejs에서도 RabbitMQ를 사용할 수 있는데, amqplib이 바로 그것이다. 사용법은 간단하다. 채널을 만들고 큐로 보내면 된다. 아래는 json 형식으로 큐에 정보를 등록하는 예제다.
Sender
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://외부서버주소ip',(err,connection)=>{
if(err){
throw err;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
var msg = {testVal1:1111, testVal2:2222, testVal3:3333}
channel.sendToQueue(queue, Buffer.from(JSON.stringify(msg)));
console.log(" [x] Sent %s", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
Queue에 등록 확인
Receiver
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://외부서버주소ip', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
var result = JSON.parse(msg.content.toString());
console.log(result.testVal1, result.testVal2, result.testVal3);
channel.ack(msg);
}, {
//noAck: true 이면 queue에서 데이터를 가져간다음 Ack를 바로 반환함으로 가져가자마자 queue에서 지워버림
noAck: false
});
});
});
Consumer가 받고 나선 Queue에 등록된 정보는 삭제된다.
참고자료 https://skarlsla.tistory.com/13
메시지 수신 자동 확인(ack , noAck)
noAck : true
사용하면 RabbitMQ가 고객에게 메시지를 전달하면 바로 삭제 표시 이 경우 작업자를 죽이면 처리중인 메시지가 손실 이 특정 작업자에게 발송되었지만 아직 처리되지 않은 모든 메시지도 손실
noAck : false
ack를 전송하지 않고 소비자가 죽거나 (채널이 닫히거나 연결이 끊어 지거나 TCP 연결이 끊어지는 경우),
RabbitMQ는 메시지가 완전히 처리되지 않았 음을 인식하고 다시 대기, 가끔씩 사망하더라도 메시지를 잃어 버리지 않음
작업을 마친 후에는 noAck : false
옵션을 사용하여 설정을 해제하고 작업자에게 적절한 응답을 보내야함
//ex) 자동 응답
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
}, secs * 1000);
}, {noAck: true});
//ex) 자동 미응답 & ack(msg)
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, {noAck: false});