Cet article analyse principalement le processus de bloc de lecture et d'écriture du client Hadoop ainsi que le protocole de communication entre le client et le datanode, le format du flux de données, etc.
Le client hadoop communique avec le namenode via le protocole RPC, mais le client n'utilise pas RPC pour communiquer avec le datanode, mais utilise directement le socket. Les protocoles de lecture et d'écriture sont également différents. Cet article analyse la version 0.20.2 de Hadoop. la version 0.19 est la même)Le principe et le protocole de communication entre le client et le datanode Il convient également de souligner que le protocole de communication entre le client et le datanode a changé dans les versions 0.23 et ultérieures. Utilisez protobuf comme méthode de sérialisation.
Bloc d'écriture
1. Le client demande d'abord au namenode de créer un fichier via namenode.create, puis démarre le thread dataStreamer.
2. Le client comprend trois threads. Le thread principal est responsable de la lecture des données locales en mémoire, de leur encapsulation dans un objet Package et de leur placement dans la file d'attente dataQueue.
3. Le thread dataStreamer détecte si la file d'attente dataQueue a un package. Si tel est le cas, il crée d'abord un objet BlockOutPutStream (un bloc est créé une fois et un bloc peut inclure plusieurs packages. Une fois créé, il communique avec le datanode correspondant et envoie). les informations DATA_TRANSFER_HEADER et obtient le retour. Créez ensuite un thread ResponseProcessor, qui est responsable de la réception des informations de confirmation d'accusé de réception renvoyées par le datanode et de la gestion des erreurs.
4. Le dataStreamer retire l'objet Package de dataQueue et l'envoie au datanode. Il continue ensuite à boucler pour déterminer si le dataQueue contient des données...
La figure ci-dessous montre le processus de bloc d'écriture.
La figure ci-dessous est le format du message
Lire le bloc
Principalement implémenté dans la classe BlockReader.
Lors de l'initialisation de newBlockReader,
1. Créez un nouveau SocketOutputStream(socket, timeout) en passant le paramètre sock, puis écrivez les informations de communication, ce qui est différent de l'écriture de l'en-tête du bloc.
//écrit l'en-tête.
out.writeShort( DataTransferProtocol. DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol. OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong(genStamp);
out.writeLong( startOffset );
out.writeLong(len);
Text.writeString (sortie, clientName);
out.flush();
2. Créez le flux d'entrée nouveau SocketInputStream (socket, timeout)
3. Déterminez le message de retour dans.readShort() != DataTransferProtocol OP_STATUS_SUCCESS .
4. Créez une somme de contrôle basée sur le flux d'entrée : DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. Lisez la position du premier Chunk : long firstChunkOffset = in.readLong()
Remarque : 512 octets sont utilisés pour calculer la somme de contrôle d'un morceau (4 octets)
6. Ensuite, lisez les données spécifiques dans la méthode read de BlockReader : result = readBuffer(buf, off, realLen)
7. Lisez morceau par morceau
int packetLen = in.readInt();
long offsetInBlock = in.readLong();
long seqno = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (dans, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully (in, buf, offset, chunkLen) ;
8. Vérification de la somme de contrôle après avoir lu les données ; FSInputChecker.verifySum(chunkPos)