前言
开发环境:
spark:3.2.1
hive:2.1
hudi:0.11.1
scala:2.12
hudi建表语句
CREATE EXTERNAL TABLE `tb1_trips_cow_w`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
begin_lat double,
begin_lon double,
driver string,
end_lat double,
end_lon double,
fare double,
rider string,
ts bigint,
uuid string,
partitionpath string
)
PARTITIONED BY (area string,county string ,city string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://10.254.21.4:51000/user/hdpu/warehouse/kyuubi_hudi.db/tb1_trips_cow_w';
ALTER TABLE hudi.tb1_trips_cow_w ADD IF NOT EXISTS PARTITION (area=’asia’,county=’india’ ,city=’chennai’ ) LOCATION ‘/hudidatas/hudi-warehouse/tb1_trips_cow_w/asia/india/chennai’;
Pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>hudi-test-scala</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop.version>2.7.7</hadoop.version>
<spark.version>3.2.1</spark.version>
<hoodie.version>0.11.1</hoodie.version>
</properties>
<dependencies>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.8.4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hudi-spark3 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_2.12</artifactId>
<version>${hoodie.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 与 Hive 集成 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<!-- 连接Hive 驱动包-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Scala代码
package com.hudi.spark.test
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConversions._
object HudiSparkTest {
def main(args: Array[String]): Unit = {
// 创建sparkSQL的运行环境
System.setProperty("HADOOP_USER_NAME", "hdpu")
val conf = new SparkConf().setMaster("local[*]").setAppName("insertDatasToHudi")
val spark = SparkSession.builder().config(conf)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("hive.metastore.uris", "thrift://10.254.21.3:53083")
.config("dfs.client.use.datanode.hostname", "true")
.getOrCreate()
// 设置序列化方式:Kryo
//定义变量:表名,数据存储路径
val tableName: String = "tb1_trips_cow_w"
val tablePath: String = "hdfs://10.254.21.4:51000/user/hdpu/warehouse/kyuubi_hudi.db/tb1_trips_cow_w"
//数据生成器
val generator = new DataGenerator()
//插人数据
insertData(spark, tableName, tablePath, "append", generator)
//查询数据
// queryData(spark, tablePath)
//根据时间查询
// queryDataByTime(spark,tablePath)
//更新数据
// updateData(spark,generator,tableName,tablePath)
//增量查询
// incrementQueryData(spark,tablePath)
//关闭
spark.stop()
}
/**
* 插入数据
*
* @param spark
* @param tableName
* @param tablePath
* @param savemode
*/
def insertData(spark: SparkSession, tableName: String, tablePath: String, savemode: String, dataGen: DataGenerator): Unit = {
//导入隐式转换
import spark.implicits._
// 第1步、模拟乘车数据
//val generator: DataGenerator = new DataGenerator()
val insertData = convertToStringList {
dataGen.generateInserts(100)
}
val dataDF = spark.read.json(spark.sparkContext.parallelize(insertData, 2).toDS())
//保存数据
dataDF.write
.format("hudi")
.mode(savemode)
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.save(tablePath)
}
/**
* 查询数据
*
* @param spark
* @param tablePath
*/
def queryData(spark: SparkSession, tablePath: String): Unit = {
// spark-shell
val tripsSnapshotDF = spark.read
.format("hudi")
.load(tablePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
tripsSnapshotDF.printSchema()
//通过spark-sql查询
spark.sql("select * from hudi_trips_snapshot where fare > 20.0").show()
//通过DF 查询费用大于20,小于50的乘车数据
// tripsSnapshotDF
// .filter($"fare" >= 20 && $"fare" <= 50)
// .select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
// .orderBy($"fare".desc, $"_hoodie_commit_time".desc)
// .show(20, truncate = false)
}
/**
* 按日期查询
*
* @param spark
* @param tablepath
*/
def queryDataByTime(spark: SparkSession, tablepath: String): Unit = {
import org.apache.spark.sql.functions._
// 方式一:指定字符串,格式 yyyyMMddHHmmss
// val df1 = spark.read
// .format("hudi")
// .option("as.of.instant", "20220902092804")
// .load(tablepath)
// .sort(col("_hoodie_commit_time").desc)
// df1.show(numRows = 5, truncate = false)
// 方式二:指定字符串,格式 yyyyMMddHHmmss
// val df2 = spark.read
// .format("hudi")
// .option("as.of.instant", "2022-09-02 09:28:04")
// .load(tablepath)
// .sort(col("_hoodie_commit_time").desc)
// df2.show(numRows = 5, truncate = false)
// 方式三
val df3 = spark.read
.format("hudi")
.option("as.of.instant", "2022-09-02")
.load(tablepath)
df3.show(numRows = 5, truncate = false)
}
/*
更新Hudi数据,运行程序时,更新数据Key是存在的
必须要求与插入数据使用同一个DataGenerator对象
*/
def updateData(spark: SparkSession, dataGen: DataGenerator, tableName: String, tablePath: String): Unit = {
import org.apache.hudi.QuickstartUtils._
import spark.implicits._
import scala.collection.JavaConverters._
val updates = convertToStringList(dataGen.generateUpdates(100)) //generateUpdates 区别
val updateDF = spark.read.json(spark.sparkContext.parallelize(updates.asScala, 2).toDS())
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
updateDF.write
.mode("append")
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.save(tablePath)
}
/**
* 增量查询
*
* @param spark
*/
def incrementQueryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
spark.read
.format("hudi")
.load(path)
.createOrReplaceTempView("view_temp_hudi_trips")
val commits: Array[String] = spark
.sql(
"""
|select
| distinct(_hoodie_commit_time) as commitTime
|from
| view_temp_hudi_trips
|order by
| commitTime DESC
|""".stripMargin
)
.map(row => row.getString(0))
.take(50)
val beginTime = commits(commits.length - 1) // commit time we are interested in
println(s"beginTime = ${beginTime}")
// TODO: b. 设置Hudi数据CommitTime时间阈值,进行增量查询数据
val tripsIncrementalDF = spark.read
.format("hudi")
// 设置查询数据模式为:incremental,增量读取
.option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
// 设置增量读取数据时开始时间
.option(BEGIN_INSTANTTIME.key(), beginTime)
.load(path)
// TODO: c. 将增量查询数据注册为临时视图,查询费用fare大于20的数据信息
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark
.sql(
"""
|select
| `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
|from
| hudi_trips_incremental
|where
| fare > 20.0
|""".stripMargin
)
.show(10, truncate = false)
// spark.read()
// .format("org.apache.hudi")
// .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
// .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
// .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY(), "/year=2020/month=*/day=*") // Optional, use glob pattern if querying certain partitions
// .load(tablePath); // For incremental query, pass in the root/base path of table
//
// hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
// spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
}
}
————————————————
版权声明:本文为CSDN博主「CarsonBigData」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Carson073/article/details/127048279
作者:Jeebiz 创建时间:2022-10-16 01:52
更新时间:2024-07-10 22:56
更新时间:2024-07-10 22:56