Skip to content

Conversation

jonathanc-n
Copy link
Contributor

@jonathanc-n jonathanc-n commented Sep 9, 2025

Which issue does this PR close?

Rationale for this change

Adds regular joins (left, right, full, inner) for PWMJ as they behave differently in the code path.

What changes are included in this PR?

Adds classic join + physical planner

Are these changes tested?

Yes SLT tests + unit tests

Follow up work to this pull request

  • Handling partitioned queries and multiple record batches (fuzz testing will be handled with this)
  • Simplify physical planning
  • Add more unit tests for different types (another pr as the LOC in this pr is getting a little daunting)

next would be to implement the existence joins

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Sep 9, 2025
@jonathanc-n jonathanc-n marked this pull request as draft September 9, 2025 04:03
@jonathanc-n
Copy link
Contributor Author

@2010YOUY01 Would you like to take a look at if this is how you wanted to split up the work? I just wanted to put this out today then i'll clean it up better this week. Only failing one external test currently.

let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
if join_filter.is_none() && matches!(join_type, JoinType::Inner) {
// cross join if there is no join conditions and no join filter set
Arc::new(CrossJoinExec::new(physical_left, physical_right))
} else if num_range_filters == 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to refactor this in another pull request, just a refactor but it should be quite simple to do. Just wanted to get this version in first.

statement ok
set datafusion.execution.batch_size = 8192;

# TODO: partitioned PWMJ execution
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently doesn't allow partitioned execution, this would make reviewing the tests a little messy as many of the partitioned single range queries would switch to PWMJ. Another follow up, will be tracked in #17427

@jonathanc-n jonathanc-n marked this pull request as ready for review September 9, 2025 17:59
@jonathanc-n
Copy link
Contributor Author

cc @2010YOUY01 @comphead this pr is now ready!

@jonathanc-n jonathanc-n changed the title POC: ClassicJoin for PWMJ feat: ClassicJoin for PWMJ Sep 9, 2025
@2010YOUY01
Copy link
Contributor

This is great! I have some suggestions for the planning part, and I'll review the execution part tomorrow.

Refactor the in-equality extracting logic

I suggest to move the inequality-extracting logic from physical_planner.rs into https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/extract_equijoin_predicate.rs

The reason is we'd better put similar code into a single place, instead of let it scatter to multiple places. ExtractEquijoinPredicate logical optimizer rule is extracting equality join predicates like t1.v1 = t2.v1, here we want to extract t1.v1 < t2.v1, their logic should be very similar.

To do this I think we need to extend the logical plan join node with extra ie predicate field (maybe we can define a new struct for IE predicate with (Expr, Op, Expr), and we can also use that in other places)

/// Join two logical plans on one or more join columns
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Join {
    ...
    /// Equijoin clause expressed as pairs of (left, right) join expressions
    pub on: Vec<(Expr, Expr)>,                                                                 
    /// In-equility clause expressed as pairs of (left, right) join expressions           <-- HERE
    pub ie_predicates: Vec<(Expr, IEOp, Expr)>,
    /// Filters applied during join (non-equi conditions)
    pub filter: Option<Expr>,
    ...
}

To make it compatible for systems only use the LogicalPlan API, but not the physical plans, we can also provide a utility to move the IE predicates back to the filter:

Before: 
ie_predicates: [t1.v1 < t2.v1, t1.v2 < t2.v2]
filter: (t1.v3 + t2.v3) = 100

After:
ie_predicates: []
filter: ((t1.v3 + t2.v3) = 100) AND (t1.v1 < t2.v1) AND (t1.v2 < t2.v2)

Perhaps we can open a PR only for this IE predicates extracting task, and during the initial planning we can simply move the IE predicates back to the filter with the above mentioned utility.

Make it configurable to turn on/off PWMJ

I'll try to finish #17467 soon to make it easier, so let's put this on hold for now.

@comphead
Copy link
Contributor

Thanks @jonathanc-n and @2010YOUY01

#17467 definitely would be nice to have as PWMJ can start as optional experimental join, which would be separately documented, showing benefits and limitations for the end user. Actually the same happened for SMJ being experimental feature for quite some time.

Another great point to identify bottlenecks in performance is to absorb some knowledge from #17488 and keep the join more stable.

As optional feature it is pretty safe to go, again referring to SMJ there was a separate ticket which post launch checks to make sure it is safe to use like #9846

Let me know your thoughts?

@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Sep 11, 2025

Yes I think the experimental flag should be added first and we can do the equality extraction logic as a follow up. WDYT @2010YOUY01 Do you think you want to get #17467 before this one?

@2010YOUY01
Copy link
Contributor

Yes I think the experimental flag should be added first and we can do the equality extraction logic as a follow up. WDYT @2010YOUY01 Do you think you want to get #17467 before this one?

Yes, so let's do other work first. If I can't get #17467 done when this PR is ready, let's add enable_piecewise_merge_join option here -- I think we can agree on this configuration.

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have gone over the exec.rs, and will continue with the stream implementation part soon.

ExecutionPlan, PlanProperties,
};
use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the best module comments I have seen.


/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter.
///
/// The physical planner will choose to evaluate this join when there is only one range predicate. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it comparison filter/predicate instead? I think range filter usually refers to BETWEEN expr

};
use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};

/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also claim here: it evaluates filter with comparison more efficiently than NLJ

/// Examples:
/// - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
///
/// Since the join only support range predicates, equijoins are not supported in `PiecewiseMergeJoinExec`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think additional ANDed predicates should be done by additional filter handled inside PWMJ, as a follow-up task, instead of this piping another join method. Should we remove this paragraph?

///
/// ## Classic Joins (Inner, Full, Left, Right)
/// For classic joins we buffer the right side (buffered), and incrementally process the left side (streamed).
/// Every streamed batch is sorted so we can perform a sort merge algorithm. For the buffered side we want to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to break this sort order rationale to two paragraphs:

  1. How is join order determined (like left asc, right desc). I think a better way to phrase the words is: it finds a order so that the operator iterates both left and right side from index 0 to end, when it finds the first match, it outputs the buffer side's current index to end index joining the current stream side index.
  2. How is join order enforced?
    • For stream side it should be read a maybe unordered batch, then sort it inside this operator
    • I don't know how is it done for buffer side, it's worth more explanation here. I vaguely remember it's declaring it in the requires_input_order in the plan property, and some optimizer rule will insert a SortExec automatically?

left_sort_exprs: LexOrdering,
/// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
/// Unsorted for mark joins
right_sort_exprs: LexOrdering,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps right_batch_required_order

/// Unsorted for mark joins
right_sort_exprs: LexOrdering,
/// Sort options of join columns used in sorting the stream and buffered execution plans
sort_options: SortOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it associated with right batch order? Let's make this variable name more descriptive


// If the join type is either RightSemi, RightAnti, or RightMark we will swap the inputs
// and sort ordering because we want the mark side to be the buffered side.
let (buffered, streamed, on_buffered, on_streamed, operator) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this logic outside to the planning and optimizing? Since DF is commonly doing join input reordering during planning and optimization, I think it can cause confusion (and become error-prone) to do an extra swapping inside the operator.


fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
// Existence joins don't need to be sorted on one side.
if is_right_existence_join(self.join_type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unimplemented!() for this branch, it would be better to let them show up in the diff in the follow-up PR

} else {
// Sort the right side in memory, so we do not need to enforce any sorting
vec![
Some(OrderingRequirements::from(self.left_sort_exprs.clone())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question here for future clean-up: now we're storing the required input ordering property inside the executor, is it possible to move them into PlanProperties struct?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants