Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion e2e_test/vector_search/vector_nearest.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,34 @@ select * from query_view order by distance;
statement ok
drop view query_view;

# test vector index lookup join on raw embedding column
# query T
# with input as (select '[3,2,1]'::vector(3) as embedding) select array(select row(id, text) from no_index_mv order by input.embedding <-> no_index_mv.embedding limit 2) as related_info from input;
# ----
# {"(1,first)","(2,second)"}

statement ok
create view query_view as with input as (select '[3,2,1]'::vector(3) as embedding) select array(select row(id, text) from items order by input.embedding <-> items.embedding limit 2) as related_info from input;

# ensure that vector index is used
query T
explain(verbose) select * from query_view;
----
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [vector_info] }
└─BatchVectorSearch { top_n: 2, distance_type: L2Sqr, index_name: "i", vector: '[3,2,1]':Vector(3), lookup_output: [("items.id", Int32), ("text", Varchar)], include_distance: false }
└─BatchValues { rows: [['[3,2,1]':Vector(3)]] }



query T
select * from query_view;
----
{"(1,first)","(2,second)"}

statement ok
drop view query_view;

statement ok
drop index i;

Expand Down Expand Up @@ -153,6 +181,33 @@ select * from query_view order by distance;
statement ok
drop view query_view;

# test vector index lookup join on functional embedding column
# query T
# with input as (select '[3,2,1]'::vector(3) as embedding) select array(select row(id, text) from no_index_mv order by input.embedding <-> get_embedding(no_index_mv.text) limit 2) as related_info from input;
# ----
# {"(1,first)","(2,second)"}

statement ok
create view query_view as with input as (select '[3,2,1]'::vector(3) as embedding) select array(select row(id, text) from items order by input.embedding <-> get_embedding(items.text) limit 2) as related_info from input;

# ensure that vector index is used
query T
explain(verbose) select * from query_view;
----
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [vector_info] }
└─BatchVectorSearch { top_n: 2, distance_type: L2Sqr, index_name: "i", vector: '[3,2,1]':Vector(3), lookup_output: [("items.id", Int32), ("text", Varchar)], include_distance: false }
└─BatchValues { rows: [['[3,2,1]':Vector(3)]] }


query T
select * from query_view;
----
{"(1,first)","(2,second)"}

statement ok
drop view query_view;

statement ok
drop index i;

Expand All @@ -162,7 +217,7 @@ drop materialized view no_index_mv;
statement ok
drop table items;

# test flat index
# test hnsw index
statement ok
create table items (id int primary key, extra string, text string, embedding vector(3)) append only;

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Schema {
}

for (a, b) in self.fields.iter().zip_eq_fast(other.fields.iter()) {
if a.data_type != b.data_type {
if !a.data_type.equals_datatype(&b.data_type) {
return false;
}
}
Expand Down
86 changes: 86 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/vector_search.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,92 @@
- create_table_and_function_index
sql: |
SELECT id, name FROM items order by openai_embedding('{"model": "model"}'::jsonb, text)::vector(3) <#> '[3,1,2]' limit 5;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- id: create_correlated_tables
sql: |
create table items (id int primary key, name string, embedding vector(3)) append only;
create table events (event_id int primary key, time timestamp, embedding vector(3));
expected_outputs: []
- before:
- create_correlated_tables
id: correlated_read_without_embedding
sql: |
select
event_id, array(
select row(id, name)
from items
order by events.embedding <=> items.embedding
limit 3
) as related_info,
time
from events;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- before:
- create_correlated_tables
id: correlated_read_with_embedding
sql: |
select
event_id, time, embedding, array(
select row(id, name)
from items
order by items.embedding <=> events.embedding
limit 3
)
as related_info from events;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- before:
- create_correlated_tables
id: correlated_read_with_distance
sql: |
select
event_id, array(
select row(id, distance, name)
from (select id, name, events.embedding <=> items.embedding as distance from items order by distance limit 3)
) as related_info,
time
from events;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- id: create_correlated_tables_with_column_value_index
sql: |
create table items (id int primary key, name string, embedding vector(3)) append only;
create table events (event_id int primary key, time timestamp, embedding vector(3));
create index i on items using flat (embedding) with (distance_type = 'l2');
expected_outputs: []
- before:
- create_correlated_tables_with_column_value_index
id: correlated_read_without_embedding
sql: |
select
event_id, array(
select row(name)
from (select name from items order by events.embedding <-> items.embedding limit 3)
) as related_info,
time
from events;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
- batch_plan
- before:
- create_correlated_tables_with_column_value_index
id: correlated_read_without_embedding
sql: |
select
event_id, array(
select row(id, name, distance)
from (select id, name, events.embedding <-> items.embedding as distance from items order by distance limit 3)
) as related_info,
time
from events;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_batch
Expand Down
123 changes: 123 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/vector_search.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,126 @@
└─BatchProjectSet { select_list: [Unnest($1)] }
└─BatchVectorSearch { top_n: 5, distance_type: InnerProduct, index_name: "vector_index", vector: query_vector, lookup_output: [("name", Varchar), ("items.id", Int32)], include_distance: true }
└─BatchValues { rows: [['[3,1,2]':Vector(3)]] }
- id: create_correlated_tables
sql: |
create table items (id int primary key, name string, embedding vector(3)) append only;
create table events (event_id int primary key, time timestamp, embedding vector(3));
- id: correlated_read_without_embedding
before:
- create_correlated_tables
sql: "select \n event_id, array(\n select row(id, name)\n from items\n order by events.embedding <=> items.embedding\n limit 3\n ) as related_info, \n time\nfrom events;\n"
logical_plan: |-
LogicalProject { exprs: [events.event_id, $expr3, events.time] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding, events._rw_timestamp] }
└─LogicalProject { exprs: [Coalesce(array_agg($expr1 order_by($expr2 ASC)), ARRAY[]:List(Struct(StructType { fields: [("f1", Int32), ("f2", Varchar)] }))) as $expr3] }
└─LogicalAgg { aggs: [array_agg($expr1 order_by($expr2 ASC))] }
└─LogicalTopN { order: [$expr2 ASC], limit: 3, offset: 0 }
└─LogicalProject { exprs: [Row(items.id, items.name) as $expr1, CosineDistance(CorrelatedInputRef { index: 2, correlated_id: 1 }, items.embedding) as $expr2] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [events.event_id, array, events.time] }
└─LogicalVectorSearchLookupJoin { distance_type: Cosine, top_n: 3, input_vector: events.embedding:Vector(3), lookup_vector: items.embedding, lookup_output_columns: [items.id:Int32, items.name:Varchar], include_distance: false }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
- id: correlated_read_with_embedding
before:
- create_correlated_tables
sql: "select \n event_id, time, embedding, array(\n select row(id, name)\n from items\n order by items.embedding <=> events.embedding \n limit 3\n )\nas related_info from events;\n"
logical_plan: |-
LogicalProject { exprs: [events.event_id, events.time, events.embedding, $expr3] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding, events._rw_timestamp] }
└─LogicalProject { exprs: [Coalesce(array_agg($expr1 order_by($expr2 ASC)), ARRAY[]:List(Struct(StructType { fields: [("f1", Int32), ("f2", Varchar)] }))) as $expr3] }
└─LogicalAgg { aggs: [array_agg($expr1 order_by($expr2 ASC))] }
└─LogicalTopN { order: [$expr2 ASC], limit: 3, offset: 0 }
└─LogicalProject { exprs: [Row(items.id, items.name) as $expr1, CosineDistance(items.embedding, CorrelatedInputRef { index: 2, correlated_id: 1 }) as $expr2] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [events.event_id, events.time, events.embedding, array] }
└─LogicalVectorSearchLookupJoin { distance_type: Cosine, top_n: 3, input_vector: events.embedding:Vector(3), lookup_vector: items.embedding, lookup_output_columns: [items.id:Int32, items.name:Varchar], include_distance: false }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
- id: correlated_read_with_distance
before:
- create_correlated_tables
sql: "select \n event_id, array(\n select row(id, distance, name)\n from (select id, name, events.embedding <=> items.embedding as distance from items order by distance limit 3)\n ) as related_info, \n time\nfrom events;\n"
logical_plan: |-
LogicalProject { exprs: [events.event_id, $expr3, events.time] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding, events._rw_timestamp] }
└─LogicalProject { exprs: [Coalesce(array_agg($expr2), ARRAY[]:List(Struct(StructType { fields: [("f1", Int32), ("f2", Float64), ("f3", Varchar)] }))) as $expr3] }
└─LogicalAgg { aggs: [array_agg($expr2)] }
└─LogicalProject { exprs: [Row(items.id, $expr1, items.name) as $expr2] }
└─LogicalTopN { order: [$expr1 ASC], limit: 3, offset: 0 }
└─LogicalProject { exprs: [items.id, items.name, CosineDistance(CorrelatedInputRef { index: 2, correlated_id: 1 }, items.embedding) as $expr1] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
optimized_logical_plan_for_batch: |-
LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(events.embedding, events.embedding), output: [events.event_id, $expr3, events.time] }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding] }
└─LogicalProject { exprs: [events.embedding, Coalesce(array_agg($expr2) filter(IsNotNull(1:Int32)), ARRAY[]:List(Struct(StructType { fields: [("f1", Int32), ("f2", Float64), ("f3", Varchar)] }))) as $expr3] }
└─LogicalAgg { group_key: [events.embedding], aggs: [array_agg($expr2) filter(IsNotNull(1:Int32))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(events.embedding, events.embedding), output: [events.embedding, $expr2, 1:Int32] }
├─LogicalAgg { group_key: [events.embedding], aggs: [] }
│ └─LogicalScan { table: events, columns: [events.embedding] }
└─LogicalProject { exprs: [events.embedding, Row(items.id, $expr1, items.name) as $expr2, 1:Int32] }
└─LogicalTopN { order: [$expr1 ASC], limit: 3, offset: 0, group_key: [events.embedding] }
└─LogicalProject { exprs: [events.embedding, items.id, items.name, CosineDistance(events.embedding, items.embedding) as $expr1] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalAgg { group_key: [events.embedding], aggs: [] }
│ └─LogicalScan { table: events, columns: [events.embedding] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding] }
- id: create_correlated_tables_with_column_value_index
sql: |
create table items (id int primary key, name string, embedding vector(3)) append only;
create table events (event_id int primary key, time timestamp, embedding vector(3));
create index i on items using flat (embedding) with (distance_type = 'l2');
- id: correlated_read_without_embedding
before:
- create_correlated_tables_with_column_value_index
sql: "select \n event_id, array(\n select row(name)\n from (select name from items order by events.embedding <-> items.embedding limit 3)\n ) as related_info, \n time\nfrom events;\n"
logical_plan: |-
LogicalProject { exprs: [events.event_id, $expr3, events.time] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding, events._rw_timestamp] }
└─LogicalProject { exprs: [Coalesce(array_agg($expr2), ARRAY[]:List(Struct(StructType { fields: [("f1", Varchar)] }))) as $expr3] }
└─LogicalAgg { aggs: [array_agg($expr2)] }
└─LogicalProject { exprs: [Row(items.name) as $expr2] }
└─LogicalProject { exprs: [items.name] }
└─LogicalTopN { order: [$expr1 ASC], limit: 3, offset: 0 }
└─LogicalProject { exprs: [items.name, L2Distance(CorrelatedInputRef { index: 2, correlated_id: 1 }, items.embedding) as $expr1] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [events.event_id, array, events.time] }
└─LogicalVectorSearchLookupJoin { distance_type: L2Sqr, top_n: 3, input_vector: events.embedding:Vector(3), lookup_vector: items.embedding, lookup_output_columns: [items.name:Varchar], include_distance: false }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [events.event_id, vector_info, events.time] }
└─BatchVectorSearch { schema: [events.event_id:Int32, events.time:Timestamp, events.embedding:Vector(3), vector_info:List(Struct(StructType { fields: [("name", Varchar)] }))], top_n: 3, distance_type: L2Sqr, index_name: "i", vector: events.embedding }
└─BatchScan { table: events, columns: [events.event_id, events.time, events.embedding], distribution: Single }
- id: correlated_read_without_embedding
before:
- create_correlated_tables_with_column_value_index
sql: "select \n event_id, array(\n select row(id, name, distance)\n from (select id, name, events.embedding <-> items.embedding as distance from items order by distance limit 3)\n ) as related_info, \n time\nfrom events;\n"
logical_plan: |-
LogicalProject { exprs: [events.event_id, $expr3, events.time] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding, events._rw_timestamp] }
└─LogicalProject { exprs: [Coalesce(array_agg($expr2), ARRAY[]:List(Struct(StructType { fields: [("f1", Int32), ("f2", Varchar), ("f3", Float64)] }))) as $expr3] }
└─LogicalAgg { aggs: [array_agg($expr2)] }
└─LogicalProject { exprs: [Row(items.id, items.name, $expr1) as $expr2] }
└─LogicalTopN { order: [$expr1 ASC], limit: 3, offset: 0 }
└─LogicalProject { exprs: [items.id, items.name, L2Distance(CorrelatedInputRef { index: 2, correlated_id: 1 }, items.embedding) as $expr1] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [events.event_id, array, events.time] }
└─LogicalVectorSearchLookupJoin { distance_type: L2Sqr, top_n: 3, input_vector: events.embedding:Vector(3), lookup_vector: items.embedding, lookup_output_columns: [items.id:Int32, items.name:Varchar], include_distance: true }
├─LogicalScan { table: events, columns: [events.event_id, events.time, events.embedding] }
└─LogicalScan { table: items, columns: [items.id, items.name, items.embedding, items._rw_timestamp] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [events.event_id, vector_info, events.time] }
└─BatchVectorSearch { schema: [events.event_id:Int32, events.time:Timestamp, events.embedding:Vector(3), vector_info:List(Struct(StructType { fields: [("id", Int32), ("name", Varchar), ("__distance", Float64)] }))], top_n: 3, distance_type: L2Sqr, index_name: "i", vector: events.embedding }
└─BatchScan { table: events, columns: [events.event_id, events.time, events.embedding], distribution: Single }
Loading
Loading