En esta tarea implementará el almacenamiento en caché de resultados UDF (función definida por el usuario) en Apache Spark, que es un marco para la computación distribuida en el molde de MapReduce. Este proyecto ilustrará conceptos clave en la evaluación de la cita y la consulta de datos, y obtendrá una experiencia práctica modificando Spark, que se usa ampliamente en el campo. Además, obtendrá exposición a Scala, un lenguaje basado en JVM que está ganando popularidad por su estilo funcional limpio.
La fecha de vencimiento de la tarea se publica en el sitio web de la clase.
Puede completar esto en parejas , si lo desea. Por último, hay mucho código en este directorio. Por favor, busque aquí para encontrar el directorio donde se encuentra el código.
Spark es un sistema informático distribuido de código abierto escrito en Scala. El proyecto fue iniciado por Ph.D. Estudiantes de la Amplab y es una parte integral de la pila de análisis de datos de Berkeley (BDAS, con el afecto pronunciado "malo").
Al igual que Hadoop MapReduce, Spark está diseñado para ejecutar funciones sobre grandes colecciones de datos, al admitir un conjunto simplificado de operaciones de procesamiento de datos de alto nivel similares a los iteradores que hemos estado aprendiendo en clase. Uno de los usos más comunes de tales sistemas es implementar el procesamiento de consultas paralelas en idiomas de alto nivel, como SQL. De hecho, muchos esfuerzos recientes de investigación y desarrollo en Spark se han destinado a apoyar una abstracción de base de datos relacional escalable e interactiva.
Usaremos, modificaremos y estudiaremos aspectos de Spark en esta clase para comprender los conceptos clave de los sistemas de datos modernos. Más importante aún, verá que las ideas que estamos cubriendo en clase, algunas de las cuales tienen décadas de edad, todavía son muy relevantes hoy en día. Específicamente, agregaremos características a Spark SQL.
Una limitación clave de Spark SQL es que actualmente es un sistema principal solo para memoria. Como parte de esta clase, la ampliaremos para incluir algunos algoritmos fuera del núcleo también.
Scala es un lenguaje de tipo estática que admite muchos paradigmas de programación diferentes. Su flexibilidad, potencia y portabilidad se han vuelto especialmente útiles en la investigación de sistemas distribuidos.
Scala se asemeja a Java, pero posee un conjunto mucho más amplio de características de sintaxis para facilitar múltiples paradigmas. Conocer Java lo ayudará a comprender algún código de Scala, pero no mucho de él, y no saber que Scala le impedirá aprovechar completamente su poder expresivo. Debido a que debe escribir código en Scala, le recomendamos encarecidamente que adquiera al menos una familiaridad pasajera con el idioma.
La idea de IntelliJ tiende a ser el IDE más utilizado para desarrollar en Spark. IntelliJ es un ide Java que tiene un complemento Scala (y Vim!). También hay otras opciones como Scala-IDE.
Puede encontrar que los siguientes tutoriales son útiles:
Las funciones definidas por el usuario permiten a los desarrolladores definir y explotar las operaciones personalizadas dentro de las expresiones. Imagine, por ejemplo, que tiene un catálogo de productos que incluye fotos del empaque del producto. Es posible que desee registrar una función definida por el usuario extract_text
que llame a un algoritmo OCR y devuelve el texto en una imagen, para que pueda obtener información de consulta de las fotos. En SQL, puedes imaginar una consulta como esta:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
La capacidad de registrar UDFS es muy poderosa: esencialmente convierte su marco de procesamiento de datos en un marco informático distribuido general. Pero los UDF a menudo pueden introducir cuellos de botella de rendimiento, especialmente cuando los ejecutamos en millones de elementos de datos.
Si las columnas de entrada a un UDF contienen muchos valores duplicados, puede ser beneficioso mejorar el rendimiento al garantizar que el UDF solo se llame una vez por valor de entrada distinto , en lugar de una vez por fila . (Por ejemplo, en nuestros productos de los productos anteriores, todas las diferentes configuraciones de una PC en particular podrían tener la misma imagen). En esta tarea, implementaremos esta optimización. Lo tomaremos en etapas: primero lo haga funcionar para datos que se ajusten en la memoria, y luego para conjuntos más grandes que requieren un enfoque fuera del núcleo. Usaremos el hashing externo como técnica para "reunirse" todas las filas con los mismos valores de entrada para el UDF.
Si está interesado en el tema, el siguiente documento será una lectura interesante (incluidas optimizaciones adicionales más allá de lo que tenemos tiempo en esta tarea):
Todo el código que tocará estará en tres archivos: CS143Utils.scala
, basicOperators.scala
y DiskHashedRelation.scala
. Sin embargo, es posible que deba consultar otros archivos dentro de Spark o las API Generales Scala para completar a fondo la tarea. Asegúrese de revisar todo el código proporcionado en los tres archivos mencionados anteriormente antes de comenzar a escribir su propio código. Hay muchas funciones útiles en CS143Utils.scala
, así como en DiskHashedRelation.scala
que le ahorrará mucho tiempo y maldecir, ¡aprovéngelos!
En general, hemos definido la mayoría (si no todos) de los métodos que necesitará. Como antes, en este proyecto, debe completar el esqueleto. La cantidad de código que escribirá no es muy alta: la solución total del personal es inferior a 100 líneas de código (sin incluir pruebas). Sin embargo, unir los componentes correctos de una manera eficiente de memoria (es decir, no leer toda la relación en la memoria a la vez) requerirá un poco de pensamiento y una planificación cuidadosa.
Hay algunas diferencias potencialmente confusas entre la terminología que usamos en la clase y la terminología utilizada en la base de código SparkSQL:
El concepto de "iterador" que aprendimos en la conferencia se llama "nodo" en el código SparkSQL: hay definiciones en el código para UnaryNode y BinaryNode. Un plan de consulta se llama Plan de Spark, y de hecho Unarynode y BinaryNode extienden Sparkplan (¡después de todo, un solo iterador es un pequeño plan de consulta!) Es posible que desee encontrar el archivo SparkPlan.scala
en la fuente SparkSQL para ver la API para estos nodos.
En algunos de los comentarios en SparkSQL, también usan el término "operador" para significar "nodo". El archivo basicOperators.scala
define una serie de nodos específicos (por ejemplo, clasificación, distinta, etc.).
No confunda el Iterator
de la interfaz de Scala con el concepto de iterador que cubrimos en la conferencia. El Iterator
que usará en este proyecto es una característica de lenguaje Scala que utilizará para implementar sus nodos SparkSQL. Iterator
proporciona una interfaz a las colecciones de Scala que hace cumplir una API específica: la next
y hasNext
Functions.
git
y Github git
es un sistema de control de versiones , que lo ayuda a rastrear diferentes versiones de su código, sincronizarlas en diferentes máquinas y colaborar con otros. GitHub es un sitio que admite este sistema, alojándolo como un servicio.
Si no sabe mucho sobre git
, le recomendamos encarecidamente que se familiarice con este sistema; ¡Pasará mucho tiempo con él! Hay muchas guías para usar git
Online; aquí hay una excelente para leer.
Primero debe configurar un repositorio privado remoto (por ejemplo, Spark-Homework). Github ofrece repositorio privado a los estudiantes (pero esto puede llevar algún tiempo). Si no tiene un repositorio privado, piense dos veces antes de verificarlo en el repositorio público, ya que estará disponible para que otros puedan checheckout.
$ cd ~
Clone su repositorio personal. Debería estar vacío.
$ git clone "https://github.com/xx/yy.git"
Ingrese el repositorio clonado, rastree el repositorio del curso y clona.
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
Nota: No se sienta abrumado por la cantidad de código que está aquí. Spark es un gran proyecto con muchas características. El código que tocamos estará contenido en un directorio específico: SQL/Core/Src/Main/Scala/Org/Apache/Spark/SQL/Ejecution/. Todas las pruebas estarán contenidas en SQL/Core/Src/Test/Scala/Org/Apache/Spark/SQL/Execution/
Empuje el clon a su repositorio personal.
$ git push origin master
Cada vez que agrega algún código, puede cometer las modificaciones al repositorio remoto.
$ git commit -m 'update to homework'
$ git push origin master
Puede ser necesario recibir actualizaciones de nuestra tarea (a pesar de que intentamos liberarlas lo más "perfectamente" posible la primera vez). Suponiendo que configure el seguimiento correctamente, simplemente puede ejecutar este siguiente comando para recibir actualizaciones de asignación:
$ git pull course master
El siguiente comando UNIX será útil, cuando necesite encontrar la ubicación de un archivo. Ejemplo: busque la ubicación de un archivo llamado 'diskhashedrelation.scala' en mi repositorio actual.
$ find ./ -name 'DiskHashedRelation.scala'
Una vez que haya extraído el código, cd
en {repo root}
y ejecute make compile
. La primera vez que ejecuta este comando, debería tomar un tiempo: sbt
descargará todas las dependencias y compilará todo el código en Spark (hay bastante código). Una vez que terminen los comandos iniciales de ensamblaje, ¡puede comenzar su proyecto! (Las construcciones futuras no deben tomar tanto: sbt
es lo suficientemente inteligente como para recompilar solo los archivos cambiados, a menos que ejecute make clean
, que eliminará todos los archivos de clase compilados).
Le hemos proporcionado un código de esqueleto para DiskHashedRelation.scala
. Este archivo tiene 4 cosas importantes:
trait DiskHashedRelation
define la interfaz de recelación de disco de discoclass GeneralDiskHashedRelation
es nuestra implementación del rasgo de la Relación DiskedHashedRelation
class DiskPartition
representa una única partición en el discoobject DiskHashedRelation
se puede considerar como una fábrica de objetos que construye GeneralDiskHashedRelation
de DiskhashedrelationDiskPartition
y GeneralDiskHashedRelation
Primero, deberá implementar los métodos insert
, closeInput
y getData
en DiskPartition
para esta parte. Para los dos anteriores, las documentos deben proporcionar una descripción completa de lo que debe implementar. La advertencia con getData
es que no puede leer la partición completa en la memoria una vez. La razón por la que estamos aplicando esta restricción es que no hay una buena manera de aplicar la memoria de liberación en el JVM, y a medida que transforma los datos en diferentes formas, habría varias copias por ahí. Como tal, tener múltiples copias de una partición completa causaría que las cosas se derramen al disco y nos pondría a todos tristes. En cambio, debe transmitir un bloque a la memoria a la vez.
En este punto, debe pasar las pruebas en DiskPartitionSuite.scala
.
object DiskHashedRelation
Su tarea en esta porción será implementar la fase 1 del hash externo, utilizando una función hash de grano grueso para transmitir una entrada en múltiples relaciones de partición en el disco. Para nuestros propósitos, el método hashCode
que tiene cada objeto es suficiente para generar un valor hash, y tomar el módulo por el número de particiones es una función hash aceptable.
En este punto, debe pasar todas las pruebas en DiskHashedRelationSuite.scala
.
En esta sección, lidiaremos con case class CacheProject
en basicOperators.scala
. Puede notar que solo hay 4 líneas de código en esta clase y, lo que es más importante, no // IMPLEMENT ME
. En realidad no tienes que escribir ningún código aquí. Sin embargo, si rastrea la llamada de función en la línea 66, encontrará que hay dos partes de esta pila que debe implementar para tener una implementación funcional en memoria UDF.
CS143Utils
Para esta tarea, deberá implementar getUdfFromExpressions
y los métodos Iterator
en CachingIteratorGenerator#apply
. Lea las documentos, especialmente para apply
, de cerca antes de comenzar.
Después de implementar estos métodos, debe pasar las pruebas en CS143UtilsSuite.scala
.
Sugerencia: Piense cuidadosamente sobre por qué estos métodos podrían ser parte de los UTILS
¡Ahora viene el momento de la verdad! Hemos implementado la partición hash basada en disco, y hemos implementado el almacenamiento en caché de UDF en memoria, lo que a veces se llama memoización. La memoización es una herramienta muy poderosa en muchos contextos, pero aquí en bases de datos-tierra, tratamos con mayores cantidades de datos de lo que la memoización puede manejar. Si tenemos valores más únicos de los que pueden encajar en un caché en memoria, nuestro rendimiento se degradará rápidamente. Por lo tanto, recurrimos a la tradición de las bases de datos tradicionales de dividir y concebir. Si nuestros datos no encajan en la memoria, entonces podemos dividirlos en el disco una vez, lea una partición a la vez (piense por qué funciona (sugerir: ¡Rendezvous!)) Y realice el almacenamiento en caché de UDF, evaluando una partición a la vez a la vez .
PartitionProject
Esta tarea final requiere que complete la implementación de PartitionProject
. Todo el código que necesitará escribir está en el método generateIterator
. Piense cuidadosamente sobre cómo necesita organizar su implementación. No debe almacenar todos los datos en la memoria ni nada similar a eso.
En este punto, deberías pasar todas las pruebas dadas.
No hay un código que tenga que escribir aquí, pero para su propia edificación, pase algún tiempo pensando en la siguiente pregunta:
Uno de los principales puntos de venta de Spark es que está "en memoria". Lo que significan es lo siguiente: cuando une una serie de trabajos de Hadoop (o cualquier otro marco de MapReduce) juntos, Hadoop escribirá los resultados de cada fase para disco y los leerá nuevamente, lo cual es muy costoso; Spark, por otro lado, mantiene sus datos en la memoria. Sin embargo, si suponemos que si nuestros datos no encajan en la memoria, ¿por qué SKIS SQL no se envía con la implementación basada en el disco de estos operadores? A este respecto, ¿por qué Spark es diferente de las bases de datos relacionales paralelas "tradicionales" que aprendemos en clase? ¡No hay una respuesta correcta a esta pregunta!
Le hemos proporcionado algunas pruebas de muestra en DiskPartitionSuite.scala
, DiskHasedRelationSuite.scala
, CS143UtilsSuite.scala
y ProjectSuite.scala
. Estas pruebas pueden guiarlo a medida que complete este proyecto. Sin embargo, tenga en cuenta que no son completos, y se le recomienda que escriba sus propias pruebas para atrapar errores. Con suerte, puede usar estas pruebas como modelos para generar sus propias pruebas.
Para ejecutar nuestras pruebas, hemos proporcionado un simple makefile. Para ejecutar las pruebas para la Tarea 1, ejecute make t1
. En consecuencia para la tarea, ejecute make t2
y lo mismo para todas las demás pruebas. make all
ejecutará todas las pruebas.
El enlace de envío se creará en CCLE, donde puede enviar su código antes de la fecha de vencimiento.
Muchas gracias a Matteo Interlandi.
¡Buena suerte!