Skip to content

Commit 5ba34d9

Browse files
committed
PortableSpiller
1 parent d8a226a commit 5ba34d9

File tree

4 files changed

+178
-79
lines changed

4 files changed

+178
-79
lines changed

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ use crate::sessions::QueryAffect;
5050
use crate::sessions::QueryContext;
5151
use crate::sessions::Session;
5252
use crate::sessions::TableContext;
53-
use crate::spillers::Spiller;
53+
use crate::spillers::PortableSpiller;
54+
use crate::spillers::PortableSpillerRef;
5455
use crate::spillers::SpillerConfig;
5556
use crate::spillers::SpillerDiskConfig;
56-
use crate::spillers::SpillerRef;
5757
use crate::spillers::SpillerType;
5858

59+
type Sender = SizedChannelSender<PortableSpillerRef>;
60+
5961
pub struct ExecutionError;
6062

6163
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
@@ -120,7 +122,7 @@ impl ExecuteState {
120122

121123
pub struct ExecuteStarting {
122124
pub(crate) ctx: Arc<QueryContext>,
123-
pub(crate) sender: SizedChannelSender<SpillerRef>,
125+
pub(crate) sender: Sender,
124126
}
125127

126128
pub struct ExecuteRunning {
@@ -361,7 +363,7 @@ impl ExecuteState {
361363
sql: String,
362364
session: Arc<Session>,
363365
ctx: Arc<QueryContext>,
364-
mut block_sender: SizedChannelSender<SpillerRef>,
366+
mut block_sender: Sender,
365367
) -> Result<(), ExecutionError> {
366368
let make_error = || format!("failed to start query: {sql}");
367369

@@ -426,7 +428,7 @@ impl ExecuteState {
426428
interpreter: Arc<dyn Interpreter>,
427429
schema: DataSchemaRef,
428430
ctx: Arc<QueryContext>,
429-
mut block_sender: SizedChannelSender<SpillerRef>,
431+
mut block_sender: Sender,
430432
executor: Arc<Mutex<Executor>>,
431433
) -> Result<(), ExecutionError> {
432434
let make_error = || format!("failed to execute {}", interpreter.name());
@@ -471,7 +473,7 @@ impl ExecuteState {
471473
}
472474

473475
async fn send_data_block(
474-
sender: &mut SizedChannelSender<SpillerRef>,
476+
sender: &mut Sender,
475477
executor: &Arc<Mutex<Executor>>,
476478
block: DataBlock,
477479
) -> Result<()> {
@@ -484,10 +486,7 @@ impl ExecuteState {
484486
}
485487
}
486488

487-
fn apply_settings(
488-
ctx: &Arc<QueryContext>,
489-
block_sender: &mut SizedChannelSender<SpillerRef>,
490-
) -> Result<()> {
489+
fn apply_settings(ctx: &Arc<QueryContext>, block_sender: &mut Sender) -> Result<()> {
491490
let settings = ctx.get_settings();
492491

493492
let spiller = if settings.get_enable_result_set_spilling()? {
@@ -507,7 +506,7 @@ impl ExecuteState {
507506
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
508507
};
509508
let op = DataOperator::instance().spill_operator();
510-
Some(Spiller::create(ctx.clone(), op, config)?.into())
509+
Some(PortableSpiller::create(op, config)?)
511510
} else {
512511
None
513512
};

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ use crate::servers::http::v1::QueryStats;
7474
use crate::sessions::QueryAffect;
7575
use crate::sessions::Session;
7676
use crate::sessions::TableContext;
77-
use crate::spillers::SpillerRef;
77+
use crate::spillers::PortableSpillerRef;
7878

7979
fn default_as_true() -> bool {
8080
true
@@ -612,7 +612,8 @@ impl HttpQuery {
612612
})
613613
};
614614

615-
let (sender, receiver) = sized_spsc::<SpillerRef>(req.pagination.max_rows_in_buffer);
615+
let (sender, receiver) =
616+
sized_spsc::<PortableSpillerRef>(req.pagination.max_rows_in_buffer);
616617

617618
let executor = Arc::new(Mutex::new(Executor {
618619
query_id: query_id.clone(),

src/query/service/src/servers/http/v1/query/page_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_common_exception::Result;
2020
use super::blocks_serializer::BlocksSerializer;
2121
use crate::servers::http::v1::query::sized_spsc::SizedChannelReceiver;
2222
use crate::servers::http::v1::query::sized_spsc::Wait;
23-
use crate::spillers::SpillerRef;
23+
use crate::spillers::PortableSpillerRef;
2424

2525
#[derive(Clone)]
2626
pub struct Page {
@@ -38,13 +38,13 @@ pub struct PageManager {
3838
total_pages: usize,
3939
end: bool,
4040
last_page: Option<Page>,
41-
block_receiver: SizedChannelReceiver<SpillerRef>,
41+
block_receiver: SizedChannelReceiver<PortableSpillerRef>,
4242
}
4343

4444
impl PageManager {
4545
pub fn new(
4646
max_rows_per_page: usize,
47-
block_receiver: SizedChannelReceiver<SpillerRef>,
47+
block_receiver: SizedChannelReceiver<PortableSpillerRef>,
4848
) -> PageManager {
4949
PageManager {
5050
total_rows: 0,

0 commit comments

Comments
 (0)