Este artículo analiza principalmente el proceso de bloqueo de lectura y escritura del cliente Hadoop, así como el protocolo de comunicación entre el cliente y el nodo de datos, el formato del flujo de datos, etc.
El cliente hadoop se comunica con el nodo de nombre a través del protocolo RPC, pero el cliente no usa RPC para comunicarse con el nodo de datos, sino que usa directamente el protocolo de lectura y escritura del socket. Este artículo analiza la versión 0.20.2 de hadoop. la versión 0.19 es la misma) El principio y el protocolo de comunicación entre el cliente y el nodo de datos. También se debe enfatizar que el protocolo de comunicación entre el cliente y el nodo de datos ha cambiado en las versiones 0.23 y posteriores, y se utiliza protobuf como método de serialización.
Bloque de escritura
1. El cliente primero solicita al namenode que cree un archivo a través de namenode.create y luego inicia el hilo dataStreamer.
2. El cliente incluye tres subprocesos. El subproceso principal es responsable de leer los datos locales en la memoria, encapsularlos en un objeto Paquete y colocarlos en la cola dataQueue.
3. El subproceso dataStreamer detecta si la cola dataQueue tiene un paquete. Si es así, primero crea un objeto BlockOutPutStream (un bloque se crea una vez y un bloque puede incluir varios paquetes cuando se crea, se comunica con el nodo de datos correspondiente y lo envía). la información DATA_TRANSFER_HEADER y obtiene la devolución. Luego crea un hilo ResponseProcessor, que es responsable de recibir la información de confirmación devuelta por el nodo de datos y manejar los errores.
4. DataStreamer saca el objeto Paquete de dataQueue y lo envía al nodo de datos. Luego continúa realizando un bucle para determinar si dataQueue tiene datos...
La siguiente figura muestra el proceso de escritura del bloque.
La siguiente figura es el formato del mensaje.
Leer bloque
Implementado principalmente en la clase BlockReader.
Al inicializar newBlockReader,
1. Cree un nuevo SocketOutputStream (socket, tiempo de espera) pasando el parámetro sock y luego escriba la información de comunicación, que es diferente de escribir el encabezado del bloque.
//escribe el encabezado.
out.writeShort( DataTransferProtocol. DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol. OP_READ_BLOCK );
out.writeLong(blockId);
out.writeLong(genStamp);
out.writeLong(inicioOffset);
out.writeLong(len);
Text.writeString (fuera, nombre del cliente);
salida.flush();
2. Cree el flujo de entrada nuevo SocketInputStream (socket, tiempo de espera)
3. Determine el mensaje de retorno en.readShort()! = DataTransferProtocol OP_STATUS_SUCCESS .
4. Cree una suma de verificación basada en el flujo de entrada: DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. Lea la posición del primer fragmento: long firstChunkOffset = in.readLong()
Nota: se utilizan 512 bytes para calcular la suma de comprobación de un fragmento (4 bytes)
6. A continuación, lea los datos específicos en el método de lectura de BlockReader: resultado = readBuffer (buf, off, realLen)
7. Leer fragmento a fragmento
int paqueteLen = in.readInt();
largo offsetInBlock = in.readLong();
número de secuencia largo = in.readLong();
booleano lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (en, checksumBytes.array(), 0,
suma de comprobaciónBytes.limit());
IOUtils.readFully (en, buf, desplazamiento, trozoLen);
8. Verificación de la suma de verificación después de leer los datos; FSInputChecker.verifySum (chunkPos)