百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

重磅!Vertica集成Apache Hudi指南

lipiwang 2024-10-15 18:40 12 浏览 0 评论



1. 摘要

本文演示了使用外部表集成 Vertica 和 Apache Hudi。在演示中我们使用 Spark 上的 Apache Hudi 将数据摄取到 S3 中,并使用 Vertica 外部表访问这些数据。

2. Apache Hudi介绍

Apache Hudi 是一种变更数据捕获 (CDC) 工具,可在不同时间线将事务记录在表中。Hudi 代表 Hadoop Upserts Deletes and Incrementals,是一个开源框架。Hudi 提供 ACID 事务、可扩展的元数据处理,并统一流和批处理数据处理。以下流程图说明了该过程。使用安装在 Apache Spark 上的 Hudi 将数据处理到 S3,并从 Vertica 外部表中读取 S3 中的数据更改。

3. 环境准备

?Apache Spark 环境。使用具有 1 个 Master 和 3 个 Worker 的 4 节点集群进行了测试。按照在多节点集群上设置 Apache Spark 中的说明安装 Spark 集群环境[1]。启动 Spark 多节点集群。?Vertica 分析数据库。使用 Vertica Enterprise 11.0.0 进行了测试。?AWS S3 或 S3 兼容对象存储。使用 MinIO 作为 S3 存储桶进行了测试。?需要以下 jar 文件。将 jar 复制到 Spark 机器上任何需要的位置,将这些 jar 文件放在 /opt/spark/jars 中。?Hadoop - hadoop-aws-2.7.3.jar?AWS - aws-java-sdk-1.7.4.jar?在 Vertica 数据库中运行以下命令来设置访问存储桶的 S3 参数:SELECT SET_CONFIG_PARAMETER('AWSAuth', 'accesskey:secretkey');
SELECT SET_CONFIG_PARAMETER('AWSRegion','us-east-1');
SELECT SET_CONFIG_PARAMETER('AWSEndpoint',’<S3_IP>:9000');
SELECT SET_CONFIG_PARAMETER('
AWSEnableHttps','0');

endpoint可能会有所不同,具体取决于 S3 存储桶位置选择的 S3 对象存储。


4. Vertica和Apache Hudi集成

要将 Vertica 与 Apache Hudi 集成,首先需要将 Apache Spark 与 Apache Hudi 集成,配置 jars,以及访问 AWS S3 的连接。其次,将 Vertica 连接到 Apache Hudi。然后对 S3 存储桶执行 Insert、Append、Update 等操作。按照以下部分中的步骤将数据写入 Vertica。 在 Apache Spark 上配置 Apache Hudi 和 AWS S3[2] 配置 Vertica 和 Apache Hudi 集成[3]

4.1 在 Apache Spark 上配置 Apache Hudi 和 AWS S3

在 Apache Spark 机器中运行以下命令。这会下载 Apache Hudi 包,配置 jar 文件,以及 AWS S3

Bash
/opt/spark/bin/spark-shell \--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1

导入Hudi的读、写等所需的包:

Bash
import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._

使用以下命令根据需要配置 Minio 访问密钥、Secret key、Endpoint 和其他 S3A 算法和路径。

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "*****")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "*****")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://XXXX.9000")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
sc.hadoopConfiguration.set("fs.s3a.signing-algorithm","S3SignerType")

创建变量来存储 MinIO 的表名和 S3 路径。

val tableName = “Trips
val basepath = “s3a://apachehudi/vertica/”

准备数据,使用 Scala 在 Apache spark 中创建示例数据

val df = Seq(
("aaa","r1","d1",10,"US","20211001"),
("bbb","r2","d2",20,"Europe","20211002"),
("ccc","r3","d3",30,"India","20211003"),
("ddd","r4","d4",40,"Europe","20211004"),
("eee","r5","d5",50,"India","20211005"),
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

将数据写入 AWS S3 并验证此数据

df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

使用 Scala 运行以下命令以验证是否从 S3 存储桶中正确读取数据。

spark.read.format("hudi").load(basePath).createOrReplaceTempView("dta")
spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare,ts, partitionpath from dta order by uuid").show()

4.2 配置 Vertica 和 Apache HUDI 集成

在 vertica 中创建一个外部表,其中包含来自 S3 上 Hudi 表的数据。我们创建了“旅行”表。

CREATE EXTERNAL TABLE Trips
(
_hoodie_commit_time TimestampTz,
uuid varchar,
rider varchar,
driver varchar,
fare int,
ts varchar,
partitionpath varchar
)
AS COPY FROM
's3a://apachehudi/parquet/vertica/*/*.parquet' PARQUET;

运行以下命令以验证正在读取外部表:

4.3 如何让 Vertica 查看更改的数据

以下部分包含为查看 Vertica 中更改的数据而执行的一些操作的示例。

4.3.1 写入数据

在这个例子中,我们使用 Scala 在 Apache spark 中运行了以下命令并附加了一些数据:

val df2 = Seq(
("fff","r6","d6",50,"India","20211005")
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

运行以下命令将此数据附加到 S3 上的 Hudi 表中:

df2.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

4.3.2 更新数据

在这个例子中,我们更新了一条 Hudi 表的记录。需要导入数据以触发并更新数据:

val df3 = Seq(
("aaa","r1","d1",100,"US","20211001"),
("eee","r5","d5",500,"India","20211001")
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

运行以下命令将数据更新到 S3 上的 HUDI 表:

df3.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

以下是 spark.sql 的输出:

以下是 Vertica 输出:

4.3.3 创建和查看数据的历史快照

执行以下指向特定时间戳的 spark 命令:

val dd = spark.read
.format("hudi")
.option("as.of.instant", "20211007092600")
.load(basePath)

使用以下命令将数据写入 S3 中的 parquet:

dd.write.parquet("s3a://apachehudi/parquet/p2")

在此示例中,我们正在读取截至“20211007092600”日期的 Hudi 表快照。

dd.show

通过在 parquet 文件上创建外部表从 Vertica 执行命令。


引用链接

[1] 多节点集群上设置 Apache Spark 中的说明安装 Spark 集群环境: [https://spark.apache.org/docs/](https://spark.apache.org/docs/)
[2] 在 Apache Spark 上配置 Apache Hudi 和 AWS S3: [https://www.vertica.com/kb/Apache_Hudi_TE/Content/Partner/Apache_Hudi_TE.htm#Configur](https://www.vertica.com/kb/Apache_Hudi_TE/Content/Partner/Apache_Hudi_TE.htm#Configur)
[3] 配置 Vertica 和 Apache Hudi 集成: [https://www.vertica.com/kb/Apache_Hudi_TE/Content/Partner/Apache_Hudi_TE.htm#Configur2](https://www.vertica.com/kb/Apache_Hudi_TE/Content/Partner/Apache_Hudi_TE.htm#Configur2)


相关推荐

如何在 Linux 中压缩文件和目录?(linux压缩文件夹到指定目录)

在Linux系统中,文件和目录的压缩是一项常见且重要的操作。无论是为了节省存储空间、便于文件传输,还是进行备份管理,掌握压缩技术都能极大地提升工作效率。Linux中常用的压缩工具1.tar:打...

什么是LIM模具?与普通硅胶模具有何本质区别?

要深入理解LIM模具及其与普通硅胶模具的本质区别,需从成型逻辑、技术架构、应用价值三个层面拆解,以下是系统性解析:一、LIM模具:定义与核心技术1.定义LIM模具(LiquidInj...

前后端安全机制(前后端分离安全的token)

一、密钥安全管理方案1.动态密钥分发机制密钥与会话绑定后端为每个用户会话生成临时密钥(如AES-256密钥),通过HTTPS加密传输给前端,会话结束后自动失效。例如:javascript//...

Switch 2芯片细节曝光,英伟达专门定制支持DLSS,网友:掌机模式相当于PS4

Switch2处理器,细节被实锤!数毛社(DigitalFoundry)消息,已经确定Switch2采用的是英伟达真·定制版芯片,包含8核CPU和12GBLPDDR5X内存。GPU则基于Amp...

独立站的PageSpeed Insights 指标在seo中的作用?

这是一个非常关键的问题,关于独立站(如Shopify、WordPress、自建FastAPI/Vue等网站)的PageSpeedInsights指标(Google的网页性能评分工具)在...

前端工程化-webpack 分包的方式有哪些?

Webpack的分包(CodeSplitting)是优化应用性能的重要手段,主要通过合理拆分代码减少首次加载体积、提升缓存利用率。以下是常见的分包方式及生产/开发环境配置建议:一、Webpack...

液态硅胶(LSR)套啤注塑件的关键技术难点与解决方案?

液态硅胶(LSR)套啤注塑件(即二次注塑成型,一次成型基材+二次LSR包胶)在医疗、电子、汽车等领域应用广泛,但其关键技术难点需从材料、模具、工艺等多维度突破。以下是核心难点及解决方案:一、关...

spa首屏加载慢怎样解决(spa首屏优化)

SPA(SinglePageApplication,单页应用)首屏加载慢是一个常见问题,主要原因通常是首次加载需要拉取体积较大的JavaScript文件、样式表、初始化数据等。以下是一些常见的...

揭秘|为什么新华三(H3C)要自主研发运维管理软件?

1概述1.1产生背景随着互联网技术的快速发展,企业对计算、网络的需求也越来越大。为了保证整个数据系统可靠、稳定地运行,相关企业对运维系统的要求越来越高,运维成本也在随之逐步增加。H3C公司自主研发的运...

动态主机配置协议——DHCP详解(dhcp动态主机配置协议的功能是?)

一、DHCP简介DHCP(DynamicHostConfigurationProtocol),动态主机配置协议,是一个应用层协议。当我们将客户主机ip地址设置为动态获取方式时,DHCP服务器就会...

OGG同步到Kafka(oggforbigdata到kafka)

目的:测试使用OGG将数据单向同步到Kafka上。简要说明:Kafka使用单节点单Broker部署;单独部署简单ZooKeeper;需要使用到JAVA1.8;OGG需要2个版本,一个fororacl...

Zabbix入门操作指南(zabbix4.0使用手册)

上篇:安装与配置一.概述在开始之前,一些概念和定义需要我们提前了解一下(以下内容摘自官方网站)。1.1几个概念架构Zabbix由几个主要的功能组件组成,其职责如下所示。ServerZabbixs...

绝对干货!升级MySQL5.7到MySQL8.0的最佳实践分享

一、前言事出必有因,在这个月的某个项目中,我们面临了一项重要任务,即每年一次的等保测评整改。这次测评的重点是Mysql的一些高危漏洞,客户要求我们无论如何必须解决这些漏洞。尽管我们感到无奈,但为了满足...

pytorch v2.7.0震撼发布!Blackwell GPU支持+编译性能狂飙,AI开发

重点内容测试版(Beta):oTorch.Compile支持Torch函数模式oMega缓存原型(Prototype):o支持NVIDIABlackwell架构oPyTorch...

kubernetes1.31.3集群搭建(上)(kubectl连接集群)

1集群规划1.1物理机环境电脑操作系统CPU内存硬盘网卡IP地址(静态)虚拟机软件服务器操作系统联想Windows11Intel12900K24核128GB4TBPcIE4.0无线网卡192...

取消回复欢迎 发表评论: