This article mainly analyzes the process of read and write block of Hadoop client. As well as the protocol of communication between client and datanode, data flow format, etc.
The hadoop client communicates with the namenode through the RPC protocol, but the client does not use RPC to communicate with the datanode, but directly uses the socket. The protocols for reading and writing are also different. This article analyzes hadoop version 0.20.2 (version 0.19 is the same )The principle and communication protocol of communication between client and datanode. It should also be emphasized that the communication protocol between client and datanode has changed in versions 0.23 and later. Use protobuf as the serialization method.
Write block
1. The client first requests the namenode to create a file through namenode.create, and then starts the dataStreamer thread.
2. The client includes three threads. The main thread is responsible for reading local data into memory, encapsulating it into a Package object, and placing it in the queue dataQueue.
3. The dataStreamer thread detects whether the queue dataQueue has a package. If so, it first creates a BlockOutPutStream object (a block is created once, and a block may include multiple packages). When created, it communicates with the corresponding datanode, sends the DATA_TRANSFER_HEADER information and gets the return . Then create a ResponseProcessor thread, which is responsible for receiving the ack confirmation information returned by the datanode and handling errors.
4. The dataStreamer takes out the Package object from the dataQueue and sends it to the datanode. Then it continues to loop to determine whether the dataQueue has data...
The figure below shows the process of write block.
The figure below is the format of the message
Read block
Mainly implemented in the BlockReader class.
When initializing newBlockReader,
1. Create a new SocketOutputStream(socket, timeout) by passing in the sock parameter, and then write the communication information, which is different from writing the header of the block.
//write the header.
out.writeShort( DataTransferProtocol. DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol. OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong(genStamp);
out.writeLong( startOffset );
out.writeLong(len);
Text.writeString (out, clientName);
out.flush();
2. Create the input stream new SocketInputStream(socket, timeout)
3. Determine the return message in.readShort() != DataTransferProtocol. OP_STATUS_SUCCESS
4. Create a checksum based on the input stream: DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. Read the position of the first Chunk: long firstChunkOffset = in.readLong()
Note: 512 bytes are used to calculate the checksum for a chunk (4 bytes)
6. Next, read the specific data in the read method of BlockReader: result = readBuffer(buf, off, realLen)
7. Read chunk by chunk
int packetLen = in.readInt();
long offsetInBlock = in.readLong();
long seqno = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (in, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully (in, buf, offset, chunkLen);
8. Checksum verification after reading the data; FSInputChecker.verifySum(chunkPos)