In dieser Zuweisung implementieren Sie UDF-Ergebnisse (Benutzerdefinierte Funktion) in Apache Spark, ein Framework für verteiltes Computer in der Form von MapReduce. Dieses Projekt veranschaulicht die Schlüsselkonzepte in der Datenerklärung und der Bewertung von Abfragen. Darüber hinaus erhalten Sie Scala, eine JVM-basierte Sprache, die für den sauberen funktionalen Stil an Popularität gewinnt.
Das Fälligkeitsdatum der Zuordnung wird auf der Klassenwebsite veröffentlicht.
Sie können dies paarweise abschließen, wenn Sie möchten. Schließlich gibt es in diesem Verzeichnis viel Code. Bitte sehen Sie hier hier, um das Verzeichnis zu finden, in dem sich der Code befindet.
Spark ist ein in Scala geschriebenes Open-Source-Verteilungssystem. Das Projekt wurde von Ph.D. Studierende aus dem Amplab und sind ein wesentlicher Bestandteil des Berkeley Data Analytics Stack (BDAs-Affektion, ausgesprochen "Bad-Ass").
Wie Hadoop MapReduce wurde Spark so konzipiert, dass er Funktionen über große Datensammlungen ausführt, indem ein vereinfachter Satz von Datenverarbeitungsvorgängen hochrangiger Daten unterstützt wird, die den Iteratoren ähneln, über die wir im Unterricht gelernt haben. Eine der häufigsten Verwendungen solcher Systeme ist die Implementierung der parallele Abfrageverarbeitung in Sprachen auf hoher Ebene wie SQL. Tatsächlich haben viele neuere Forschungs- und Entwicklungsbemühungen in Spark auf die Unterstützung einer skalierbaren und interaktiven relationalen Datenbankabstraktion gegangen.
Wir werden in dieser Klasse Aspekte des Funkens verwenden, ändern und untersuchen, um Schlüsselkonzepte moderner Datensysteme zu verstehen. Noch wichtiger ist, dass Sie sehen, dass die Ideen, die wir im Unterricht abdecken - einige Jahrzehnte alt sind - heute noch sehr relevant sind. Insbesondere werden wir Funktionen zum Funken von SQL hinzufügen.
Eine wichtige Einschränkung von Spark SQL ist, dass es sich derzeit um ein wichtiges Hauptsystem handelt. Im Rahmen dieser Klasse werden wir sie auch auf einige Out-of-Core-Algorithmen erweitern.
Scala ist eine statisch typische Sprache, die viele verschiedene Programmierparadigmen unterstützt. Seine Flexibilität, Leistung und Portabilität sind in der Forschung verteilter Systeme besonders nützlich geworden.
Scala ähnelt Java, besitzt jedoch einen viel breiteren Satz von Syntaxfunktionen, um mehrere Paradigmen zu erleichtern. Wenn Sie Java kennen, können Sie einen Scala -Code verstehen, aber nicht viel davon, und nicht zu wissen, dass Scala Sie daran hindert, die ausdrucksstarke Kraft vollständig auszunutzen. Da Sie Code in Scala schreiben müssen, empfehlen wir Ihnen dringend, zumindest eine vorübergehende Vertrautheit mit der Sprache zu erwerben.
Die Intellij -Idee ist in der Regel die am häufigsten verwendete IDE für die Entwicklung in Spark. Intellij ist eine Java -IDE, die ein Scala (und ein Vim!) Plugin hat. Es gibt auch andere Optionen wie Scala-ide.
Möglicherweise finden Sie die folgenden Tutorials als nützlich:
Mit benutzerdefinierten Funktionen können Entwickler benutzerdefinierte Operationen innerhalb der Ausdrücke definieren und ausnutzen. Stellen Sie sich zum Beispiel vor, Sie haben einen Produktkatalog, der Fotos der Produktverpackung enthält. Möglicherweise möchten Sie eine benutzerdefinierte Funktion extract_text
registrieren, die einen OCR-Algorithmus aufruft und den Text in einem Bild zurückgibt, sodass Sie abfragbare Informationen aus den Fotos herausholen können. In SQL können Sie sich eine solche Frage vorstellen:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
Die Möglichkeit, UDFS zu registrieren, ist sehr leistungsfähig - es wird im Wesentlichen Ihr Datenverarbeitungsframework in ein allgemeines verteiltes Computergerüst. UDFs können jedoch häufig Leistungs Engpässe einführen, insbesondere wenn wir sie über Millionen von Datenelementen führen.
Wenn die Eingangsspalten zu einem UDF viele doppelte Werte enthalten, kann es vorteilhaft sein, die Leistung zu verbessern, indem sichergestellt wird, dass der UDF nur einmal pro eindeutiger Eingangswert und nicht einmal pro Zeile aufgerufen wird. (Zum Beispiel in unserem Produkt -Beispiel oben können alle verschiedenen Konfigurationen eines bestimmten PC das gleiche Bild haben.) In dieser Zuordnung werden wir diese Optimierung implementieren. Wir werden es in Phasen nehmen-lassen Sie es zuerst für Daten arbeiten, die in Speicher passen, und später für größere Sätze, die einen Out-of-Core-Ansatz erfordern. Wir werden externes Hashing als Technik verwenden, um alle Zeilen mit denselben Eingangswerten für die UDF zu "rendenzvous".
Wenn Sie sich für das Thema interessieren, wird das folgende Papier eine interessante Lektüre sein (einschließlich zusätzlicher Optimierungen über das, wofür wir Zeit in diesen Hausaufgaben haben):
Der gesamte Code, den Sie berühren, befinden sich in drei Dateien - CS143Utils.scala
, basicOperators.scala
und DiskHashedRelation.scala
. Möglicherweise müssen Sie jedoch andere Dateien innerhalb von Spark oder in der allgemeinen Scala -APIs konsultieren, um die Zuordnung gründlich abzuschließen. Bitte stellen Sie sicher, dass Sie in den drei oben genannten Dateien alle angegebenen Code durchsuchen, bevor Sie Ihren eigenen Code schreiben. Es gibt viele nützliche Funktionen in CS143Utils.scala
sowie in DiskHashedRelation.scala
, die Ihnen viel Zeit und Fluchen spart - nutzen Sie sie!
Im Allgemeinen haben wir die meisten (wenn nicht alle) der Methoden definiert, die Sie benötigen. In diesem Projekt müssen Sie nach wie vor das Skelett ausfüllen. Die Menge an Code, die Sie schreiben, ist nicht sehr hoch - die Gesamtlösung der Mitarbeiter beträgt weniger als 100 Codezeilen (ohne Tests). Das Zusammenlegen der richtigen Komponenten auf speichereffiziente Weise (dh nicht die gesamte Beziehung in das Gedächtnis) erfordert jedoch einige Gedanken und sorgfältige Planung.
Es gibt einige potenziell verwirrende Unterschiede zwischen der Terminologie, die wir in der Klasse verwenden, und der in der Sparksql -Codebasis verwendeten Terminologie:
Das "Iterator" -Konzept, das wir in Vortrag gelernt haben, wird im SparksQL -Code als "Knoten" bezeichnet - es gibt Definitionen im Code für Unarynode und BinaryNode. Ein Abfrageplan wird als Sparkplan bezeichnet, und tatsächlich SparkPlan.scala
Unarynode und BinaryNode das Sparkplan (schließlich ist ein einzelner Iterator ein kleiner Abfrageplan!) Möglich Knoten.
In einigen Kommentaren in Sparksql verwenden sie auch den Begriff "Operator", um "Knoten" zu bedeuten. Die Datei basicOperators.scala
definiert eine Reihe spezifischer Knoten (z. B. Sortierung, Unterscheidungsmerkmal usw.).
Verwechseln Sie den Iterator
der Skala -Schnittstelle nicht mit dem Iterator -Konzept, das wir in Vortrag behandelt haben. Der Iterator
, den Sie in diesem Projekt verwenden werden, ist eine Skala -Sprachfunktion, mit der Sie Ihre SparksQL -Knoten implementieren. Iterator
bietet eine Schnittstelle zu Scala -Sammlungen, die eine bestimmte API erzwingt: die next
und hasNext
-Funktionen.
git
und Github git
ist ein Versionskontrollsystem , das Ihnen hilft, verschiedene Versionen Ihres Codes zu verfolgen, sie über verschiedene Maschinen hinweg zu synchronisieren und mit anderen zusammenzuarbeiten. GitHub ist eine Website, die dieses System unterstützt und es als Dienst hostet.
Wenn Sie nicht viel über git
wissen, empfehlen wir Ihnen dringend, sich mit diesem System vertraut zu machen. Sie werden viel Zeit damit verbringen! Es gibt viele Anleitungen, git
online zu verwenden - hier ist eine großartige zum Lesen.
Sie sollten zuerst ein Remote Private Repository (z. B. Spark-Homework) einrichten. GitHub gibt den Schülern ein privates Repository (aber dies kann einige Zeit dauern). Wenn Sie kein privates Repository haben, überlegen Sie sich zweimal über die Überprüfung im öffentlichen Repository, da es für andere für T -TschetCeckout verfügbar sein wird.
$ cd ~
Klonen Sie Ihr persönliches Repository. Es sollte leer sein.
$ git clone "https://github.com/xx/yy.git"
Geben Sie das geklonte Repository ein, verfolgen Sie das Kursrepository und klonen Sie es.
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
Hinweis: Bitte lassen Sie sich nicht von der Menge an Code, die hier ist, überwältigt. Spark ist ein großes Projekt mit vielen Funktionen. Der Code, den wir berühren werden, wird in einem bestimmten Verzeichnis enthalten: SQL/Core/src/main/scala/org/apache/spark/sql/Execution/. Die Tests sind alle in SQL/Core/src/test/scala/org/apache/spark/sql/execution/enthalten sein
Schieben Sie den Klon in Ihr persönliches Repository.
$ git push origin master
Jedes Mal, wenn Sie einen Code hinzufügen, können Sie die Änderungen am Remote -Repository angeben.
$ git commit -m 'update to homework'
$ git push origin master
Es kann erforderlich sein, Updates für unsere Aufgabe zu erhalten (obwohl wir versuchen, sie beim ersten Mal so "perfekt" wie möglich zu veröffentlichen). Angenommen, Sie haben die Verfolgung korrekt eingerichtet, können Sie diesen folgenden Befehl einfach ausführen, um Zuordnungsaktualisierungen zu erhalten:
$ git pull course master
Der folgende UNIX -Befehl wird nützlich sein, wenn Sie den Speicherort einer Datei finden müssen. Beispiel: Suchen Sie den Speicherort einer Datei mit dem Namen "diskhashedrelation.scala" in meinem aktuellen Repository.
$ find ./ -name 'DiskHashedRelation.scala'
Sobald Sie den Code gezogen haben, cd
in {repo root}
und Run make compile
. Wenn Sie diesen Befehl zum ersten Mal ausführen, sollte es eine Weile dauern - sbt
lädt alle Abhängigkeiten herunter und kompiliert den gesamten Code in Spark (es gibt ziemlich viel Code). Sobald die ersten Montagebefehle fertig sind, können Sie Ihr Projekt starten! (Zukünftige Builds sollten nicht so lange dauern - sbt
ist klug genug, um nur die geänderten Dateien neu zu kompilieren, es sei denn, Sie führen make clean
zusammen, wodurch alle kompilierten Klassendateien entfernt werden.)
Wir haben Ihnen den Skelettcode für DiskHashedRelation.scala
bereitgestellt. Diese Datei hat 4 wichtige Dinge:
trait DiskHashedRelation
definiert die Diskhashedrelation -Schnittstelleclass GeneralDiskHashedRelation
ist unsere Umsetzung des DiskedHashedRelation
-Merkmalsclass DiskPartition
stellt eine einzige Partition auf der Festplatte darobject DiskHashedRelation
kann als Objektfabrik betrachtet werden, die GeneralDiskHashedRelation
s konstruiertDiskPartition
und GeneralDiskHashedRelation
Zunächst müssen Sie für diesen Teil die Methoden zur insert
, closeInput
und getData
-Methoden in DiskPartition
implementieren. Für die ersten beiden sollten die Docstrings eine umfassende Beschreibung dessen geben, was Sie implementieren müssen. Die Einschränkung mit getData
ist, dass Sie die gesamte Partition nicht einmal in das Gedächtnis lesen können . Der Grund, warum wir diese Einschränkung durchsetzen, ist, dass es keine gute Möglichkeit gibt, das Befreiungsgedächtnis in der JVM durchzusetzen, und wenn Sie Daten in verschiedene Formen verwandeln, würden mehrere Kopien herumliegen. Als solches würde es dazu führen, dass mehrere Kopien einer ganzen Partition zu einer Scheibe verschüttet werden und uns alle traurig machen. Stattdessen sollten Sie jeweils einen Block in den Speicher streamen.
Zu diesem Zeitpunkt sollten Sie die Tests in DiskPartitionSuite.scala
bestehen.
object DiskHashedRelation
Ihre Aufgabe in diesem Teil besteht darin, Phase 1 des externen Hashings zu implementieren. Verwenden einer grobkörnigen Hash-Funktion, um eine Eingabe in mehrere Partitionsbeziehungen auf der Festplatte zu streamen. Für unsere Zwecke reicht die hashCode
-Methode, die jedes Objekt hat, aus, um einen Hash -Wert zu generieren, und das Modulo durch die Anzahl der Partitionen ist eine akzeptable Hash -Funktion.
Zu diesem Zeitpunkt sollten Sie alle Tests in DiskHashedRelationSuite.scala
bestehen.
In diesem Abschnitt werden wir uns mit case class CacheProject
in basicOperators.scala
befassen. Sie werden vielleicht feststellen, dass es in dieser Klasse nur 4 Codezeilen und vor allem no // IMPLEMENT ME
. Sie müssen hier keinen Code schreiben. Wenn Sie jedoch den Funktionsaufruf in Zeile 66 verfolgen, werden Sie feststellen, dass Sie zwei Teile dieses Stapels implementieren müssen, um eine funktionale UDF-Implementierung zu haben.
CS143Utils
-Methoden Für diese Aufgabe müssen Sie getUdfFromExpressions
und die Iterator
in CachingIteratorGenerator#apply
implementieren. Bitte lesen Sie die Docstrings - insbesondere für apply
- genau vor dem Einstieg.
Nach der Implementierung dieser Methoden sollten Sie die Tests in CS143UtilsSuite.scala
bestehen.
Hinweis: Überlegen Sie sorgfältig, warum diese Methoden Teil der Utils sein könnten
Jetzt kommt der Moment der Wahrheit! Wir haben mit diskbasierter Hash-Partitionierung implementiert und in Memory UDF Caching implementiert-was manchmal als Memoisierung bezeichnet wird. Memoisierung ist in vielen Kontexten ein sehr leistungsfähiges Werkzeug, aber hier im Datenbanken-wir befassen uns mit größeren Datenmengen, als die Memoisierung verarbeiten kann. Wenn wir mehr einzigartige Werte haben als in einen Memory-Cache passen können, wird unsere Leistung schnell verschlechtert. Daher greifen wir auf die altehrwürdige Datenbanken der Tradition von Divide-and-Conquer zurück. Wenn unsere Daten nicht in den Speicher passen, können wir sie einmal auf Festplatten aufteilt, eine Partition gleichzeitig einlesen (überlegen Sie, warum dies funktioniert (Hinweis: Rendezvous!)) Und führen .
PartitionProject
Diese letzte Aufgabe erfordert, dass Sie die Implementierung von PartitionProject
ausfüllen. Der gesamte Code, den Sie schreiben müssen, befindet sich in der generateIterator
-Methode. Überlegen Sie sorgfältig, wie Sie Ihre Implementierung organisieren müssen. Sie sollten nicht alle Daten im Speicher oder etwas Ähnliches puffern.
Zu diesem Zeitpunkt sollten Sie alle gegebenen Tests bestehen.
Es gibt keinen Code, den Sie hier schreiben müssen, aber für Ihre eigene Erbauung sollten Sie einige Zeit über die folgende Frage nachdenken:
Einer von Sparks Hauptverkaufspunkten ist, dass es "in Memory" ist. Was sie bedeuten, ist Folgendes: Wenn Sie eine Reihe von Hadoop -Jobs (oder anderen MapReduce -Frameworks) zusammen anziehen, schreibt Hadoop die Ergebnisse jeder Phase in die Festplatte und lesen sie erneut ein, was sehr teuer ist. Spark hingegen pflegt seine Daten im Speicher. Wenn wir jedoch davon ausgehen, dass wenn unsere Daten nicht in den Speicher passen, warum ist Funken SQL dann nicht mit der diskbasierten Implementierung dieser Betreiber geliefert? Warum unterscheidet sich Funken in dieser Hinsicht von den "traditionellen" parallelen relationalen Datenbanken, über die wir im Unterricht erfahren? Es gibt keine richtige Antwort auf diese Frage!
Wir haben Ihnen einige Beispieltests in DiskPartitionSuite.scala
, DiskHasedRelationSuite.scala
, CS143UtilsSuite.scala
und ProjectSuite.scala
vorgelegt. Diese Tests können Sie beim Abschluss dieses Projekts führen. Denken Sie jedoch daran, dass sie nicht umfassend sind, und Sie sind gut geraten, Ihre eigenen Tests zu schreiben, um Fehler zu fangen. Hoffentlich können Sie diese Tests als Modelle verwenden, um Ihre eigenen Tests zu generieren.
Um unsere Tests durchzuführen, haben wir ein einfaches Makefile bereitgestellt. Um die Tests für Aufgabe 1 auszuführen, make t1
. Entsprechend für die Aufgabe make t2
und das gleiche für alle anderen Tests. make all
werden alle Tests durchführen.
Der Einreichungslink wird auf CCLE erstellt, wo Sie Ihren Code bis zum Fälligkeitsdatum einreichen können.
Vielen Dank an Matteo Interlandi.
Viel Glück!