تحلل هذه المقالة بشكل أساسي عملية كتلة القراءة والكتابة لعميل Hadoop بالإضافة إلى بروتوكول الاتصال بين العميل وdatanode وتنسيق تدفق البيانات وما إلى ذلك.
يتواصل عميل hadoop مع namenode من خلال بروتوكول RPC، لكن العميل لا يستخدم RPC للتواصل مع datanode، ولكنه يستخدم المقبس مباشرةً. تختلف أيضًا بروتوكولات القراءة والكتابة في هذه المقالة بتحليل hadoop الإصدار 0.20.2. الإصدار 0.19 هو نفسه) مبدأ وبروتوكول الاتصال للاتصال بين العميل وdatanode، ويجب أيضًا التأكيد على أن بروتوكول الاتصال بين العميل وdatanode قد تغير في الإصدار 0.23 والإصدارات الأحدث، ويتم استخدام protobuf كطريقة تسلسل.
كتلة الكتابة
1. يطلب العميل أولاً من namenode إنشاء ملف من خلال namenode.create، ثم يبدأ سلسلة dataStreamer.
2. يتضمن العميل ثلاثة سلاسل رسائل، ويكون الخيط الرئيسي مسؤولاً عن قراءة البيانات المحلية في الذاكرة، وتغليفها في كائن حزمة، ووضعها في قائمة انتظار البيانات.
3. يكتشف مؤشر ترابط dataStreamer ما إذا كانت قائمة الانتظار dataQueue تحتوي على حزمة. إذا كان الأمر كذلك، فإنه يقوم أولاً بإنشاء كائن BlockOutPutStream (يتم إنشاء الكتلة مرة واحدة، وقد تتضمن الكتلة حزمًا متعددة عند إنشائها، وتتواصل مع عقدة البيانات المقابلة). معلومات DATA_TRANSFER_HEADER والحصول على الإرجاع، ثم قم بإنشاء مؤشر ترابط ResponseProcessor، وهو المسؤول عن تلقي معلومات تأكيد ACK التي يتم إرجاعها بواسطة datanode ومعالجة الأخطاء.
4. يقوم dataStreamer بإخراج كائن الحزمة من dataQueue ويرسله إلى datanode، ثم يستمر في التكرار لتحديد ما إذا كانت dataQueue تحتوي على بيانات...
يوضح الشكل أدناه عملية كتلة الكتابة.
الشكل أدناه هو شكل الرسالة
قراءة الكتلة
يتم تطبيقه بشكل رئيسي في فئة BlockReader.
عند تهيئة newBlockReader،
1. قم بإنشاء SwitchOutputStream(socket, timeout) جديد عن طريق تمرير معلمة sock، ثم اكتب معلومات الاتصال، والتي تختلف عن كتابة رأس الكتلة.
//اكتب الرأس.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong(blockId);
out.writeLong(genStamp);
out.writeLong(startOffset);
out.writeLong(len);
Text.writeString (خارج، اسم العميل)؛
out.flush();
2. قم بإنشاء دفق الإدخال الجديد JackInputStream(socket, timeout)
3. تحديد رسالة الإرجاع in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS
4. قم بإنشاء مجموع اختباري بناءً على دفق الإدخال: المجموع الاختباري DataChecksum = DataChecksum.newDataChecksum( in )
5. اقرأ موضع القطعة الأولى: long firstChunkOffset = in.readLong()
ملاحظة: يتم استخدام 512 بايت لحساب المجموع الاختباري للقطعة (4 بايت)
6. بعد ذلك، اقرأ البيانات المحددة في طريقة القراءة الخاصة بـ BlockReader: result = readBuffer(buf, off, realLen)
7. اقرأ قطعة قطعة
int packetLen = in.readInt();
long OffsetInBlock = in.readLong();
long seqno = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully (في، checksumBytes.array()، 0،
checksumBytes.limit());
IOUtils.readFully (in، buf، offset، ChunkLen)؛
8. التحقق من المجموع الاختباري بعد قراءة البيانات؛ FSInputChecker.verifySum(chunkPos)