本文主要分析了hadoop客戶端read和write block的流程. 以及client和datanode通信的協定, 資料流格式等.
hadoop客戶端與namenode通訊透過RPC協定, 但是client 與datanode通訊並沒有使用RPC, 而是直接使用socket, 其中讀寫時的協定也不同, 本文分析了hadoop 0.20.2版本的(0.19版本也是一樣的)client與datanode通訊的原理與通訊協定. 另外需要強調的是0.23及以後的版本中client與datanode的通訊協定有所變化,使用了protobuf作為序列化方式.
Write block
1. 客戶端首先透過namenode.create, 向namenode請求建立檔案, 然後啟動dataStreamer線程
2. client包括三個執行緒, main執行緒負責把本地資料讀入記憶體, 並封裝為Package物件, 放到佇列dataQueue中.
3. dataStreamer線程檢測隊列dataQueue是否有package, 如果有, 則先創建BlockOutPutStream對象(一個block創建一次, 一個block可能包括多個package), 創建的時候會和相應的datanode通信, 發送DATA_TRANSFER_HEADER信息並獲取返回. 然後創建ResponseProcessor線程, 負責接收datanode的返回ack確認訊息, 並進行錯誤處理.
4. dataStreamer從dataQueue中拿出Package物件, 發送給datanode. 然後繼續循環判斷dataQueue是否有資料…..
下圖展示了write block的流程.
下圖是報文的格式
Read block
主要在BlockReader類別中實作.
初始化newBlockReader時,
1. 透過傳入參數sock創建new SocketOutputStream(socket, timeout), 然後寫通信信息, 與寫block的header不大一樣.
//write the header.
out.writeShort( DataTransferProtocol. DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol. OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( genStamp );
out.writeLong( startOffset );
out.writeLong( len );
Text. writeString (out, clientName);
out.flush();
2. 建立輸入流new SocketInputStream(socket, timeout)
3. 判斷回傳訊息in.readShort() != DataTransferProtocol. OP_STATUS_SUCCESS
4. 依照輸入流建立checksum : DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. 讀取第一個Chunk的位置: long firstChunkOffset = in.readLong()
註: 512個位元組為一個chunk計算checksum(4個位元組)
6. 接下來在BlockReader的read方法中讀取具體資料: result = readBuffer(buf, off, realLen)
7. 一個一個chunk的讀取
int packetLen = in.readInt();
長 offsetInBlock = in.readLong();
long seqno = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils. readFully (in, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils. readFully (in, buf, offset, chunkLen);
8. 讀取資料後checksum驗證; FSInputChecker.verifySum(chunkPos)