В этой статье в основном анализируется процесс чтения и записи блока клиента Hadoop, а также протокол связи между клиентом и узлом данных, формат потока данных и т. д.
Клиент Hadoop связывается с узлом имени через протокол RPC, но клиент не использует RPC для связи с узлом данных, а напрямую использует сокет. В этой статье анализируется версия Hadoop 0.20.2. версия 0.19 такая же) Принцип и протокол связи между клиентом и узлом данных. Следует также подчеркнуть, что протокол связи между клиентом и узлом данных изменился в версиях 0.23 и более поздних, и в качестве метода сериализации используется protobuf.
Блок записи
1. Клиент сначала запрашивает у namenode создание файла через namenode.create, а затем запускает поток dataStreamer.
2. Клиент включает три потока. Основной поток отвечает за чтение локальных данных в память, инкапсуляцию их в объект Package и размещение в очереди dataQueue.
3. Поток dataStreamer определяет, есть ли в очереди dataQueue пакет. Если да, он сначала создает объект BlockOutPutStream (блок создается один раз, и блок может включать несколько пакетов. При создании он связывается с соответствующим узлом данных, отправляет его). информацию DATA_TRANSFER_HEADER и получает возврат. Затем создайте поток ResponseProcessor, который отвечает за получение информации подтверждения подтверждения, возвращаемой узлом данных, и обработку ошибок.
4. DataStreamer извлекает объект Package из dataQueue и отправляет его в datanode. Затем он продолжает цикл, чтобы определить, есть ли в dataQueue данные...
На рисунке ниже показан процесс записи блока.
На рисунке ниже показан формат сообщения.
Чтение блока
В основном реализовано в классе BlockReader.
При инициализации newBlockReader
1. Создайте новый SocketOutputStream(socket, timeout), передав параметр sock, а затем запишите информацию о связи, которая отличается от записи заголовка блока.
// пишем заголовок.
out.writeShort(DataTransferProtocol. DATA_TRANSFER_VERSION );
out.write(DataTransferProtocol. OP_READ_BLOCK );
out.writeLong(blockId);
out.writeLong(genStamp);
out.writeLong(startOffset);
out.writeLong(лен);
Text.writeString (out, clientName);
out.flush();
2. Создайте новый входной поток SocketInputStream(socket, timeout).
3. Определите возвращаемое сообщение в .readShort() != DataTransferProtocol .
4. Создайте контрольную сумму на основе входного потока: Контрольная сумма DataChecksum = DataChecksum.newDataChecksum( in )
5. Прочитайте позицию первого чанка: long firstChunkOffset = in.readLong()
Примечание. 512 байт используются для расчета контрольной суммы чанка (4 байта).
6. Далее читаем конкретные данные в методе чтения BlockReader: result = readBuffer(buf, off, realLen)
7. Читайте по частям
int packageLen = in.readInt();
длинное смещениеInBlock = in.readLong();
длинный seqno = in.readLong();
логическое значение LastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (in, checksumBytes.array(), 0,
контрольная суммаBytes.limit());
IOUtils.readFully (in, buf, offset, chunkLen);
8. Проверка контрольной суммы после чтения данных FSInputChecker.verifySum(chunkPos)