この割り当てでは、UDF(ユーザー定義関数)結果キャッシュを実装します。これは、MapReduceの金型で分散コンピューティングのフレームワークです。このプロジェクトでは、データランデブーとクエリの評価における重要な概念を説明し、この分野で広く使用されているSparkを修正する実践的なエクスペリエンスを得ることができます。さらに、JVMベースの言語であるScalaに露出が得られ、そのクリーンな機能スタイルで人気を博しています。
割り当て期日は、クラスのWebサイトで公開されます。
選択した場合は、これをペアで完了できます。最後に、このディレクトリには多くのコードがあります。ここをご覧ください。コードがあるディレクトリを見つけてください。
Sparkは、Scalaで書かれたオープンソース分散コンピューティングシステムです。このプロジェクトは博士号によって開始されました。 Amplabの学生は、Berkeley Data Analytics Stack(BDAS-Affectionallyと発音される「Bad-Ass」)の不可欠な部分です。
Hadoop MapReduceと同様に、Sparkは、クラスで学んでいる反復ターに似た高レベルのデータ処理操作の単純化されたセットをサポートすることにより、大量のデータコレクションに対して関数を実行するように設計されています。このようなシステムの最も一般的な用途の1つは、SQLなどの高レベル言語で並列クエリ処理を実装することです。実際、Sparkにおける最近の多くの研究開発努力は、スケーラブルでインタラクティブなリレーショナルデータベースの抽象化をサポートすることになりました。
このクラスでSparkの側面を使用、変更、および研究して、最新のデータシステムの重要な概念を理解します。さらに重要なことは、私たちがクラスでカバーしているアイデア(何十年も前のもの)が今日でも非常に関連性があることがわかります。具体的には、Spark SQLに機能を追加します。
Spark SQLの重要な制限の1つは、現在メインメモリのみのシステムであることです。このクラスの一部として、コア外のアルゴリズムも含めるように拡張します。
Scalaは、さまざまなプログラミングパラダイムをサポートする静的なタイプの言語です。その柔軟性、パワー、および携帯性は、分散型システムの研究で特に有用になっています。
ScalaはJavaに似ていますが、複数のパラダイムを容易にするために、はるかに広範な構文機能セットを備えています。 Javaを知ることは、Scalaコードを理解するのに役立ちますが、それほど多くはありません。Scalaを知らないことで、その表現力を完全に活用することができません。 Scalaでコードを書く必要があるため、少なくとも言語に精通していることを強くお勧めします。
Intellijのアイデアは、Sparkで発達するために最も一般的に使用されるIDEである傾向があります。 Intellijは、Scala(およびVim!)プラグインを備えたJava Ideです。 Scala-Ideなどの他のオプションもあります。
次のチュートリアルが役立つ場合があります。
ユーザー定義の機能により、開発者は表現内でカスタム操作を定義および悪用することができます。たとえば、製品パッケージの写真を含む製品カタログがあることを想像してください。写真からクエリ情報を取得できるように、OCRアルゴリズムを呼び出してテキストを画像に返すユーザー定義の関数extract_text
を登録することをお勧めします。 SQLでは、次のようなクエリを想像できます。
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
UDFSを登録する機能は非常に強力です。基本的に、データ処理フレームワークを一般的な分散コンピューティングフレームワークに変えます。しかし、UDFは、特に何百万ものデータ項目を介して実行するため、パフォーマンスのボトルネックを導入することがよくあります。
UDFへの入力列に多くの重複値が含まれている場合、UDFが行ごとに1回ではなく、異なる入力値ごとに1回のみ呼び出されるようにすることで、パフォーマンスを改善することが有益です。 (たとえば、上記の製品の例では、特定のPCのすべての異なる構成が同じ画像を持っている可能性があります。)この割り当てでは、この最適化を実装します。私たちはそれを段階的に採用します - 最初にメモリに収まるデータで動作し、次にコア外アプローチを必要とするより大きなセットには後で動作します。外部ハッシュを手法として使用して、UDFの同じ入力値を持つすべての行を「ランデブー」します。
トピックに興味がある場合、次の論文は興味深い読み物になります(この宿題に時間があるものを超えて追加の最適化を含む):
接触するすべてのコードは、 CS143Utils.scala
、 basicOperators.scala
、およびDiskHashedRelation.scala
の3つのファイルにあります。ただし、割り当てを徹底的に完了するには、Sparkまたは一般的なScala API内の他のファイルを参照する必要がある場合があります。独自のコードを書き始める前に、上記の3つのファイルのすべての提供されたコードを確認してください。 CS143Utils.scala
とDiskHashedRelation.scala
には、多くの便利な機能があります。
一般に、必要な方法のほとんど(すべてではないにしても)を定義しました。前と同様に、このプロジェクトでは、スケルトンを埋める必要があります。書くコードの量はそれほど高くありません。スタッフの合計ソリューションは、100行のコード(テストを含めない)未満です。ただし、適切なコンポーネントをメモリ効率の高い方法で縛り付ける(つまり、一度にメモリへの関係全体を読み取らない)には、ある程度の思考と慎重な計画が必要です。
クラスで使用する用語と、SparksQLコードベースで使用される用語との間には、潜在的に混乱する違いがあります。
講義で学んだ「イテレーター」の概念は、SparksQLコードの「ノード」と呼ばれます。UnarynodeとBinaryNodeのコードに定義があります。クエリプランはスパークプランと呼ばれ、実際にはunarynodeとbinaryNode拡張スパークプラン(結局、単一のイテレーターは小さなクエリプランです!)Sparkplan.scalaを見つけて、これらのAPIを表示してSparksqlソースでSparkPlan.scala
見つけたいと思うかもしれません。ノード。
Sparksqlのコメントのいくつかでは、「演算子」という用語を使用して「ノード」を意味します。ファイルbasicOperators.scala
、多くの特定のノード(並べ替え、異なるなど)を定義します。
Scala Interface Iterator
講義で取り上げたIteratorの概念と混同しないでください。このプロジェクトで使用するIterator
は、SparkSQLノードを実装するために使用するScala言語機能です。 Iterator
、特定のAPIを強制するSCALAコレクションへのインターフェイスを提供します: next
APIとhasNext
関数。
git
とgithub git
はバージョン制御システムであり、さまざまなバージョンのコードを追跡し、異なるマシンでそれらを同期させ、他のマシンと協力するのに役立ちます。 GitHubは、このシステムをサポートし、サービスとしてホストするサイトです。
git
についてあまり知らない場合は、このシステムに慣れることを強くお勧めします。あなたはそれで多くの時間を費やすでしょう!オンラインでgit
を使用するためのガイドはたくさんあります - ここに読むのに最適なガイドがあります。
最初にリモートプライベートリポジトリ(スパークホームワークなど)を設定する必要があります。 GitHubは学生にプライベートリポジトリを提供します(ただし、これには時間がかかる場合があります)。プライベートリポジトリをお持ちでない場合は、他の人がチェックアウトするために利用できるようになるため、公開リポジトリでチェックすることをよく考えてください。
$ cd ~
個人リポジトリをクローンします。空になるはずです。
$ git clone "https://github.com/xx/yy.git"
クローンリポジトリを入力し、コースリポジトリを追跡してクローンします。
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
注:ここにあるコードの量に圧倒されないでください。 Sparkは、多くの機能を備えた大きなプロジェクトです。接触するコードは、1つの特定のディレクトリ(SQL/Core/SRC/Main/Scala/org/apache/spark/sql/execution/)に含まれます。テストはすべて、SQL/CORE/SRC/TEST/SCALA/ORG/APACHE/SPARK/SQL/実行/に含まれます。
クローンを個人リポジトリに押します。
$ git push origin master
コードを追加するたびに、リモートリポジトリの変更をコミットできます。
$ git commit -m 'update to homework'
$ git push origin master
割り当ての更新を受信する必要がある場合があります(可能な限り「完全に」リリースしようとする場合でも)。追跡を正しく設定したと仮定すると、この次のコマンドを実行して割り当ての更新を受信するだけです。
$ git pull course master
ファイルの場所を見つける必要がある場合、次のUNIXコマンドが役立ちます。例 - 現在のリポジトリで「diskhashedrelation.scala」という名前のファイルの場所を見つけます。
$ find ./ -name 'DiskHashedRelation.scala'
コードを引いたら、 cd
を{repo root}
にcdし、 make compile
。このコマンドを初めて実行したときは、しばらく時間がかかります。SBT sbt
すべての依存関係をダウンロードし、Sparkのすべてのコードをコンパイルします(かなりのコードがあります)。最初のアセンブリコマンドが終了したら、プロジェクトを開始できます! (将来のビルドにはこれほど長くかかるべきではありません。SBT sbt
、 make clean
を実行しない限り、変更されたファイルのみを再コンパイルするのに十分なほどスマートです。
DiskHashedRelation.scala
用のスケルトンコードを提供しました。このファイルには4つの重要なことがあります。
trait DiskHashedRelation
、ディスカシュドレレーションインターフェイスを定義しますclass GeneralDiskHashedRelation
は、 DiskedHashedRelation
の特性の実装ですclass DiskPartition
ディスク上の単一のパーティションを表しますobject DiskHashedRelation
GeneralDiskHashedRelation
を構築するオブジェクト工場と考えることができますDiskPartition
とGeneralDiskHashedRelation
実装まず、このパートのDiskPartition
にinsert
、 closeInput
、およびgetData
メソッドを実装する必要があります。前者の2つについては、Docstringsは、実装しなければならないものの包括的な説明を提供する必要があります。 getData
の注意点は、パーティション全体を1回メモリに読み込むことができないことです。この制限を実施している理由は、JVMで自由化メモリを強制する良い方法がないため、データをさまざまなフォームに変換すると、複数のコピーが横たわっているからです。そのため、パーティション全体の複数のコピーを使用すると、物事がディスクにこぼれ、私たち全員が悲しくなります。代わりに、一度に1つのブロックをメモリにストリーミングする必要があります。
この時点で、 DiskPartitionSuite.scala
でテストに合格するはずです。
object DiskHashedRelation
実装この部分のタスクは、外部ハッシュのフェーズ1を実装することです。粗粒ハッシュ関数を使用して、ディスク上の複数のパーティション関係に入力をストリーミングします。私たちの目的のために、すべてのオブジェクトが持っているhashCode
法はハッシュ値を生成するのに十分であり、moduloをパーティションの数だけに取ることは許容可能なハッシュ関数です。
この時点で、 DiskHashedRelationSuite.scala
のすべてのテストに合格するはずです。
このセクションでは、 basicOperators.scala
のcase class CacheProject
を扱います。このクラスには4行のコードしかないことに気付くかもしれませんが、さらに重要なことには、 // IMPLEMENT ME
。ここに実際にコードを書く必要はありません。ただし、66行目で関数呼び出しをトレースすると、機能的なメモリのUDF実装を行うために実装する必要があるこのスタックの2つの部分があることがわかります。
CS143Utils
メソッドの実装このタスクでは、 CachingIteratorGenerator#apply
にgetUdfFromExpressions
とIterator
メソッドを実装する必要があります。開始する前に、特にapply
のためにドキュメントを読んでください。
これらのメソッドを実装した後、 CS143UtilsSuite.scala
でテストに合格する必要があります。
ヒント:これらの方法がUTILSの一部である理由について慎重に考えてください
今、真実の瞬間が来ます!ディスクベースのハッシュパーティション化を実装し、メモリ内のUDFキャッシュを実装しました。これは、メモ化と呼ばれることもあります。メモは多くのコンテキストで非常に強力なツールですが、ここではデータベースランドでは、メモが処理できるよりも多くのデータを扱います。メモリ内キャッシュに収まるよりも一意の値がある場合、パフォーマンスは急速に低下します。したがって、私たちは、分裂と征服の昔ながらのデータベースの伝統に頼ります。データがメモリに収まらない場合は、ディスクに一度ディスクに分割し、一度に1つのパーティションを読んでください(これが機能する理由(ヒント:rendezvous!))を実行し、UDFキャッシュを実行し、一度に1つのパーティションを評価します。 。
PartitionProject
の実装この最後のタスクでは、 PartitionProject
の実装に記入する必要があります。記述する必要があるコードはすべて、 generateIterator
メソッドです。実装を整理する必要がある方法について慎重に考えてください。メモリ内のすべてのデータやそれに似たものをバッファリングしないでください。
この時点で、与えられたすべてのテストに合格する必要があります。
ここに書く必要があるコードはありませんが、あなた自身の啓発のために、次の質問について考えるのに時間を費やしてください。
Sparkの主なセールスポイントの1つは、「インメモリ」であることです。それらが意味するのは次のとおりです。多くのHadoop(またはその他のMapReduceフレームワーク)のジョブを一緒に編成すると、Hadoopは各フェーズの結果をディスクに書き、非常に高価なもので再び読み取ります。一方、Sparkはデータをメモリに維持します。ただし、データがメモリに適合しない場合、Spark SQLがこれらの演算子のディスクベースの実装を既に出荷しないのはなぜですか?この点で、なぜSparkがクラスで学んだ「従来の」並列リレーショナルデータベースと異なるのですか?この質問に対する正しい答えはありません!
DiskPartitionSuite.scala
、 DiskHasedRelationSuite.scala
、 CS143UtilsSuite.scala
およびProjectSuite.scala
でいくつかのサンプルテストを提供しました。これらのテストは、このプロジェクトを完了するときにガイドできます。ただし、それらは包括的ではないことに留意してください。バグをキャッチするために独自のテストを書くことをお勧めします。うまくいけば、これらのテストをモデルとして使用して、独自のテストを生成できることを願っています。
テストを実行するために、シンプルなメイクファイルを提供しました。タスク1のテストを実行するには、 make t1
実行します。それに応じてタスクについては、 make t2
、他のすべてのテストで同じものを実行します。 make all
すべてのテストを実行します。
送信リンクはCCLEで作成され、コードを期日までに送信できます。
Matteo Interlandiに感謝します。
幸運を!