メッセージ キューとは何ですか?
メッセージ キューは、メッセージ送信プロセス中にメッセージを保存するコンテナであり、本質的にはキュー (先入れ先出し) です。
消息
とは、送信する必要があるデータを指します。これには、テキスト、文字列、オブジェクトなどの情報が含まれます。
消息队列
、2 つのアプリケーション間の通信サービスです。メッセージの产生者
メッセージの接收者
の応答を待たずに、メッセージ キューにデータを保存した後、すぐにメッセージを返すことができます。つまり、生产者
データがキューに挿入されることを保証し、誰がメッセージを取得するかは問題ではありません。メッセージの接收者
、メッセージを受信して処理することだけに集中します。
どのような
分離を行うことができますか? 上で紹介したように、メッセージ キューはメッセージのプロデューサーとメッセージの受信者を分離し、どちらも他方の影響を受けません。
非同期非同期では、メッセージの作成者は単純なロジックを処理し、データをメッセージ キューに入れて返すだけで済み、データベース操作や IO 操作などの複雑なロジックは受信者によって処理されます。メッセージ。
ピーク シェービングメッセージ キュー アプリケーションがサービスを提供しているときに、リクエスト情報の瞬間的な流入をメッセージ キューに保存し、すぐに返すことができます。その後、リクエストはメッセージの受信者によってデータに基づいて処理されます。
適用シナリオ:瞬間的なトラフィックの急増を引き起こすゲーム活動、フラッシュセール活動、注文など。
メッセージ キューの基本情報を紹介した後、メッセージ キューを開発する前に、メッセージ キューの基本的な概念、つまり、
メッセージ プロデューサー (プロデューサー) とコンシューマー (顧客)生产者
消费者
します
。
リンク、チャネル、およびキューを提供します
。
接続: サービス プログラムとメッセージ キューの間のリンクを表します。サービス プログラムは複数のリンクを作成できます。
チャネル: メッセージ キュー リンク間のチャネル。リンクには複数のチャネルを含めることができます。
キュー: メッセージ キューにデータを格納するキュー。メッセージ キュー サービスは複数のキューを持つことができます。
まとめると、リンクとチャネルキューの関係は次のようになります
Exchange (交換)
メッセージ キューには、メッセージを送信するときに交換が必要です。指定しない場合は、デフォルトの交換が使用されます。スイッチの役割は、メッセージを対応するキューにプッシュすることです。メッセージキューには合計 4 種類のスイッチがあります
。 ダイレクト: メッセージが来たときに、指定されたキューにのみ送信され、他のキューは受信しません。
fanout: ブロードキャスト モード。メッセージが到着すると、すべてのキューに送信されます。
トピック: ファジー マッチング モード、ファジー マッチングによる対応する転送。
header: ダイレクト モードに似ています。
醸造インストールrabbitmq
次に、ローカルで http://localhost:15672/ にアクセスして、rabbitmq サービスの背景を確認します。初期アカウントのパスワードはguest
amqplib は、ノードでメッセージ キューを使用するためのツールのセットで、メッセージ キューをすばやく使用できるようにします。
プロデューサーの作成アドレス: https://www.npmjs.com/package/amqplib
/** product.js コンシューマー*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = 設定; (async () => { const connection = await amqplib.connect(connectUrl); const チャネル = 接続を待機します。createChannel(); const ExchangeName = 'testExchange'; const key = 'testQueue'; const sendMsg = 'こんにちは、rabbitmq'; // スイッチのタイプを知る await channel.assertExchange(exchangeName, 'fanout', { 耐久性: 真、 }); //待機キューを指定する channel.assertQueue(key); for (i = 0; i < 100; i++) { channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } チャンネルを閉じるのを待ちます(); 接続を待ちます。close(); })();
実行後、バックグラウンドで 100 個のメッセージを含む新しいキューが追加されたことがわかります。
コンシューマの作成/** customer.js コンシューマー*/ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = 設定; (async () => { let connection = await amqplib.connect(connectUrl); const ExchangeName = 'testExchange'; const key = 'testQueue'; // 2 つのチャネルを作成します const channel1 = await connection.createChannel(); const channel2 = 接続を待ちます。createChannel(); //交換待機を指定します channel1.assertExchange(exchangeName, 'fanout', { 耐久性: 真、 }); //待機キューを指定する channel1.assertQueue(key); await channel1.bindQueue(key,exchangeName,key); channel1.consume(key, (msg) => { console.log('チャンネル 1', msg.content.toString()); }); await channel2.assertExchange(exchangeName, 'ファンアウト', { 耐久性: 真、 }); Channel2.assertQueue(key); を待ちます。 await channel2.bindQueue(key,exchangeName,key); channel2.consume(key, (msg) => { console.log('チャンネル 2', msg.content.toString()); }); })();
実行後、両方のチャネルが同時に動作してメッセージを受信できることがわかります。