diff --git a/benchmark/micro/join/lateraljoin_unnest.benchmark b/benchmark/micro/join/lateraljoin_unnest.benchmark new file mode 100644 index 000000000000..cdcc9556914e --- /dev/null +++ b/benchmark/micro/join/lateraljoin_unnest.benchmark @@ -0,0 +1,17 @@ +# name: benchmark/micro/join/lateraljoin_unnest.benchmark +# description: Perform a LATERAL join using UNNEST +# group: [join] + +name LATERAL join + UNNEST +group join +storage persistent + +load +CREATE TABLE lists AS SELECT ARRAY[i, NULL, i + 1, i + 2, NULL] l FROM generate_series(0, 999999, 1) t(i); +checkpoint; + +run +SELECT SUM(k) FROM lists, UNNEST(l) t(k) + +result I +1500001500000 diff --git a/scripts/coverage_check.sh b/scripts/coverage_check.sh index 0ca7afa85ad8..dc0d042b4b8d 100755 --- a/scripts/coverage_check.sh +++ b/scripts/coverage_check.sh @@ -16,6 +16,7 @@ build/coverage/test/unittest "[coverage]" build/coverage/test/unittest "[intraquery]" build/coverage/test/unittest "[interquery]" build/coverage/test/unittest "[detailed_profiler]" +build/coverage/test/unittest "[filter]" build/coverage/test/unittest test/sql/tpch/tpch_sf01.test_slow python3 -m pytest --shell-binary build/coverage/duckdb tools/shell/tests/ diff --git a/src/execution/operator/projection/physical_tableinout_function.cpp b/src/execution/operator/projection/physical_tableinout_function.cpp index d32094f7e2e0..1b9b8c9cc59a 100644 --- a/src/execution/operator/projection/physical_tableinout_function.cpp +++ b/src/execution/operator/projection/physical_tableinout_function.cpp @@ -11,6 +11,31 @@ class TableInOutLocalState : public OperatorState { idx_t row_index; bool new_row; DataChunk input_chunk; + unique_ptr intermediate_chunk; + +public: + DataChunk &GetIntermediateChunk(DataChunk &output, idx_t base_column_count) { + if (!intermediate_chunk) { + intermediate_chunk = make_uniq(); + // Create an empty DataChunk that has room for the input + the mapping vector + auto chunk_types = output.GetTypes(); + vector intermediate_types; + intermediate_types.reserve(base_column_count + 1); + intermediate_types.insert(intermediate_types.end(), chunk_types.begin(), + chunk_types.begin() + base_column_count); + intermediate_types.emplace_back(LogicalType::UINTEGER); + // We initialize this as empty + intermediate_chunk->InitializeEmpty(intermediate_types); + // And only allocate for our mapping vector + intermediate_chunk->data[base_column_count].Initialize(); + } + // Initialize our output chunk + for (idx_t i = 0; i < base_column_count; i++) { + intermediate_chunk->data[i].Reference(output.data[i]); + } + intermediate_chunk->SetCardinality(output.size()); + return *intermediate_chunk; + } }; class TableInOutGlobalState : public GlobalOperatorState { @@ -38,7 +63,11 @@ unique_ptr PhysicalTableInOutFunction::GetOperatorState(Execution result->local_state = function.init_local(context, input, gstate.global_state.get()); } if (!projected_input.empty()) { - result->input_chunk.Initialize(context.client, children[0]->types); + if (!function.in_out_mapping) { + // If we have to project columns, and the function doesn't provide a mapping from output row -> input row + // then we have to execute tuple-at-a-time + result->input_chunk.Initialize(context.client, children[0]->types); + } } return std::move(result); } @@ -52,15 +81,65 @@ unique_ptr PhysicalTableInOutFunction::GetGlobalOperatorSta return std::move(result); } -OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, - GlobalOperatorState &gstate_p, OperatorState &state_p) const { - auto &gstate = gstate_p.Cast(); - auto &state = state_p.Cast(); - TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get()); - if (projected_input.empty()) { - // straightforward case - no need to project input - return function.in_out_function(context, data, input, chunk); +void PhysicalTableInOutFunction::AddProjectedColumnsFromConstantMapping(idx_t map_idx, DataChunk &input, + DataChunk &intermediate, DataChunk &out) const { + auto &mapping_column = intermediate.data[map_idx]; + D_ASSERT(mapping_column.GetVectorType() == VectorType::CONSTANT_VECTOR); + + auto mapping_data = FlatVector::GetData(mapping_column); + + // Add the projected columns, and apply the selection vector + for (idx_t project_idx = 0; project_idx < projected_input.size(); project_idx++) { + auto source_idx = projected_input[project_idx]; + D_ASSERT(source_idx < input.data.size()); + auto target_idx = map_idx + project_idx; + + auto &target_column = out.data[target_idx]; + auto &source_column = input.data[source_idx]; + + ConstantVector::Reference(target_column, source_column, mapping_data[0], input.size()); } +} + +OperatorResultType PhysicalTableInOutFunction::ExecuteWithMapping(ExecutionContext &context, DataChunk &input, + DataChunk &chunk, TableInOutLocalState &state, + TableFunctionInput &data) const { + // Create a duplicate of 'chunk' that contains one extra column + // this column is used to register the relation between input tuple -> output tuple(s) + const auto base_columns = chunk.ColumnCount() - projected_input.size(); + + auto &intermediate_chunk = state.GetIntermediateChunk(chunk, base_columns); + + // Let the function know that we expect it to write an in-out mapping for rowids + data.add_in_out_mapping = true; + auto result = function.in_out_function(context, data, input, intermediate_chunk); + chunk.SetCardinality(intermediate_chunk.size()); + + // Move the result into the output chunk + for (idx_t i = 0; i < base_columns; i++) { + chunk.data[i].Reference(intermediate_chunk.data[i]); + } + + auto &mapping_column = intermediate_chunk.data[base_columns]; + switch (mapping_column.GetVectorType()) { + case VectorType::CONSTANT_VECTOR: { + // We can avoid creating a selection vector altogether + AddProjectedColumnsFromConstantMapping(base_columns, input, intermediate_chunk, chunk); + break; + } + default: { + throw NotImplementedException( + "Executing Table in-out functions with a non-constant mapping is not supported yet"); + } + } + + return result; +} + +OperatorResultType PhysicalTableInOutFunction::ExecuteWithoutMapping(ExecutionContext &context, DataChunk &input, + DataChunk &chunk, TableInOutGlobalState &gstate, + TableInOutLocalState &state, + TableFunctionInput &data) const { // when project_input is set we execute the input function row-by-row if (state.new_row) { if (state.row_index >= input.size()) { @@ -100,6 +179,20 @@ OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context return OperatorResultType::HAVE_MORE_OUTPUT; } +OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, + GlobalOperatorState &gstate_p, OperatorState &state_p) const { + auto &gstate = gstate_p.Cast(); + auto &state = state_p.Cast(); + TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get()); + if (projected_input.empty()) { + return function.in_out_function(context, data, input, chunk); + } + if (function.in_out_mapping) { + return ExecuteWithMapping(context, input, chunk, state, data); + } + return ExecuteWithoutMapping(context, input, chunk, gstate, state, data); +} + OperatorFinalizeResultType PhysicalTableInOutFunction::FinalExecute(ExecutionContext &context, DataChunk &chunk, GlobalOperatorState &gstate_p, OperatorState &state_p) const { @@ -108,6 +201,7 @@ OperatorFinalizeResultType PhysicalTableInOutFunction::FinalExecute(ExecutionCon if (!projected_input.empty()) { throw InternalException("FinalExecute not supported for project_input"); } + D_ASSERT(RequiresFinalExecute()); TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get()); return function.in_out_function_final(context, data, chunk); } diff --git a/src/execution/operator/projection/physical_unnest.cpp b/src/execution/operator/projection/physical_unnest.cpp index d5632f80e36f..29f370a13cb4 100644 --- a/src/execution/operator/projection/physical_unnest.cpp +++ b/src/execution/operator/projection/physical_unnest.cpp @@ -251,10 +251,55 @@ unique_ptr PhysicalUnnest::GetState(ExecutionContext &context, return make_uniq(context.client, select_list); } +static void UnnestLists(UnnestOperatorState &state, DataChunk &chunk, idx_t col_offset, idx_t this_chunk_len) { + // unnest the lists + for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) { + + auto &result_vector = chunk.data[col_idx + col_offset]; + + if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL) { + // UNNEST(NULL) + chunk.SetCardinality(0); + break; + } + + auto &vector_data = state.list_vector_data[col_idx]; + auto current_idx = vector_data.sel->get_index(state.current_row); + + if (!vector_data.validity.RowIsValid(current_idx)) { + UnnestNull(0, this_chunk_len, result_vector); + continue; + } + + auto list_data = UnifiedVectorFormat::GetData(vector_data); + auto list_entry = list_data[current_idx]; + + idx_t list_count = 0; + if (state.list_position < list_entry.length) { + // there are still list_count elements to unnest + list_count = MinValue(this_chunk_len, list_entry.length - state.list_position); + + auto &list_vector = state.list_data.data[col_idx]; + auto &child_vector = ListVector::GetEntry(list_vector); + auto list_size = ListVector::GetListSize(list_vector); + auto &child_vector_data = state.list_child_data[col_idx]; + + auto base_offset = list_entry.offset + state.list_position; + UnnestVector(child_vector_data, child_vector, list_size, base_offset, base_offset + list_count, + result_vector); + } + + // fill the rest with NULLs + if (list_count != this_chunk_len) { + UnnestNull(list_count, this_chunk_len, result_vector); + } + } +} + OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, OperatorState &state_p, const vector> &select_list, - bool include_input) { + bool include_input, bool add_in_out_mapping) { auto &state = state_p.Cast(); @@ -290,7 +335,7 @@ OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, Da // if we include other projection input columns, e.g. SELECT 1, UNNEST([1, 2]);, then // we need to add them as a constant vector to the resulting chunk // FIXME: emit multiple unnested rows. Currently, we never emit a chunk containing multiple unnested input rows, - // so setting a constant vector for the value at state.current_row is fine + // so setting a constant vector for the value at state.current_row is fine idx_t col_offset = 0; if (include_input) { for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) { @@ -299,51 +344,18 @@ OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, Da col_offset = input.ColumnCount(); } - // unnest the lists - for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) { - - auto &result_vector = chunk.data[col_idx + col_offset]; - - if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL) { - // UNNEST(NULL) - chunk.SetCardinality(0); - break; - } - - auto &vector_data = state.list_vector_data[col_idx]; - auto current_idx = vector_data.sel->get_index(state.current_row); - - if (!vector_data.validity.RowIsValid(current_idx)) { - UnnestNull(0, this_chunk_len, result_vector); - continue; - } - - auto list_data = UnifiedVectorFormat::GetData(vector_data); - auto list_entry = list_data[current_idx]; - - idx_t list_count = 0; - if (state.list_position < list_entry.length) { - // there are still list_count elements to unnest - list_count = MinValue(this_chunk_len, list_entry.length - state.list_position); + UnnestLists(state, chunk, col_offset, this_chunk_len); - auto &list_vector = state.list_data.data[col_idx]; - auto &child_vector = ListVector::GetEntry(list_vector); - auto list_size = ListVector::GetListSize(list_vector); - auto &child_vector_data = state.list_child_data[col_idx]; - - auto base_offset = list_entry.offset + state.list_position; - UnnestVector(child_vector_data, child_vector, list_size, base_offset, base_offset + list_count, - result_vector); - } + chunk.Verify(); - // fill the rest with NULLs - if (list_count != this_chunk_len) { - UnnestNull(list_count, this_chunk_len, result_vector); - } + // Register which input tuple produced the input tuples + if (add_in_out_mapping) { + auto &relation_vec = chunk.data[state.list_data.ColumnCount() + col_offset]; + auto relation_data = FlatVector::GetData(relation_vec); + relation_data[0] = state.current_row; + relation_vec.SetVectorType(VectorType::CONSTANT_VECTOR); } - chunk.Verify(); - state.list_position += this_chunk_len; if (state.list_position == state.longest_list_length) { state.current_row++; diff --git a/src/function/table/summary.cpp b/src/function/table/summary.cpp index d6c4615e4251..517cc047f8ab 100644 --- a/src/function/table/summary.cpp +++ b/src/function/table/summary.cpp @@ -40,6 +40,7 @@ static OperatorResultType SummaryFunction(ExecutionContext &context, TableFuncti for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) { output.data[col_idx + 1].Reference(input.data[col_idx]); } + return OperatorResultType::NEED_MORE_INPUT; } diff --git a/src/function/table/unnest.cpp b/src/function/table/unnest.cpp index 15f3950893bf..1673739de3cb 100644 --- a/src/function/table/unnest.cpp +++ b/src/function/table/unnest.cpp @@ -74,12 +74,14 @@ static OperatorResultType UnnestFunction(ExecutionContext &context, TableFunctio DataChunk &output) { auto &state = data_p.global_state->Cast(); auto &lstate = data_p.local_state->Cast(); - return PhysicalUnnest::ExecuteInternal(context, input, output, *lstate.operator_state, state.select_list, false); + return PhysicalUnnest::ExecuteInternal(context, input, output, *lstate.operator_state, state.select_list, false, + data_p.add_in_out_mapping); } void UnnestTableFunction::RegisterFunction(BuiltinFunctions &set) { TableFunction unnest_function("unnest", {LogicalTypeId::TABLE}, nullptr, UnnestBind, UnnestInit, UnnestLocalInit); unnest_function.in_out_function = UnnestFunction; + unnest_function.in_out_mapping = true; set.AddFunction(unnest_function); } diff --git a/src/function/table_function.cpp b/src/function/table_function.cpp index 4fcf8d82f91b..6b268c8e0414 100644 --- a/src/function/table_function.cpp +++ b/src/function/table_function.cpp @@ -32,7 +32,7 @@ TableFunction::TableFunction() init_local(nullptr), function(nullptr), in_out_function(nullptr), statistics(nullptr), dependency(nullptr), cardinality(nullptr), pushdown_complex_filter(nullptr), to_string(nullptr), table_scan_progress(nullptr), get_batch_index(nullptr), get_batch_info(nullptr), serialize(nullptr), deserialize(nullptr), - projection_pushdown(false), filter_pushdown(false), filter_prune(false) { + projection_pushdown(false), filter_pushdown(false), filter_prune(false), in_out_mapping(false) { } bool TableFunction::Equal(const TableFunction &rhs) const { diff --git a/src/include/duckdb/execution/operator/projection/physical_tableinout_function.hpp b/src/include/duckdb/execution/operator/projection/physical_tableinout_function.hpp index 659e5cc4a404..7692e9afe9e2 100644 --- a/src/include/duckdb/execution/operator/projection/physical_tableinout_function.hpp +++ b/src/include/duckdb/execution/operator/projection/physical_tableinout_function.hpp @@ -14,6 +14,10 @@ namespace duckdb { +class TableInOutLocalState; +class TableInOutGlobalState; + +//! PhysicalWindow implements window functions class PhysicalTableInOutFunction : public PhysicalOperator { public: static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::INOUT_FUNCTION; @@ -39,6 +43,15 @@ class PhysicalTableInOutFunction : public PhysicalOperator { return function.in_out_function_final; } +private: + OperatorResultType ExecuteWithMapping(ExecutionContext &context, DataChunk &input, DataChunk &chunk, + TableInOutLocalState &state, TableFunctionInput &data) const; + OperatorResultType ExecuteWithoutMapping(ExecutionContext &context, DataChunk &input, DataChunk &chunk, + TableInOutGlobalState &gstate, TableInOutLocalState &state, + TableFunctionInput &data) const; + void AddProjectedColumnsFromConstantMapping(idx_t map_idx, DataChunk &input, DataChunk &intermediate, + DataChunk &out) const; + private: //! The table function TableFunction function; diff --git a/src/include/duckdb/execution/operator/projection/physical_unnest.hpp b/src/include/duckdb/execution/operator/projection/physical_unnest.hpp index 194001353bbf..7072cb3c33c4 100644 --- a/src/include/duckdb/execution/operator/projection/physical_unnest.hpp +++ b/src/include/duckdb/execution/operator/projection/physical_unnest.hpp @@ -43,7 +43,7 @@ class PhysicalUnnest : public PhysicalOperator { //! not set, then the UNNEST behaves as a table function and only emits the unnested data. static OperatorResultType ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, OperatorState &state, const vector> &select_list, - bool include_input = true); + bool include_input = true, bool add_in_out_mapping = false); }; } // namespace duckdb diff --git a/src/include/duckdb/function/table_function.hpp b/src/include/duckdb/function/table_function.hpp index ef537304b2e1..eed0fb634971 100644 --- a/src/include/duckdb/function/table_function.hpp +++ b/src/include/duckdb/function/table_function.hpp @@ -131,6 +131,9 @@ struct TableFunctionInput { optional_ptr bind_data; optional_ptr local_state; optional_ptr global_state; + //! Whether or not the current execution should write the in-out mapping vector + //! for table in-out functions + bool add_in_out_mapping = false; }; enum ScanType { TABLE, PARQUET }; @@ -275,6 +278,9 @@ class TableFunction : public SimpleNamedParameterFunction { //! Whether or not the table function can immediately prune out filter columns that are unused in the remainder of //! the query plan, e.g., "SELECT i FROM tbl WHERE j = 42;" - j does not need to leave the table function at all bool filter_prune; + //! Whether or not the table function produces an extra column containing the mapping from output row to + //! the input row that produced it. + bool in_out_mapping = false; //! Additional function info, passed to the bind shared_ptr function_info; diff --git a/test/sql/function/table/test_projected_without_mapping.test b/test/sql/function/table/test_projected_without_mapping.test new file mode 100644 index 000000000000..9f6e2db5557a --- /dev/null +++ b/test/sql/function/table/test_projected_without_mapping.test @@ -0,0 +1,16 @@ +# name: test/sql/function/table/test_projected_without_mapping.test +# group: [table] + +query II +SELECT * FROM ( + VALUES + ([1, 2, 3]), + ([4, 5, 6]) +) t(l), SUMMARY(l) t2(k) ORDER BY k +---- +[1, 2, 3] 1 +[1, 2, 3] 2 +[1, 2, 3] 3 +[4, 5, 6] 4 +[4, 5, 6] 5 +[4, 5, 6] 6