paint-brush
Desarrollo de streaming de lagos de datos con Hudi y MinIOpor@minio
7,249 lecturas
7,249 lecturas

Desarrollo de streaming de lagos de datos con Hudi y MinIO

por MinIO14m2023/08/29
Read on Terminal Reader

Demasiado Largo; Para Leer

Apache Hudi fue el primer formato de tabla abierta para lagos de datos y es digno de consideración en arquitecturas de transmisión. El uso de MinIO para el almacenamiento de Hudi allana el camino para análisis y lagos de datos de múltiples nubes.
featured image - Desarrollo de streaming de lagos de datos con Hudi y MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Hudi es una plataforma de lago de datos de transmisión que brinda la funcionalidad central de base de datos y almacén directamente al lago de datos. No contento con llamarse a sí mismo un formato de archivo abierto como Delta o Apache Iceberg , Hudi proporciona tablas, transacciones, actualizaciones/eliminaciones, índices avanzados, servicios de ingesta de streaming, optimizaciones de agrupación/compactación de datos y concurrencia.


Introducido en 2016, Hudi está firmemente arraigado en el ecosistema de Hadoop, lo que explica el significado detrás del nombre: Hadoop Upserts and Incrementals. Fue desarrollado para gestionar el almacenamiento de grandes conjuntos de datos analíticos en HDFS. El objetivo principal de Hudi es disminuir la latencia durante la ingesta de datos de transmisión.


Mesa Hudi


Con el tiempo, Hudi ha evolucionado para utilizar almacenamiento en la nube y almacenamiento de objetos, incluido MinIO. El alejamiento de Hudi de HDFS va de la mano con la tendencia más amplia del mundo de dejar atrás HDFS heredado para un almacenamiento de objetos de alto rendimiento, escalable y nativo de la nube. La promesa de Hudi de proporcionar optimizaciones que aceleren las cargas de trabajo analíticas para Apache Spark, Flink, Presto, Trino y otros encaja muy bien con la promesa de MinIO de rendimiento de aplicaciones nativas de la nube a escala.


Las empresas que utilizan Hudi en la producción incluyen Uber , Amazon , ByteDance y Robinhood . Estos son algunos de los lagos de datos de transmisión más grandes del mundo. La clave de Hudi en este caso de uso es que proporciona una pila de procesamiento de datos incremental que realiza un procesamiento de baja latencia en datos en columnas. Normalmente, los sistemas escriben los datos una vez utilizando un formato de archivo abierto como Apache Parquet u ORC, y los almacenan encima de un almacenamiento de objetos altamente escalable o un sistema de archivos distribuido. Hudi sirve como un plano de datos para ingerir, transformar y administrar estos datos. Hudi interactúa con el almacenamiento mediante la API Hadoop FileSystem , que es compatible (pero no necesariamente óptima) con implementaciones que van desde HDFS hasta almacenamiento de objetos y sistemas de archivos en memoria.

Formato de archivo Hudi

Hudi utiliza un archivo base y archivos de registro delta que almacenan actualizaciones/cambios en un archivo base determinado. Los archivos base pueden ser Parquet (de columnas) o HFile (indexados). Los registros delta se guardan como Avro (fila) porque tiene sentido registrar los cambios en el archivo base a medida que ocurren.


Hudi codifica todos los cambios en un archivo base determinado como una secuencia de bloques. Los bloques pueden ser bloques de datos, bloques de eliminación o bloques de reversión. Estos bloques se fusionan para derivar archivos base más nuevos. Esta codificación también crea un registro autónomo.



Formato de archivo Hudi

Fuente .

Formato de tabla Hudi

Un formato de tabla consta del diseño del archivo de la tabla, el esquema de la tabla y los metadatos que rastrean los cambios en la tabla. Hudi aplica el esquema en escritura, de acuerdo con el énfasis en el procesamiento de flujo, para garantizar que las canalizaciones no se rompan debido a cambios no compatibles con versiones anteriores.


Hudi agrupa archivos para una tabla/partición determinada y asigna entre claves de registro y grupos de archivos. Como se mencionó anteriormente, todas las actualizaciones se registran en los archivos de registro delta para un grupo de archivos específico. Este diseño es más eficiente que Hive ACID, que debe fusionar todos los registros de datos con todos los archivos base para procesar consultas. El diseño de Hudi anticipa inserciones y eliminaciones rápidas basadas en claves, ya que funciona con registros delta para un grupo de archivos, no para un conjunto de datos completo.


Hudi agrupa archivos para una tabla/partición determinada y asigna entre claves de registro y grupos de archivos. Como se mencionó anteriormente, todas las actualizaciones se registran en los archivos de registro delta para un grupo de archivos específico. Este diseño es más eficiente que Hive ACID, que debe fusionar todos los registros de datos con todos los archivos base para procesar consultas. El diseño de Hudi anticipa inserciones y eliminaciones rápidas basadas en claves, ya que funciona con registros delta para un grupo de archivos, no para un conjunto de datos completo.


Formato de tabla Hudi

Fuente .


Es fundamental comprender la línea de tiempo porque sirve como fuente de registro de eventos veraz para todos los metadatos de las tablas de Hudi. La línea de tiempo se almacena en la carpeta .hoodie , o depósito en nuestro caso. Los eventos se conservan en la línea de tiempo hasta que se eliminen. La línea de tiempo existe para una tabla general así como para grupos de archivos, lo que permite la reconstrucción de un grupo de archivos aplicando los registros delta al archivo base original. Para optimizar las escrituras/confirmaciones frecuentes, el diseño de Hudi mantiene los metadatos pequeños en relación con el tamaño de toda la tabla.


Los nuevos eventos en la línea de tiempo se guardan en una tabla de metadatos interna y se implementan como una serie de tablas de fusión en lectura, lo que proporciona una baja amplificación de escritura. Como resultado, Hudi puede absorber rápidamente cambios rápidos en los metadatos. Además, la tabla de metadatos utiliza el formato de archivo base HFile, lo que optimiza aún más el rendimiento con un conjunto de búsquedas indexadas de claves que evita la necesidad de leer toda la tabla de metadatos. Todas las rutas de archivos físicos que forman parte de la tabla se incluyen en los metadatos para evitar listas de archivos en la nube costosas y que requieren mucho tiempo.

Escritores Hudi

Los escritores de Hudi facilitan arquitecturas en las que Hudi actúa como una capa de escritura de alto rendimiento con soporte de transacciones ACID que permite cambios incrementales muy rápidos, como actualizaciones y eliminaciones.


Una arquitectura típica de Hudi se basa en canalizaciones Spark o Flink para entregar datos a tablas de Hudi. La ruta de escritura de Hudi está optimizada para ser más eficiente que simplemente escribir un archivo Parquet o Avro en el disco. Hudi analiza las operaciones de escritura y las clasifica como incrementales ( insert , upsert , delete ) o por lotes ( insert_overwrite , insert_overwrite_table , delete_partition , bulk_insert ) y luego aplica las optimizaciones necesarias.


Los escritores de Hudi también son responsables de mantener los metadatos. Para cada registro, se escriben la hora de confirmación y un número de secuencia exclusivo de ese registro (esto es similar a un desplazamiento de Kafka), lo que permite derivar cambios a nivel de registro. Los usuarios también pueden especificar campos de hora del evento en flujos de datos entrantes y realizar un seguimiento utilizando metadatos y la línea de tiempo de Hudi. Esto puede suponer mejoras espectaculares en el procesamiento de flujos, ya que Hudi contiene tanto la llegada como la hora del evento para cada registro, lo que permite crear marcas de agua sólidas para tuberías complejas de procesamiento de flujos.

Lectores Hudi

El aislamiento de instantáneas entre escritores y lectores permite consultar instantáneas de tablas de manera consistente desde todos los principales motores de consulta de lagos de datos, incluidos Spark, Hive, Flink, Prest, Trino e Impala. Al igual que Parquet y Avro, las tablas Hudi pueden leerse como tablas externas en empresas como Snowflake y SQL Server .


Los lectores Hudi están desarrollados para ser livianos. Siempre que sea posible, se utilizan lectores vectorizados y almacenamiento en caché específicos del motor, como los de Presto y Spark. Cuando Hudi tiene que fusionar archivos base y de registro para una consulta, Hudi mejora el rendimiento de la fusión utilizando mecanismos como mapas derramables y lectura diferida, al mismo tiempo que proporciona consultas optimizadas para lectura.


Hudi incluye más de unas pocas capacidades de consulta incrementales notablemente poderosas. Los metadatos son el núcleo de esto, lo que permite consumir grandes confirmaciones como fragmentos más pequeños y desacoplar completamente la escritura y la consulta incremental de datos. Mediante el uso eficiente de metadatos, el viaje en el tiempo es simplemente otra consulta incremental con un punto de inicio y finalización definido. Hudi asigna claves de forma atómica a grupos de archivos únicos en cualquier momento dado, lo que admite capacidades CDC completas en tablas de Hudi. Como se analizó anteriormente en la sección de escritores de Hudi, cada tabla se compone de grupos de archivos y cada grupo de archivos tiene sus propios metadatos independientes.

¡Hurra por Hudi!

La mayor fortaleza de Hudi es la velocidad con la que ingiere datos en streaming y por lotes. Al brindar la capacidad de upsert , Hudi ejecuta tareas en órdenes de magnitud más rápido que reescribir tablas o particiones enteras.


Para aprovechar la velocidad de ingesta de Hudi, los lagos de datos requieren una capa de almacenamiento capaz de altos IOPS y rendimiento. La combinación de escalabilidad y alto rendimiento de MinIO es justo lo que Hudi necesita. MinIO es más que capaz de alcanzar el rendimiento necesario para alimentar un lago de datos empresariales en tiempo real: un punto de referencia reciente logró 325 GiB/s (349 GB/s) en GET y 165 GiB/s (177 GB/s) en PUT con solo 32 nodos de SSD NVMe disponibles en el mercado.


Un lago de datos empresarial activo de Hudi almacena una gran cantidad de pequeños archivos Parquet y Avro. MinIO incluye una serie de optimizaciones de archivos pequeños que permiten lagos de datos más rápidos. Los objetos pequeños se guardan en línea con los metadatos, lo que reduce las IOPS necesarias para leer y escribir archivos pequeños como índices y metadatos de Hudi.


El esquema es un componente crítico de cada tabla Hudi. Hudi puede hacer cumplir el esquema o puede permitir la evolución del esquema para que la canalización de datos de transmisión pueda adaptarse sin romperse. Además, Hudi aplica el esquema en el escritor para garantizar que los cambios no interrumpan las canalizaciones. Hudi confía en Avro para almacenar, gestionar y desarrollar el esquema de una tabla.


Hudi proporciona garantías transaccionales ACID a los lagos de datos. Hudi garantiza escrituras atómicas: las confirmaciones se realizan de forma atómica en una línea de tiempo y se les asigna una marca de tiempo que indica el momento en que se considera que ocurrió la acción. Hudi aísla instantáneas entre los procesos de escritura, tabla y lectura para que cada uno opere en una instantánea consistente de la tabla. Hudi completa esto con un control de concurrencia optimista (OCC) entre escritores y un control de concurrencia sin bloqueo basado en MVCC entre servicios de tabla y escritores y entre múltiples servicios de tabla.

Tutorial de Hudi y MinIO

Este tutorial lo guiará en la configuración de Spark, Hudi y MinIO y le presentará algunas funciones básicas de Hudi. Este tutorial se basa en la Guía Apache Hudi Spark , adaptada para trabajar con almacenamiento de objetos MinIO nativo de la nube.


Tenga en cuenta que trabajar con depósitos versionados agrega cierta sobrecarga de mantenimiento a Hudi. Cualquier objeto que se elimine crea un marcador de eliminación . A medida que Hudi limpia archivos usando la utilidad Cleaner, la cantidad de marcadores de eliminación aumenta con el tiempo. Es importante configurar correctamente la administración del ciclo de vida para limpiar estos marcadores de eliminación, ya que la operación de lista puede bloquearse si la cantidad de marcadores de eliminación llega a 1000. Los mantenedores del proyecto Hudi recomiendan limpiar los marcadores de eliminación después de un día usando reglas del ciclo de vida.

Requisitos previos

Descargue e instale Apache Spark.


Descargue e instale MinIO. Registre la dirección IP, el puerto TCP de la consola, la clave de acceso y la clave secreta.


Descargue e instale el cliente MinIO.


Descargue las bibliotecas de AWS y AWS Hadoop y agréguelas a su classpath para utilizar S3A para trabajar con el almacenamiento de objetos.

  • AWS: aws-java-sdk:1.10.34 (o superior)

  • Hadoop: hadoop-aws:2.7.3 (o superior)


Descargue los archivos Jar, descomprímalos y cópielos en /opt/spark/jars .

Crear un depósito MinIO

Utilice el cliente MinIO para crear un depósito para almacenar datos de Hudi:

 mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi

Inicie Spark con Hudi

Inicie Spark Shell con Hudi configurado para usar MinIO para almacenamiento. Asegúrese de configurar las entradas para S3A con su configuración de MinIO.


 spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'


Luego, inicializa Hudi dentro de Spark.

 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord


Tenga en cuenta que simplificará el uso repetido de Hudi para crear unarchivo de configuración externo .

Crear una tabla

Pruébelo y cree una pequeña tabla Hudi sencilla utilizando Scala. Hudi DataGenerator es una forma rápida y sencilla de generar inserciones y actualizaciones de muestra basadas en el esquema de viaje de muestra .


 val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator

Inserte datos en Hudi y escriba la tabla en MinIO

Lo siguiente generará nuevos datos de viaje, los cargará en un DataFrame y escribirá el DataFrame que acabamos de crear en MinIO como una tabla Hudi. mode(Overwrite) sobrescribe y recrea la tabla en caso de que ya exista. Los datos de viajes se basan en una clave de registro ( uuid ), un campo de partición ( region/country/city ) y una lógica ( ts ) para garantizar que los registros de viajes sean únicos para cada partición. Usaremos la operación de escritura predeterminada, upsert . Cuando tenga una carga de trabajo sin actualizaciones, puede usar insert o bulk_insert , que podría ser más rápido.


 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)


Abra un navegador e inicie sesión en MinIO en http://<your-MinIO-IP>:<port> con su clave de acceso y clave secreta. Verás la tabla Hudi en el cubo.


Consola MinIO


El depósito también contiene una ruta .hoodie que contiene metadatos y rutas americas y asia que contienen datos.


Metadatos


Eche un vistazo a los metadatos. Así es como se ve mi ruta .hoodie después de completar todo el tutorial. Podemos ver que modifiqué la tabla el martes 13 de septiembre de 2022 a las 9:02, 10:37, 10:48, 10:52 y 10:56.


El camino .hoodie después de completar el tutorial

Consultar datos

Carguemos datos de Hudi en un DataFrame y ejecutemos una consulta de ejemplo.

 // spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Viaje en el tiempo con Hudi

No, no estamos hablando de ir a ver un concierto de Hootie and the Blowfish en 1988.


Cada escritura en tablas Hudi crea nuevas instantáneas. Piense en las instantáneas como versiones de la tabla a las que se puede hacer referencia para consultas de viajes en el tiempo.


Pruebe algunas consultas sobre viajes en el tiempo (tendrá que cambiar las marcas de tiempo para que sean relevantes para usted).


 spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)

Actualizar datos

Este proceso es similar a cuando insertamos nuevos datos anteriormente. Para mostrar la capacidad de Hudi para actualizar datos, generaremos actualizaciones de registros de viajes existentes, los cargaremos en un DataFrame y luego escribiremos el DataFrame en la tabla de Hudi ya guardada en MinIO.


Tenga en cuenta que estamos usando el modo de guardar append . Una pauta general es utilizar el modo append a menos que esté creando una nueva tabla para que no se sobrescriban registros. Una forma típica de trabajar con Hudi es ingerir datos de transmisión en tiempo real, agregarlos a la tabla y luego escribir alguna lógica que combine y actualice los registros existentes en función de lo que se acaba de agregar. Alternativamente, escribir usando el modo overwrite elimina y vuelve a crear la tabla si ya existe.


 // spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)


Al consultar los datos se mostrarán los registros de viaje actualizados.

Consulta incremental

Hudi puede proporcionar un flujo de registros que cambiaron desde una marca de tiempo determinada mediante consultas incrementales. Todo lo que necesitamos hacer es proporcionar una hora de inicio a partir de la cual se transmitirán los cambios para ver los cambios hasta la confirmación actual, y podemos usar una hora de finalización para limitar la transmisión.


La consulta incremental es muy importante para Hudi porque le permite crear canalizaciones de transmisión a partir de datos por lotes.


 // spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

Consulta de momento determinado

Hudi puede consultar datos a partir de una fecha y hora específicas.


 // spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

Eliminar datos con eliminaciones temporales

Hudi admite dos formas diferentes de eliminar registros. Una eliminación temporal conserva la clave del registro y anula los valores de todos los demás campos. Las eliminaciones temporales persisten en MinIO y solo se eliminan del lago de datos mediante una eliminación definitiva.


 // spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

Eliminar datos con eliminaciones definitivas

Por el contrario, las eliminaciones definitivas son lo que consideramos eliminaciones. La clave de registro y los campos asociados se eliminan de la tabla.


 // spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

Insertar sobrescribir

El lago de datos se convierte en un lago de datos cuando adquiere la capacidad de actualizar los datos existentes. Vamos a generar algunos datos de viaje nuevos y luego sobrescribiremos nuestros datos existentes. Esta operación es más rápida que una upsert en la que Hudi calcula toda la partición de destino a la vez. Aquí especificamos la configuración para evitar la indexación, precombinación y reparticionamiento automáticos que upsert haría por usted.


 // spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

Evolucionar esquema de tabla y particionamiento

La evolución del esquema le permite cambiar el esquema de una tabla Hudi para adaptarse a los cambios que se producen en los datos a lo largo del tiempo.


A continuación se muestran algunos ejemplos de cómo consultar y desarrollar esquemas y particiones. Para una discusión más profunda, consulte Evolución del esquema | Apache Hudi . Tenga en cuenta que si ejecuta estos comandos, alterarán el esquema de su tabla Hudi para diferir de este tutorial.


 -- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');


Actualmente, SHOW partitions solo funciona en un sistema de archivos, ya que se basa en la ruta de la tabla del sistema de archivos.


Este tutorial utilizó Spark para mostrar las capacidades de Hudi. Sin embargo, Hudi puede admitir múltiples tipos de tablas/consultas y las tablas de Hudi se pueden consultar desde motores de consulta como Hive, Spark, Presto y muchos más. El proyecto Hudi tiene un vídeo de demostración que muestra todo esto en una configuración basada en Docker con todos los sistemas dependientes ejecutándose localmente.

¡Ulular! ¡Ulular! ¡Construyamos lagos de datos de Hudi en MinIO!

Apache Hudi fue el primer formato de tabla abierta para lagos de datos y es digno de consideración en arquitecturas de transmisión. La comunidad y el ecosistema de Hudi están vivos y activos, con un creciente énfasis en reemplazar Hadoop/HDFS con Hudi/almacenamiento de objetos para lagos de datos de streaming nativos de la nube. El uso de MinIO para el almacenamiento de Hudi allana el camino para análisis y lagos de datos de múltiples nubes. MinIO incluye replicación activa-activa para sincronizar datos entre ubicaciones (en las instalaciones, en la nube pública/privada y en el borde), permitiendo las excelentes cosas que las empresas necesitan, como el equilibrio de carga geográfica y la rápida conmutación por error.


Pruebe Hudi en MinIO hoy. Si tiene alguna pregunta o desea compartir consejos, comuníquese con nuestro canal de Slack .


También publicado aquí .