在此任務中,您將在Apache Spark中實現UDF(用戶定義的功能)結果緩存,這是在MapReduce模具中分佈式計算的框架。該項目將說明數據彙和查詢評估中的關鍵概念,並且您將獲得一些動手修改Spark的體驗,該體驗在該領域中廣泛使用。此外,您將獲得一種基於JVM的語言Scala,它以其乾淨的功能風格而越來越受歡迎。
任務截止日期發表在類網站上。
如果選擇,可以成對完成此操作。最後,此目錄中有很多代碼。請在此處查找代碼所在的目錄。
Spark是用Scala編寫的開源分佈式計算系統。該項目由博士學位發起。來自Amplab的學生,是伯克利數據分析堆棧(BDAS-發音為“壞蛋”)的組成部分。
像Hadoop MapReduce一樣,Spark旨在通過支持一組簡化的高級數據處理操作,類似於我們在課堂上學習的迭代器,可以在大量數據集上運行功能。這種系統的最常見用途之一是用高級語言(例如SQL)實現並行查詢處理。實際上,SPARK的許多最新研發工作都用於支持可擴展和互動的關係數據庫抽象。
我們將在此類中使用,修改和研究Spark的各個方面,以了解現代數據系統的關鍵概念。更重要的是,您會看到我們在課堂上涵蓋的想法(其中一些已經有幾十年的想法)今天仍然非常重要。具體來說,我們將添加功能為Spark SQL。
SPARK SQL的一個關鍵限制是它目前是一個僅使用內存的系統。作為此類課程的一部分,我們還將其擴展以包括一些核心外算法。
Scala是一種支持許多不同編程範式的靜態語言。它的靈活性,功率和可移植性在分佈式系統研究中變得特別有用。
Scala類似於Java,但它具有更廣泛的語法功能,可以促進多個範式。了解Java將幫助您了解一些Scala代碼,但並不多,並且不知道Scala會阻止您完全利用其表達能力。因為您必須在Scala中編寫代碼,所以我們強烈建議您至少獲得對語言的熟悉程度。
Intellij Idea往往是在Spark中開發的最常用的IDE。 Intellij是具有Scala(和VIM!)插件的Java IDE。還有其他選項,例如Scala-ide。
您可能會發現以下教程很有用:
用戶定義的功能使開發人員可以在表達式中定義和利用自定義操作。想像一下,例如,您擁有一個產品目錄,其中包括產品包裝的照片。您可能需要註冊一個用戶定義的函數extract_text
,該功能dectraxt調用OCR算法並在圖像中返回文本,以便您可以從照片中獲取可查詢信息。在SQL中,您可以想像這樣的查詢:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
註冊UDFS的能力非常強大 - 從本質上講,您的數據處理框架變成了一般的分佈式計算框架。但是UDF通常可以引入性能瓶頸,尤其是當我們運行數百萬個數據項時。
如果輸入列到UDF包含許多重複值,則通過確保僅調用UDF一次,每行一次只調用UDF一次,而不是每行一次。 (例如,在上面的產品示例中,特定PC的所有不同配置都可能具有相同的圖像。)在此作業中,我們將實現此優化。我們將分階段進行 - 首先使其用於適合內存的數據,然後再用於需要較大的方法的較大集合。我們將使用外部哈希作為技術來“集合”所有行的所有行,並具有相同的輸入值的UDF。
如果您對該主題感興趣,則以下論文將是一本有趣的閱讀(包括超出我們在此作業中有時間的其他優化):
您將要訪問的所有代碼都將在三個文件中 - CS143Utils.scala
, basicOperators.scala
和DiskHashedRelation.scala
。但是,您可能需要在Spark或Scala API中諮詢其他文件,以徹底完成作業。在開始編寫自己的代碼之前,請確保您查看上面提到的三個文件中提供的所有代碼。 CS143Utils.scala
以及DiskHashedRelation.scala
中有很多有用的功能,可以節省您很多時間和詛咒 - 利用它們!
通常,我們定義了您需要的大多數(如果不是全部)的方法。和以前一樣,在這個項目中,您需要填寫骨架。您將編寫的代碼量不是很高 - 總人員解決方案少於100行代碼(不包括測試)。但是,以內存有效的方式將正確的組件串在一起(即,不立即將整個關係讀取為記憶)將需要一些思考和仔細的計劃。
我們在課堂中使用的術語與SparkSQL代碼基礎中使用的術語之間存在一些可能令人困惑的差異:
我們在演講中學到的“迭代器”概念在SparkSQL代碼中稱為“節點” - 一元簡歷和二進制詞的代碼中有一些定義。一個查詢計劃稱為Spark Plans,實際上UnaryNode和BinaryNode擴展了Spark Pllan(畢竟,單個迭代器是一個小查詢計劃!)您可能需要在SparkSQL源中找到file SparkPlan.scala
,以查看這些api以查看這些api節點。
在SparkSQL中的某些評論中,它們還使用“操作員”一詞來表示“節點”。 file basicOperators.scala
定義了許多特定節點(例如排序,獨特等)。
不要將Scala接口Iterator
與我們在演講中介紹的迭代概念混淆。您將在此項目中使用的Iterator
是Scala語言功能,您將使用它來實現SparkSQL節點。 Iterator
為Scala集合提供了一個接口,該界面可以強制執行特定的API: next
和hasNext
函數。
git
和github git
是一個版本控制系統,可幫助您跟踪代碼的不同版本,在不同的機器上同步它們,並與其他機器協作。 GitHub是一個支持該系統的站點,將其作為服務託管。
如果您對git
了解不多,我們強烈建議您熟悉該系統;您將花費大量時間!在線使用git
的指南很多 - 這是一個很棒的閱讀指南。
您應該首先設置一個遠程私人存儲庫(例如,Spark-Homework)。 Github將私人存儲庫交給學生(但這可能需要一些時間)。如果您沒有私人存儲庫,請三思而後行,在公共存儲庫中檢查它,因為它可以讓其他人進行置換。
$ cd ~
克隆您的個人存儲庫。它應該是空的。
$ git clone "https://github.com/xx/yy.git"
輸入克隆的存儲庫,跟踪課程存儲庫並克隆。
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
注意:請不要對這裡的代碼數量不知所措。 Spark是一個具有許多功能的大型項目。我們將要觸摸的代碼將包含在一個特定目錄中:SQL/CORE/SRC/MAIN/SCALA/ORG/APACHE/SPARK/SQL/SQL/Execution/。測試將全部包含在SQL/Core/src/test/scala/org/apache/spark/spark/sql/eccution/execution/
將克隆推到您的個人存儲庫。
$ git push origin master
每次添加一些代碼時,都可以對遠程存儲庫進行修改。
$ git commit -m 'update to homework'
$ git push origin master
可能有必要接收我們的作業的更新(即使我們嘗試在第一次將它們釋放為“完美”)。假設您正確設置了跟踪,則可以簡單地運行以下命令接收分配更新:
$ git pull course master
當您需要找到文件的位置時,以下UNIX命令將派上用場。示例 - 在我當前的存儲庫中查找名為“ diskhashedrelation.scala”的文件的位置。
$ find ./ -name 'DiskHashedRelation.scala'
一旦將代碼cd
{repo root}
並運行make compile
。您第一次運行此命令時,應該花費一段時間 - sbt
將下載所有依賴項並在Spark中編譯所有代碼(有很多代碼)。初始組件命令完成後,您可以啟動項目! (未來的構建不應花費很長時間 - sbt
足夠聰明,只能重新編譯更改的文件,除非您運行make clean
,否則將刪除所有編譯的類文件。)
我們為您提供了用於DiskHashedRelation.scala
的骨架代碼。該文件有4個重要的事情:
trait DiskHashedRelation
定義了磁盤界面class GeneralDiskHashedRelation
是我們實施DiskedHashedRelation
特徵class DiskPartition
代表磁盤上的一個分區object DiskHashedRelation
視為構建GeneralDiskHashedRelation
s的對象工廠DiskPartition
和GeneralDiskHashedRelation
首先,您將需要在此部分中實現DiskPartition
中的insert
, closeInput
和getData
方法。對於前兩個,docstrings應該對您必須實施的內容進行全面描述。帶有getData
的警告是,您無法一次將整個分區讀取到內存中。我們執行此限制的原因是,沒有一個好方法可以在JVM中執行自由記憶,並且當您將數據轉換為不同的形式時,周圍會有多個副本。因此,擁有多個副本的整個分區會導致磁盤溢出,並使我們所有人都很難過。相反,您應該一次將一個塊流入內存中。
此時,您應該在DiskPartitionSuite.scala
中通過測試。
object DiskHashedRelation
您在本部分中的任務將是實現外部哈希的第1階段 - 使用粗粒的哈希功能將輸入傳輸到磁盤上的多個分區關係中。出於我們的目的,每個對象具有的hashCode
方法足以生成哈希值,而按分區的數量取模量是可以接受的哈希函數。
在這一點上,您應該在DiskHashedRelationSuite.scala
中通過所有測試。
在本節中,我們將在basicOperators.scala
中處理case class CacheProject
。您可能會注意到,此類中只有4行代碼,更重要的是,沒有// IMPLEMENT ME
。您實際上不必在此處編寫任何代碼。但是,如果您在第66行中跟踪函數調用,您會發現必須實現此堆棧的兩個部分才能具有功能性的內存UDF實現。
CS143Utils
方法對於此任務,您需要在CachingIteratorGenerator#apply
中實現getUdfFromExpressions
和Iterator
方法。請在入門之前密切閱讀docstrings - 尤其是apply
。
實施這些方法後,您應該在CS143UtilsSuite.scala
中通過測試。
提示:仔細考慮為什麼這些方法可能是UTIT的一部分
現在是真理的時刻!我們已經實施了基於磁盤的哈希分區,並且已經實現了內存UDF緩存 - 有時稱為備忘錄。在許多情況下,紀念活動是非常強大的工具,但是在數據庫領域,我們處理的數據量超過了記憶的處理。如果我們的獨特值比在內存中的緩存中更具獨特的值,我們的性能將迅速降低。因此,我們回到了悠久的劃分和爭議的數據庫傳統。如果我們的數據不適合內存,那麼我們可以將其劃分為磁盤一次,一次讀取一個分區(請考慮為什麼起作用(提示:rendezvous!)),然後執行UDF緩存,一次評估一個分區。
PartitionProject
最終任務要求您填寫PartitionProject
的實現。您需要編寫的所有代碼都在generateIterator
方法中。仔細考慮如何組織實施。您不應在內存中緩衝所有數據或類似的數據。
在這一點上,您應該通過所有給定的測試。
您沒有必須在這裡編寫的代碼,但是為了自己的教育,請花一些時間思考以下問題:
Spark的主要賣點之一是它是“內存”。他們的意思是以下內容:當您將許多Hadoop(或任何其他MapReduce框架)作業串起時,Hadoop會寫下每個階段的結果以磁盤並再次閱讀它們,這非常昂貴;另一方面,Spark將其數據保存在內存中。但是,如果我們的假設是,如果我們的數據不適合內存,那麼為什麼SPARK SQL不使用基於磁盤的實現這些運算符的實現?在這方面,為什麼Spark與我們在課堂上學習的“傳統”並行關係數據庫不同?這個問題沒有正確的答案!
我們為您提供了一些示例測試DiskHasedRelationSuite.scala
DiskPartitionSuite.scala
CS143UtilsSuite.scala
and ProjectSuite.scala
。完成此項目時,這些測試可以指導您。但是,請記住,它們並不全面,建議您編寫自己的測試以捕捉錯誤。希望您可以將這些測試用作模型來生成自己的測試。
為了進行測試,我們提供了一個簡單的makefile。為了運行任務1的測試,請運行make t1
。相應的任務,運行make t2
,並且所有其他測試都相同。 make all
將運行所有測試。
提交鏈接將在CCLE上創建,您可以在此之前提交代碼。
非常感謝Matteo Interlandi。
祝你好運!