Dans cette affectation, vous implémenterez la mise en cache UDF (fonction définie par l'utilisateur) dans Apache Spark, qui est un cadre pour l'informatique distribuée dans le moule de MapReduce. Ce projet illustrera les concepts clés de l'évaluation des données de données et de requête, et vous obtiendrez une expérience pratique de modification de l'expérience, qui est largement utilisée dans le domaine. De plus, vous obtiendrez une exposition à Scala, un langage basé sur JVM qui gagne en popularité pour son style fonctionnel propre.
La date d'échéance de l'affectation est publiée sur le site Web de la classe.
Vous pouvez terminer cela par paires , si vous le souhaitez. Enfin, il y a beaucoup de code dans ce répertoire. Veuillez consulter ici pour trouver le répertoire où se trouve le code.
Spark est un système informatique distribué open source écrit dans Scala. Le projet a été lancé par Ph.D. Les étudiants de l'amplab et font partie intégrante de la pile d'analyse de données de Berkeley (BDAS - affecte de manière «méchante»).
Comme Hadoop MapReduce, Spark est conçu pour exécuter des fonctions sur de grandes collections de données, en soutenant un ensemble simplifié d'opérations de traitement de données de haut niveau semblables aux itérateurs que nous avons appris en classe. L'une des utilisations les plus courantes de ces systèmes consiste à implémenter le traitement parallèle des requêtes dans des langages de haut niveau tels que SQL. En fait, de nombreux efforts récents de recherche et de développement dans Spark ont pris en charge une abstraction de base de données relationnelle évolutive et interactive.
Nous allons utiliser, modifier et étudier les aspects de Spark dans cette classe pour comprendre les concepts clés des systèmes de données modernes. Plus important encore, vous verrez que les idées que nous couvrons en classe - dont certaines sont âgées de décennies - sont toujours très pertinentes aujourd'hui. Plus précisément, nous ajouterons des fonctionnalités pour Spark SQL.
Une limitation clé de Spark SQL est qu'il s'agit actuellement d'un système unique à mémoire principale. Dans le cadre de cette classe, nous l'étendrons également pour inclure certains algorithmes hors du cœur.
Scala est un langage de type statique qui prend en charge de nombreux paradigmes de programmation différents. Sa flexibilité, sa puissance et sa portabilité sont devenues particulièrement utiles dans la recherche sur les systèmes distribués.
Scala ressemble à Java, mais il possède un ensemble beaucoup plus large de fonctionnalités de syntaxe pour faciliter plusieurs paradigmes. Connaître Java vous aidera à comprendre un code Scala, mais pas beaucoup, et ne pas savoir Scala vous empêchera de profiter pleinement de son pouvoir expressif. Parce que vous devez écrire du code dans Scala, nous vous recommandons fortement d'acquérir au moins une familiarité passagère avec la langue.
Intellij Idea a tendance à être l'IDE la plus couramment utilisée pour se développer dans Spark. Intellij est un IDE Java qui a un plugin scala (et vim!). Il existe également d'autres options telles que Scala.
Vous pourriez trouver les tutoriels suivants utiles:
Les fonctions définies par l'utilisateur permettent aux développeurs de définir et d'exploiter les opérations personnalisées dans les expressions. Imaginez, par exemple, que vous ayez un catalogue de produits qui comprend des photos de l'emballage du produit. Vous souhaiterez peut-être enregistrer une fonction définie par l'utilisateur extract_text
qui appelle un algorithme OCR et renvoie le texte dans une image, afin que vous puissiez obtenir des informations interrogables sur les photos. Dans SQL, vous pouvez imaginer une question comme celle-ci:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
La possibilité d'enregistrer UDFS est très puissante - elle transforme essentiellement votre cadre de traitement des données en un cadre informatique distribué général. Mais les UDF peuvent souvent introduire des goulots d'étranglement de performances, d'autant plus que nous les exécutons sur des millions d'éléments de données.
Si la ou les colonnes d'entrée à un UDF contiennent beaucoup de valeurs en double, il peut être utile d'améliorer les performances en s'assurant que l'UDF n'est appelé qu'une seule fois par valeur d'entrée distincte , plutôt qu'une fois par ligne . (Par exemple, dans notre exemple de produits ci-dessus, toutes les différentes configurations d'un PC particulier peuvent avoir la même image.) Dans cette affectation, nous implémenterons cette optimisation. Nous le prendrons par étapes - faites d'abord fonctionner pour les données qui s'adaptent en mémoire, puis plus tard pour des ensembles plus grands qui nécessitent une approche hors noyau. Nous utiliserons le hachage externe comme technique pour "rendu" toutes les lignes avec les mêmes valeurs d'entrée pour l'UDF.
Si vous êtes intéressé par le sujet, le document suivant sera une lecture intéressante (y compris des optimisations supplémentaires au-delà de ce que nous avons le temps dans ces devoirs):
Tout le code que vous toucherez sera dans trois fichiers - CS143Utils.scala
, basicOperators.scala
et DiskHashedRelation.scala
. Vous pourriez cependant avoir besoin de consulter d'autres fichiers dans Spark ou les API General Scala afin de terminer soigneusement l'affectation. Veuillez vous assurer de regarder tout le code fourni dans les trois fichiers mentionnés ci-dessus avant de commencer à écrire votre propre code. Il existe de nombreuses fonctions utiles dans CS143Utils.scala
ainsi que dans DiskHashedRelation.scala
qui vous fera gagner beaucoup de temps et de malédiction - profitez-en!
En général, nous avons défini la plupart (sinon la totalité) des méthodes dont vous aurez besoin. Comme précédemment, dans ce projet, vous devez remplir le squelette. La quantité de code que vous écrirez n'est pas très élevée - la solution totale du personnel est inférieure à 100 lignes de code (sans compter les tests). Cependant, enchaîner les bons composants d'une manière économe en mémoire (c'est-à-dire ne pas lire toute la relation dans la mémoire à la fois) nécessitera une certaine réflexion et une planification minutieuse.
Il existe des différences potentiellement déroutantes entre la terminologie que nous utilisons en classe et la terminologie utilisée dans la base de code SPARKSQL:
Le concept "Iterator" que nous avons appris dans la conférence est appelé "nœud" dans le code SPARKSQL - il y a des définitions dans le code pour unarynode et binaryNode. SparkPlan.scala
plan de requête est appelé SparkPlan, et en fait unarynode et BinaryNode étendent SparkPlan (après tout, un seul itérateur est un petit plan de requête!) nœuds.
Dans certains commentaires dans SPARKSQL, ils utilisent également le terme "opérateur" pour signifier "nœud". Le fichier basicOperators.scala
définit un certain nombre de nœuds spécifiques (par exemple, distinct, etc.).
Ne confondez pas l' Iterator
de l'interface Scala avec le concept d'itérateur que nous avons couvert dans la conférence. L' Iterator
que vous utiliserez dans ce projet est une fonctionnalité de langage Scala que vous utiliserez pour implémenter vos nœuds SPARKSQL. Iterator
fournit une interface aux collections Scala qui applique une API spécifique: les fonctions next
et hasNext
.
git
et github git
est un système de contrôle de version , vous aidant à suivre différentes versions de votre code, à les synchroniser sur différentes machines et à collaborer avec d'autres. GitHub est un site qui prend en charge ce système, l'hébergeant en tant que service.
Si vous ne savez pas grand-chose sur git
, nous vous recommandons fortement de vous familiariser avec ce système; Vous passerez beaucoup de temps avec ça! Il existe de nombreux guides pour utiliser git
en ligne - en voici un excellent à lire.
Vous devez d'abord configurer un référentiel privé distant (par exemple, Spark-Homework). GitHub donne un référentiel privé aux étudiants (mais cela peut prendre un certain temps). Si vous n'avez pas de référentiel privé, réfléchissez à deux fois avant de le vérifier dans le référentiel public, car il sera disponible pour d'autres pour tchcheckout.
$ cd ~
Clone votre référentiel personnel. Il devrait être vide.
$ git clone "https://github.com/xx/yy.git"
Entrez le référentiel cloné, suivez le référentiel de cours et clonez-le.
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
Remarque: Veuillez ne pas être submergé par la quantité de code qui est ici. Spark est un grand projet avec beaucoup de fonctionnalités. Le code que nous toucherons sera contenu dans un répertoire spécifique: SQL / CORE / SRC / MAIN / Scala / Org / APACHE / SPARK / SQL / EXECUTION /. Les tests seront tous contenus dans SQL / CORE / SRC / TEST / Scala / Org / Apache / Spark / SQL / EXECUTION /
Poussez le clone vers votre référentiel personnel.
$ git push origin master
Chaque fois que vous ajoutez du code, vous pouvez commettre les modifications au référentiel distant.
$ git commit -m 'update to homework'
$ git push origin master
Il peut être nécessaire de recevoir des mises à jour de notre affectation (même si nous essayons de les libérer aussi "parfaitement" que possible la première fois). En supposant que vous configuriez correctement le suivi, vous pouvez simplement exécuter cette commande suivante pour recevoir des mises à jour de cession:
$ git pull course master
La commande UNIX suivante sera utile lorsque vous devrez trouver l'emplacement d'un fichier. Exemple - Trouvez l'emplacement d'un fichier nommé «diskhashedrelation.scala» dans mon référentiel actuel.
$ find ./ -name 'DiskHashedRelation.scala'
Une fois que vous avez tiré le code, cd
dans {repo root}
et exécutez make compile
. La première fois que vous exécutez cette commande, cela devrait prendre un certain temps - sbt
téléchargera toutes les dépendances et compilera tout le code dans Spark (il y a pas mal de code). Une fois les commandes d'assemblage initiales terminées, vous pouvez démarrer votre projet! (Les builds futurs ne devraient pas prendre autant de temps - sbt
est assez intelligent pour ne recompiler que les fichiers modifiés, sauf si vous exécutez make clean
, ce qui supprimera tous les fichiers de classe compilés.)
Nous vous avons fourni du code squelette pour DiskHashedRelation.scala
. Ce fichier a 4 choses importantes:
trait DiskHashedRelation
définit l'interface de diskhashedredrelationclass GeneralDiskHashedRelation
est notre mise en œuvre du trait de DiskedHashedRelation
class DiskPartition
représente une seule partition sur le disqueobject DiskHashedRelation
peut être considéré comme une usine d'objets qui construit GeneralDiskHashedRelation
SDiskPartition
et GeneralDiskHashedRelation
Tout d'abord, vous devrez implémenter les méthodes insert
, closeInput
et getData
dans DiskPartition
pour cette partie. Pour les deux premiers, les docstrings devraient fournir une description complète de ce que vous devez mettre en œuvre. La mise en garde avec getData
est que vous ne pouvez pas lire toute la partition en mémoire une fois. La raison pour laquelle nous appliquons cette restriction est qu'il n'y a aucun bon moyen d'appliquer la libération de la mémoire dans le JVM, et lorsque vous transformez les données en différentes formes, il y aurait plusieurs copies. En tant que tel, avoir plusieurs copies d'une partition entière entraînerait un renversement des choses et nous rendrait tous tristes. Au lieu de cela, vous devez diffuser un bloc dans la mémoire à la fois.
À ce stade, vous devriez passer les tests dans DiskPartitionSuite.scala
.
object DiskHashedRelation
Votre tâche dans cette partie sera de mettre en œuvre la phase 1 du hachage externe - en utilisant une fonction de hachage à grain grossier pour diffuser une entrée dans plusieurs relations de partition sur le disque. Pour nos besoins, la méthode hashCode
dont chaque objet a est suffisante pour générer une valeur de hachage, et prendre le modulo par le nombre de partitions est une fonction de hachage acceptable.
À ce stade, vous devriez passer tous les tests dans DiskHashedRelationSuite.scala
.
Dans cette section, nous allons faire face à case class CacheProject
dans basicOperators.scala
. Vous remarquerez peut-être qu'il n'y a que 4 lignes de code dans cette classe et, plus important encore, aucun // IMPLEMENT ME
. Vous n'avez pas vraiment à écrire de code ici. Cependant, si vous tracez l'appel de la fonction à la ligne 66, vous constaterez qu'il y a deux parties de cette pile que vous devez implémenter afin d'avoir une implémentation UDF en mémoire fonctionnelle.
CS143Utils
Pour cette tâche, vous devrez implémenter getUdfFromExpressions
et les méthodes Iterator
dans CachingIteratorGenerator#apply
. Veuillez lire les docstrings - en particulier pour apply
- de près avant de commencer.
Après la mise en œuvre de ces méthodes, vous devriez passer les tests dans CS143UtilsSuite.scala
.
Astuce: réfléchissez attentivement à la raison pour laquelle ces méthodes pourraient faire partie des utils
Vient maintenant le moment de la vérité! Nous avons mis en œuvre le partitionnement de hachage basé sur le disque et nous avons implémenté la mise en cache UDF en mémoire - ce qu'on appelle parfois la mémorisation. La mémorisation est un outil très puissant dans de nombreux contextes, mais ici dans les bases de données-terrains, nous traitons des quantités plus importantes de données que la mémoire ne peut en gérer. Si nous avons des valeurs plus uniques qui peuvent s'adapter dans un cache en mémoire, nos performances se dégradent rapidement. Ainsi, nous nous retournons à la tradition de bases de données séculaire de division et de conquête. Si nos données ne correspondent pas à la mémoire, alors nous pouvons la partitionner sur disque une fois, lisez une partition à la fois (réfléchissez à la raison pour laquelle cela fonctionne (indice: rendez-vous!)) Et effectuer la mise en cache UDF, évaluant une partition à la fois .
PartitionProject
Cette tâche finale nécessite de remplir la mise en œuvre de PartitionProject
. Tout le code que vous devrez écrire réside dans la méthode generateIterator
. Réfléchissez attentivement à la façon dont vous devez organiser votre implémentation. Vous ne devez pas être tampon toutes les données en mémoire ou quelque chose de similaire à cela.
À ce stade, vous devriez passer tous les tests donnés.
Il n'y a pas de code que vous devez écrire ici, mais pour votre propre édification, passez un peu de temps à réfléchir à la question suivante:
L'un des principaux arguments de vente de Spark est qu'il est "en mémoire". Ce qu'ils signifient, c'est ce qui suit: Lorsque vous enchaînez un certain nombre de travaux Hadoop (ou tout autre cadre MapReduce) ensemble, Hadoop écrira les résultats de chaque phase sur le disque et les rerira, ce qui est très cher; Spark, en revanche, maintient ses données en mémoire. Cependant, si notre hypothèse est que si nos données ne correspondent pas à la mémoire, pourquoi Spark SQL n'expédie-t-elle pas déjà avec la mise en œuvre basée sur le disque de ces opérateurs? À cet égard, pourquoi Spark est-il différent des bases de données relationnelles parallèles "traditionnelles" que nous apprenons en classe? Il n'y a pas de bonne réponse à cette question!
Nous vous avons fourni quelques échantillons de tests dans DiskPartitionSuite.scala
, DiskHasedRelationSuite.scala
, CS143UtilsSuite.scala
et ProjectSuite.scala
. Ces tests peuvent vous guider lorsque vous terminez ce projet. Cependant, gardez à l'esprit qu'ils ne sont pas complets et que vous êtes bien invité à rédiger vos propres tests pour attraper des bogues. J'espère que vous pourrez utiliser ces tests comme modèles pour générer vos propres tests.
Afin d'exécuter nos tests, nous avons fourni un simple makefile. Afin d'exécuter les tests de la tâche 1, exécutez make t1
. En conséquence pour la tâche, exécutez make t2
et la même pour tous les autres tests. make all
exécutera tous les tests.
Le lien de soumission sera créé sur CCLE, où vous pouvez soumettre votre code à la date d'échéance.
Un grand merci à Matteo Interlandi.
Bonne chance!