Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ trait SparkAdapter extends Serializable {
def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
schema: Schema,
globPaths: Array[StoragePath],
parameters: java.util.Map[String, String]): BaseRelation

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val userSchema: Option[StructType],
private val globPaths: Seq[StoragePath],
private val prunedDataSchema: Option[StructType] = None)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema)
with SparkAdapterSupport {
Expand Down Expand Up @@ -110,7 +109,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
val fileSlices = listLatestFileSlices(partitionFilters, dataFilters)
val fileSplits = fileSlices.flatMap { fileSlice =>
// TODO fix, currently assuming parquet as underlying format
val pathInfo: StoragePathInfo = fileSlice.getBaseFile.get.getPathInfo
Expand Down Expand Up @@ -139,7 +138,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
def toHadoopFsRelation: HadoopFsRelation = {
val enableFileIndex = HoodieSparkConfUtils.getConfigValue(optParams, sparkSession.sessionState.conf,
ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
if (enableFileIndex && globPaths.isEmpty) {
if (enableFileIndex) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object DataSourceReadOptions {
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = HoodieReaderConfig.REALTIME_PAYLOAD_COMBINE
val REALTIME_MERGE: ConfigProperty[String] = HoodieReaderConfig.MERGE_TYPE

@Deprecated
val READ_PATHS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
Expand All @@ -88,6 +89,7 @@ object DataSourceReadOptions {
@Deprecated
val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_NAME

@Deprecated
val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.file.index.enable")
.defaultValue(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.util.{PathUtils, SparkConfigUtils}
import org.apache.hudi.util.{SparkConfigUtils}

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
Expand Down Expand Up @@ -107,27 +107,15 @@ class DefaultSource extends RelationProvider
val storage = HoodieStorageUtils.getStorage(
allPaths.head, HadoopFSUtils.getStorageConf(sqlContext.sparkContext.hadoopConfiguration))

val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
PathUtils.checkAndGlobPathIfNecessary(allPaths, storage)
} else {
Seq.empty
if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
throw new HoodieException("Glob paths are not supported for read paths as of Hudi 1.2.0")
}

// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams)
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams)

// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
DataSourceUtils.getTablePath(storage, globPaths.asJava)
} else {
DataSourceUtils.getTablePath(storage, Seq(new StoragePath(path.get)).asJava)
}
val tablePath = DataSourceUtils.getTablePath(storage, Seq(new StoragePath(path.get)).asJava)
log.info("Obtained hudi table path: " + tablePath)

val metaClient = HoodieTableMetaClient.builder().setMetaserverConfig(parameters.toMap.asJava)
Expand All @@ -141,7 +129,7 @@ class DefaultSource extends RelationProvider
parameters
}

DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, options.toMap)
DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap)
}

/**
Expand Down Expand Up @@ -279,7 +267,6 @@ object DefaultSource {
def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
schema: StructType,
globPaths: Seq[StoragePath],
parameters: Map[String, String]): BaseRelation = {
val tableType = metaClient.getTableType
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
Expand Down Expand Up @@ -309,7 +296,7 @@ object DefaultSource {

lazy val enableFileGroupReader = SparkConfigUtils
.getStringWithAltKeys(parameters, HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean &&
!metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
!metaClient.isMetadataTable
lazy val tableVersion = if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) {
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
} else {
Expand Down Expand Up @@ -340,7 +327,7 @@ object DefaultSource {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
resolveBaseFileOnlyRelation(sqlContext, userSchema, metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
(hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
Expand All @@ -357,15 +344,15 @@ object DefaultSource {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, userSchema)
}

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
if (enableFileGroupReader) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
} else {
HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
HoodieBootstrapMORRelation(sqlContext, userSchema, metaClient, parameters)
}

case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
Expand All @@ -383,7 +370,7 @@ object DefaultSource {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
} else {
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
resolveHoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters)
}

case (_, _, _) =>
Expand All @@ -395,7 +382,6 @@ object DefaultSource {
}

private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
globPaths: Seq[StoragePath],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
parameters: Map[String, String]): BaseRelation = {
Expand All @@ -404,20 +390,18 @@ object DefaultSource {
val isSchemaEvolutionEnabledOnRead = HoodieSparkConfUtils.getConfigValue(parameters,
sqlContext.sparkSession.sessionState.conf, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
if (!enableFileIndex || isSchemaEvolutionEnabledOnRead
|| globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false"))
if (!enableFileIndex || isSchemaEvolutionEnabledOnRead || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) {
HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false"))
} else {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters).toHadoopFsRelation
HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters).toHadoopFsRelation
}
}

private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[StoragePath],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
optParams: Map[String, String]): BaseRelation = {
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema)

// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig, HoodieMetadataConfig}
import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
Expand All @@ -33,7 +33,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineLayout}
import org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
Expand All @@ -55,7 +55,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SerializableWritable
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession, SQLContext}
Expand Down Expand Up @@ -412,18 +411,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]

protected def listLatestFileSlices(globPaths: Seq[StoragePath], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
protected def listLatestFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
queryTimestamp match {
case Some(ts) =>
specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient, t))

val partitionDirs = if (globPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}

val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters)
val fsView = new HoodieTableFileSystemView(
metaClient, timeline, sparkAdapter.getSparkPartitionedFileUtils.toFileStatuses(partitionDirs)
.map(fileStatus => HadoopFSUtils.convertToStoragePathInfo(fileStatus))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ case class HoodieBootstrapMORSplit(dataFile: PartitionedFile, skeletonFile: Opti
*/
case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
private val userSchema: Option[StructType],
private val globPaths: Seq[StoragePath],
override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val prunedDataSchema: Option[StructType] = None)
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient,
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient,
optParams, prunedDataSchema) {

override type Relation = HoodieBootstrapMORRelation
Expand All @@ -69,12 +68,8 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging

protected override def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
if (globPaths.isEmpty) {
fileIndex.listFileSlices(HoodieFileIndex.
convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)).values.flatten.toSeq
} else {
listLatestFileSlices(globPaths, partitionFilters, dataFilters)
}
fileIndex.listFileSlices(HoodieFileIndex.
convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)).values.flatten.toSeq
}

protected override def createFileSplit(fileSlice: FileSlice, dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = {
Expand Down
Loading
Loading