diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index d84d0339499b..622e57f8bab2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -41,7 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BasePredicate, BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.sql.internal.SQLConf @@ -248,12 +248,24 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // the whole table if (haveProperPartitionValues(partitionPaths.toSeq) && partitionSchema.nonEmpty) { val predicate = partitionPruningPredicates.reduce(expressions.And) - val boundPredicate = InterpretedPredicate(predicate.transform { + val transformedPredicate = predicate.transform { case a: AttributeReference => val index = partitionSchema.indexWhere(a.name == _.name) BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - + } + val boundPredicate: BasePredicate = try { + // Try using 1-arg constructor via reflection + val clazz = Class.forName("org.apache.spark.sql.catalyst.expressions.InterpretedPredicate") + val ctor = clazz.getConstructor(classOf[Expression]) + ctor.newInstance(transformedPredicate).asInstanceOf[BasePredicate] + } catch { + case _: NoSuchMethodException | _: IllegalArgumentException => + // Fallback: Try using 2-arg constructor for certain Spark runtime + val clazz = Class.forName("org.apache.spark.sql.catalyst.expressions.InterpretedPredicate") + val ctor = clazz.getConstructor(classOf[Expression], classOf[Boolean]) + ctor.newInstance(transformedPredicate, java.lang.Boolean.FALSE) + .asInstanceOf[BasePredicate] + } val prunedPartitionPaths = partitionPaths.filter { partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values)) }.toSeq