บทความนี้จะวิเคราะห์กระบวนการอ่านและเขียนบล็อกของไคลเอนต์ Hadoop เป็นหลัก รวมถึงโปรโตคอลการสื่อสารระหว่างไคลเอนต์และดาต้าโหนด รูปแบบการไหลของข้อมูล ฯลฯ
ไคลเอนต์ Hadoop สื่อสารกับเนมโหนดผ่านโปรโตคอล RPC แต่ไคลเอนต์ไม่ได้ใช้ RPC เพื่อสื่อสารกับดาต้าโหนด แต่ใช้ซ็อกเก็ตโดยตรงสำหรับการอ่านและการเขียนก็แตกต่างกันเช่นกัน เวอร์ชัน 0.19 เหมือนกัน )หลักการและโปรโตคอลการสื่อสารระหว่างไคลเอนต์และดาต้าโหนด ควรเน้นด้วยว่าโปรโตคอลการสื่อสารระหว่างไคลเอนต์และดาต้าโหนดมีการเปลี่ยนแปลงในเวอร์ชัน 0.23 และใหม่กว่า ใช้ protobuf เป็นวิธีการทำให้เป็นอนุกรม
เขียนบล็อก
1. ขั้นแรกไคลเอ็นต์จะร้องขอ namenode ให้สร้างไฟล์ผ่าน namenode.create จากนั้นจึงเริ่มเธรด dataStreamer
2. ไคลเอนต์ประกอบด้วยสามเธรด เธรดหลักมีหน้าที่ในการอ่านข้อมูลภายในเครื่องลงในหน่วยความจำ ห่อหุ้มลงในวัตถุแพ็คเกจ และวางลงในคิว dataQueue
3. เธรด dataStreamer ตรวจพบว่าคิว dataQueue มีแพ็คเกจหรือไม่ หากเป็นเช่นนั้น จะสร้างออบเจ็กต์ BlockOutPutStream ก่อน (บล็อกจะถูกสร้างขึ้นหนึ่งครั้ง และบล็อกอาจมีหลายแพ็คเกจ) เมื่อสร้างขึ้น บล็อกจะสื่อสารกับดาต้าโหนดที่เกี่ยวข้องและส่งข้อมูล ข้อมูล DATA_TRANSFER_HEADER และรับการส่งคืน จากนั้นสร้างเธรด ResponseProcessor ซึ่งมีหน้าที่รับข้อมูลการยืนยัน ack ที่ส่งคืนโดย datanode และการจัดการข้อผิดพลาด
4. dataStreamer นำออบเจ็กต์ Package ออกจาก dataQueue และส่งไปยัง datanode จากนั้นจะทำการวนซ้ำต่อไปเพื่อตรวจสอบว่า dataQueue มีข้อมูล...
รูปด้านล่างแสดงกระบวนการเขียนบล็อก
รูปด้านล่างคือรูปแบบของข้อความ
อ่านบล็อก
ใช้งานเป็นหลักในคลาส BlockReader
เมื่อเริ่มต้น newBlockReader
1. สร้าง SocketOutputStream ใหม่ (ซ็อกเก็ต การหมดเวลา) โดยส่งพารามิเตอร์ sock จากนั้นเขียนข้อมูลการสื่อสาร ซึ่งแตกต่างจากการเขียนส่วนหัวของบล็อก
//เขียนส่วนหัว.
out.writeShort( DataTransferProtocol. DATA_TRANSFER_VERSION );
ออก.write( DataTransferProtocol.OP_READ_BLOCK );
ออก.writeLong( blockId );
ออก.writeLong(genStamp);
ออก.writeLong( startOffset );
out.writeLong(เลน);
Text.writeString (ออก ชื่อลูกค้า);
ออก.ล้าง();
2. สร้างสตรีมอินพุตใหม่ SocketInputStream (ซ็อกเก็ต, หมดเวลา)
3. กำหนดข้อความส่งคืน in.readShort() != DataTransferProtocol
4. สร้างเช็คซัมตามสตรีมอินพุต: DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. อ่านตำแหน่งของ Chunk แรก: long firstChunkOffset = in.readLong()
หมายเหตุ: 512 ไบต์ใช้ในการคำนวณผลรวมตรวจสอบสำหรับก้อน (4 ไบต์)
6. ถัดไป อ่านข้อมูลเฉพาะในวิธีการอ่านของ BlockReader: result = readBuffer(buf, off, realLen)
7. อ่านทีละตอน
int packetLen = in.readInt();
ยาว offsetInBlock = in.readLong();
seqno ยาว = in.readLong();
บูลีนlastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (ใน, checksumBytes.array(), 0,
checksumBytes.จำกัด());
IOUtils.readFully (ใน, buf, ออฟเซ็ต, chunkLen);
8. การตรวจสอบผลรวมหลังจากอ่านข้อมูล FSInputChecker.verifySum(chunkPos)