Artikel ini terutama menganalisis proses membaca dan menulis blok klien Hadoop serta protokol komunikasi antara klien dan datanode, format aliran data, dll.
Klien hadoop berkomunikasi dengan namenode melalui protokol RPC, tetapi klien tidak menggunakan RPC untuk berkomunikasi dengan datanode, tetapi langsung menggunakan soket. Protokol untuk membaca dan menulis juga berbeda. versi 0.19 sama) Prinsip dan protokol komunikasi antara klien dan datanode. Perlu juga ditekankan bahwa protokol komunikasi antara klien dan datanode telah berubah di versi 0.23 dan yang lebih baru, dan protobuf digunakan sebagai metode serialisasi.
Blok tulis
1. Klien pertama-tama meminta namenode untuk membuat file melalui namenode.create, lalu memulai thread dataStreamer.
2. Klien menyertakan tiga thread. Thread utama bertanggung jawab untuk membaca data lokal ke dalam memori, merangkumnya ke dalam objek Paket, dan menempatkannya dalam antrian dataQueue.
3. Thread dataStreamer mendeteksi apakah antrian dataQueue memiliki paket. Jika demikian, pertama-tama ia membuat objek BlockOutPutStream (sebuah blok dibuat satu kali, dan sebuah blok dapat mencakup beberapa paket. Saat dibuat, ia berkomunikasi dengan datanode yang sesuai, mengirimkan informasi DATA_TRANSFER_HEADER dan mendapatkan return . Kemudian buat thread ResponseProcessor, yang bertanggung jawab untuk menerima informasi konfirmasi konfirmasi yang dikembalikan oleh datanode dan menangani kesalahan.
4. dataStreamer mengeluarkan objek Package dari dataQueue dan mengirimkannya ke datanode. Kemudian melanjutkan perulangan untuk menentukan apakah dataQueue memiliki data...
Gambar di bawah menunjukkan proses blok tulis.
Gambar di bawah ini adalah format pesannya
Baca blok
Terutama diimplementasikan di kelas BlockReader.
Saat menginisialisasi newBlockReader,
1. Buat SocketOutputStream(socket, timeout) baru dengan memasukkan parameter sock, lalu tulis informasi komunikasi, yang berbeda dengan penulisan header blok.
//tulis headernya.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
keluar.writeLong( blockId );
keluar.writeLong(genStamp);
keluar.writeLong( startOffset );
keluar.writeLong(len);
Text.writeString (keluar, nama klien);
keluar.flush();
2. Buat aliran input baru SocketInputStream(socket, timeout)
3. Tentukan pesan balasan di.readShort() != DataTransferProtocol
4. Buat checksum berdasarkan aliran input: DataChecksum checksum = DataChecksum.newDataChecksum( di )
5. Baca posisi Chunk pertama: long firstChunkOffset = in.readLong()
Catatan: 512 byte digunakan untuk menghitung checksum untuk sebuah potongan (4 byte)
6. Selanjutnya, baca data spesifik dalam metode baca BlockReader: result = readBuffer(buf, off, realLen)
7. Bacalah sepotong demi sepotong
int packetLen = masuk.readInt();
offsetInBlock panjang = in.readLong();
seqno panjang = in.readLong();
boolean lastPacketInBlock = di.readBoolean();
int dataLen = masuk.readInt();
IOUtils.readFully (dalam, checksumBytes.array(), 0,
checksumBytes.batas());
IOUtils.readFully (dalam, buf, offset, chunkLen);
8. Verifikasi checksum setelah membaca data;