What is a message queue?
A message queue is a container that saves messages during the message transmission process. It is essentially a queue (first in, first out).
消息
refers to the data that needs to be transmitted, which can be some text, string, or object and other information.
消息队列
is a communication service between two applications.产生者
of the message can return it immediately after storing the data in the message queue, without waiting for接收者
of the message to respond. That is:生产者
ensures that the data is inserted into the queue, and it does not matter who gets the message.接收者
of the message only focuses on receiving the message and processing it.
What
decoupling can the message queue do? As introduced above, the message queue separates the message producer and the message receiver, and neither is affected by the other.
Asynchronous Asynchronous is to reduce the response time of requests. The producer of the message only needs to process simple logic and put the data in the message queue to return. Complex logic, such as database operations and IO operations, are handled by the receiver of the message. .
Peak Shaving When the message queue application is serving, it can save the instantaneous influx of request information into the message queue and return it immediately. The request is then processed based on the data by the recipient of the message.
Application scenarios: gaming activities, flash sale activities, order placement, etc. that will cause instantaneous traffic surge.
After introducing the basic information of message queue, before developing the message queue, let’s introduce some basic concepts of message queue ~
message producer (producer) and consumer (customer).
生产者
and consumers mentioned above消费者
provide
links, channels and queues.
Connection: represents a link between the service program and the message queue. A service program can create multiple links .
Channel: A channel between message queue links. A link can have multiple channels .
Queue: A queue that stores data in a message queue. A message queue service can have multiple queues.
To sum up, the relationship between links and channel queues is like this
Exchange (exchange)
message queue must have an exchange when sending messages . If not specified, the default exchange will be used. The role of the switch is to push messages to the corresponding queue. There are a total of 4 types of switches in the message queue
: Direct: Specify the queue mode. When a message comes, it will only be sent to the specified Queue, and other Queues will not receive it.
fanout: Broadcast mode, when a message comes, it will be sent to all queues.
topic: fuzzy matching mode, corresponding forwarding through fuzzy matching.
header: Similar to Direct mode.
brew install rabbitmq
Then visit http://localhost:15672/ locally to see the background of the rabbitmq service. The initial account password is guest
amqplib is a set of tools for using message queues in node, which allows us to quickly use message queues.
Create producerAddress: https://www.npmjs.com/package/amqplib
/** product.js consumer*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { const connection = await amqplib.connect(connectUrl); const channel = await connection.createChannel(); const exchangeName = 'testExchange'; const key = 'testQueue'; const sendMsg = 'hello rabbitmq'; // Know the switch type await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); //Specify a queue await channel.assertQueue(key); for (let i = 0; i < 100; i++) { channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } await channel.close(); await connection.close(); })();
After running, you can see in the background that a new queue with 100 messages has been added.
Create consumer/** customer.js consumer*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { let connection = await amqplib.connect(connectUrl); const exchangeName = 'testExchange'; const key = 'testQueue'; //Create two channels const channel1 = await connection.createChannel(); const channel2 = await connection.createChannel(); //Specify an exchange await channel1.assertExchange(exchangeName, 'fanout', { durable: true, }); //Specify a queue await channel1.assertQueue(key); await channel1.bindQueue(key, exchangeName, key); channel1.consume(key, (msg) => { console.log('channel 1', msg.content.toString()); }); await channel2.assertExchange(exchangeName, 'fanout', { durable: true, }); await channel2.assertQueue(key); await channel2.bindQueue(key, exchangeName, key); channel2.consume(key, (msg) => { console.log('channel 2', msg.content.toString()); }); })();
After execution, you can see that both channels can work at the same time to receive messages.