前言

开发环境:

spark:3.2.1
hive:2.1
hudi:0.11.1
scala:2.12

hudi建表语句

    CREATE EXTERNAL TABLE `tb1_trips_cow_w`(
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key`   string,
      `_hoodie_partition_path` string,
      `_hoodie_file_name` string,
      begin_lat double,
      begin_lon double, 
      driver string,
      end_lat double, 
      end_lon double, 
      fare double,
      rider string, 
      ts bigint,
      uuid string,
      partitionpath string
      )
    PARTITIONED BY (area string,county string ,city string)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION 'hdfs://10.254.21.4:51000/user/hdpu/warehouse/kyuubi_hudi.db/tb1_trips_cow_w';

ALTER TABLE hudi.tb1_trips_cow_w ADD IF NOT EXISTS PARTITION (area=’asia’,county=’india’ ,city=’chennai’ ) LOCATION ‘/hudidatas/hudi-warehouse/tb1_trips_cow_w/asia/india/chennai’;

Pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>

        <groupId>org.example</groupId>
        <artifactId>hudi-test-scala</artifactId>
        <version>1.0-SNAPSHOT</version>


        <properties>
            <scala.version>2.12.10</scala.version>
            <scala.binary.version>2.12</scala.binary.version>
            <hadoop.version>2.7.7</hadoop.version>
            <spark.version>3.2.1</spark.version>
            <hoodie.version>0.11.1</hoodie.version>
        </properties>
        <dependencies>

            <dependency>
                <groupId>org.xerial.snappy</groupId>
                <artifactId>snappy-java</artifactId>
                <version>1.1.8.4</version>
            </dependency>

            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>

            <!-- Spark Core 依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- Spark SQL 依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- Structured Streaming + Kafka  依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!-- Hadoop Client 依赖 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>

            <!-- hudi-spark3 -->
            <dependency>
                <groupId>org.apache.hudi</groupId>
                <artifactId>hudi-spark3-bundle_2.12</artifactId>
                <version>${hoodie.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-avro_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!-- Spark SQL 与 Hive 集成 依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>4.4.13</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.12</version>
            </dependency>

            <!-- 连接Hive 驱动包-->
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>3.1.3</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.49</version>
            </dependency>

        </dependencies>


        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>

                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>

                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    </project>

Scala代码

    package com.hudi.spark.test


    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession

    import scala.collection.JavaConversions._


    object HudiSparkTest {


      def main(args: Array[String]): Unit = {
        // 创建sparkSQL的运行环境
        System.setProperty("HADOOP_USER_NAME", "hdpu")
        val conf = new SparkConf().setMaster("local[*]").setAppName("insertDatasToHudi")
        val spark = SparkSession.builder().config(conf)
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .config("hive.metastore.uris", "thrift://10.254.21.3:53083")
          .config("dfs.client.use.datanode.hostname", "true")
          .getOrCreate()

        // 设置序列化方式:Kryo
        //定义变量:表名,数据存储路径
        val tableName: String = "tb1_trips_cow_w"
        val tablePath: String = "hdfs://10.254.21.4:51000/user/hdpu/warehouse/kyuubi_hudi.db/tb1_trips_cow_w"

        //数据生成器
        val generator = new DataGenerator()

        //插人数据
        insertData(spark, tableName, tablePath, "append", generator)

        //查询数据
    //    queryData(spark, tablePath)

        //根据时间查询
    //     queryDataByTime(spark,tablePath)

        //更新数据
    //    updateData(spark,generator,tableName,tablePath)

        //增量查询
    //    incrementQueryData(spark,tablePath)

        //关闭
        spark.stop()
      }

      /**
       * 插入数据
       *
       * @param spark
       * @param tableName
       * @param tablePath
       * @param savemode
       */
      def insertData(spark: SparkSession, tableName: String, tablePath: String, savemode: String, dataGen: DataGenerator): Unit = {
        //导入隐式转换
        import spark.implicits._
        // 第1步、模拟乘车数据
        //val generator: DataGenerator = new DataGenerator()
        val insertData = convertToStringList {
          dataGen.generateInserts(100)
        }
        val dataDF = spark.read.json(spark.sparkContext.parallelize(insertData, 2).toDS())
        //保存数据
        dataDF.write
          .format("hudi")
          .mode(savemode)
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          // Hudi 表的属性值设置
          .option(PRECOMBINE_FIELD.key(), "ts")
          .option(RECORDKEY_FIELD.key(), "uuid")
          .option(PARTITIONPATH_FIELD.key(), "partitionpath")
          .option(TBL_NAME.key(), tableName)
          .save(tablePath)
      }

      /**
       * 查询数据
       *
       * @param spark
       * @param tablePath
       */
      def queryData(spark: SparkSession, tablePath: String): Unit = {
        // spark-shell
        val tripsSnapshotDF = spark.read
          .format("hudi")
          .load(tablePath)

        tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
        tripsSnapshotDF.printSchema()

        //通过spark-sql查询
        spark.sql("select * from  hudi_trips_snapshot where fare > 20.0").show()


        //通过DF 查询费用大于20,小于50的乘车数据
        //    tripsSnapshotDF
        //      .filter($"fare" >= 20 && $"fare" <= 50)
        //      .select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
        //      .orderBy($"fare".desc, $"_hoodie_commit_time".desc)
        //      .show(20, truncate = false)
      }

      /**
       * 按日期查询
       *
       * @param spark
       * @param tablepath
       */
      def queryDataByTime(spark: SparkSession, tablepath: String): Unit = {
        import org.apache.spark.sql.functions._
        // 方式一:指定字符串,格式 yyyyMMddHHmmss
    //    val df1 = spark.read
    //      .format("hudi")
    //      .option("as.of.instant", "20220902092804")
    //      .load(tablepath)
    //      .sort(col("_hoodie_commit_time").desc)
    //    df1.show(numRows = 5, truncate = false)

        // 方式二:指定字符串,格式 yyyyMMddHHmmss
    //    val df2 = spark.read
    //      .format("hudi")
    //      .option("as.of.instant", "2022-09-02 09:28:04")
    //      .load(tablepath)
    //      .sort(col("_hoodie_commit_time").desc)
    //    df2.show(numRows = 5, truncate = false)

        // 方式三
        val df3 = spark.read
          .format("hudi")
          .option("as.of.instant", "2022-09-02")
          .load(tablepath)
        df3.show(numRows = 5, truncate = false)
      }

      /*
           更新Hudi数据,运行程序时,更新数据Key是存在的
           必须要求与插入数据使用同一个DataGenerator对象
       */
      def updateData(spark: SparkSession, dataGen: DataGenerator, tableName: String, tablePath: String): Unit = {
        import org.apache.hudi.QuickstartUtils._
        import spark.implicits._

        import scala.collection.JavaConverters._

        val updates = convertToStringList(dataGen.generateUpdates(100)) //generateUpdates 区别
        val updateDF = spark.read.json(spark.sparkContext.parallelize(updates.asScala, 2).toDS())

        import org.apache.hudi.DataSourceWriteOptions._
        import org.apache.hudi.config.HoodieWriteConfig._
        updateDF.write
          .mode("append")
          .format("hudi")
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .option(PRECOMBINE_FIELD.key(), "ts")
          .option(RECORDKEY_FIELD.key(), "uuid")
          .option(PARTITIONPATH_FIELD.key(), "partitionpath")
          .option(TBL_NAME.key(), tableName)
          .save(tablePath)
      }

      /**
       * 增量查询
       *
       * @param spark
       */
      def incrementQueryData(spark: SparkSession, path: String): Unit = {
        import spark.implicits._
        spark.read
          .format("hudi")
          .load(path)
          .createOrReplaceTempView("view_temp_hudi_trips")
        val commits: Array[String] = spark
          .sql(
            """
              |select
              |  distinct(_hoodie_commit_time) as commitTime
              |from
              |  view_temp_hudi_trips
              |order by
              |  commitTime DESC
              |""".stripMargin
          )
          .map(row => row.getString(0))
          .take(50)
        val beginTime = commits(commits.length - 1) // commit time we are interested in
        println(s"beginTime = ${beginTime}")

        // TODO: b. 设置Hudi数据CommitTime时间阈值,进行增量查询数据
        val tripsIncrementalDF = spark.read
          .format("hudi")
          // 设置查询数据模式为:incremental,增量读取
          .option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
          // 设置增量读取数据时开始时间
          .option(BEGIN_INSTANTTIME.key(), beginTime)
          .load(path)

        // TODO: c. 将增量查询数据注册为临时视图,查询费用fare大于20的数据信息
        tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
        spark
          .sql(
            """
              |select
              |  `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
              |from
              |  hudi_trips_incremental
              |where
              |  fare > 20.0
              |""".stripMargin
          )
          .show(10, truncate = false)


        //      spark.read()
        //      .format("org.apache.hudi")
        //      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
        //      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
        //        .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY(), "/year=2020/month=*/day=*") // Optional, use glob pattern if querying certain partitions
        //        .load(tablePath); // For incremental query, pass in the root/base path of table
        //
        //        hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
        //        spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

      }


    }

————————————————
版权声明:本文为CSDN博主「CarsonBigData」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Carson073/article/details/127048279

作者:Jeebiz  创建时间:2022-10-16 01:52
 更新时间:2024-07-10 22:56