Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
30d3b6e
expect the table in-out function to produce a tuple mapping vector
Tishj Mar 7, 2023
69606cc
move some logic to separate functions to reduce the size and nesting
Tishj Mar 7, 2023
caf58ac
fixed some bugs, 'unnest' now works with the new path
Tishj Mar 7, 2023
b030a44
add the ability for in-out table functions to produce a in-out row ma…
Tishj Mar 8, 2023
3fa49ff
optimization: directly use the column as the selection vector + small…
Tishj Mar 8, 2023
c0dee98
fix tidy-check
Tishj Mar 8, 2023
6bab159
implement 'in_out_mapping' for summary + optimize the implementation …
Tishj Mar 8, 2023
b1b4848
revert change - we can't know the input types beforehand
Tishj Mar 8, 2023
c667408
fixed issue with the mapping vector -> selection vector, also changed…
Tishj Mar 9, 2023
9cf1230
Merge branch 'master' into scalar_table_function
Tishj Mar 10, 2023
3e1bd83
add lateraljoin + unnest benchmark
Tishj Mar 14, 2023
077829d
apply feedback
Tishj Mar 14, 2023
dfe4683
Merge branch 'master' into scalar_table_function
Tishj Mar 14, 2023
737a074
reformat
Tishj Mar 16, 2023
92aac78
Merge branch 'master' into scalar_table_function
Tishj Apr 7, 2023
03a12fc
Merge branch 'master' into scalar_table_function
Tishj Apr 25, 2023
eaeaab6
Merge branch 'master' into scalar_table_function
Tishj May 16, 2023
d793369
Merge branch 'master' into scalar_table_function
Tishj Aug 3, 2023
849434a
small tidy issues
Tishj Aug 3, 2023
fc509dd
remove code that can not be tested for now
Tishj Aug 4, 2023
2e38141
add test with summary to get coverage for ExecuteWithoutMapping
Tishj Aug 4, 2023
aa45c6c
add [filter] to the tests contributing to coverage, to cover Physical…
Tishj Aug 4, 2023
a53aa40
update coverage
Tishj Aug 4, 2023
f75b9c3
Merge branch 'master' into scalar_table_function
Tishj Aug 14, 2023
8fcacce
this breaks verification, removing it for now
Tishj Aug 14, 2023
2d4fc8a
Merge remote-tracking branch 'upstream/feature' into scalar_table_fun…
Tishj Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions benchmark/micro/join/lateraljoin_unnest.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# name: benchmark/micro/join/lateraljoin_unnest.benchmark
# description: Perform a LATERAL join using UNNEST
# group: [join]

name LATERAL join + UNNEST
group join
storage persistent

load
CREATE TABLE lists AS SELECT ARRAY[i, NULL, i + 1, i + 2, NULL] l FROM generate_series(0, 999999, 1) t(i);
checkpoint;

run
SELECT SUM(k) FROM lists, UNNEST(l) t(k)

result I
1500001500000
1 change: 1 addition & 0 deletions scripts/coverage_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ build/coverage/test/unittest "[coverage]"
build/coverage/test/unittest "[intraquery]"
build/coverage/test/unittest "[interquery]"
build/coverage/test/unittest "[detailed_profiler]"
build/coverage/test/unittest "[filter]"
build/coverage/test/unittest test/sql/tpch/tpch_sf01.test_slow
python3 -m pytest --shell-binary build/coverage/duckdb tools/shell/tests/

Expand Down
112 changes: 103 additions & 9 deletions src/execution/operator/projection/physical_tableinout_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ class TableInOutLocalState : public OperatorState {
idx_t row_index;
bool new_row;
DataChunk input_chunk;
unique_ptr<DataChunk> intermediate_chunk;

public:
DataChunk &GetIntermediateChunk(DataChunk &output, idx_t base_column_count) {
if (!intermediate_chunk) {
intermediate_chunk = make_uniq<DataChunk>();
// Create an empty DataChunk that has room for the input + the mapping vector
auto chunk_types = output.GetTypes();
vector<LogicalType> intermediate_types;
intermediate_types.reserve(base_column_count + 1);
intermediate_types.insert(intermediate_types.end(), chunk_types.begin(),
chunk_types.begin() + base_column_count);
intermediate_types.emplace_back(LogicalType::UINTEGER);
// We initialize this as empty
intermediate_chunk->InitializeEmpty(intermediate_types);
// And only allocate for our mapping vector
intermediate_chunk->data[base_column_count].Initialize();
}
// Initialize our output chunk
for (idx_t i = 0; i < base_column_count; i++) {
intermediate_chunk->data[i].Reference(output.data[i]);
}
intermediate_chunk->SetCardinality(output.size());
return *intermediate_chunk;
}
};

class TableInOutGlobalState : public GlobalOperatorState {
Expand Down Expand Up @@ -38,7 +63,11 @@ unique_ptr<OperatorState> PhysicalTableInOutFunction::GetOperatorState(Execution
result->local_state = function.init_local(context, input, gstate.global_state.get());
}
if (!projected_input.empty()) {
result->input_chunk.Initialize(context.client, children[0]->types);
if (!function.in_out_mapping) {
// If we have to project columns, and the function doesn't provide a mapping from output row -> input row
// then we have to execute tuple-at-a-time
result->input_chunk.Initialize(context.client, children[0]->types);
}
}
return std::move(result);
}
Expand All @@ -52,15 +81,65 @@ unique_ptr<GlobalOperatorState> PhysicalTableInOutFunction::GetGlobalOperatorSta
return std::move(result);
}

OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
auto &gstate = gstate_p.Cast<TableInOutGlobalState>();
auto &state = state_p.Cast<TableInOutLocalState>();
TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get());
if (projected_input.empty()) {
// straightforward case - no need to project input
return function.in_out_function(context, data, input, chunk);
void PhysicalTableInOutFunction::AddProjectedColumnsFromConstantMapping(idx_t map_idx, DataChunk &input,
DataChunk &intermediate, DataChunk &out) const {
auto &mapping_column = intermediate.data[map_idx];
D_ASSERT(mapping_column.GetVectorType() == VectorType::CONSTANT_VECTOR);

auto mapping_data = FlatVector::GetData<sel_t>(mapping_column);

// Add the projected columns, and apply the selection vector
for (idx_t project_idx = 0; project_idx < projected_input.size(); project_idx++) {
auto source_idx = projected_input[project_idx];
D_ASSERT(source_idx < input.data.size());
auto target_idx = map_idx + project_idx;

auto &target_column = out.data[target_idx];
auto &source_column = input.data[source_idx];

ConstantVector::Reference(target_column, source_column, mapping_data[0], input.size());
}
}

OperatorResultType PhysicalTableInOutFunction::ExecuteWithMapping(ExecutionContext &context, DataChunk &input,
DataChunk &chunk, TableInOutLocalState &state,
TableFunctionInput &data) const {
// Create a duplicate of 'chunk' that contains one extra column
// this column is used to register the relation between input tuple -> output tuple(s)
const auto base_columns = chunk.ColumnCount() - projected_input.size();

auto &intermediate_chunk = state.GetIntermediateChunk(chunk, base_columns);

// Let the function know that we expect it to write an in-out mapping for rowids
data.add_in_out_mapping = true;
auto result = function.in_out_function(context, data, input, intermediate_chunk);
chunk.SetCardinality(intermediate_chunk.size());

// Move the result into the output chunk
for (idx_t i = 0; i < base_columns; i++) {
chunk.data[i].Reference(intermediate_chunk.data[i]);
}

auto &mapping_column = intermediate_chunk.data[base_columns];
switch (mapping_column.GetVectorType()) {
case VectorType::CONSTANT_VECTOR: {
// We can avoid creating a selection vector altogether
AddProjectedColumnsFromConstantMapping(base_columns, input, intermediate_chunk, chunk);
break;
}
default: {
throw NotImplementedException(
"Executing Table in-out functions with a non-constant mapping is not supported yet");
}
}

return result;
}

OperatorResultType PhysicalTableInOutFunction::ExecuteWithoutMapping(ExecutionContext &context, DataChunk &input,
DataChunk &chunk, TableInOutGlobalState &gstate,
TableInOutLocalState &state,
TableFunctionInput &data) const {
// when project_input is set we execute the input function row-by-row
if (state.new_row) {
if (state.row_index >= input.size()) {
Expand Down Expand Up @@ -100,6 +179,20 @@ OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context
return OperatorResultType::HAVE_MORE_OUTPUT;
}

OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
auto &gstate = gstate_p.Cast<TableInOutGlobalState>();
auto &state = state_p.Cast<TableInOutLocalState>();
TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get());
if (projected_input.empty()) {
return function.in_out_function(context, data, input, chunk);
}
if (function.in_out_mapping) {
return ExecuteWithMapping(context, input, chunk, state, data);
}
return ExecuteWithoutMapping(context, input, chunk, gstate, state, data);
}

OperatorFinalizeResultType PhysicalTableInOutFunction::FinalExecute(ExecutionContext &context, DataChunk &chunk,
GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
Expand All @@ -108,6 +201,7 @@ OperatorFinalizeResultType PhysicalTableInOutFunction::FinalExecute(ExecutionCon
if (!projected_input.empty()) {
throw InternalException("FinalExecute not supported for project_input");
}
D_ASSERT(RequiresFinalExecute());
TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get());
return function.in_out_function_final(context, data, chunk);
}
Expand Down
98 changes: 55 additions & 43 deletions src/execution/operator/projection/physical_unnest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,55 @@ unique_ptr<OperatorState> PhysicalUnnest::GetState(ExecutionContext &context,
return make_uniq<UnnestOperatorState>(context.client, select_list);
}

static void UnnestLists(UnnestOperatorState &state, DataChunk &chunk, idx_t col_offset, idx_t this_chunk_len) {
// unnest the lists
for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) {

auto &result_vector = chunk.data[col_idx + col_offset];

if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL) {
// UNNEST(NULL)
chunk.SetCardinality(0);
break;
}

auto &vector_data = state.list_vector_data[col_idx];
auto current_idx = vector_data.sel->get_index(state.current_row);

if (!vector_data.validity.RowIsValid(current_idx)) {
UnnestNull(0, this_chunk_len, result_vector);
continue;
}

auto list_data = UnifiedVectorFormat::GetData<list_entry_t>(vector_data);
auto list_entry = list_data[current_idx];

idx_t list_count = 0;
if (state.list_position < list_entry.length) {
// there are still list_count elements to unnest
list_count = MinValue<idx_t>(this_chunk_len, list_entry.length - state.list_position);

auto &list_vector = state.list_data.data[col_idx];
auto &child_vector = ListVector::GetEntry(list_vector);
auto list_size = ListVector::GetListSize(list_vector);
auto &child_vector_data = state.list_child_data[col_idx];

auto base_offset = list_entry.offset + state.list_position;
UnnestVector(child_vector_data, child_vector, list_size, base_offset, base_offset + list_count,
result_vector);
}

// fill the rest with NULLs
if (list_count != this_chunk_len) {
UnnestNull(list_count, this_chunk_len, result_vector);
}
}
}

OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
OperatorState &state_p,
const vector<unique_ptr<Expression>> &select_list,
bool include_input) {
bool include_input, bool add_in_out_mapping) {

auto &state = state_p.Cast<UnnestOperatorState>();

Expand Down Expand Up @@ -290,7 +335,7 @@ OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, Da
// if we include other projection input columns, e.g. SELECT 1, UNNEST([1, 2]);, then
// we need to add them as a constant vector to the resulting chunk
// FIXME: emit multiple unnested rows. Currently, we never emit a chunk containing multiple unnested input rows,
// so setting a constant vector for the value at state.current_row is fine
// so setting a constant vector for the value at state.current_row is fine
idx_t col_offset = 0;
if (include_input) {
for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
Expand All @@ -299,51 +344,18 @@ OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, Da
col_offset = input.ColumnCount();
}

// unnest the lists
for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) {

auto &result_vector = chunk.data[col_idx + col_offset];

if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL) {
// UNNEST(NULL)
chunk.SetCardinality(0);
break;
}

auto &vector_data = state.list_vector_data[col_idx];
auto current_idx = vector_data.sel->get_index(state.current_row);

if (!vector_data.validity.RowIsValid(current_idx)) {
UnnestNull(0, this_chunk_len, result_vector);
continue;
}

auto list_data = UnifiedVectorFormat::GetData<list_entry_t>(vector_data);
auto list_entry = list_data[current_idx];

idx_t list_count = 0;
if (state.list_position < list_entry.length) {
// there are still list_count elements to unnest
list_count = MinValue<idx_t>(this_chunk_len, list_entry.length - state.list_position);
UnnestLists(state, chunk, col_offset, this_chunk_len);

auto &list_vector = state.list_data.data[col_idx];
auto &child_vector = ListVector::GetEntry(list_vector);
auto list_size = ListVector::GetListSize(list_vector);
auto &child_vector_data = state.list_child_data[col_idx];

auto base_offset = list_entry.offset + state.list_position;
UnnestVector(child_vector_data, child_vector, list_size, base_offset, base_offset + list_count,
result_vector);
}
chunk.Verify();

// fill the rest with NULLs
if (list_count != this_chunk_len) {
UnnestNull(list_count, this_chunk_len, result_vector);
}
// Register which input tuple produced the input tuples
if (add_in_out_mapping) {
auto &relation_vec = chunk.data[state.list_data.ColumnCount() + col_offset];
auto relation_data = FlatVector::GetData<uint32_t>(relation_vec);
relation_data[0] = state.current_row;
relation_vec.SetVectorType(VectorType::CONSTANT_VECTOR);
}

chunk.Verify();

state.list_position += this_chunk_len;
if (state.list_position == state.longest_list_length) {
state.current_row++;
Expand Down
1 change: 1 addition & 0 deletions src/function/table/summary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static OperatorResultType SummaryFunction(ExecutionContext &context, TableFuncti
for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
output.data[col_idx + 1].Reference(input.data[col_idx]);
}

return OperatorResultType::NEED_MORE_INPUT;
}

Expand Down
4 changes: 3 additions & 1 deletion src/function/table/unnest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ static OperatorResultType UnnestFunction(ExecutionContext &context, TableFunctio
DataChunk &output) {
auto &state = data_p.global_state->Cast<UnnestGlobalState>();
auto &lstate = data_p.local_state->Cast<UnnestLocalState>();
return PhysicalUnnest::ExecuteInternal(context, input, output, *lstate.operator_state, state.select_list, false);
return PhysicalUnnest::ExecuteInternal(context, input, output, *lstate.operator_state, state.select_list, false,
data_p.add_in_out_mapping);
}

void UnnestTableFunction::RegisterFunction(BuiltinFunctions &set) {
TableFunction unnest_function("unnest", {LogicalTypeId::TABLE}, nullptr, UnnestBind, UnnestInit, UnnestLocalInit);
unnest_function.in_out_function = UnnestFunction;
unnest_function.in_out_mapping = true;
set.AddFunction(unnest_function);
}

Expand Down
2 changes: 1 addition & 1 deletion src/function/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ TableFunction::TableFunction()
init_local(nullptr), function(nullptr), in_out_function(nullptr), statistics(nullptr), dependency(nullptr),
cardinality(nullptr), pushdown_complex_filter(nullptr), to_string(nullptr), table_scan_progress(nullptr),
get_batch_index(nullptr), get_batch_info(nullptr), serialize(nullptr), deserialize(nullptr),
projection_pushdown(false), filter_pushdown(false), filter_prune(false) {
projection_pushdown(false), filter_pushdown(false), filter_prune(false), in_out_mapping(false) {
}

bool TableFunction::Equal(const TableFunction &rhs) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

namespace duckdb {

class TableInOutLocalState;
class TableInOutGlobalState;

//! PhysicalWindow implements window functions
class PhysicalTableInOutFunction : public PhysicalOperator {
public:
static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::INOUT_FUNCTION;
Expand All @@ -39,6 +43,15 @@ class PhysicalTableInOutFunction : public PhysicalOperator {
return function.in_out_function_final;
}

private:
OperatorResultType ExecuteWithMapping(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
TableInOutLocalState &state, TableFunctionInput &data) const;
OperatorResultType ExecuteWithoutMapping(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
TableInOutGlobalState &gstate, TableInOutLocalState &state,
TableFunctionInput &data) const;
void AddProjectedColumnsFromConstantMapping(idx_t map_idx, DataChunk &input, DataChunk &intermediate,
DataChunk &out) const;

private:
//! The table function
TableFunction function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PhysicalUnnest : public PhysicalOperator {
//! not set, then the UNNEST behaves as a table function and only emits the unnested data.
static OperatorResultType ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
OperatorState &state, const vector<unique_ptr<Expression>> &select_list,
bool include_input = true);
bool include_input = true, bool add_in_out_mapping = false);
};

} // namespace duckdb
6 changes: 6 additions & 0 deletions src/include/duckdb/function/table_function.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ struct TableFunctionInput {
optional_ptr<const FunctionData> bind_data;
optional_ptr<LocalTableFunctionState> local_state;
optional_ptr<GlobalTableFunctionState> global_state;
//! Whether or not the current execution should write the in-out mapping vector
//! for table in-out functions
bool add_in_out_mapping = false;
};

enum ScanType { TABLE, PARQUET };
Expand Down Expand Up @@ -275,6 +278,9 @@ class TableFunction : public SimpleNamedParameterFunction {
//! Whether or not the table function can immediately prune out filter columns that are unused in the remainder of
//! the query plan, e.g., "SELECT i FROM tbl WHERE j = 42;" - j does not need to leave the table function at all
bool filter_prune;
//! Whether or not the table function produces an extra column containing the mapping from output row to
//! the input row that produced it.
bool in_out_mapping = false;
//! Additional function info, passed to the bind
shared_ptr<TableFunctionInfo> function_info;

Expand Down
Loading