Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1652,14 +1652,14 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
}

object OffsetAndLimit {
def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = {
def unapply(p: GlobalLimit): Option[(Int, Int, Int, LogicalPlan)] = {
p match {
// Optimizer pushes local limit through offset, so we need to match the plan this way.
case GlobalLimit(IntegerLiteral(globalLimit),
Offset(IntegerLiteral(offset),
LocalLimit(IntegerLiteral(localLimit), child)))
if globalLimit + offset == localLimit =>
Some((offset, globalLimit, child))
Some((offset, globalLimit, localLimit, child))
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// plan, instead of planning limit and offset separately.
case LimitAndOffset(limit, offset, child) =>
CollectLimitExec(limit = limit, child = planLater(child), offset = offset)
case OffsetAndLimit(offset, limit, child) =>
case OffsetAndLimit(offset, _, localLimit, child) =>
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset)
// 'a + b' is equals to 'localLimit'.
CollectLimitExec(limit = localLimit, child = planLater(child), offset = offset)
case Limit(IntegerLiteral(limit), child) =>
CollectLimitExec(limit = limit, child = planLater(child))
case logical.Offset(IntegerLiteral(offset), child) =>
Expand All @@ -121,14 +122,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Some(TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child), offset))
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
case OffsetAndLimit(offset, limit, Sort(order, true, child, _))
if offset + limit < conf.topKSortFallbackThreshold =>
Some(TakeOrderedAndProjectExec(
offset + limit, order, child.output, planLater(child), offset))
case OffsetAndLimit(offset, limit, Project(projectList, Sort(order, true, child, _)))
if offset + limit < conf.topKSortFallbackThreshold =>
Some(TakeOrderedAndProjectExec(
offset + limit, order, projectList, planLater(child), offset))
// 'a + b' is equals to 'localLimit'.
case OffsetAndLimit(offset, _, localLimit, Sort(order, true, child, _))
if localLimit < conf.topKSortFallbackThreshold =>
Some(TakeOrderedAndProjectExec(localLimit, order, child.output, planLater(child), offset))
case OffsetAndLimit(offset, _, localLimit, Project(projectList, Sort(order, true, child, _)))
if localLimit < conf.topKSortFallbackThreshold =>
Some(TakeOrderedAndProjectExec(localLimit, order, projectList, planLater(child), offset))
case Limit(IntegerLiteral(limit), Sort(order, true, child, _))
if limit < conf.topKSortFallbackThreshold =>
Some(TakeOrderedAndProjectExec(
Expand Down Expand Up @@ -1037,10 +1037,11 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case LimitAndOffset(limit, offset, child) =>
GlobalLimitExec(limit,
LocalLimitExec(limit, planLater(child)), offset) :: Nil
case OffsetAndLimit(offset, limit, child) =>
case OffsetAndLimit(offset, _, localLimit, child) =>
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
GlobalLimitExec(offset + limit,
LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil
// 'a + b' is equals to 'localLimit'.
GlobalLimitExec(localLimit,
LocalLimitExec(localLimit, planLater(child)), offset) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,10 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// Keep the OFFSET operator if we can't remove LIMIT operator.
offset
}
case globalLimit @ OffsetAndLimit(offset, limit, child) =>
case globalLimit @ OffsetAndLimit(offset, limit, localLimit, child) =>
// For `df.offset(n).limit(m)`, we can push down `limit(m + n)` first.
val (newChild, canRemoveLimit) = pushDownLimit(child, limit + offset)
// 'm + n' is equals to 'localLimit'.
val (newChild, canRemoveLimit) = pushDownLimit(child, localLimit)
if (canRemoveLimit) {
// Try to push down OFFSET only if the LIMIT operator has been pushed and can be removed.
val isPushed = pushDownOffset(newChild, offset)
Expand Down