diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 515424db63567..1c165f331346b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1652,14 +1652,14 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr } object OffsetAndLimit { - def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { + def unapply(p: GlobalLimit): Option[(Int, Int, Int, LogicalPlan)] = { p match { // Optimizer pushes local limit through offset, so we need to match the plan this way. case GlobalLimit(IntegerLiteral(globalLimit), Offset(IntegerLiteral(offset), LocalLimit(IntegerLiteral(localLimit), child))) if globalLimit + offset == localLimit => - Some((offset, globalLimit, child)) + Some((offset, globalLimit, localLimit, child)) case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f76bc911bef8f..3158e34576d3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -94,9 +94,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // plan, instead of planning limit and offset separately. case LimitAndOffset(limit, offset, child) => CollectLimitExec(limit = limit, child = planLater(child), offset = offset) - case OffsetAndLimit(offset, limit, child) => + case OffsetAndLimit(offset, _, localLimit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) + // 'a + b' is equals to 'localLimit'. + CollectLimitExec(limit = localLimit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => CollectLimitExec(limit = limit, child = planLater(child)) case logical.Offset(IntegerLiteral(offset), child) => @@ -121,14 +122,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Some(TakeOrderedAndProjectExec( limit, order, projectList, planLater(child), offset)) // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - case OffsetAndLimit(offset, limit, Sort(order, true, child, _)) - if offset + limit < conf.topKSortFallbackThreshold => - Some(TakeOrderedAndProjectExec( - offset + limit, order, child.output, planLater(child), offset)) - case OffsetAndLimit(offset, limit, Project(projectList, Sort(order, true, child, _))) - if offset + limit < conf.topKSortFallbackThreshold => - Some(TakeOrderedAndProjectExec( - offset + limit, order, projectList, planLater(child), offset)) + // 'a + b' is equals to 'localLimit'. + case OffsetAndLimit(offset, _, localLimit, Sort(order, true, child, _)) + if localLimit < conf.topKSortFallbackThreshold => + Some(TakeOrderedAndProjectExec(localLimit, order, child.output, planLater(child), offset)) + case OffsetAndLimit(offset, _, localLimit, Project(projectList, Sort(order, true, child, _))) + if localLimit < conf.topKSortFallbackThreshold => + Some(TakeOrderedAndProjectExec(localLimit, order, projectList, planLater(child), offset)) case Limit(IntegerLiteral(limit), Sort(order, true, child, _)) if limit < conf.topKSortFallbackThreshold => Some(TakeOrderedAndProjectExec( @@ -1037,10 +1037,11 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case LimitAndOffset(limit, offset, child) => GlobalLimitExec(limit, LocalLimitExec(limit, planLater(child)), offset) :: Nil - case OffsetAndLimit(offset, limit, child) => + case OffsetAndLimit(offset, _, localLimit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - GlobalLimitExec(offset + limit, - LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil + // 'a + b' is equals to 'localLimit'. + GlobalLimitExec(localLimit, + LocalLimitExec(localLimit, planLater(child)), offset) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 31a98e1ff96cb..acb07b685cd95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -730,9 +730,10 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // Keep the OFFSET operator if we can't remove LIMIT operator. offset } - case globalLimit @ OffsetAndLimit(offset, limit, child) => + case globalLimit @ OffsetAndLimit(offset, limit, localLimit, child) => // For `df.offset(n).limit(m)`, we can push down `limit(m + n)` first. - val (newChild, canRemoveLimit) = pushDownLimit(child, limit + offset) + // 'm + n' is equals to 'localLimit'. + val (newChild, canRemoveLimit) = pushDownLimit(child, localLimit) if (canRemoveLimit) { // Try to push down OFFSET only if the LIMIT operator has been pushed and can be removed. val isPushed = pushDownOffset(newChild, offset)