메시지 큐란 무엇입니까?
메시지 큐는 메시지 전송 프로세스 중에 메시지를 저장하는 컨테이너입니다. 본질적으로 큐(선입선출)입니다.
消息
전송해야 하는 데이터를 의미하며 텍스트, 문자열, 개체 및 기타 정보일 수 있습니다.
消息队列
는 두 애플리케이션 간의 통신 서비스입니다. 메시지产生者
는 메시지接收者
의 응답을 기다리지 않고 메시지 큐에 데이터를 저장한 후 즉시 이를 반환할 수 있습니다. 즉,生产者
데이터가 대기열에 삽입되었는지 확인하고 누가 메시지를 받는지는 중요하지 않습니다. 메시지接收者
메시지 수신 및 처리에만 집중합니다.
어떤
분리 기능을 수행할 수 있습니까? 위에서 설명한 것처럼 메시지 대기열은 메시지 생성자와 메시지 수신자를 분리하며 어느 쪽도 다른 쪽의 영향을 받지 않습니다.
비동기식 비동기식은 요청의 응답 시간을 줄이는 것입니다. 메시지 생성자는 간단한 논리만 처리하고 데이터를 메시지 대기열 에 넣어 반환하면 됩니다. 메시지.
Peak Shaving 메시지 큐 애플리케이션이 서비스를 제공할 때 순간적으로 유입되는 요청 정보를 메시지 큐에 저장하고 즉시 반환할 수 있습니다. 그런 다음 메시지 수신자의 데이터를 기반으로 요청이 처리됩니다.
적용 시나리오: 순간적인 트래픽 급증을 유발하는 게임 활동, 반짝 세일 활동, 주문 배치 등.
메시지 큐의 기본 정보를 소개한 후, 메시지 큐를 개발하기 전에 메시지 큐의 기본 개념을 소개하겠습니다 ~
메시지 생성자(생산자)와 소비자(고객)
生产者
언급한消费者
링크, 채널 및 대기열을
제공합니다
.연결: 서비스 프로그램과 메시지 대기열 간의 링크를 나타냅니다. 서비스 프로그램은 여러 링크를 생성할 수 있습니다 .
채널: 메시지 대기열 링크 사이의 채널 은 여러 채널을 가질 수 있습니다 .
큐: 메시지 큐에 데이터를 저장하는 큐입니다. 메시지 큐 서비스에는 여러 개의 큐가 있을 수 있습니다.
링크와 채널 큐의 관계를 정리하면 이렇습니다.
교환(exchange)
메시지 대기열은 메시지를 보낼 때 교환이 있어야 합니다 . 지정하지 않으면 기본 교환이 사용됩니다. 스위치의 역할은 해당 대기열에 메시지를 푸시하는 것입니다. 메시지 대기열에는 총 4가지 유형의 스위치가 있습니다
. 직접: 대기열 모드를 지정합니다. 메시지가 오면 지정된 대기열로만 전송되고 다른 대기열에서는 메시지가 수신되지 않습니다.
팬아웃(fanout): 브로드캐스트 모드, 메시지가 오면 모든 대기열로 전송됩니다.
주제: 퍼지 매칭 모드, 퍼지 매칭을 통한 해당 전달.
헤더: 직접 모드와 유사합니다.
양조 설치 RabbitMQ
그런 다음 로컬로 http://localhost:15672/를 방문하여 Rabbitmq 서비스의 배경을 확인하세요. 초기 계정 비밀번호는 guest
amqplib는 노드에서 메시지 대기열을 사용하기 위한 도구 세트로, 이를 통해 메시지 대기열을 빠르게 사용할 수 있습니다.
생산자 만들기주소: https://www.npmjs.com/package/amqplib
/** product.js 소비자*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = 구성; (비동기 () => { const 연결 = amqplib.connect(connectUrl)를 기다립니다; const 채널 = 연결을 기다립니다.createChannel(); const exchangeName = 'testExchange'; const 키 = 'testQueue'; const sendMsg = '안녕하세요 Rabbitmq'; // 스위치 유형을 확인합니다. wait Channel.assertExchange(exchangeName, 'fanout', { 내구성 : 사실, }); //큐 지정 대기 채널.assertQueue(key); for (let i = 0; i < 100; i++) { Channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } 채널을 기다리고 있습니다.닫기(); 연결을 기다립니다.닫기(); })();
실행 후에는 100개의 메시지가 포함된 새 대기열이 추가된 것을 백그라운드에서 볼 수 있습니다.
소비자 생성/** customer.js 소비자*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = 구성; (비동기 () => { 연결 허용 = amqplib.connect(connectUrl) 대기; const exchangeName = 'testExchange'; const 키 = 'testQueue'; //두 개의 채널을 생성합니다. const 채널1 = 대기 연결.createChannel(); const 채널2 = 연결을 기다립니다.createChannel(); //교환 지정 대기 채널1.assertExchange(exchangeName, 'fanout', { 내구성 : 사실, }); //큐 지정 대기 채널1.assertQueue(key); 채널1.bindQueue(key, exchangeName, key)를 기다립니다. 채널1.소비(키, (msg) => { console.log('채널 1', msg.content.toString()); }); 채널2.assertExchange(exchangeName, 'fanout', {를 기다리십시오. 내구성 : 사실, }); 채널2.assertQueue(키)를 기다립니다; 채널2.bindQueue(key, exchangeName, key)를 기다립니다. 채널2.소비(키, (msg) => { console.log('채널 2', msg.content.toString()); }); })();
실행 후에는 두 채널이 동시에 작동하여 메시지를 수신할 수 있는 것을 확인할 수 있습니다.