在此任务中,您将在Apache Spark中实现UDF(用户定义的功能)结果缓存,这是在MapReduce模具中分布式计算的框架。该项目将说明数据汇和查询评估中的关键概念,并且您将获得一些动手修改Spark的体验,该体验在该领域中广泛使用。此外,您将获得一种基于JVM的语言Scala,它以其干净的功能风格而越来越受欢迎。
任务截止日期发表在类网站上。
如果选择,可以成对完成此操作。最后,此目录中有很多代码。请在此处查找代码所在的目录。
Spark是用Scala编写的开源分布式计算系统。该项目由博士学位发起。来自Amplab的学生,是伯克利数据分析堆栈(BDAS-发音为“坏蛋”)的组成部分。
像Hadoop MapReduce一样,Spark旨在通过支持一组简化的高级数据处理操作,类似于我们在课堂上学习的迭代器,可以在大量数据集上运行功能。这种系统的最常见用途之一是用高级语言(例如SQL)实现并行查询处理。实际上,SPARK的许多最新研发工作都用于支持可扩展和互动的关系数据库抽象。
我们将在此类中使用,修改和研究Spark的各个方面,以了解现代数据系统的关键概念。更重要的是,您会看到我们在课堂上涵盖的想法(其中一些已经有几十年的想法)今天仍然非常重要。具体来说,我们将添加功能为Spark SQL。
SPARK SQL的一个关键限制是它目前是一个仅使用内存的系统。作为此类课程的一部分,我们还将其扩展以包括一些核心外算法。
Scala是一种支持许多不同编程范式的静态语言。它的灵活性,功率和可移植性在分布式系统研究中变得特别有用。
Scala类似于Java,但它具有更广泛的语法功能,可以促进多个范式。了解Java将帮助您了解一些Scala代码,但并不多,并且不知道Scala会阻止您完全利用其表达能力。因为您必须在Scala中编写代码,所以我们强烈建议您至少获得对语言的熟悉程度。
Intellij Idea往往是在Spark中开发的最常用的IDE。 Intellij是具有Scala(和VIM!)插件的Java IDE。还有其他选项,例如Scala-ide。
您可能会发现以下教程很有用:
用户定义的功能使开发人员可以在表达式中定义和利用自定义操作。想象一下,例如,您拥有一个产品目录,其中包括产品包装的照片。您可能需要注册一个用户定义的函数extract_text
,该功能dectraxt调用OCR算法并在图像中返回文本,以便您可以从照片中获取可查询信息。在SQL中,您可以想象这样的查询:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
注册UDFS的能力非常强大 - 从本质上讲,您的数据处理框架变成了一般的分布式计算框架。但是UDF通常可以引入性能瓶颈,尤其是当我们运行数百万个数据项时。
如果输入列到UDF包含许多重复值,则通过确保仅调用UDF一次,每行一次只调用UDF一次,而不是每行一次。 (例如,在上面的产品示例中,特定PC的所有不同配置都可能具有相同的图像。)在此作业中,我们将实现此优化。我们将分阶段进行 - 首先使其用于适合内存的数据,然后再用于需要较大的方法的较大集合。我们将使用外部哈希作为技术来“集合”所有行的所有行,并具有相同的输入值的UDF。
如果您对该主题感兴趣,则以下论文将是一本有趣的阅读(包括超出我们在此作业中有时间的其他优化):
您将要访问的所有代码都将在三个文件中 - CS143Utils.scala
, basicOperators.scala
和DiskHashedRelation.scala
。但是,您可能需要在Spark或Scala API中咨询其他文件,以彻底完成作业。在开始编写自己的代码之前,请确保您查看上面提到的三个文件中提供的所有代码。 CS143Utils.scala
以及DiskHashedRelation.scala
中有很多有用的功能,可以节省您很多时间和诅咒 - 利用它们!
通常,我们定义了您需要的大多数(如果不是全部)的方法。和以前一样,在这个项目中,您需要填写骨架。您将编写的代码量不是很高 - 总人员解决方案少于100行代码(不包括测试)。但是,以内存有效的方式将正确的组件串在一起(即,不立即将整个关系读取为记忆)将需要一些思考和仔细的计划。
我们在课堂中使用的术语与SparkSQL代码基础中使用的术语之间存在一些可能令人困惑的差异:
我们在演讲中学到的“迭代器”概念在SparkSQL代码中称为“节点” - 一元简历和二进制词的代码中有一些定义。一个查询计划称为Spark Plans,实际上UnaryNode和BinaryNode扩展了Spark Pllan(毕竟,单个迭代器是一个小查询计划!)您可能需要在SparkSQL源中找到file SparkPlan.scala
,以查看这些api以查看这些api节点。
在SparkSQL中的某些评论中,它们还使用“操作员”一词来表示“节点”。 file basicOperators.scala
定义了许多特定节点(例如排序,独特等)。
不要将Scala接口Iterator
与我们在演讲中介绍的迭代概念混淆。您将在此项目中使用的Iterator
是Scala语言功能,您将使用它来实现SparkSQL节点。 Iterator
为Scala集合提供了一个接口,该界面可以强制执行特定的API: next
和hasNext
函数。
git
和github git
是一个版本控制系统,可帮助您跟踪代码的不同版本,在不同的机器上同步它们,并与其他机器协作。 GitHub是一个支持该系统的站点,将其作为服务托管。
如果您对git
了解不多,我们强烈建议您熟悉该系统;您将花费大量时间!在线使用git
的指南很多 - 这是一个很棒的阅读指南。
您应该首先设置一个远程私人存储库(例如,Spark-Homework)。 Github将私人存储库交给学生(但这可能需要一些时间)。如果您没有私人存储库,请三思而后行,在公共存储库中检查它,因为它可以让其他人进行置换。
$ cd ~
克隆您的个人存储库。它应该是空的。
$ git clone "https://github.com/xx/yy.git"
输入克隆的存储库,跟踪课程存储库并克隆。
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
注意:请不要对这里的代码数量不知所措。 Spark是一个具有许多功能的大型项目。我们将要触摸的代码将包含在一个特定目录中:SQL/CORE/SRC/MAIN/SCALA/ORG/APACHE/SPARK/SQL/SQL/Execution/。测试将全部包含在SQL/Core/src/test/scala/org/apache/spark/spark/sql/eccution/execution/
将克隆推到您的个人存储库。
$ git push origin master
每次添加一些代码时,都可以对远程存储库进行修改。
$ git commit -m 'update to homework'
$ git push origin master
可能有必要接收我们的作业的更新(即使我们尝试在第一次将它们释放为“完美”)。假设您正确设置了跟踪,则可以简单地运行以下命令接收分配更新:
$ git pull course master
当您需要找到文件的位置时,以下UNIX命令将派上用场。示例 - 在我当前的存储库中查找名为“ diskhashedrelation.scala”的文件的位置。
$ find ./ -name 'DiskHashedRelation.scala'
一旦将代码cd
{repo root}
并运行make compile
。您第一次运行此命令时,应该花费一段时间 - sbt
将下载所有依赖项并在Spark中编译所有代码(有很多代码)。初始组件命令完成后,您可以启动项目! (未来的构建不应花费很长时间 - sbt
足够聪明,只能重新编译更改的文件,除非您运行make clean
,否则将删除所有编译的类文件。)
我们为您提供了用于DiskHashedRelation.scala
的骨架代码。该文件有4个重要的事情:
trait DiskHashedRelation
定义了磁盘界面class GeneralDiskHashedRelation
是我们实施DiskedHashedRelation
特征class DiskPartition
代表磁盘上的一个分区object DiskHashedRelation
视为构建GeneralDiskHashedRelation
s的对象工厂DiskPartition
和GeneralDiskHashedRelation
首先,您将需要在此部分中实现DiskPartition
中的insert
, closeInput
和getData
方法。对于前两个,docstrings应该对您必须实施的内容进行全面描述。带有getData
的警告是,您无法一次将整个分区读取到内存中。我们执行此限制的原因是,没有一个好方法可以在JVM中执行自由记忆,并且当您将数据转换为不同的形式时,周围会有多个副本。因此,拥有多个副本的整个分区会导致磁盘溢出,并使我们所有人都很难过。相反,您应该一次将一个块流入内存中。
此时,您应该在DiskPartitionSuite.scala
中通过测试。
object DiskHashedRelation
您在本部分中的任务将是实现外部哈希的第1阶段 - 使用粗粒的哈希功能将输入传输到磁盘上的多个分区关系中。出于我们的目的,每个对象具有的hashCode
方法足以生成哈希值,而按分区的数量取模量是可以接受的哈希函数。
在这一点上,您应该在DiskHashedRelationSuite.scala
中通过所有测试。
在本节中,我们将在basicOperators.scala
中处理case class CacheProject
。您可能会注意到,此类中只有4行代码,更重要的是,没有// IMPLEMENT ME
。您实际上不必在此处编写任何代码。但是,如果您在第66行中跟踪函数调用,您会发现必须实现此堆栈的两个部分才能具有功能性的内存UDF实现。
CS143Utils
方法对于此任务,您需要在CachingIteratorGenerator#apply
中实现getUdfFromExpressions
和Iterator
方法。请在入门之前密切阅读docstrings - 尤其是apply
。
实施这些方法后,您应该在CS143UtilsSuite.scala
中通过测试。
提示:仔细考虑为什么这些方法可能是UTIT的一部分
现在是真理的时刻!我们已经实施了基于磁盘的哈希分区,并且已经实现了内存UDF缓存 - 有时称为备忘录。在许多情况下,纪念活动是非常强大的工具,但是在数据库领域,我们处理的数据量超过了记忆的处理。如果我们的独特值比在内存中的缓存中更具独特的值,我们的性能将迅速降低。因此,我们回到了悠久的划分和争议的数据库传统。如果我们的数据不适合内存,那么我们可以将其划分为磁盘一次,一次读取一个分区(请考虑为什么起作用(提示:rendezvous!)),然后执行UDF缓存,一次评估一个分区。
PartitionProject
最终任务要求您填写PartitionProject
的实现。您需要编写的所有代码都在generateIterator
方法中。仔细考虑如何组织实施。您不应在内存中缓冲所有数据或类似的数据。
在这一点上,您应该通过所有给定的测试。
您没有必须在这里编写的代码,但是为了自己的教育,请花一些时间思考以下问题:
Spark的主要卖点之一是它是“内存”。他们的意思是以下内容:当您将许多Hadoop(或任何其他MapReduce框架)作业串起时,Hadoop会写下每个阶段的结果以磁盘并再次阅读它们,这非常昂贵;另一方面,Spark将其数据保存在内存中。但是,如果我们的假设是,如果我们的数据不适合内存,那么为什么SPARK SQL不使用基于磁盘的实现这些运算符的实现?在这方面,为什么Spark与我们在课堂上学习的“传统”并行关系数据库不同?这个问题没有正确的答案!
我们为您提供了一些示例测试DiskHasedRelationSuite.scala
DiskPartitionSuite.scala
CS143UtilsSuite.scala
and ProjectSuite.scala
。完成此项目时,这些测试可以指导您。但是,请记住,它们并不全面,建议您编写自己的测试以捕捉错误。希望您可以将这些测试用作模型来生成自己的测试。
为了进行测试,我们提供了一个简单的makefile。为了运行任务1的测试,请运行make t1
。相应的任务,运行make t2
,并且所有其他测试都相同。 make all
将运行所有测试。
提交链接将在CCLE上创建,您可以在此之前提交代码。
非常感谢Matteo Interlandi。
祝你好运!