@@ -26,6 +26,7 @@ use databend_common_base::base::tokio;
26
26
use databend_common_base:: base:: tokio:: sync:: Notify ;
27
27
use databend_common_base:: base:: WatchNotify ;
28
28
use databend_common_exception:: Result ;
29
+ use databend_common_expression:: BlockEntry ;
29
30
use databend_common_expression:: DataBlock ;
30
31
use databend_common_io:: prelude:: FormatSettings ;
31
32
use databend_common_pipeline_transforms:: traits:: DataBlockSpill ;
@@ -65,7 +66,17 @@ impl PageBuilder {
65
66
self . collector . append_block ( block) ;
66
67
}
67
68
68
- fn calculate_take_rows ( & self , block_rows : usize , memory_size : usize ) -> usize {
69
+ fn calculate_take_rows ( & self , block : & DataBlock ) -> usize {
70
+ let block_rows = block. num_rows ( ) ;
71
+ let memory_size = block
72
+ . columns ( )
73
+ . iter ( )
74
+ . map ( |entry| match entry {
75
+ BlockEntry :: Const ( scalar, _, n) => * n * scalar. as_ref ( ) . memory_size ( ) ,
76
+ BlockEntry :: Column ( column) => column. memory_size ( ) ,
77
+ } )
78
+ . sum :: < usize > ( ) ;
79
+
69
80
min (
70
81
self . remain_rows ,
71
82
if memory_size > self . remain_size {
@@ -157,7 +168,7 @@ impl SizedChannelBuffer {
157
168
return Err ( block. location . clone ( ) . unwrap ( ) ) ;
158
169
} ;
159
170
160
- let take_rows = builder. calculate_take_rows ( data. num_rows ( ) , data . memory_size ( ) ) ;
171
+ let take_rows = builder. calculate_take_rows ( & data) ;
161
172
if take_rows < data. num_rows ( ) {
162
173
builder. remain_rows = 0 ;
163
174
builder. collector . append_block ( block. slice ( take_rows) ) ;
@@ -303,7 +314,8 @@ impl<S> SizedChannelReceiver<S>
303
314
where S : DataBlockSpill
304
315
{
305
316
pub fn close ( & self ) {
306
- self . chan . stop_recv ( )
317
+ self . chan . stop_recv ( ) ;
318
+ self . chan . spiller . lock ( ) . unwrap ( ) . take ( ) ; // release session
307
319
}
308
320
309
321
#[ async_backtrace:: framed]
0 commit comments