이 글에서는 주로 Hadoop 클라이언트의 블록 읽기 및 쓰기 프로세스와 클라이언트와 데이터노드 간의 통신 프로토콜, 데이터 흐름 형식 등을 분석합니다.
hadoop 클라이언트는 RPC 프로토콜을 통해 네임노드와 통신하지만 클라이언트는 RPC를 사용하여 데이터노드와 통신하지 않고 직접 소켓을 사용합니다. 이 기사에서는 hadoop 버전 0.20.2를 분석합니다. 버전 0.19도 동일합니다.) 클라이언트와 데이터노드 간 통신의 원리와 통신 프로토콜은 버전 0.23 이상에서 클라이언트와 데이터노드 간 통신 프로토콜이 변경되었으며, 직렬화 방식으로 protobuf를 사용한다는 점도 강조해야 합니다.
쓰기 블록
1. 클라이언트는 먼저 namenode.create를 통해 파일 생성을 namenode에 요청한 후 dataStreamer 스레드를 시작합니다.
2. 클라이언트에는 세 개의 스레드가 포함되어 있습니다. 기본 스레드는 로컬 데이터를 메모리로 읽어 들여 이를 Package 객체로 캡슐화하고 큐 dataQueue에 배치하는 일을 담당합니다.
3. dataStreamer 스레드는 큐 dataQueue에 패키지가 있는지 여부를 감지합니다. 그렇다면 먼저 BlockOutPutStream 개체를 생성합니다(블록은 한 번 생성되며 블록에는 여러 패키지가 포함될 수 있음). DATA_TRANSFER_HEADER 정보를 가져오고 반환을 가져옵니다. 그런 다음 데이터 노드에서 반환된 확인 정보를 수신하고 오류를 처리하는 ResponseProcessor 스레드를 만듭니다.
4. dataStreamer는 dataQueue에서 Package 개체를 가져와 데이터 노드로 보냅니다. 그런 다음 dataQueue에 데이터가 있는지 확인하기 위해 계속 루프를 돌립니다.
아래 그림은 쓰기 블록 과정을 보여줍니다.
아래 그림은 메시지 형식입니다.
블록 읽기
주로 BlockReader 클래스에서 구현됩니다.
newBlockReader를 초기화할 때,
1. sock 매개변수를 전달하여 새로운 SocketOutputStream(socket, timeout)을 생성한 후 블록의 헤더 작성과 다른 통신 정보를 작성합니다.
//헤더를 씁니다.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong(genStamp);
out.writeLong( startOffset );
out.writeLong(len);
Text.writeString (출력, 클라이언트 이름);
out.flush();
2. 새로운 입력 스트림 생성 SocketInputStream(socket, timeout)
3. .readShort() != DataTransferProtocol에서 반환 메시지를 결정 합니다.
4. 입력 스트림을 기반으로 체크섬을 생성합니다. DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. 첫 번째 청크의 위치를 읽습니다. long firstChunkOffset = in.readLong()
참고: 청크(4바이트)에 대한 체크섬을 계산하는 데 512바이트가 사용됩니다.
6. 다음으로 BlockReader의 읽기 메소드에서 특정 데이터를 읽습니다. result = readBuffer(buf, off, realLen)
7. 덩어리 단위로 읽기
int packetLen = in.readInt();
긴 offsetInBlock = in.readLong();
긴 순서 번호 = in.readLong();
부울 lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (in, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully (in, buf, offset, ChunkLen);
8. 데이터를 읽은 후 체크섬 확인 FSInputChecker.verifySum(chunkPos)