From 72e1840799366cdd38cd30ebd5e3643be8a2771f Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 8 Oct 2025 12:52:49 -0400 Subject: [PATCH 1/5] remove glob paths and deprecate read paths support --- .../org/apache/hudi/util/PathUtils.scala | 86 --------- .../apache/hudi/BaseFileOnlyRelation.scala | 5 +- .../org/apache/hudi/DataSourceOptions.scala | 1 + .../scala/org/apache/hudi/DefaultSource.scala | 46 ++--- .../org/apache/hudi/HoodieBaseRelation.scala | 15 +- .../hudi/HoodieBootstrapMORRelation.scala | 11 +- .../apache/hudi/HoodieBootstrapRelation.scala | 6 +- .../MergeOnReadIncrementalRelationV1.scala | 6 +- .../MergeOnReadIncrementalRelationV2.scala | 6 +- .../hudi/MergeOnReadSnapshotRelation.scala | 14 +- .../datasources/HoodieInMemoryFileIndex.scala | 178 ------------------ .../org/apache/hudi/util/TestPathUtils.scala | 82 -------- 12 files changed, 36 insertions(+), 420 deletions(-) delete mode 100644 hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/PathUtils.scala delete mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestPathUtils.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/PathUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/PathUtils.scala deleted file mode 100644 index 4165c24415343..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/PathUtils.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.util - -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.storage.{HoodieStorage, StoragePath} - -import scala.collection.JavaConverters._ - -/** - * TODO convert to Java, move to hudi-common - */ -object PathUtils { - - /** - * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. - * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. - */ - def isGlobPath(pattern: StoragePath): Boolean = { - pattern.toString.exists("{}[]*?\\".toSet.contains) - } - - /** - * This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like - * skipping meta paths. - */ - def globPath(storage: HoodieStorage, pattern: StoragePath): Seq[StoragePath] = { - // find base path to assist in skipping meta paths - var basePath = pattern.getParent - while (basePath.getName.equals("*")) { - basePath = basePath.getParent - } - - Option(storage.globEntries(pattern)).map { pathInfoList => { - val nonMetaStatuses = pathInfoList.asScala.filterNot(entry => { - // skip all entries in meta path - var leafPath = entry.getPath - // walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped - while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) { - leafPath = leafPath.getParent - } - leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME) - }) - nonMetaStatuses.map(e => e.getPath.makeQualified(storage.getUri)) - }.toSeq - }.getOrElse(Seq.empty[StoragePath]) - } - - /** - * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. - * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. - */ - def globPathIfNecessary(storage: HoodieStorage, pattern: StoragePath): Seq[StoragePath] = { - if (isGlobPath(pattern)) globPath(storage, pattern) else Seq(pattern) - } - - /** - * Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths - * which match the glob pattern. Otherwise, returns original path - * - * @param paths List of absolute or globbed paths - * @param fs {@link HoodieStorage} instance - * @return list of absolute file paths - */ - def checkAndGlobPathIfNecessary(paths: Seq[String], storage: HoodieStorage): Seq[StoragePath] = { - paths.flatMap(path => { - val qualified = new StoragePath(path).makeQualified(storage.getUri); - globPathIfNecessary(storage, qualified) - }) - } -} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index f28da81417c4c..301f9c904fedc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -51,7 +51,6 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext, override val metaClient: HoodieTableMetaClient, override val optParams: Map[String, String], private val userSchema: Option[StructType], - private val globPaths: Seq[StoragePath], private val prunedDataSchema: Option[StructType] = None) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) with SparkAdapterSupport { @@ -110,7 +109,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext, } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { - val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) + val fileSlices = listLatestFileSlices(partitionFilters, dataFilters) val fileSplits = fileSlices.flatMap { fileSlice => // TODO fix, currently assuming parquet as underlying format val pathInfo: StoragePathInfo = fileSlice.getBaseFile.get.getPathInfo @@ -139,7 +138,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext, def toHadoopFsRelation: HadoopFsRelation = { val enableFileIndex = HoodieSparkConfUtils.getConfigValue(optParams, sparkSession.sessionState.conf, ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean - if (enableFileIndex && globPaths.isEmpty) { + if (enableFileIndex) { // NOTE: There are currently 2 ways partition values could be fetched: // - Source columns (producing the values used for physical partitioning) will be read // from the data file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 310a170e1ad18..03131bb094f27 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -79,6 +79,7 @@ object DataSourceReadOptions { val REALTIME_PAYLOAD_COMBINE_OPT_VAL = HoodieReaderConfig.REALTIME_PAYLOAD_COMBINE val REALTIME_MERGE: ConfigProperty[String] = HoodieReaderConfig.MERGE_TYPE + @Deprecated val READ_PATHS: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.paths") .noDefaultValue() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index fb78db78c51f0..67f8c321b7475 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -35,7 +35,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.io.storage.HoodieSparkIOFactory import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath} import org.apache.hudi.storage.hadoop.HoodieHadoopStorage -import org.apache.hudi.util.{PathUtils, SparkConfigUtils} +import org.apache.hudi.util.{SparkConfigUtils} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} @@ -107,27 +107,15 @@ class DefaultSource extends RelationProvider val storage = HoodieStorageUtils.getStorage( allPaths.head, HadoopFSUtils.getStorageConf(sqlContext.sparkContext.hadoopConfiguration)) - val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) { - PathUtils.checkAndGlobPathIfNecessary(allPaths, storage) - } else { - Seq.empty + if (path.exists(_.contains("*")) || readPaths.nonEmpty) { + throw new HoodieException("Glob paths are not supported for read paths as of Hudi 1.2.0") } // Add default options for unspecified read options keys. - val parameters = (if (globPaths.nonEmpty) { - Map( - "glob.paths" -> globPaths.mkString(",") - ) - } else { - Map() - }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams) + val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams) // Get the table base path - val tablePath = if (globPaths.nonEmpty) { - DataSourceUtils.getTablePath(storage, globPaths.asJava) - } else { - DataSourceUtils.getTablePath(storage, Seq(new StoragePath(path.get)).asJava) - } + val tablePath = DataSourceUtils.getTablePath(storage, Seq(new StoragePath(path.get)).asJava) log.info("Obtained hudi table path: " + tablePath) val metaClient = HoodieTableMetaClient.builder().setMetaserverConfig(parameters.toMap.asJava) @@ -141,7 +129,7 @@ class DefaultSource extends RelationProvider parameters } - DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, options.toMap) + DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap) } /** @@ -279,7 +267,6 @@ object DefaultSource { def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, schema: StructType, - globPaths: Seq[StoragePath], parameters: Map[String, String]): BaseRelation = { val tableType = metaClient.getTableType val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent @@ -309,7 +296,7 @@ object DefaultSource { lazy val enableFileGroupReader = SparkConfigUtils .getStringWithAltKeys(parameters, HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean && - !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty) + !metaClient.isMetadataTable lazy val tableVersion = if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) { Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key)) } else { @@ -340,7 +327,7 @@ object DefaultSource { new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory( sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() } else { - resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters) + resolveBaseFileOnlyRelation(sqlContext, userSchema, metaClient, parameters) } case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => (hoodieTableSupportsCompletionTime, enableFileGroupReader) match { @@ -357,7 +344,7 @@ object DefaultSource { new HoodieMergeOnReadSnapshotHadoopFsRelationFactory( sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() } else { - new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) + new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, userSchema) } case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) => @@ -365,7 +352,7 @@ object DefaultSource { new HoodieMergeOnReadSnapshotHadoopFsRelationFactory( sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build() } else { - HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) + HoodieBootstrapMORRelation(sqlContext, userSchema, metaClient, parameters) } case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => @@ -383,7 +370,7 @@ object DefaultSource { new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory( sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build() } else { - resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters) + resolveHoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters) } case (_, _, _) => @@ -395,7 +382,6 @@ object DefaultSource { } private def resolveHoodieBootstrapRelation(sqlContext: SQLContext, - globPaths: Seq[StoragePath], userSchema: Option[StructType], metaClient: HoodieTableMetaClient, parameters: Map[String, String]): BaseRelation = { @@ -404,20 +390,18 @@ object DefaultSource { val isSchemaEvolutionEnabledOnRead = HoodieSparkConfUtils.getConfigValue(parameters, sqlContext.sparkSession.sessionState.conf, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - if (!enableFileIndex || isSchemaEvolutionEnabledOnRead - || globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) { - HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false")) + if (!enableFileIndex || isSchemaEvolutionEnabledOnRead || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) { + HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false")) } else { - HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters).toHadoopFsRelation + HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters).toHadoopFsRelation } } private def resolveBaseFileOnlyRelation(sqlContext: SQLContext, - globPaths: Seq[StoragePath], userSchema: Option[StructType], metaClient: HoodieTableMetaClient, optParams: Map[String, String]): BaseRelation = { - val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths) + val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema) // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of // [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 2ee6c6c92fb5b..1b12d87731adb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -23,7 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField import org.apache.hudi.client.utils.SparkInternalSchemaConverter -import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig, HoodieMetadataConfig} +import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord} @@ -33,7 +33,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineLayout} import org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.common.util.{ConfigUtils, StringUtils} +import org.apache.hudi.common.util.ConfigUtils import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY @@ -55,7 +55,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.JobConf import org.apache.spark.SerializableWritable -import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession, SQLContext} @@ -412,18 +411,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] - protected def listLatestFileSlices(globPaths: Seq[StoragePath], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { + protected def listLatestFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { queryTimestamp match { case Some(ts) => specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient, t)) - val partitionDirs = if (globPaths.isEmpty) { - fileIndex.listFiles(partitionFilters, dataFilters) - } else { - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) - inMemoryFileIndex.listFiles(partitionFilters, dataFilters) - } - + val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters) val fsView = new HoodieTableFileSystemView( metaClient, timeline, sparkAdapter.getSparkPartitionedFileUtils.toFileStatuses(partitionDirs) .map(fileStatus => HadoopFSUtils.convertToStoragePathInfo(fileStatus)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala index 3b5e51da33aa4..60ae8b7224780 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala @@ -54,11 +54,10 @@ case class HoodieBootstrapMORSplit(dataFile: PartitionedFile, skeletonFile: Opti */ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext, private val userSchema: Option[StructType], - private val globPaths: Seq[StoragePath], override val metaClient: HoodieTableMetaClient, override val optParams: Map[String, String], private val prunedDataSchema: Option[StructType] = None) - extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, + extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient, optParams, prunedDataSchema) { override type Relation = HoodieBootstrapMORRelation @@ -69,12 +68,8 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext, override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging protected override def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { - if (globPaths.isEmpty) { - fileIndex.listFileSlices(HoodieFileIndex. - convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)).values.flatten.toSeq - } else { - listLatestFileSlices(globPaths, partitionFilters, dataFilters) - } + fileIndex.listFileSlices(HoodieFileIndex. + convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)).values.flatten.toSeq } protected override def createFileSplit(fileSlice: FileSlice, dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index e027a671e52a4..99d43508d7f21 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -44,11 +44,10 @@ case class HoodieBootstrapSplit(dataFile: PartitionedFile, case class HoodieBootstrapRelation(override val sqlContext: SQLContext, private val userSchema: Option[StructType], - private val globPaths: Seq[StoragePath], override val metaClient: HoodieTableMetaClient, override val optParams: Map[String, String], private val prunedDataSchema: Option[StructType] = None) - extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, optParams, prunedDataSchema) { + extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient, optParams, prunedDataSchema) { override type Relation = HoodieBootstrapRelation @@ -88,7 +87,6 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, */ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext, private val userSchema: Option[StructType], - private val globPaths: Seq[StoragePath], override val metaClient: HoodieTableMetaClient, override val optParams: Map[String, String], private val prunedDataSchema: Option[StructType] = None) @@ -101,7 +99,7 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext, override lazy val mandatoryFields: Seq[String] = Seq.empty protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { - listLatestFileSlices(globPaths, partitionFilters, dataFilters) + listLatestFileSlices(partitionFilters, dataFilters) } protected def createFileSplit(fileSlice: FileSlice, dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala index 45e91863b49ce..10eb1cfd272e3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala @@ -50,7 +50,7 @@ case class MergeOnReadIncrementalRelationV1(override val sqlContext: SQLContext, override val metaClient: HoodieTableMetaClient, private val userSchema: Option[StructType], private val prunedDataSchema: Option[StructType] = None) - extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, Seq(), userSchema, prunedDataSchema) + extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, userSchema, prunedDataSchema) with HoodieIncrementalRelationV1Trait with MergeOnReadIncrementalRelation { override type Relation = MergeOnReadIncrementalRelationV1 @@ -105,7 +105,7 @@ case class MergeOnReadIncrementalRelationV1(override val sqlContext: SQLContext, List() } else { val fileSlices = if (fullTableScan) { - listLatestFileSlices(Seq(), partitionFilters, dataFilters) + listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime @@ -128,7 +128,7 @@ case class MergeOnReadIncrementalRelationV1(override val sqlContext: SQLContext, List() } else { val fileSlices = if (fullTableScan) { - listLatestFileSlices(Seq(), partitionFilters, dataFilters) + listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala index f104f2cfc0f3b..fb30b247093f8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala @@ -53,7 +53,7 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, private val userSchema: Option[StructType], private val prunedDataSchema: Option[StructType] = None, override val rangeType: RangeType = RangeType.OPEN_CLOSED) - extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, Seq(), userSchema, prunedDataSchema) + extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, userSchema, prunedDataSchema) with HoodieIncrementalRelationV2Trait with MergeOnReadIncrementalRelation { override type Relation = MergeOnReadIncrementalRelationV2 @@ -101,7 +101,7 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, List() } else { val fileSlices = if (fullTableScan) { - listLatestFileSlices(Seq(), partitionFilters, dataFilters) + listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime @@ -124,7 +124,7 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, List() } else { val fileSlices = if (fullTableScan) { - listLatestFileSlices(Seq(), partitionFilters, dataFilters) + listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 8bd50881e8fe1..64e1262ba6e19 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -42,10 +42,9 @@ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], case class MergeOnReadSnapshotRelation(override val sqlContext: SQLContext, override val optParams: Map[String, String], override val metaClient: HoodieTableMetaClient, - private val globPaths: Seq[StoragePath], private val userSchema: Option[StructType], private val prunedDataSchema: Option[StructType] = None) - extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, globPaths, userSchema, prunedDataSchema) { + extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, userSchema, prunedDataSchema) { override type Relation = MergeOnReadSnapshotRelation @@ -68,7 +67,6 @@ case class MergeOnReadSnapshotRelation(override val sqlContext: SQLContext, abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, optParams: Map[String, String], metaClient: HoodieTableMetaClient, - globPaths: Seq[StoragePath], userSchema: Option[StructType], prunedDataSchema: Option[StructType]) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) { @@ -133,14 +131,8 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { val convertedPartitionFilters = HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) - - if (globPaths.isEmpty) { - val fileSlices = fileIndex.filterFileSlices(dataFilters, convertedPartitionFilters).flatMap(s => s._2) - buildSplits(fileSlices) - } else { - val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) - buildSplits(fileSlices) - } + val fileSlices = fileIndex.filterFileSlices(dataFilters, convertedPartitionFilters).flatMap(s => s._2) + buildSplits(fileSlices) } protected def buildSplits(fileSlices: Seq[FileSlice]): List[HoodieMergeOnReadFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala deleted file mode 100644 index 722cd74408f5e..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.execution.datasources - -import org.apache.hudi.SparkAdapterSupport -import org.apache.hudi.storage.StoragePath - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.HoodieHadoopFSUtils -import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression} -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.types.StructType - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -class HoodieInMemoryFileIndex(sparkSession: SparkSession, - rootPathsSpecified: Seq[Path], - parameters: Map[String, String], - userSpecifiedSchema: Option[StructType], - fileStatusCache: FileStatusCache = NoopCache) - extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) - with SparkAdapterSupport { - - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is non-partitioned, - * this will return a single partition with no partition values - * - * NOTE: This method replicates the one it overrides, however it uses custom method - * that accepts files starting with "." - */ - override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory( - InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil - } else { - prunePartitions(partitionFilters, partitionSpec()).map { - case PartitionPath(values, path) => - val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { - case Some(existingDir) => - // Directory has children files in it, return them - existingDir.filter(f => isDataPath(f.getPath)) - - case None => - // Directory does not exist, or has no children files - Nil - } - sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(values, files) - } - } - logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) - selectedPartitions - } - - private def isDataPath(path: Path): Boolean = { - val name = path.getName - !(name.startsWith("_") && !name.contains("=")) - } - - private def prunePartitions( - predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionPath] = { - val PartitionSpec(partitionColumns, partitions) = partitionSpec - val partitionColumnNames = partitionColumns.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = sparkAdapter.createInterpretedPredicate(predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) - - val selected = partitions.filter { - case PartitionPath(values, _) => boundPredicate.eval(values) - } - logInfo { - val total = partitions.length - val selectedSize = selected.length - val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 - s"Selected $selectedSize partitions out of $total, " + - s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions." - } - - selected - } else { - partitions - } - } - - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery threshold. - * - * This is publicly visible for testing. - * - * NOTE: This method replicates the one it overrides, however it uses custom method to run parallel - * listing that accepts files starting with "." - */ - override def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val startTime = System.nanoTime() - val output = mutable.LinkedHashSet[FileStatus]() - val pathsToFetch = mutable.ArrayBuffer[Path]() - for (path <- paths) { - fileStatusCache.getLeafFiles(path) match { - case Some(files) => - HiveCatalogMetrics.incrementFileCacheHits(files.length) - output ++= files - case None => - pathsToFetch += path - } - () // for some reasons scalac 2.12 needs this; return type doesn't matter - } - val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) - val discovered = bulkListLeafFiles(sparkSession, pathsToFetch, filter, hadoopConf) - - discovered.foreach { case (path, leafFiles) => - HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) - fileStatusCache.putLeafFiles(path, leafFiles.toArray) - output ++= leafFiles - } - - logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf files" + - s" for ${paths.length} paths.") - - output - } - - protected def bulkListLeafFiles(sparkSession: SparkSession, paths: ArrayBuffer[Path], filter: PathFilter, hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = { - HoodieHadoopFSUtils.parallelListLeafFiles( - sc = sparkSession.sparkContext, - paths = paths.toSeq, - hadoopConf = hadoopConf, - filter = new PathFilterWrapper(filter), - ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, - // NOTE: We're disabling fetching Block Info to speed up file listing - ignoreLocality = true, - parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, - parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism) - } -} - -object HoodieInMemoryFileIndex { - def create(sparkSession: SparkSession, globbedPaths: Seq[StoragePath]): HoodieInMemoryFileIndex = { - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - new HoodieInMemoryFileIndex(sparkSession, globbedPaths.map(e => new Path(e.toUri)), Map(), Option.empty, fileStatusCache) - } -} - -private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable { - override def accept(path: Path): Boolean = { - (filter == null || filter.accept(path)) && !HoodieHadoopFSUtils.shouldFilterOutPathName(path.getName) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestPathUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestPathUtils.scala deleted file mode 100644 index 39119d988fa7c..0000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestPathUtils.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.util - -import org.apache.hudi.common.testutils.HoodieTestUtils -import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath} - -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.io.TempDir - -import java.io.File -import java.nio.file.Paths - -class TestPathUtils { - - @Test - def testGlobPaths(@TempDir tempDir: File): Unit = { - val folders: Seq[StoragePath] = Seq( - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder2").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, ".hoodie").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata").toUri) - ) - - val files: Seq[StoragePath] = Seq( - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata", "file5").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata", "file6").toUri) - ) - - folders.foreach(folder => new File(folder.toUri).mkdir()) - files.foreach(file => new File(file.toUri).createNewFile()) - - val storage = HoodieStorageUtils.getStorage(tempDir.getAbsolutePath, HoodieTestUtils.getDefaultStorageConf) - var paths = Seq(tempDir.getAbsolutePath + "/*") - var globbedPaths = PathUtils.checkAndGlobPathIfNecessary(paths, storage) - assertEquals(folders.filterNot(entry => entry.toString.contains(".hoodie")) - .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) - - paths = Seq(tempDir.getAbsolutePath + "/*/*") - globbedPaths = PathUtils.checkAndGlobPathIfNecessary(paths, storage) - assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie")) - .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) - - paths = Seq(tempDir.getAbsolutePath + "/folder1/*") - globbedPaths = PathUtils.checkAndGlobPathIfNecessary(paths, storage) - assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString), - globbedPaths.sortWith(_.toString < _.toString)) - - paths = Seq(tempDir.getAbsolutePath + "/folder2/*") - globbedPaths = PathUtils.checkAndGlobPathIfNecessary(paths, storage) - assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString), - globbedPaths.sortWith(_.toString < _.toString)) - - paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*") - globbedPaths = PathUtils.checkAndGlobPathIfNecessary(paths, storage) - assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie")) - .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) - } - - -} From f0ff74df675f8be6a18a9621e33bd90a9af72a2a Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 8 Oct 2025 13:00:47 -0400 Subject: [PATCH 2/5] remove in memory file index test --- .../TestHoodieInMemoryFileIndex.scala | 61 ------------------- 1 file changed, 61 deletions(-) delete mode 100644 hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala deleted file mode 100644 index 8bca57fa0856d..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.execution.datasources - -import org.apache.hudi.storage.StoragePath -import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.io.TempDir - -import java.io.File -import java.nio.file.Paths - -class TestHoodieInMemoryFileIndex { - - @Test - def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = { - val spark = SparkSession.builder - .config(getSparkConfForTest("Hoodie Datasource test")) - .getOrCreate - - val folders: Seq[StoragePath] = Seq( - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), - new StoragePath(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) - ) - - val files: Seq[Path] = Seq( - new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) - ) - - folders.foreach(folder => new File(folder.toUri).mkdir()) - files.foreach(file => new File(file.toUri).createNewFile()) - - val index = HoodieInMemoryFileIndex.create(spark, Seq(folders(0), folders(1))) - val indexedFilePaths = index.allFiles().map(fs => fs.getPath) - assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) - spark.stop() - } - -} From 46d9e7266ed1fc2331a1d0c68db8679fd4110246 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 8 Oct 2025 13:14:06 -0400 Subject: [PATCH 3/5] try fixing build --- .../main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala | 1 - .../scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala | 3 +-- .../scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index aa03be2872e2e..9cdebd88b85e1 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -170,7 +170,6 @@ trait SparkAdapter extends Serializable { def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, schema: Schema, - globPaths: Array[StoragePath], parameters: java.util.Map[String, String]): BaseRelation /** diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index a284cb6108510..66f894394d9a2 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -104,10 +104,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, schema: Schema, - globPaths: Array[StoragePath], parameters: java.util.Map[String, String]): BaseRelation = { val dataSchema = Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull - DefaultSource.createRelation(sqlContext, metaClient, dataSchema, globPaths, parameters.asScala.toMap) + DefaultSource.createRelation(sqlContext, metaClient, dataSchema, parameters.asScala.toMap) } override def convertStorageLevelToString(level: StorageLevel): String diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala index e22607d8a416c..2bfa141a35db9 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala @@ -106,10 +106,9 @@ abstract class BaseSpark4Adapter extends SparkAdapter with Logging { override def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, schema: Schema, - globPaths: Array[StoragePath], parameters: java.util.Map[String, String]): BaseRelation = { val dataSchema = Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull - DefaultSource.createRelation(sqlContext, metaClient, dataSchema, globPaths, parameters.asScala.toMap) + DefaultSource.createRelation(sqlContext, metaClient, dataSchema, parameters.asScala.toMap) } override def convertStorageLevelToString(level: StorageLevel): String From dd865faef08e386da2bbe480a943ec32f7730102 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 8 Oct 2025 14:43:33 -0400 Subject: [PATCH 4/5] fix some tests --- .../org/apache/hudi/DataSourceOptions.scala | 1 + .../org/apache/hudi/TestHoodieFileIndex.scala | 1 - .../hudi/functional/CommonOptionUtils.scala | 9 +- .../hudi/functional/TestCOWDataSource.scala | 148 ++++----------- .../hudi/functional/TestMORDataSource.scala | 173 ++---------------- 5 files changed, 55 insertions(+), 277 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 03131bb094f27..e72d5a993fcab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -89,6 +89,7 @@ object DataSourceReadOptions { @Deprecated val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_NAME + @Deprecated val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.file.index.enable") .defaultValue(true) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 750c94325d38a..fa79272acb165 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -79,7 +79,6 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS ) var queryOpts = Map( - DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> "true", DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala index 8510a2028ca21..7b7259beb97f6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala @@ -42,15 +42,12 @@ object CommonOptionUtils { val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> classOf[DefaultSparkRecordMerger].getName) def getWriterReaderOpts(recordType: HoodieRecordType, - opt: Map[String, String] = commonOpts, - enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + opt: Map[String, String] = commonOpts): (Map[String, String], Map[String, String]) = { - val fileIndexOpt: Map[String, String] = - Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> enableFileIndex.toString) recordType match { - case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++ fileIndexOpt) - case _ => (opt, fileIndexOpt) + case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts) + case _ => (opt, Map.empty) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 8738030189395..6df048c6878b5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -615,10 +615,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup * For COW table, test the snapshot query mode and incremental query mode. */ @ParameterizedTest - @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK")) - def testPrunePartitionForTimestampBasedKeyGenerator(enableFileIndex: Boolean, - recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) + @CsvSource(Array("AVRO", "SPARK")) + def testPrunePartitionForTimestampBasedKeyGenerator(recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType) val options = CommonOptionUtils.commonOpts ++ Map( "hoodie.compact.inline" -> "false", @@ -650,8 +649,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val commit2CompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) // snapshot query - val pathForReader = getPathForReader(basePath, !enableFileIndex, 3) - val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(pathForReader) + val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(basePath) assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) @@ -938,55 +936,6 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals("replacecommit", commits(1)) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testReadPathsOnCopyOnWriteTable(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) - - val records1 = dataGen.generateInsertsContainsAllPartitions("001", 20) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala.toSeq, 2)) - inputDF1.write.format("org.apache.hudi") - .options(writeOpts) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - val metaClient = createMetaClient(spark, basePath) - - val instantTime = metaClient.getActiveTimeline.filterCompletedInstants().getInstantsAsStream.findFirst().get().requestedTime - - val record1FilePaths = storage.listDirectEntries(new StoragePath(basePath, dataGen.getPartitionPaths.head)) - .asScala - .filter(!_.getPath.getName.contains("hoodie_partition_metadata")) - .filter(_.getPath.getName.endsWith("parquet")) - .map(_.getPath.toString) - .mkString(",") - - val records2 = dataGen.generateInsertsContainsAllPartitions("002", 20) - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala.toSeq, 2)) - inputDF2.write.format("org.apache.hudi") - .options(writeOpts) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - - val inputDF3 = spark.read.options(readOpts).json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala.toSeq, 2)) - inputDF3.write.format("org.apache.hudi") - .options(writeOpts) - // Use bulk insert here to make sure the files have different file groups. - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - - val hudiReadPathDF = spark.read.format("org.apache.hudi") - .options(readOpts) - .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), instantTime) - .option(DataSourceReadOptions.READ_PATHS.key, record1FilePaths) - .load() - - val expectedCount = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head) - assertEquals(expectedCount, hudiReadPathDF.count()) - } - @Test def testOverWriteTableModeUseReplaceAction(): Unit = { val (writeOpts, readOpts) = getWriterReaderOpts() @@ -1223,6 +1172,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } + // TODO: Should this be deleted? @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testSparkPartitionByWithCustomKeyGeneratorWithGlobbing(recordType: HoodieRecordType): Unit = { @@ -1309,7 +1259,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup @ParameterizedTest @CsvSource(value = Array("6", "8")) def testPartitionPruningForTimestampBasedKeyGenerator(tableVersion: Int): Unit = { - var (writeOpts, readOpts) = getWriterReaderOptsLessPartitionPath(HoodieRecordType.AVRO, enableFileIndex = true) + var (writeOpts, readOpts) = getWriterReaderOptsLessPartitionPath(HoodieRecordType.AVRO) writeOpts = writeOpts + (HoodieTableConfig.VERSION.key() -> tableVersion.toString, HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString) val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, writeOpts) @@ -1450,7 +1400,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup partitionEncode: Boolean, isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) + val (writeOpts, readOpts) = getWriterReaderOpts(recordType) val N = 20 // Test query with partition prune if URL_ENCODE_PARTITIONING has enable @@ -1466,12 +1416,12 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val commitCompletionTime1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15") - val pathForReader = getPathForReader(basePath, !enableFileIndex, if (partitionEncode) 1 else 3) + // query the partition by filter val count1 = spark.read.format("hudi") .options(readOpts) .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(pathForReader) + .load(basePath) .filter("partition = '2016/03/15'") .count() assertEquals(countIn20160315, count1) @@ -1591,14 +1541,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } @ParameterizedTest - @CsvSource(Array( - "true, true, AVRO", "true, false, AVRO", "true, true, SPARK", "true, false, SPARK", - "false, true, AVRO", "false, false, AVRO", "false, true, SPARK", "false, false, SPARK" - )) - def testPartitionColumnsProperHandling(enableFileIndex: Boolean, - useGlobbing: Boolean, - recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) + @CsvSource(Array("AVRO", "SPARK")) + def testPartitionColumnsProperHandling(recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType) val _spark = spark import _spark.implicits._ @@ -1622,13 +1567,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup .mode(org.apache.spark.sql.SaveMode.Append) .save(basePath) - // NOTE: We're testing here that both paths are appropriately handling - // partition values, regardless of whether we're reading the table - // t/h a globbed path or not - val pathForReader = getPathForReader(basePath, useGlobbing || !enableFileIndex, 3) - // Case #1: Partition columns are read from the data file - val firstDF = spark.read.format("hudi").options(readOpts).load(pathForReader) + val firstDF = spark.read.format("hudi").options(readOpts).load(basePath) assert(firstDF.count() == 2) @@ -1643,27 +1583,22 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup ) // Case #2: Partition columns are extracted from the partition path - // - // NOTE: This case is only relevant when globbing is NOT used, since when globbing is used Spark - // won't be able to infer partitioning properly - if (!useGlobbing && enableFileIndex) { - val secondDF = spark.read.format("hudi") - .options(readOpts) - .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true") - .load(pathForReader) - - assert(secondDF.count() == 2) - - // data_date is the partition field. Persist to the parquet file using the origin values, and read it. - assertEquals( - Seq("2018/09/23", "2018/09/24"), - secondDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq - ) - assertEquals( - Seq("2018/09/23", "2018/09/24"), - secondDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq - ) - } + val secondDF = spark.read.format("hudi") + .options(readOpts) + .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true") + .load(basePath) + + assert(secondDF.count() == 2) + + // data_date is the partition field. Persist to the parquet file using the origin values, and read it. + assertEquals( + Seq("2018/09/23", "2018/09/24"), + secondDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq + ) + assertEquals( + Seq("2018/09/23", "2018/09/24"), + secondDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq + ) } @Test @@ -1826,36 +1761,21 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO, - opt: Map[String, String] = CommonOptionUtils.commonOpts, - enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + opt: Map[String, String] = CommonOptionUtils.commonOpts): (Map[String, String], Map[String, String]) = { - val fileIndexOpt: Map[String, String] = - Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> enableFileIndex.toString) - recordType match { - case HoodieRecordType.SPARK => (opt ++ CommonOptionUtils.sparkOpts, CommonOptionUtils.sparkOpts ++ fileIndexOpt) - case _ => (opt, fileIndexOpt) + case HoodieRecordType.SPARK => (opt ++ CommonOptionUtils.sparkOpts, CommonOptionUtils.sparkOpts) + case _ => (opt, Map.empty) } } def getWriterReaderOptsLessPartitionPath(recordType: HoodieRecordType, - opt: Map[String, String] = CommonOptionUtils.commonOpts, - enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + opt: Map[String, String] = CommonOptionUtils.commonOpts): (Map[String, String], Map[String, String]) = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, opt, enableFileIndex) + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, opt) (writeOpts.-(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), readOpts) } - def getPathForReader(basePath: String, useGlobbing: Boolean, partitionPathLevel: Int): String = { - if (useGlobbing) { - // When explicitly using globbing or not using HoodieFileIndex, we fall back to the old way - // of reading Hudi table with globbed path - basePath + "/*" * (partitionPathLevel + 1) - } else { - basePath - } - } - @Test def testHiveStyleDelete(): Unit = { val columns = Seq("id", "precombine", "partition") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 39179e966222b..e68d0cd2ef125 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,7 +17,7 @@ package org.apache.hudi.functional -import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers, ScalaAssertionSupport, SparkDatasetMixin} +import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers, ScalaAssertionSupport, SparkDatasetMixin} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.client.SparkRDDWriteClient @@ -441,7 +441,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin val hudiSnapshotDF6 = spark.read.format("org.apache.hudi") .options(readOpts) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .load(basePath + "/2020/01/10/*") + .load(basePath).where("partition = '2020/01/10'") assertEquals(102, hudiSnapshotDF6.count()) // validate incremental queries only for table version 8 // 1.0 reader (table version 8) supports incremental query reads using completion time @@ -990,123 +990,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin assertEquals(partitionCounts("2021/03/03"), count7) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testReadPathsForMergeOnReadTable(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) - - // Paths only baseFiles - val records1 = dataGen.generateInserts("001", 100) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala.toSeq, 2)) - inputDF1.write.format("org.apache.hudi") - .options(writeOpts) - .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000")) - val baseFilePath = storage.listDirectEntries(new StoragePath(basePath, dataGen.getPartitionPaths.head)) - .asScala - .filter(_.getPath.getName.endsWith("parquet")) - .map(_.getPath.toString) - .mkString(",") - val records2 = dataGen.generateUniqueDeleteRecords("002", 100) - val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala.toSeq, 2)) - inputDF2.write.format("org.apache.hudi") - .options(writeOpts) - .mode(SaveMode.Append) - .save(basePath) - val hudiReadPathDF1 = spark.read.options(readOpts).format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .option(DataSourceReadOptions.READ_PATHS.key, baseFilePath) - .load() - - val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head) - assertEquals(expectedCount1, hudiReadPathDF1.count()) - - // Paths Contains both baseFile and log files - val logFilePath = storage.listDirectEntries(new StoragePath(basePath, dataGen.getPartitionPaths.head)) - .asScala - .filter(_.getPath.getName.contains("log")) - .map(_.getPath.toString) - .mkString(",") - - val readPaths = baseFilePath + "," + logFilePath - val hudiReadPathDF2 = spark.read.format("org.apache.hudi") - .options(readOpts) - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .option(DataSourceReadOptions.READ_PATHS.key, readPaths) - .load() - - assertEquals(0, hudiReadPathDF2.count()) - } - - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testReadPathsForOnlyLogFiles(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) - // enable column stats - val hudiOpts = writeOpts ++ Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key() -> "fare,city_to_state,rider") - - initMetaClient(HoodieTableType.MERGE_ON_READ) - val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala.toSeq, 2)) - inputDF1.write.format("hudi") - .options(hudiOpts) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - // Use InMemoryIndex to generate log only mor table. - .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString) - .mode(SaveMode.Overwrite) - .save(basePath) - // There should no base file in the file list. - assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) - - val logFilePath = storage.listDirectEntries(new StoragePath(basePath, dataGen.getPartitionPaths.head)) - .asScala - .filter(_.getPath.getName.contains("log")) - .map(_.getPath.toString) - .mkString(",") - - val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20) - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala.toSeq, 2)) - inputDF2.write.format("hudi") - .options(hudiOpts) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - // Use InMemoryIndex to generate log only mor table. - .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString) - .mode(SaveMode.Append) - .save(basePath) - // There should no base file in the file list. - assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) - - val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head) - - val hudiReadPathDF = spark.read.options(readOpts).format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .option(DataSourceReadOptions.READ_PATHS.key, logFilePath) - .load() - - assertEquals(expectedCount1, hudiReadPathDF.count()) - - if (recordType == HoodieRecordType.SPARK) { - metaClient = HoodieTableMetaClient.reload(metaClient) - val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build() - val columnStatsIndex = new ColumnStatsIndexSupport(spark, inputDF1.schema, metadataConfig, metaClient) - columnStatsIndex.loadTransposed(Seq("fare","city_to_state", "rider"), shouldReadInMemory = true) { emptyTransposedColStatsDF => - assertTrue(!emptyTransposedColStatsDF.columns.contains("fare")) - assertTrue(!emptyTransposedColStatsDF.columns.contains("city_to_state")) - // rider is a simple string field, so it should have a min/max value as well as nullCount - assertTrue(emptyTransposedColStatsDF.filter("rider_minValue IS NOT NULL").count() > 0) - assertTrue(emptyTransposedColStatsDF.filter("rider_maxValue IS NOT NULL").count() > 0) - assertTrue(emptyTransposedColStatsDF.filter("rider_nullCount IS NOT NULL").count() > 0) - } - } - } - @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testReadLogOnlyMergeOnReadTable(recordType: HoodieRecordType): Unit = { @@ -1125,8 +1008,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .save(basePath) // There should no base file in the file list. assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) - // Test read logs only mor table with glob paths. - assertEquals(20, spark.read.format("hudi").load(basePath).count()) // Test read log only mor table. assertEquals(20, spark.read.format("hudi").options(readOpts).load(basePath).count()) } @@ -1267,10 +1148,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin * For MOR table, test all the three query modes. */ @ParameterizedTest - @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK")) - def testPrunePartitionForTimestampBasedKeyGenerator(enableFileIndex: Boolean, - recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) + @CsvSource(Array("AVRO", "SPARK")) + def testPrunePartitionForTimestampBasedKeyGenerator(recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType) val options = commonOpts ++ Map( "hoodie.compact.inline" -> "false", @@ -1311,7 +1191,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin val commit3Time = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime val commit3CompletionTime = metaClient.getActiveTimeline.lastInstant().get().getCompletionTime - val pathForROQuery = getPathForROQuery(basePath, !enableFileIndex, 3) // snapshot query val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(basePath) assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit1Time'").count, 50) @@ -1325,7 +1204,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin val readOptimizedQueryRes = spark.read.format("hudi") .options(readOpts) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(pathForROQuery) + .load(basePath) assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) @@ -1360,9 +1239,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin * * The read-optimized query should read `fg1_dc1.parquet` only in this case. */ - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(enableFileIndex: Boolean): Unit = { + @Test + def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = { val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table") val orderingFields = "col3" val recordKeyField = "key" @@ -1378,9 +1256,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin HoodieWriteConfig.TBL_NAME.key -> tableName, "hoodie.insert.shuffle.parallelism" -> "1", "hoodie.upsert.shuffle.parallelism" -> "1") - val pathForROQuery = getPathForROQuery(tablePath, !enableFileIndex, 0) - val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, options, enableFileIndex) + val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, options) // First batch with all inserts // Deltacommit1 (DC1, completed), writing file group 1 (fg1) @@ -1448,7 +1325,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .option( DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(pathForROQuery) + .load(tablePath) // The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only assertEquals(10, roDf.count()) @@ -1458,9 +1335,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin } @ParameterizedTest - @CsvSource(value = Array("true,6", "true,8", "false,6", "false,8")) -// @ValueSource(booleans = Array(true, false)) - def testSnapshotQueryAfterInflightDeltaCommit(enableFileIndex: Boolean, tableVersion: Int): Unit = { + @ValueSource(ints = Array(6, 8)) + def testSnapshotQueryAfterInflightDeltaCommit(tableVersion: Int): Unit = { val (tableName, tablePath) = ("hoodie_mor_snapshot_read_test_table", s"${basePath}_mor_test_table") val orderingFields = "col3" val recordKeyField = "key" @@ -1476,9 +1352,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin HoodieWriteConfig.TBL_NAME.key -> tableName, "hoodie.insert.shuffle.parallelism" -> "1", "hoodie.upsert.shuffle.parallelism" -> "1") - val pathForQuery = getPathForROQuery(tablePath, !enableFileIndex, 0) - var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, options, enableFileIndex) + var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, options) writeOpts = writeOpts ++ Map( HoodieTableConfig.VERSION.key() -> tableVersion.toString, HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString) @@ -1501,7 +1376,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin // Snapshot query on MOR val snapshotDf = spark.read.format("org.apache.hudi") .options(readOpts) - .load(pathForQuery) + .load(tablePath) // Delete last completed instant metaClient = createMetaClient(spark, tablePath) @@ -1577,25 +1452,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin } def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO, - opt: Map[String, String] = commonOpts, - enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + opt: Map[String, String] = commonOpts): (Map[String, String], Map[String, String]) = { - val fileIndexOpt: Map[String, String] = - Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> enableFileIndex.toString) - recordType match { - case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++ fileIndexOpt) - case _ => (opt, fileIndexOpt) - } - } - - def getPathForROQuery(basePath: String, useGlobbing: Boolean, partitionPathLevel: Int): String = { - if (useGlobbing) { - // When explicitly using globbing or not using HoodieFileIndex, we fall back to the old way - // of reading Hudi table with globbed path - basePath + "/*" * (partitionPathLevel + 1) - } else { - basePath + case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts) + case _ => (opt, Map.empty) } } From 693e5a187c43afe94fe93a37c92ead929a3fd99a Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 8 Oct 2025 14:50:10 -0400 Subject: [PATCH 5/5] fix TestDataSourceForBootstrap.scala --- .../TestDataSourceForBootstrap.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 7d5ca083107ef..7b73453986992 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -135,7 +135,7 @@ class TestDataSourceForBootstrap { assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) // Read bootstrapped table and verify count using glob path - val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF1 = spark.read.format("hudi").load(basePath) assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList()) // Read bootstrapped table and verify count using Hudi file index @@ -160,7 +160,7 @@ class TestDataSourceForBootstrap { assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) // Read table after upsert and verify count using glob path - val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF3 = spark.read.format("hudi").load(basePath) assertEquals(numRecords, hoodieROViewDF3.count()) assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) // Read with base path using Hudi file index @@ -218,7 +218,7 @@ class TestDataSourceForBootstrap { } // Read bootstrapped table and verify count - val hoodieROViewDF1 = spark.read.options(readOpts).format("hudi").load(basePath + "/*") + val hoodieROViewDF1 = spark.read.options(readOpts).format("hudi").load(basePath) assertEquals(expectedDF.collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList()) // Read bootstrapped table and verify count using Hudi file index @@ -247,7 +247,7 @@ class TestDataSourceForBootstrap { assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) // Read table after upsert and verify count using glob path - val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF3 = spark.read.format("hudi").load(basePath) assertEquals(numRecords, hoodieROViewDF3.count()) assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) // Read table after upsert and verify count using Hudi file index @@ -284,7 +284,7 @@ class TestDataSourceForBootstrap { val commitCompletionTime1 = DataSourceTestUtils.latestCommitCompletionTime(fs, basePath) // Read bootstrapped table and verify count using glob path - val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF1 = spark.read.format("hudi").load(basePath) assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList()) // Read with base path using Hudi file index @@ -303,7 +303,7 @@ class TestDataSourceForBootstrap { // Read table after upsert and verify the updated value assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) - val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF2 = spark.read.format("hudi").load(basePath) hoodieROViewDF2.collect() assertEquals(updatedVerificationVal, hoodieROViewDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) @@ -324,7 +324,7 @@ class TestDataSourceForBootstrap { assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) // Read table after upsert and verify count using glob paths - val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF3 = spark.read.format("hudi").load(basePath) assertEquals(numRecords, hoodieROViewDF3.count()) assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) // Read table after upsert and verify count using Hudi file index @@ -386,7 +386,7 @@ class TestDataSourceForBootstrap { val hoodieROViewDF2 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(basePath + "/*") + .load(basePath) assertEquals(numRecords, hoodieROViewDF2.count()) // Test query without "*" for MOR READ_OPTIMIZED @@ -454,7 +454,7 @@ class TestDataSourceForBootstrap { val hoodieROViewDF2 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(basePath + "/*") + .load(basePath) assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) // Test query without "*" for MOR READ_OPTIMIZED @@ -494,7 +494,7 @@ class TestDataSourceForBootstrap { val hoodieROViewDF1 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(basePath + "/*") + .load(basePath) assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList()) @@ -588,7 +588,7 @@ class TestDataSourceForBootstrap { // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi") - .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath + "/*") + .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath) assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList()) val hoodieROViewDFWithBasePath = spark.read.format("hudi") @@ -613,7 +613,7 @@ class TestDataSourceForBootstrap { // Read table after upsert and verify count val hoodieROViewDF2 = spark.read.format("hudi") - .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath + "/*") + .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath) assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())