Skip to content

Commit d8a226a

Browse files
committed
fix
1 parent 00df00b commit d8a226a

File tree

5 files changed

+28
-17
lines changed

5 files changed

+28
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ databend-storages-common-stage = { workspace = true }
117117
databend-storages-common-table-meta = { workspace = true }
118118
derive-visitor = { workspace = true }
119119
dyn-clone = { workspace = true }
120+
either = { workspace = true }
120121
enum-as-inner = { workspace = true }
121122
ethnum = { workspace = true }
122123
fastrace = { workspace = true }

src/query/service/src/servers/http/v1/query/sized_spsc.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_expression::DataBlock;
3131
use databend_common_io::prelude::FormatSettings;
3232
use databend_common_pipeline_transforms::traits::DataBlockSpill;
3333
use databend_common_pipeline_transforms::traits::Location;
34+
use either::Either;
3435
use log::debug;
3536
use log::info;
3637

@@ -160,12 +161,12 @@ impl SizedChannelBuffer {
160161
self.is_recv_stopped = true
161162
}
162163

163-
fn take_block_once(&mut self, builder: &mut PageBuilder) -> result::Result<(), Location> {
164+
fn take_block_once(&mut self, builder: &mut PageBuilder) -> Either<(), Location> {
164165
let Some(block) = self.values.front_mut() else {
165-
return Ok(());
166+
return Either::Left(());
166167
};
167168
let Some(data) = &block.data else {
168-
return Err(block.location.clone().unwrap());
169+
return Either::Right(block.location.clone().unwrap());
169170
};
170171

171172
let take_rows = builder.calculate_take_rows(data);
@@ -177,14 +178,16 @@ impl SizedChannelBuffer {
177178
self.values.pop_front();
178179
builder.append_full_block(data);
179180
}
180-
Ok(())
181+
Either::Left(())
181182
}
182183

183-
fn take_block(&mut self, builder: &mut PageBuilder) -> result::Result<bool, Location> {
184+
fn take_block(&mut self, builder: &mut PageBuilder) -> Either<bool, Location> {
184185
while builder.has_capacity() && !self.values.is_empty() {
185-
self.take_block_once(builder)?;
186+
if let Either::Right(location) = self.take_block_once(builder) {
187+
return Either::Right(location);
188+
}
186189
}
187-
Ok(!self.values.is_empty())
190+
Either::Left(!self.values.is_empty())
188191
}
189192

190193
fn restore_first(&mut self, location: &Location, data: DataBlock) {
@@ -219,8 +222,8 @@ where S: DataBlockSpill
219222
#[fastrace::trace(name = "SizedChannel::try_take_block")]
220223
async fn try_take_block(&self, builder: &mut PageBuilder) -> Result<bool> {
221224
let location = match self.inner.lock().unwrap().take_block(builder) {
222-
Err(location) => location.clone(),
223-
Ok(has_more) => {
225+
Either::Right(location) => location,
226+
Either::Left(has_more) => {
224227
return Ok(has_more);
225228
}
226229
};

src/query/service/src/test_kits/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ ADDRESS = 'https://databend.com';"
171171
self
172172
}
173173

174+
pub fn off_log(mut self) -> ConfigBuilder {
175+
self.conf.log.file.on = false;
176+
self
177+
}
178+
174179
pub fn build(self) -> InnerConfig {
175180
self.conf
176181
}

src/query/service/tests/it/servers/http/http_query_handlers.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -524,9 +524,9 @@ async fn test_client_compatible_query_id() -> Result<()> {
524524

525525
#[tokio::test(flavor = "current_thread")]
526526
async fn test_active_sessions() -> Result<()> {
527-
let max_active_sessions = 2;
528527
let conf = ConfigBuilder::create()
529-
.max_active_sessions(max_active_sessions)
528+
.max_active_sessions(2)
529+
.off_log()
530530
.build();
531531
let _fixture = TestFixture::setup_with_config(&conf).await?;
532532
let ep = create_endpoint()?;
@@ -542,6 +542,7 @@ async fn test_active_sessions() -> Result<()> {
542542
.into_iter()
543543
.map(|(_status, resp)| (resp.error.map(|e| e.message).unwrap_or_default()))
544544
.collect::<Vec<_>>();
545+
_fixture.keep_alive();
545546
results.sort();
546547
let msg = "[HTTP-QUERY] Failed to upgrade session: Current active sessions (2) has exceeded the max_active_sessions limit (2)";
547548
let expect = vec!["", "", msg];
@@ -1764,12 +1765,12 @@ async fn test_max_size_per_page() -> Result<()> {
17641765
let json = serde_json::json!({"sql": sql.to_string(), "pagination": {"wait_time_secs": wait_time_secs}});
17651766
let (_, reply, body) = TestHttpQueryRequest::new(json).fetch_begin().await?;
17661767
assert!(reply.error.is_none(), "{:?}", reply.error);
1767-
let len = body.len() as i32;
1768-
let target = 20_080_000;
1769-
assert!(len > target);
1770-
assert!(len < target + 2000);
1771-
assert_eq!(reply.data.len(), 10_000);
1772-
assert_eq!(reply.data[0].len(), 2);
1768+
let target = (10_usize * 1024 * 1024) as f64;
1769+
assert!(
1770+
(0.9..1.1).contains(&(body.len() as f64 / target)),
1771+
"body len {}",
1772+
body.len()
1773+
);
17731774
Ok(())
17741775
}
17751776

0 commit comments

Comments
 (0)