Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
6191b1d
colexec/group: add SpillableData, SpillManager, MemorySpillManager, S…
reusee Aug 28, 2025
00b844c
colexec/group: add Group.SpillManager, SpillThreshold
reusee Aug 29, 2025
8ca6f24
colexec/group: add spill states to container
reusee Aug 29, 2025
0fb7988
colexec/group: implement spill
reusee Aug 29, 2025
5bd4966
colexec/group: add spill tests
reusee Aug 29, 2025
8ed5298
refinements
reusee Sep 3, 2025
3497546
fix
reusee Sep 8, 2025
596573c
fix restoreAndMergeSpilledAggregators
reusee Sep 10, 2025
287f35d
fix memory leak in spillPartialResults
reusee Sep 10, 2025
ed80477
fix memory leak in spill tests
reusee Sep 10, 2025
be89bff
update
reusee Sep 10, 2025
cde158f
fixes
reusee Sep 12, 2025
c0d1b56
fixes
reusee Sep 12, 2025
d097b46
fixes
reusee Sep 12, 2025
3e29024
fixes
reusee Sep 12, 2025
607e33d
update
reusee Sep 13, 2025
d7b123e
update
reusee Sep 14, 2025
a2d7998
fix init agg state
reusee Sep 17, 2025
8d14197
fix *GroupResultBuffer.IsEmpty
reusee Sep 19, 2025
2665f25
fix
reusee Sep 19, 2025
32c4411
fix chunk size in restore
reusee Sep 19, 2025
6ba1c34
minor clean-ups
reusee Sep 19, 2025
f24427f
fixes
reusee Sep 19, 2025
1b5e83b
fix
reusee Sep 23, 2025
7dd84d1
colexec/group: add lock to memory spill manager
reusee Sep 23, 2025
3b880d1
colexec/group: fix spill trigger
reusee Sep 23, 2025
0d985f1
add test for spill memory
reusee Sep 23, 2025
870e807
fix TestSpill
reusee Sep 23, 2025
3feb7dd
fix cleanup
reusee Sep 23, 2025
305de84
remove tests
reusee Sep 24, 2025
fcc5c22
fixes
reusee Sep 24, 2025
5fd108c
add testspill package
reusee Sep 24, 2025
86ae60e
add testspill package
reusee Sep 24, 2025
283ec50
do not use sonic
reusee Sep 24, 2025
59221db
Revert "do not use sonic"
reusee Sep 24, 2025
3f015ab
update
reusee Sep 25, 2025
306f1c5
testspill: more groups
reusee Sep 25, 2025
e89b3c3
fix
reusee Sep 25, 2025
82bf088
fix error handling
reusee Sep 25, 2025
076d868
add logs for spill
reusee Sep 26, 2025
93f446d
Revert "add logs for spill"
reusee Sep 26, 2025
3e2d1c2
more logs for spill
reusee Sep 26, 2025
fbaf61e
more logs for spill
reusee Sep 26, 2025
6bd9e6b
more logs for spill
reusee Sep 26, 2025
0afe4ff
more logs for spill
reusee Sep 26, 2025
1043ec6
add logs for spill
reusee Sep 26, 2025
11a1a07
fix test
reusee Sep 28, 2025
a5b32de
fix test
reusee Sep 28, 2025
e93565c
enable debug log
reusee Sep 28, 2025
bcd50ce
more logs
reusee Sep 28, 2025
c3b5d6b
fix sql for test
reusee Sep 28, 2025
d3ac292
more spill tests
reusee Sep 28, 2025
feafc25
fix agg state serialize
reusee Sep 30, 2025
c61f5ee
fix agg state deserialize
reusee Sep 30, 2025
46d68c1
use logutil.Info
reusee Sep 30, 2025
c2aabd9
update
reusee Sep 30, 2025
1d48563
clean up logs
reusee Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions pkg/sql/colexec/group/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func (group *Group) Prepare(proc *process.Process) (err error) {
if err = group.prepareGroup(proc); err != nil {
return err
}

if group.SpillManager == nil {
group.SpillManager = NewMemorySpillManager()
}
if group.SpillThreshold <= 0 {
group.SpillThreshold = 256 * 1024 //TODO configurable
}

return group.PrepareProjection(proc)
}

Expand Down Expand Up @@ -188,6 +196,12 @@ func (group *Group) callToGetFinalResult(proc *process.Process) (*batch.Batch, e

for {
if group.ctr.state == vm.Eval {
if len(group.ctr.spilledStates) > 0 {
if err := group.mergeSpilledResults(proc); err != nil {
return nil, err
}
}

if group.ctr.result1.IsEmpty() {
group.ctr.state = vm.End
return nil, nil
Expand All @@ -203,7 +217,7 @@ func (group *Group) callToGetFinalResult(proc *process.Process) (*batch.Batch, e
group.ctr.state = vm.Eval

if group.ctr.isDataSourceEmpty() && len(group.Exprs) == 0 {
if err = group.generateInitialResult1WithoutGroupBy(proc); err != nil {
if err := group.generateInitialResult1WithoutGroupBy(proc); err != nil {
return nil, err
}
group.ctr.result1.ToPopped[0].SetRowCount(1)
Expand All @@ -215,6 +229,16 @@ func (group *Group) callToGetFinalResult(proc *process.Process) (*batch.Batch, e
}

group.ctr.dataSourceIsEmpty = false

// Check if we need to spill before processing this batch
group.updateMemoryUsage(proc)
if group.shouldSpill() {
if err := group.spillPartialResults(proc); err != nil {
return nil, err
}
continue
}

if err = group.consumeBatchToGetFinalResult(proc, res); err != nil {
return nil, err
}
Expand Down Expand Up @@ -245,7 +269,6 @@ func (group *Group) consumeBatchToGetFinalResult(

switch group.ctr.mtyp {
case H0:
// without group by.
if group.ctr.result1.IsEmpty() {
if err := group.generateInitialResult1WithoutGroupBy(proc); err != nil {
return err
Expand All @@ -260,7 +283,6 @@ func (group *Group) consumeBatchToGetFinalResult(
}

default:
// with group by.
if group.ctr.result1.IsEmpty() {
err := group.ctr.hr.BuildHashTable(false, group.ctr.mtyp == HStr, group.ctr.keyNullable, group.PreAllocSize)
if err != nil {
Expand Down Expand Up @@ -315,6 +337,16 @@ func (group *Group) consumeBatchToGetFinalResult(
}
}

// Update memory usage after processing the batch
group.updateMemoryUsage(proc)

// Check if we need to spill after processing this batch
if group.shouldSpill() {
if err := group.spillPartialResults(proc); err != nil {
return err
}
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/group/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (h *hackAggExecToTest) GetOptResult() aggexec.SplitResult {
return nil
}

func (h *hackAggExecToTest) Size() int64 {
return 0
}

func (h *hackAggExecToTest) GroupGrow(more int) error {
h.groupNumber += more
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/group/execctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type GroupResultBuffer struct {
}

func (buf *GroupResultBuffer) IsEmpty() bool {
return cap(buf.ToPopped) == 0
return len(buf.ToPopped) == 0
}

func (buf *GroupResultBuffer) InitOnlyAgg(chunkSize int, aggList []aggexec.AggFuncExec) {
Expand Down
Loading
Loading