ในการกำหนดนี้คุณจะใช้ UDF (ฟังก์ชั่นที่ผู้ใช้กำหนด) แคชใน Apache Spark ซึ่งเป็นกรอบสำหรับการคำนวณแบบกระจายในแม่พิมพ์ของ MapReduce โครงการนี้จะแสดงให้เห็นถึงแนวคิดหลักในการประเมินผลข้อมูลและการประเมินแบบสอบถามและคุณจะได้รับประสบการณ์การปรับเปลี่ยนประกายไฟซึ่งใช้กันอย่างแพร่หลายในสนาม นอกจากนี้คุณจะได้รับการสัมผัสกับ Scala ซึ่งเป็นภาษาที่ใช้ JVM ซึ่งได้รับความนิยมในรูปแบบการทำงานที่สะอาด
วันครบกำหนดที่กำหนดจะเผยแพร่ที่เว็บไซต์ชั้นเรียน
คุณสามารถทำสิ่งนี้ เป็นคู่ได้ หากคุณเลือก สุดท้ายมีรหัสจำนวนมากในไดเรกทอรีนี้ โปรดดูที่นี่ที่นี่เพื่อค้นหาไดเรกทอรีที่รหัสอยู่
Spark เป็นระบบคอมพิวเตอร์แบบกระจายโอเพนซอร์ซที่เขียนใน Scala โครงการเริ่มต้นโดยปริญญาเอก นักเรียนจากแอมป์และเป็นส่วนสำคัญของสแต็กการวิเคราะห์ข้อมูลของ Berkeley (BDAs-ออกเสียงว่า "Bad-Ass")
เช่น Hadoop Mapreduce, Spark ได้รับการออกแบบมาเพื่อเรียกใช้ฟังก์ชั่นผ่านคอลเลกชันข้อมูลขนาดใหญ่โดยสนับสนุนชุดการดำเนินการประมวลผลข้อมูลระดับสูงที่เรียบง่ายคล้ายกับตัววนซ้ำที่เราได้เรียนรู้ในชั้นเรียน หนึ่งในการใช้งานที่พบบ่อยที่สุดของระบบดังกล่าวคือการใช้การประมวลผลแบบสอบถามแบบขนานในภาษาระดับสูงเช่น SQL ในความเป็นจริงความพยายามในการวิจัยและพัฒนาเมื่อเร็ว ๆ นี้ใน Spark ได้ก้าวไปสู่การสนับสนุนฐานข้อมูลเชิงสัมพันธ์เชิงสัมพันธ์ที่ปรับขนาดและโต้ตอบได้
เราจะใช้การปรับเปลี่ยนและศึกษาแง่มุมของ Spark ในชั้นเรียนนี้เพื่อทำความเข้าใจแนวคิดหลักของระบบข้อมูลที่ทันสมัย ที่สำคัญคุณจะเห็นว่าความคิดที่เราครอบคลุมในชั้นเรียน - บางส่วนมีอายุหลายสิบปี - ยังคงมีความเกี่ยวข้องมากในปัจจุบัน โดยเฉพาะเราจะเพิ่มคุณสมบัติลงใน Spark SQL
ข้อ จำกัด ที่สำคัญอย่างหนึ่งของ Spark SQL คือปัจจุบันเป็นระบบหลักเฉพาะหน่วยความจำ เป็นส่วนหนึ่งของชั้นเรียนนี้เราจะขยายมันเพื่อรวมอัลกอริทึมนอกศูนย์ด้วยเช่นกัน
Scala เป็นภาษาแบบคงที่ซึ่งรองรับกระบวนทัศน์การเขียนโปรแกรมที่แตกต่างกันมากมาย ความยืดหยุ่นพลังงานและการพกพาได้มีประโยชน์อย่างยิ่งในการวิจัยระบบกระจาย
สกาล่ามีลักษณะคล้ายกับจาวา แต่มันมีคุณสมบัติไวยากรณ์ที่กว้างขึ้นเพื่ออำนวยความสะดวกในหลาย ๆ กระบวนทัศน์ การรู้ว่า Java จะช่วยให้คุณเข้าใจรหัส Scala บางอย่าง แต่ไม่มากนักและไม่รู้ว่า Scala จะป้องกันไม่ให้คุณใช้ประโยชน์จากพลังที่แสดงออกอย่างเต็มที่ เนื่องจากคุณต้องเขียนโค้ดใน Scala เราขอแนะนำให้คุณได้รับอย่างน้อยความคุ้นเคยกับภาษา
Intellij Idea มีแนวโน้มที่จะเป็น IDE ที่ใช้กันมากที่สุดสำหรับการพัฒนาใน Spark Intellij เป็น java ide ที่มีปลั๊กอิน scala (และ vim!) นอกจากนี้ยังมีตัวเลือกอื่น ๆ เช่น Scala-ide
คุณอาจพบว่าบทเรียนต่อไปนี้มีประโยชน์:
ฟังก์ชั่นที่ผู้ใช้กำหนดช่วยให้นักพัฒนาสามารถกำหนดและใช้ประโยชน์จากการดำเนินการที่กำหนดเองภายในนิพจน์ ลองนึกภาพตัวอย่างเช่นคุณมีแคตตาล็อกผลิตภัณฑ์ที่มีรูปถ่ายของบรรจุภัณฑ์ผลิตภัณฑ์ คุณอาจต้องการลงทะเบียนฟังก์ชั่นที่ผู้ใช้กำหนด extract_text
ที่เรียกอัลกอริทึม OCR และส่งคืนข้อความในภาพเพื่อให้คุณสามารถรับข้อมูลที่สามารถสอบถามได้จากภาพถ่าย ใน SQL คุณสามารถจินตนาการแบบสอบถามเช่นนี้:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
ความสามารถในการลงทะเบียน UDFs นั้นมีประสิทธิภาพมาก - โดยพื้นฐานแล้วมันจะเปลี่ยนกรอบการประมวลผลข้อมูลของคุณเป็นกรอบการคำนวณทั่วไป แต่ UDFs มักจะแนะนำคอขวดประสิทธิภาพโดยเฉพาะอย่างยิ่งเมื่อเราเรียกใช้รายการข้อมูลหลายล้านรายการ
หากคอลัมน์อินพุตไปยัง UDF มีค่าที่ซ้ำกันจำนวนมากมันจะเป็นประโยชน์ในการปรับปรุงประสิทธิภาพโดยทำให้มั่นใจได้ว่า UDF นั้นเรียกว่าเพียงครั้งเดียวต่อ ค่าอินพุตที่แตกต่างกัน มากกว่าหนึ่งครั้งต่อ แถว (ตัวอย่างเช่นในตัวอย่างผลิตภัณฑ์ของเราด้านบนการกำหนดค่าที่แตกต่างกันทั้งหมดของพีซีเฉพาะอาจมีภาพเดียวกัน) ในการกำหนดนี้เราจะใช้การเพิ่มประสิทธิภาพนี้ เราจะนำไปใช้ในขั้นตอน-ก่อนอื่นให้ทำงานกับข้อมูลที่เหมาะกับหน่วยความจำและจากนั้นสำหรับชุดขนาดใหญ่ที่ต้องใช้วิธีนอกคอร์ เราจะใช้การแฮชภายนอกเป็นเทคนิคในการ "นัดพบ" แถวทั้งหมดที่มีค่าอินพุตเดียวกันสำหรับ UDF
หากคุณสนใจในหัวข้อบทความต่อไปนี้จะเป็นการอ่านที่น่าสนใจ (รวมถึงการเพิ่มประสิทธิภาพเพิ่มเติมนอกเหนือจากที่เรามีเวลาในการบ้านนี้):
รหัสทั้งหมดที่คุณจะได้รับจะอยู่ในสามไฟล์ - CS143Utils.scala
, basicOperators.scala
และ DiskHashedRelation.scala
อย่างไรก็ตามคุณอาจต้องปรึกษาไฟล์อื่น ๆ ภายใน Spark หรือ General Scala APIs เพื่อให้การมอบหมายเสร็จสมบูรณ์ โปรดตรวจสอบให้แน่ใจว่าคุณดูรหัส ทั้งหมด ที่ให้ไว้ในสามไฟล์ที่กล่าวถึงข้างต้นก่อนที่จะเริ่มเขียนโค้ดของคุณเอง มีฟังก์ชั่นที่มีประโยชน์มากมายใน CS143Utils.scala
เช่นเดียวกับใน DiskHashedRelation.scala
ที่จะช่วยคุณประหยัดเวลาและสาปแช่ง - ใช้ประโยชน์จากพวกเขา!
โดยทั่วไปเราได้กำหนดวิธีการที่คุณต้องการมากที่สุด (ถ้าไม่ใช่ทั้งหมด) ก่อนหน้านี้ในโครงการนี้คุณต้องเติมลงในโครงกระดูก จำนวนรหัสที่คุณจะเขียนไม่สูงมาก - โซลูชันพนักงานทั้งหมดน้อยกว่า 100 บรรทัดของรหัส (ไม่รวมการทดสอบ) อย่างไรก็ตามการรวมส่วนประกอบที่เหมาะสมเข้าด้วยกันอย่างมีประสิทธิภาพ (เช่นการไม่อ่านความสัมพันธ์ทั้งหมดในหน่วยความจำในครั้งเดียว) จะต้องใช้ความคิดและการวางแผนอย่างรอบคอบ
มีความแตกต่างที่อาจทำให้เกิดความสับสนระหว่างคำศัพท์ที่เราใช้ในชั้นเรียนและคำศัพท์ที่ใช้ในฐานรหัส SparksQL:
แนวคิด "ตัววนซ้ำ" ที่เราเรียนรู้ในการบรรยายเรียกว่า "โหนด" ในรหัส SparksQL - มีคำจำกัดความในรหัสสำหรับ UnaryNode และ BinaryNode แผนการสืบค้นเรียกว่า Sparkplan และในความเป็นจริง Unarynode และ BinaryNode ขยาย Sparkplan (หลังจากทั้งหมดตัววนซ้ำเดียวเป็นแผนสืบค้นขนาดเล็ก!) คุณอาจต้องการค้นหาไฟล์ SparkPlan.scala
ในแหล่งกำเนิด Sparksql เพื่อดู API สำหรับสิ่งเหล่านี้ โหนด
ในความคิดเห็นบางอย่างใน SparksQL พวกเขายังใช้คำว่า "ตัวดำเนินการ" เพื่อหมายถึง "โหนด" ไฟล์ basicOperators.scala
กำหนดจำนวนโหนดเฉพาะจำนวน (เช่นการเรียงลำดับที่แตกต่าง ฯลฯ )
อย่าสับสนกับ Iterator
ซ้ำสกาล่ากับแนวคิดตัววนซ้ำที่เรากล่าวถึงในการบรรยาย Iterator
ที่คุณจะใช้ในโครงการนี้เป็นคุณสมบัติภาษา Scala ที่คุณจะใช้ในการใช้โหนด SparksQl ของคุณ Iterator
ให้อินเทอร์เฟซกับคอลเลกชัน Scala ที่บังคับใช้ API เฉพาะ: ฟังก์ชั่น next
และ hasNext
git
และ github git
เป็นระบบ ควบคุมเวอร์ชัน ช่วยให้คุณติดตามรหัสเวอร์ชันที่แตกต่างกันซิงโครไนซ์ในเครื่องจักรที่แตกต่างกันและร่วมมือกับผู้อื่น GitHub เป็นเว็บไซต์ที่รองรับระบบนี้โดยโฮสต์เป็นบริการ
หากคุณไม่รู้จัก git
มากนักเรา ขอแนะนำ ให้คุณทำความคุ้นเคยกับระบบนี้ คุณจะใช้เวลากับมันมาก! มีคำแนะนำมากมายในการใช้ git
Online - นี่เป็นคำแนะนำที่ยอดเยี่ยมในการอ่าน
คุณควรตั้งค่าที่เก็บ ส่วนตัว ระยะไกลก่อน (เช่น Spark-Homework) GitHub ให้พื้นที่เก็บข้อมูลส่วนตัวแก่นักเรียน (แต่อาจใช้เวลาสักครู่) หากคุณไม่มีที่เก็บส่วนตัวให้คิดสองครั้งเกี่ยวกับการตรวจสอบในที่เก็บสาธารณะเพราะผู้อื่นจะใช้ Checheckout
$ cd ~
โคลนที่เก็บส่วนตัวของคุณ มันควรจะว่างเปล่า
$ git clone "https://github.com/xx/yy.git"
ป้อนที่เก็บโคลนติดตามที่เก็บหลักสูตรและโคลน
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
หมายเหตุ: โปรดอย่าถูกครอบงำด้วยจำนวนรหัสที่อยู่ที่นี่ Spark เป็นโครงการขนาดใหญ่ที่มีคุณสมบัติมากมาย รหัสที่เราจะสัมผัสจะอยู่ในไดเรกทอรีเฉพาะหนึ่งรายการ: SQL/Core/Src/Main/Scala/org/Apache/Spark/SQL/Execution/ การทดสอบทั้งหมดจะอยู่ใน SQL/Core/Src/Test/Scala/org/Apache/Spark/SQL/Execution/
ผลักโคลนไปยังที่เก็บส่วนตัวของคุณ
$ git push origin master
ทุกครั้งที่คุณเพิ่มรหัสคุณสามารถทำการแก้ไขที่เก็บระยะไกลได้
$ git commit -m 'update to homework'
$ git push origin master
อาจจำเป็นต้องได้รับการอัปเดตการมอบหมายของเรา (แม้ว่าเราจะพยายามปล่อยให้เป็น "สมบูรณ์แบบ" ที่สุดเท่าที่จะเป็นไปได้ในครั้งแรก) สมมติว่าคุณตั้งค่าการติดตามอย่างถูกต้องคุณสามารถเรียกใช้คำสั่งต่อไปนี้เพื่อรับการอัปเดตการมอบหมาย:
$ git pull course master
คำสั่ง UNIX ต่อไปนี้จะมีประโยชน์เมื่อคุณต้องการค้นหาตำแหน่งของไฟล์ ตัวอย่าง- ค้นหาตำแหน่งของไฟล์ชื่อ 'diskhashedrelation.scala' ในที่เก็บปัจจุบันของฉัน
$ find ./ -name 'DiskHashedRelation.scala'
เมื่อคุณมีรหัสดึงแล้ว cd
ลงใน {repo root}
และเรียกใช้ make compile
ครั้งแรกที่คุณเรียกใช้คำสั่งนี้ควรใช้เวลาสักครู่ - sbt
จะดาวน์โหลดการอ้างอิงทั้งหมดและรวบรวมรหัสทั้งหมดใน Spark (มีรหัสค่อนข้างน้อย) เมื่อคำสั่งชุดประกอบเริ่มต้นเสร็จสิ้นคุณสามารถเริ่มโครงการของคุณ! (การสร้างในอนาคตไม่ควรใช้เวลานาน - sbt
นั้นฉลาดพอที่จะคอมไพล์ไฟล์ที่เปลี่ยนแปลงได้อีกครั้งเว้นแต่คุณจะเรียก make clean
ซึ่งจะลบไฟล์คลาสที่รวบรวมทั้งหมด)
เราได้ให้รหัสโครงกระดูกแก่คุณสำหรับ DiskHashedRelation.scala
ไฟล์นี้มี 4 สิ่งสำคัญ:
trait DiskHashedRelation
กำหนดส่วนต่อประสาน diskhashedrelationclass GeneralDiskHashedRelation
เป็นการดำเนินการตามลักษณะของ DiskedHashedRelation
ของเราclass DiskPartition
แสดงถึงพาร์ติชันเดียวบนดิสก์object DiskHashedRelation
สามารถถือได้ว่าเป็นโรงงานวัตถุที่สร้าง GeneralDiskHashedRelation
SDiskPartition
และ GeneralDiskHashedRelation
ก่อนอื่นคุณจะต้องใช้เมธอด insert
, closeInput
และ getData
ใน DiskPartition
สำหรับส่วนนี้ สำหรับอดีตสองคนเอกสารควรให้คำอธิบายที่ครอบคลุมเกี่ยวกับสิ่งที่คุณต้องนำไปใช้ ข้อแม้กับ getData
คือคุณ ไม่สามารถ อ่านพาร์ติชันทั้งหมดลงในหน่วยความจำได้ในครั้งเดียว เหตุผลที่เราบังคับใช้ข้อ จำกัด นี้คือไม่มีวิธีที่ดีในการบังคับใช้การปลดปล่อยหน่วยความจำใน JVM และเมื่อคุณแปลงข้อมูลเป็นรูปแบบที่แตกต่างกันจะมีสำเนาหลายชุดอยู่รอบ ๆ เช่นนี้การมีสำเนาพาร์ติชันทั้งหมดหลายชุดจะทำให้สิ่งต่าง ๆ ถูกหกลงในดิสก์และจะทำให้เราทุกคนเศร้า แต่คุณควรสตรีม หนึ่งบล็อก ลงในหน่วยความจำในแต่ละครั้ง
ณ จุดนี้คุณควรผ่านการทดสอบใน DiskPartitionSuite.scala
object DiskHashedRelation
งานของคุณในส่วนนี้คือการใช้เฟส 1 ของการแฮชภายนอก-โดยใช้ฟังก์ชันแฮชแบบหยาบเพื่อสตรีมอินพุตลงในความสัมพันธ์หลายพาร์ติชันบนดิสก์ สำหรับวัตถุประสงค์ของเราวิธีการ hashCode
ที่ทุกวัตถุมีเพียงพอสำหรับการสร้างค่าแฮชและการใช้โมดูโลด้วยจำนวนพาร์ติชันเป็นฟังก์ชันแฮชที่ยอมรับได้
ณ จุดนี้คุณควรผ่านการทดสอบทั้งหมดใน DiskHashedRelationSuite.scala
ในส่วนนี้เราจะจัดการกับ case class CacheProject
ใน basicOperators.scala
คุณอาจสังเกตเห็นว่ามีรหัสเพียง 4 บรรทัดในชั้นเรียนนี้และที่สำคัญกว่านั้นคือไม่มี // IMPLEMENT ME
คุณไม่จำเป็นต้องเขียนรหัสใด ๆ ที่นี่ อย่างไรก็ตามหากคุณติดตามการเรียกใช้ฟังก์ชันในบรรทัดที่ 66 คุณจะพบว่ามีสองส่วนของสแต็กนี้คุณต้องใช้เพื่อให้มีการใช้งาน UDF ในหน่วยความจำในหน่วยงาน
CS143Utils
สำหรับงานนี้คุณจะต้องใช้ getUdfFromExpressions
และวิธีการ Iterator
ใน CachingIteratorGenerator#apply
โปรดอ่านเอกสาร - โดยเฉพาะอย่างยิ่งสำหรับ apply
- อย่างใกล้ชิดก่อนเริ่มต้น
หลังจากใช้วิธีการเหล่านี้คุณควรผ่านการทดสอบใน CS143UtilsSuite.scala
คำแนะนำ: คิดอย่างรอบคอบว่าทำไมวิธีการเหล่านี้อาจเป็นส่วนหนึ่งของ Utils
ตอนนี้เป็นช่วงเวลาแห่งความจริง! เราได้ใช้การแบ่งพาร์ติชันแฮชที่ใช้ดิสก์และเราได้ใช้การแคช UDF ในหน่วยความจำ-บางครั้งสิ่งที่เรียกว่าการบันทึกความทรงจำ การบันทึกความทรงจำเป็นเครื่องมือที่ทรงพลังมากในหลายบริบท แต่ที่นี่ในฐานข้อมูล-ดินแดนเราจัดการกับข้อมูลจำนวนมากเกินกว่าการบันทึกความทรงจำที่สามารถจัดการได้ หากเรามีค่าที่เป็นเอกลักษณ์มากกว่าที่จะพอดีกับแคชในหน่วยความจำประสิทธิภาพของเราจะลดลงอย่างรวดเร็ว ดังนั้นเราจึงถอยกลับไปที่ฐานข้อมูลที่ได้รับการยกย่องตามกาลเวลาของการแบ่งแยกและพิชิต หากข้อมูลของเราไม่พอดีกับหน่วยความจำเราสามารถแบ่งพาร์ติชันไปยังดิสก์ได้ครั้งเดียวอ่านพาร์ติชันหนึ่งครั้งในเวลา (คิดว่าทำไมงานนี้ (คำแนะนำ: Rendezvous!)) และทำการแคช UDF ประเมินหนึ่งพาร์ติชันในแต่ละครั้ง .
PartitionProject
งานสุดท้ายนี้ต้องการให้คุณกรอกในการใช้งาน PartitionProject
รหัสทั้งหมดที่คุณจะต้องเขียนอยู่ในเมธอด generateIterator
คิดอย่างรอบคอบเกี่ยวกับวิธีที่คุณต้องการจัดระเบียบการนำไปปฏิบัติ คุณ ไม่ ควรบัฟเฟอร์ข้อมูลทั้งหมดในหน่วยความจำหรืออะไรก็ตามที่คล้ายกับนั้น
ณ จุดนี้คุณควรผ่านการทดสอบ ทั้งหมด
ไม่มีรหัสที่คุณต้องเขียนที่นี่ แต่สำหรับการแก้ไขของคุณเองใช้เวลาคิดเกี่ยวกับคำถามต่อไปนี้:
หนึ่งในจุดขายหลักของ Spark คือ "ในหน่วยความจำ" สิ่งที่พวกเขาหมายถึงมีดังต่อไปนี้: เมื่อคุณสตริง Hadoop จำนวนหนึ่ง (หรือกรอบ MapReduce อื่น ๆ ) งานร่วมกัน Hadoop จะเขียนผลลัพธ์ของแต่ละเฟสลงบนดิสก์และอ่านอีกครั้งซึ่งมีราคาแพงมาก ในทางกลับกัน Spark เก็บข้อมูลไว้ในหน่วยความจำ อย่างไรก็ตามหากสมมติฐานของเราคือถ้าข้อมูลของเราไม่เหมาะสมในหน่วยความจำทำไม Spark SQL ไม่ได้จัดส่งด้วยการใช้ดิสก์โดยใช้ตัวดำเนินการเหล่านี้อยู่แล้ว ในแง่นี้ทำไม Spark ถึงแตกต่างจากฐานข้อมูลเชิงสัมพันธ์แบบขนาน "ดั้งเดิม" ที่เราเรียนรู้เกี่ยวกับในชั้นเรียน? ไม่มีคำตอบที่ถูกต้องสำหรับคำถามนี้!
เราได้ให้การทดสอบตัวอย่างใน DiskPartitionSuite.scala
, DiskHasedRelationSuite.scala
, CS143UtilsSuite.scala
และ ProjectSuite.scala
การทดสอบเหล่านี้สามารถแนะนำคุณในขณะที่คุณทำโครงการนี้ให้เสร็จสมบูรณ์ อย่างไรก็ตามโปรดทราบว่าพวกเขา ไม่ ครอบคลุมและคุณควรเขียนการทดสอบของคุณเองเพื่อจับข้อบกพร่อง หวังว่าคุณสามารถใช้การทดสอบเหล่านี้เป็นแบบจำลองเพื่อสร้างการทดสอบของคุณเอง
เพื่อที่จะเรียกใช้การทดสอบของเราเราได้จัดทำ makefile อย่างง่าย ในการเรียกใช้การทดสอบสำหรับงาน 1 ให้เรียกใช้ make t1
ตามลำดับสำหรับงานเรียกใช้ make t2
และเหมือนกันสำหรับการทดสอบอื่น ๆ ทั้งหมด make all
จะเรียกใช้การทดสอบทั้งหมด
ลิงค์การส่งจะถูกสร้างขึ้นบน CCLE ซึ่งคุณสามารถส่งรหัสของคุณภายในวันที่ครบกำหนด
ขอบคุณมากสำหรับ Matteo Interlandi
ขอให้โชคดี!