佇列什麼是訊息佇列
訊息佇列就是訊息的傳送過程中保存訊息的容器,本質是一個佇列(先進先出)
消息
指的是需要傳輸的數據,可以是一些文本,字串,或是物件等資訊。
消息队列
則是兩個應用間的通訊服務,訊息的产生者
將資料存放到訊息佇列就可以立即返回,不需要等待訊息的接收者
應答。即:生产者
保證資料插入佇列,誰來取這則訊息不需要管。訊息的接收者
只專注於接受訊息並處理。
訊息佇列能做什麼
解耦上面介紹了,訊息佇列將訊息的生產者和訊息的接收者分開,彼此都不受影響。
非同步非同步就是為了減少請求的回應時間,訊息的生產者只需要處理簡單的邏輯,並將資料放到訊息佇列中即可返回,複雜的邏輯,例如:資料庫操作,IO操作由訊息的接收者處理。
削峰訊息佇列應用在服務時,能將瞬時大量湧入的請求資訊儲存到訊息佇列中,並立即回傳。再由訊息的接收者根據資料處理請求。
應用場景遊戲活動,秒殺活動,下單等會造成瞬時流量暴增的應用。
介紹完訊息佇列的基本訊息,在開發訊息佇列之前先介紹一下訊息佇列的一些基本概念~
訊息的生產者(producer)與消費者(customer)
上文提到的生产者
與消费者
,提供的是
鏈接,通道與隊列
鏈接(connection):表示服務程序與訊息隊列之間的一條鏈接。一個服務程式可以建立多個連結。
通道(channel):訊息佇列連結之間的一個通,一個連結可以有多個通道。
隊列(queue):訊息佇列中存放資料的佇列,一個訊息佇列服務可以有多個佇列。
總結一下,鏈接,通道隊列之間的關係是這樣的
交換器(exchange)
訊息佇列傳送訊息時必須要有一個交換機,如果沒有指定則用的是預設的交換器。交換器的作用就是將訊息才推到對應的佇列中。訊息佇列中一共有4種交換器
Direct: 指定佇列模式,訊息來了,只發給指定的Queue,其他Queue都收不到。
fanout: 廣播模式,訊息來了,就會傳送給所有的佇列。
topic: 模糊匹配模式,透過模糊匹配的方式進行相應轉送。
header: 與Direct模式類似。
brew install rabbitmq
然後再本地訪問http://localhost:15672/ 就可以看到rabbitmq服務的後台。初始的帳號密碼均為guest
amqplib是node中使用訊息佇列的一套工具,可以讓我們快速地使用訊息佇列
創建生產者網址:https://www.npmjs.com/package/amqplib
/** product.js 消費者*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { const connection = await amqplib.connect(connectUrl); const channel = await connection.createChannel(); const exchangeName = 'testExchange'; const key = 'testQueue'; const sendMsg = 'hello rabbitmq'; // 知道交換器型別await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一個佇列await channel.assertQueue(key); for (let i = 0; i < 100; i++) { channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } await channel.close(); await connection.close(); })();
運行後在後台可以看到新增了一個有100個訊息的佇列
創建消費者/** customer.js 消費者*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { let connection = await amqplib.connect(connectUrl); const exchangeName = 'testExchange'; const key = 'testQueue'; // 建立兩個通道const channel1 = await connection.createChannel(); const channel2 = await connection.createChannel(); // 指定一個交換器await channel1.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一個佇列await channel1.assertQueue(key); await channel1.bindQueue(key, exchangeName, key); channel1.consume(key, (msg) => { console.log('channel 1', msg.content.toString()); }); await channel2.assertExchange(exchangeName, 'fanout', { durable: true, }); await channel2.assertQueue(key); await channel2.bindQueue(key, exchangeName, key); channel2.consume(key, (msg) => { console.log('channel 2', msg.content.toString()); }); })();
執行後可以看到,兩個通道可以同時工作接收訊息