Este artigo analisa principalmente o processo de leitura e gravação do bloco do cliente Hadoop, bem como o protocolo de comunicação entre o cliente e o datanode, formato do fluxo de dados, etc.
O cliente hadoop se comunica com o namenode por meio do protocolo RPC, mas o cliente não usa RPC para se comunicar com o datanode, mas usa diretamente o soquete. Os protocolos de leitura e gravação também são diferentes. Este artigo analisa o hadoop versão 0.20.2 (. versão 0.19 é a mesma) O princípio e protocolo de comunicação entre cliente e datanode Também deve ser enfatizado que o protocolo de comunicação entre cliente e datanode mudou nas versões 0.23 e posteriores, e o protobuf é usado como método de serialização.
Bloco de gravação
1. O cliente primeiro solicita que o namenode crie um arquivo por meio de namenode.create e, em seguida, inicia o thread dataStreamer.
2. O cliente inclui três threads. O thread principal é responsável por ler os dados locais na memória, encapsulando-os em um objeto Package e colocando-os na fila dataQueue.
3. O thread dataStreamer detecta se a fila dataQueue possui um pacote. Nesse caso, ele primeiro cria um objeto BlockOutPutStream (um bloco é criado uma vez e um bloco pode incluir vários pacotes. Quando criado, ele se comunica com o datanode correspondente e envia). as informações de DATA_TRANSFER_HEADER e obtém o retorno. Em seguida, crie um thread ResponseProcessor, que é responsável por receber as informações de confirmação de confirmação retornadas pelo datanode e tratar os erros.
4. O dataStreamer retira o objeto Package do dataQueue e o envia para o datanode. Em seguida, ele continua em loop para determinar se o dataQueue possui dados...
A figura abaixo mostra o processo de bloco de gravação.
A figura abaixo é o formato da mensagem
Ler bloco
Implementado principalmente na classe BlockReader.
Ao inicializar newBlockReader,
1. Crie um novo SocketOutputStream(socket, timeout) passando o parâmetro sock e a seguir escreva as informações de comunicação, que é diferente de escrever o cabeçalho do bloco.
//escreve o cabeçalho.
out.writeShort(DataTransferProtocol. DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong(blocoId);
out.writeLong(genStamp);
out.writeLong(startOffset);
out.writeLong(len);
Text.writeString (saída, nomedocliente);
fora.flush();
2. Crie o fluxo de entrada new SocketInputStream(socket, timeout)
3. Determine a mensagem de retorno in.readShort() != DataTransferProtocol .
4. Crie uma soma de verificação com base no fluxo de entrada: DataChecksum checksum = DataChecksum.newDataChecksum(in)
5. Leia a posição do primeiro pedaço: long firstChunkOffset = in.readLong()
Nota: 512 bytes são usados para calcular a soma de verificação de um pedaço (4 bytes)
6. Em seguida, leia os dados específicos no método read do BlockReader: result = readBuffer(buf, off, realLen)
7. Leia pedaço por pedaço
int packetLen = in.readInt();
deslocamento longoInBlock = in.readLong();
sequência longa = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (em, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully (in, buf, offset, chunkLen);
8. Verificação da soma de verificação após leitura dos dados;