From ac41365ab2a1cb825c5888e6c8a5665c8f97df33 Mon Sep 17 00:00:00 2001 From: beliefer Date: Thu, 11 Sep 2025 15:46:46 +0800 Subject: [PATCH 1/2] [SPARK-53551][SQL] Improve OffsetAndLimit by avoiding duplicate evaluation --- .../plans/logical/basicLogicalOperators.scala | 4 ++-- .../spark/sql/execution/SparkStrategies.scala | 24 +++++++++---------- .../v2/V2ScanRelationPushDown.scala | 4 ++-- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 515424db63567..1c165f331346b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1652,14 +1652,14 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr } object OffsetAndLimit { - def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { + def unapply(p: GlobalLimit): Option[(Int, Int, Int, LogicalPlan)] = { p match { // Optimizer pushes local limit through offset, so we need to match the plan this way. case GlobalLimit(IntegerLiteral(globalLimit), Offset(IntegerLiteral(offset), LocalLimit(IntegerLiteral(localLimit), child))) if globalLimit + offset == localLimit => - Some((offset, globalLimit, child)) + Some((offset, globalLimit, localLimit, child)) case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f76bc911bef8f..7a400604edeba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -94,9 +94,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // plan, instead of planning limit and offset separately. case LimitAndOffset(limit, offset, child) => CollectLimitExec(limit = limit, child = planLater(child), offset = offset) - case OffsetAndLimit(offset, limit, child) => + case OffsetAndLimit(offset, _, localLimit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) + CollectLimitExec(limit = localLimit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => CollectLimitExec(limit = limit, child = planLater(child)) case logical.Offset(IntegerLiteral(offset), child) => @@ -121,14 +121,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Some(TakeOrderedAndProjectExec( limit, order, projectList, planLater(child), offset)) // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - case OffsetAndLimit(offset, limit, Sort(order, true, child, _)) - if offset + limit < conf.topKSortFallbackThreshold => - Some(TakeOrderedAndProjectExec( - offset + limit, order, child.output, planLater(child), offset)) - case OffsetAndLimit(offset, limit, Project(projectList, Sort(order, true, child, _))) - if offset + limit < conf.topKSortFallbackThreshold => - Some(TakeOrderedAndProjectExec( - offset + limit, order, projectList, planLater(child), offset)) + case OffsetAndLimit(offset, _, localLimit, Sort(order, true, child, _)) + if localLimit < conf.topKSortFallbackThreshold => + Some(TakeOrderedAndProjectExec(localLimit, order, child.output, planLater(child), offset)) + case OffsetAndLimit(offset, _, localLimit, Project(projectList, Sort(order, true, child, _))) + if localLimit < conf.topKSortFallbackThreshold => + Some(TakeOrderedAndProjectExec(localLimit, order, projectList, planLater(child), offset)) case Limit(IntegerLiteral(limit), Sort(order, true, child, _)) if limit < conf.topKSortFallbackThreshold => Some(TakeOrderedAndProjectExec( @@ -1037,10 +1035,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case LimitAndOffset(limit, offset, child) => GlobalLimitExec(limit, LocalLimitExec(limit, planLater(child)), offset) :: Nil - case OffsetAndLimit(offset, limit, child) => + case OffsetAndLimit(offset, _, localLimit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - GlobalLimitExec(offset + limit, - LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil + GlobalLimitExec(localLimit, + LocalLimitExec(localLimit, planLater(child)), offset) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 31a98e1ff96cb..1ddffdbe2d37b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -730,9 +730,9 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // Keep the OFFSET operator if we can't remove LIMIT operator. offset } - case globalLimit @ OffsetAndLimit(offset, limit, child) => + case globalLimit @ OffsetAndLimit(offset, limit, localLimit, child) => // For `df.offset(n).limit(m)`, we can push down `limit(m + n)` first. - val (newChild, canRemoveLimit) = pushDownLimit(child, limit + offset) + val (newChild, canRemoveLimit) = pushDownLimit(child, localLimit) if (canRemoveLimit) { // Try to push down OFFSET only if the LIMIT operator has been pushed and can be removed. val isPushed = pushDownOffset(newChild, offset) From c4da179a70bd1d0b04631ec6ead010e1039f76d2 Mon Sep 17 00:00:00 2001 From: beliefer Date: Fri, 12 Sep 2025 11:57:21 +0800 Subject: [PATCH 2/2] Add comments --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 3 +++ .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 1 + 2 files changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7a400604edeba..3158e34576d3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -96,6 +96,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { CollectLimitExec(limit = limit, child = planLater(child), offset = offset) case OffsetAndLimit(offset, _, localLimit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + // 'a + b' is equals to 'localLimit'. CollectLimitExec(limit = localLimit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => CollectLimitExec(limit = limit, child = planLater(child)) @@ -121,6 +122,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Some(TakeOrderedAndProjectExec( limit, order, projectList, planLater(child), offset)) // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + // 'a + b' is equals to 'localLimit'. case OffsetAndLimit(offset, _, localLimit, Sort(order, true, child, _)) if localLimit < conf.topKSortFallbackThreshold => Some(TakeOrderedAndProjectExec(localLimit, order, child.output, planLater(child), offset)) @@ -1037,6 +1039,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { LocalLimitExec(limit, planLater(child)), offset) :: Nil case OffsetAndLimit(offset, _, localLimit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + // 'a + b' is equals to 'localLimit'. GlobalLimitExec(localLimit, LocalLimitExec(localLimit, planLater(child)), offset) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 1ddffdbe2d37b..acb07b685cd95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -732,6 +732,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } case globalLimit @ OffsetAndLimit(offset, limit, localLimit, child) => // For `df.offset(n).limit(m)`, we can push down `limit(m + n)` first. + // 'm + n' is equals to 'localLimit'. val (newChild, canRemoveLimit) = pushDownLimit(child, localLimit) if (canRemoveLimit) { // Try to push down OFFSET only if the LIMIT operator has been pushed and can be removed.