이 과제에서는 Apache Spark에서 UDF (사용자 정의 기능) 결과 캐싱을 구현합니다. 이는 MapReduce의 금형에서 분산 컴퓨팅을위한 프레임 워크입니다. 이 프로젝트는 데이터 랑데부 및 쿼리 평가의 주요 개념을 설명하며 현장에서 널리 사용되는 Spark를 수정하는 실습 경험을 얻을 수 있습니다. 또한 깨끗한 기능 스타일로 인기를 얻는 JVM 기반 언어 인 Scala에 노출됩니다.
과제 마감일은 클래스 웹 사이트에 게시됩니다.
선택한 경우 쌍으로 작성할 수 있습니다. 마지막 으로이 디렉토리에는 많은 코드가 있습니다. 코드가있는 디렉토리를 찾으려면 여기를 참조하십시오.
Spark는 Scala로 작성된 오픈 소스 분산 컴퓨팅 시스템입니다. 이 프로젝트는 Ph.D.에 의해 시작되었습니다. Amplab의 학생들은 Berkeley Data Analytics Stack의 필수 부분입니다 (BDA-"Bad-Ass").
Hadoop Mapreduce와 마찬가지로 Spark는 우리가 수업 시간에 배운 반복자와 유사한 단순화 된 고급 데이터 처리 작업 세트를 지원함으로써 대규모 데이터 컬렉션을 통해 기능을 실행하도록 설계되었습니다. 이러한 시스템의 가장 일반적인 용도 중 하나는 SQL과 같은 고급 언어로 병렬 쿼리 처리를 구현하는 것입니다. 실제로, Spark의 최근의 많은 연구 개발 노력은 확장 가능하고 대화식 관계형 데이터베이스 추상화를 지원하기 위해 노력했습니다.
우리는 현대 데이터 시스템의 주요 개념을 이해하기 위해이 클래스에서 Spark의 측면을 사용, 수정 및 연구 할 것입니다. 더 중요한 것은 수업 시간에 우리가 다루고있는 아이디어 (일부는 수십 년 전)가 오늘날에도 여전히 매우 관련이 있다는 것을 알게 될 것입니다. 특히 Spark SQL에 기능을 추가 할 것입니다.
Spark SQL의 주요 제한은 현재 메인 메모리 전용 시스템이라는 것입니다. 이 클래스의 일환으로, 우리는 코어 외 알고리즘도 포함하도록 확장 할 것입니다.
Scala는 다양한 프로그래밍 패러다임을 지원하는 정적으로 유형 된 언어입니다. 유연성, 전력 및 이식성은 분산 시스템 연구에서 특히 유용 해졌습니다.
Scala는 Java와 비슷하지만 여러 패러다임을 용이하게하기 위해 훨씬 더 광범위한 구문 기능 세트를 가지고 있습니다. Java를 아는 것은 스칼라 코드를 이해하는 데 도움이되지만 그다지 많지 않으면 스칼라를 알지 못하면 표현력을 완전히 활용하지 못하게됩니다. 스칼라로 코드를 작성해야하기 때문에 적어도 언어에 대한 친숙 함을 얻는 것이 좋습니다.
Intellij 아이디어는 Spark에서 개발하는 데 가장 일반적으로 사용되는 IDE 인 경향이 있습니다. Intellij는 스칼라 (및 vim!) 플러그인이있는 Java IDE입니다. Scala-ide와 같은 다른 옵션도 있습니다.
다음 튜토리얼이 유용하다는 것을 알 수 있습니다.
사용자 정의 기능을 통해 개발자는 표현식 내에서 사용자 정의 작업을 정의하고 이용할 수 있습니다. 예를 들어, 제품 포장 사진이 포함 된 제품 카탈로그가 있다고 상상해보십시오. OCR 알고리즘을 호출하고 이미지에서 텍스트를 반환하여 사진에서 쿼리 가능한 정보를 얻을 수있는 사용자 정의 기능 extract_text
등록 할 수 있습니다. SQL에서는 다음과 같은 쿼리를 상상할 수 있습니다.
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
UDFS 등록 기능은 매우 강력합니다. 본질적으로 데이터 처리 프레임 워크를 일반적인 분산 컴퓨팅 프레임 워크로 바꿉니다. 그러나 UDF는 종종 성능 병목 현상을 도입 할 수 있습니다. 특히 수백만 건의 데이터 항목을 실행할 때.
UDF 로의 입력 열에 많은 중복 값이 포함 된 경우, UDF가 행당 한 번이 아닌 별도의 입력 값 당 1 회만 호출되도록함으로써 성능을 향상시키는 데 도움이 될 수 있습니다. (예를 들어, 위의 제품 예제에서 특정 PC의 모든 다른 구성은 동일한 이미지를 가질 수 있습니다.)이 과제에서는이 최적화를 구현합니다. 우리는 그것을 단계적으로 가져갈 것입니다-먼저 메모리에 맞는 데이터를 위해 작동하고 나중에 코어 외 접근 방식이 필요한 더 큰 세트의 경우 작동합니다. UDF와 동일한 입력 값을 갖는 모든 행을 "랑데부"하는 기술로 외부 해싱을 사용합니다.
주제에 관심이 있다면 다음 논문은 흥미로운 읽기가 될 것입니다 (이 숙제에서 우리가 할 시간 이상의 추가 최적화 포함).
터치 할 모든 코드는 CS143Utils.scala
, basicOperators.scala
및 DiskHashedRelation.scala
의 세 가지 파일로됩니다. 그러나 과제를 철저히 완료하려면 Spark 또는 General Scala API 내의 다른 파일을 참조해야 할 수도 있습니다. 자신의 코드를 작성하기 전에 위에서 언급 한 세 가지 파일의 모든 제공된 코드를 살펴보십시오. CS143Utils.scala
에는 DiskHashedRelation.scala
에는 많은 시간과 저주를 절약 할 수있는 유용한 기능이 많이 있습니다.
일반적으로, 우리는 필요한 방법 중 대부분을 정의했습니다. 이전과 마찬가지로이 프로젝트에서는 골격을 채워야합니다. 당신이 쓸 코드의 양은 그다지 높지 않습니다. 전체 직원 솔루션은 100 줄의 코드 (테스트 제외)보다 작습니다. 그러나 메모리 효율적인 방식으로 올바른 구성 요소를 함께 묶어두면 (예 : 전체 관계를 한 번에 메모리에 읽지 않음) 약간의 생각과 신중한 계획이 필요합니다.
수업 시간에 사용하는 용어와 SparkSQL 코드 기반에 사용 된 용어 사이에는 잠재적으로 혼란스러운 차이가 있습니다.
강의에서 우리가 배운 "반복자"개념은 SparkSQL 코드에서 "노드"라고 불립니다. 단백질 및 BinaryNode 코드에는 정의가 있습니다. 쿼리 계획을 SparkPlan이라고하며 실제로는 단백질과 BinaryNode 확장 SparkPlan (결국 단일 반복기는 작은 쿼리 계획입니다!) SparkSQL 소스에서 SparkPlan.scala
파일을 찾아서 API를 확인할 수 있습니다. 노드.
SparkSQL의 일부 주석에서는 "연산자"라는 용어를 사용하여 "노드"를 의미합니다. 파일 basicOperators.scala
여러 특정 노드 (예 : 정렬, 별개 등)를 정의합니다.
Scala Interface Iterator
강의에서 다루는 반복자 개념과 혼동하지 마십시오. 이 프로젝트에서 사용할 Iterator
SparkSQL 노드를 구현하는 데 사용할 스칼라 언어 기능입니다. Iterator
특정 API : next
및 hasNext
기능을 시행하는 Scala 컬렉션에 대한 인터페이스를 제공합니다.
git
과 github git
는 버전 제어 시스템으로, 다른 버전의 코드를 추적하고 다른 컴퓨터에서 동기화하며 다른 시스템과 협력하는 데 도움이됩니다. Github는이 시스템을 지원하여 서비스로 호스팅하는 사이트입니다.
git
에 대해 많이 모른다면이 시스템에 익숙해지는 것이 좋습니다 . 당신은 그것으로 많은 시간을 보낼 것입니다! git
Online 사용에 대한 많은 안내서가 있습니다. 여기에 읽을 수있는 좋은 방법이 있습니다.
먼저 원격 개인 저장소 (예 : Spark-Homework)를 설정해야합니다. Github은 학생들에게 개인 저장소를 제공하지만 시간이 걸릴 수 있습니다. 개인 저장소가없는 경우 다른 사람이 Checheckout을 사용할 수 있도록 공개 저장소에서 확인하는 것에 대해 두 번 생각하십시오.
$ cd ~
개인 저장소를 복제하십시오. 비어 있어야합니다.
$ git clone "https://github.com/xx/yy.git"
복제 된 저장소를 입력하고 코스 저장소를 추적하고 복제하십시오.
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
참고 : 여기에있는 코드의 양에 압도되지 마십시오. Spark는 많은 기능을 갖춘 큰 프로젝트입니다. 우리가 터치 할 코드는 하나의 특정 디렉토리 인 SQL/Core/Src/Main/Scala/org/Apache/Spark/SQL/Execution/에 포함됩니다. 테스트는 모두 SQL/Core/SRC/Test/Scala/Org/Apache/Spark/SQL/Execution/에 포함됩니다.
클론을 개인 저장소로 푸시하십시오.
$ git push origin master
코드를 추가 할 때마다 원격 저장소에 수정을 커밋 할 수 있습니다.
$ git commit -m 'update to homework'
$ git push origin master
과제에 대한 업데이트를받을 필요가있을 수 있습니다 (처음으로 최대한 "완벽하게"릴리스하려고하더라도). 추적을 올바르게 설정한다고 가정하면이 다음 명령을 실행하여 할당 업데이트를받을 수 있습니다.
$ git pull course master
파일의 위치를 찾아야 할 때 다음 UNIX 명령은 편리합니다. 예- 현재 저장소에서 'diskhashedrelation.scala'라는 파일의 위치를 찾으십시오.
$ find ./ -name 'DiskHashedRelation.scala'
코드를 가져 오면 cd
{repo root}
에 넣고 make compile
실행하십시오. 이 명령을 처음 실행할 때는 시간이 걸릴 필요가 있습니다. sbt
모든 종속성을 다운로드하고 모든 코드를 Spark에서 컴파일합니다 (약간의 코드가 있습니다). 초기 어셈블리 명령이 완료되면 프로젝트를 시작할 수 있습니다! (미래 빌드는이 길을 오래 걸리지 않아야합니다. sbt
make clean
실행하지 않으면 모든 컴파일 된 클래스 파일을 제거하지 않는 한 변경된 파일을 다시 컴파일하기에 충분히 똑똑합니다.)
우리는 당신에게 DiskHashedRelation.scala
에 대한 골격 코드를 제공했습니다. 이 파일에는 4 가지 중요한 사항이 있습니다.
trait DiskHashedRelation
DiskhashedRelation 인터페이스를 정의합니다class GeneralDiskHashedRelation
은 DiskedHashedRelation
특성의 구현입니다class DiskPartition
디스크의 단일 파티션을 나타냅니다object DiskHashedRelation
GeneralDiskHashedRelation
s를 구성하는 물체 공장으로 생각할 수 있습니다.DiskPartition
및 GeneralDiskHashedRelation
구현 먼저이 부분에 DiskPartition
insert
, closeInput
및 getData
메소드를 구현해야합니다. 전 두 사람의 경우, Docstrings는 구현해야 할 사항에 대한 포괄적 인 설명을 제공해야합니다. getData
의 경고는 전체 파티션을 한 번만 기억으로 읽을 수 없다는 것입니다. 우리 가이 제한을 시행하는 이유는 JVM에서 메모리를 자유롭게하는 좋은 방법이없고 데이터를 다른 형태로 변환 할 때 여러 개의 사본이있을 것이기 때문입니다. 따라서 전체 파티션의 여러 사본을 갖추면 사물이 디스크에 쏟아져서 우리 모두를 슬프게 할 것입니다. 대신 한 번에 한 블록을 메모리로 스트리밍해야합니다.
이 시점에서 DiskPartitionSuite.scala
에서 테스트를 통과해야합니다.
object DiskHashedRelation
구현 이 부분의 작업은 거친 입자 해시 함수를 사용하여 디스크의 여러 파티션 관계에 입력을 스트리밍하는 외부 해싱의 1 단계를 구현하는 것입니다. 우리의 목적을 위해, 모든 객체가 가지고있는 hashCode
방법은 해시 값을 생성하기에 충분하며, 파티션 수에 의해 모듈로를 취하는 것은 허용 가능한 해시 함수입니다.
이 시점에서 DiskHashedRelationSuite.scala
에서 모든 테스트를 통과해야합니다.
이 섹션에서는 basicOperators.scala
의 case class CacheProject
다룰 것입니다. 이 클래스에는 4 줄의 코드가 있다는 것을 알 수 있으며, 더 중요한 것은 // IMPLEMENT ME
없습니다. 실제로 여기에 코드를 쓸 필요는 없습니다. 그러나 66 행에서 함수 호출을 추적하면 기능적 인 메모리 UDF 구현을 위해 구현 해야하는이 스택의 두 부분이 있음을 알게됩니다.
CS143Utils
방법 구현 이 작업을 위해서는 CachingIteratorGenerator#apply
에서 getUdfFromExpressions
및 Iterator
메소드를 구현해야합니다. 시작하기 전에 Docstrings (특히 apply
위해)를 자세히 읽으십시오.
이러한 방법을 구현 한 후에는 CS143UtilsSuite.scala
에서 테스트를 통과해야합니다.
힌트 :이 방법들이 왜 utils의 일부가 될 수 있는지주의 깊게 생각하십시오.
이제 진실의 순간이 온다! 우리는 디스크 기반 해시 파티셔닝을 구현했으며 때로는 메모리 UDF 캐싱 (memoization)을 구현했습니다. Memoization은 많은 상황에서 매우 강력한 도구이지만 여기서는 데이터베이스에서 메모가 처리 할 수있는 것보다 많은 양의 데이터를 처리합니다. 메모리 인 캐시에 맞는 것보다 고유 한 값을 가지고 있다면 성능이 빠르게 저하됩니다. 따라서, 우리는 시간이 지남에 따라 분할 및 정복의 전통으로 돌아갑니다. 데이터 데이터가 메모리에 맞지 않으면 한 번에 한 번 디스크에 파티션하고 한 번에 하나의 파티션을 읽고 (왜 작동하는지 생각해보십시오 (힌트 : rendezvous!)). .
PartitionProject
구현 이 최종 작업을 사용하려면 PartitionProject
의 구현을 작성해야합니다. 작성 해야하는 모든 코드는 generateIterator
메소드에 있습니다. 구현을 구성하는 방법에 대해주의 깊게 생각하십시오. 모든 데이터를 메모리로 버튼하거나 그와 유사한 것을 버리지 않아야 합니다.
이 시점에서 주어진 모든 테스트를 통과해야합니다.
여기에 써야 할 코드는 없지만 자신의 교화를 위해 다음 질문에 대해 생각하는 데 시간을 보내십시오.
Spark의 주요 판매 포인트 중 하나는 "메모리 인"이라는 것입니다. 그들이 의미하는 바는 다음과 같습니다. 여러 Hadoop (또는 다른 Mapreduce 프레임 워크) 작업을 함께 묶을 때 Hadoop은 각 단계의 결과를 디스크에 작성하여 다시 읽습니다. 반면에 Spark는 데이터를 메모리로 유지합니다. 그러나 우리의 데이터가 메모리에 맞지 않으면 Spark SQL이 왜이 연산자의 디스크 기반 구현으로 배송되지 않는가? 이와 관련하여 Spark가 왜 우리가 수업 시간에 배우는 "전통적인"병렬 관계 데이터베이스와 다른가? 이 질문에 대한 정답은 없습니다!
DiskPartitionSuite.scala
, DiskHasedRelationSuite.scala
, CS143UtilsSuite.scala
및 ProjectSuite.scala
에서 샘플 테스트를 제공했습니다. 이 테스트는이 프로젝트를 완료 할 때 안내 할 수 있습니다. 그러나 포괄적 이지 않으며 버그를 잡기 위해 자신의 테스트를 작성하는 것이 좋습니다. 바라건대,이 테스트를 모델로 사용하여 자신의 테스트를 생성 할 수 있습니다.
테스트를 실행하기 위해 간단한 makefile을 제공했습니다. 작업 1에 대한 테스트를 실행하려면 make t1
실행하십시오. 이에 따라 작업을 위해 make t2
실행하고 다른 모든 테스트에 대해 동일하게 실행하십시오. make all
됩니다.
제출 링크는 CCLE에서 생성되며 마감일까지 코드를 제출할 수 있습니다.
Matteo Interlandi에게 큰 감사를드립니다.
행운을 빌어요!