この記事では、主に Hadoop クライアントの読み取りおよび書き込みブロックのプロセス、およびクライアントとデータノード間の通信プロトコル、データ フロー形式などを分析します。
Hadoop クライアントは RPC プロトコルを介してネームノードと通信しますが、クライアントはデータノードとの通信に RPC を使用せず、ソケットを直接使用します。この記事では、Hadoop バージョン 0.20.2 を分析します。バージョン 0.19 は同じです) クライアントとデータノード間の通信の原理と通信プロトコル クライアントとデータノード間の通信プロトコルがバージョン 0.23 以降で変更されていることも強調しておく必要があります。シリアル化メソッドとして protobuf を使用します。
書き込みブロック
1. クライアントはまず、namenode.create を通じて namenode にファイルの作成を要求し、次に dataStreamer スレッドを開始します。
2. クライアントには 3 つのスレッドが含まれており、メイン スレッドはローカル データをメモリに読み取り、それを Package オブジェクトにカプセル化し、キュー dataQueue に配置する役割を果たします。
3. dataStreamer スレッドは、キュー dataQueue にパッケージがあるかどうかを検出し、存在する場合は、最初に BlockOutPutStream オブジェクトを作成します (ブロックは一度作成され、ブロックには複数のパッケージが含まれる場合があります)。 DATA_TRANSFER_HEADER 情報を取得し、 return を取得します。次に、datanode から返された ack 確認情報を受信してエラーを処理する ResponseProcessor スレッドを作成します。
4. dataStreamer は、dataQueue から Package オブジェクトを取り出して datanode に送信し、dataQueue にデータがあるかどうかを判断するループを続けます。
以下の図にブロック書き込みの処理を示します。
下図はメッセージのフォーマットです
読み取りブロック
主に BlockReader クラスに実装されます。
newBlockReaderを初期化するときに、
1. sock パラメーターを渡して新しい SocketOutputStream(socket, timeout) を作成し、ブロックのヘッダーを書き込むのとは異なり、通信情報を書き込みます。
// ヘッダーを書き込みます。
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong(genStamp);
out.writeLong( startOffset );
out.writeLong(len);
Text.writeString (out, clientName);
out.flush();
2. 入力ストリーム new SocketInputStream(socket, timeout) を作成します。
3. 戻りメッセージ in.readShort() != DataTransferProtocolを決定します。
4. 入力ストリームに基づいてチェックサムを作成します: DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. 最初のチャンクの位置を読み取ります:long firstChunkOffset = in.readLong()
注:チャンク (4 バイト) のチェックサムを計算するために 512 バイトが使用されます。
6. 次に、BlockReader の read メソッドで特定のデータを読み取ります。 result = readBuffer(buf, off, realLen)
7. チャンクごとに読み取ります
int packetLen = in.readInt();
ロングオフセットInBlock = in.readLong();
長いシーケンス番号 = in.readLong();
ブール値 lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (in, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully (in、buf、offset、chunkLen);
8. データ読み取り後のチェックサム検証; FSInputChecker.verifySum(chunkPos)