Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BasePredicate, BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -248,12 +248,24 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// the whole table
if (haveProperPartitionValues(partitionPaths.toSeq) && partitionSchema.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate(predicate.transform {
val transformedPredicate = predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})

}
val boundPredicate: BasePredicate = try {
// Try using 1-arg constructor via reflection
val clazz = Class.forName("org.apache.spark.sql.catalyst.expressions.InterpretedPredicate")
val ctor = clazz.getConstructor(classOf[Expression])
ctor.newInstance(transformedPredicate).asInstanceOf[BasePredicate]
} catch {
case _: NoSuchMethodException | _: IllegalArgumentException =>
// Fallback: Try using 2-arg constructor
val clazz = Class.forName("org.apache.spark.sql.catalyst.expressions.InterpretedPredicate")
val ctor = clazz.getConstructor(classOf[Expression], classOf[Boolean])
ctor.newInstance(transformedPredicate, java.lang.Boolean.FALSE)
.asInstanceOf[BasePredicate]
Comment on lines +266 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add docs on what the second argument represents?

}
val prunedPartitionPaths = partitionPaths.filter {
partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}.toSeq
Expand Down
Loading