Skip to content

Commit a22e270

Browse files
flearcForsworns
authored andcommitted
fix reader & searcher
1. fix find_offset_to_start(#146) 2. fix read_metrics_one_file_by_end_time(#145)
1 parent b1107f9 commit a22e270

File tree

3 files changed

+44
-43
lines changed

3 files changed

+44
-43
lines changed

sentinel-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ schemars = { version = "0.8.8", optional = true }
7070
apollo-client = { version = "0.7.5", optional = true }
7171
futures-util = { version = "0.3.29", optional = true }
7272
dirs = "5.0.1"
73+
byteorder = "1.5.0"
7374

7475
[target.'cfg(not(target_arch="wasm32"))'.dependencies]
7576
# cannot add "wasm-bindgen" feature to uuid,

sentinel-core/src/core/log/metric/reader.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -85,38 +85,36 @@ impl DefaultMetricLogReader {
8585
let end_sec = end_ms / 1000;
8686
let file = open_file_and_seek_to(filename, offset)?;
8787

88-
let mut buf_reader = BufReader::new(file);
88+
let buf_reader = BufReader::new(file);
8989
let mut items = Vec::with_capacity(1024);
90-
loop {
91-
let mut line = String::new();
92-
let count = buf_reader.read_line(&mut line)?;
93-
if count == 0 {
94-
return Ok((Vec::new(), true));
95-
}
96-
let item = base::MetricItem::from_string(&line);
97-
match item {
98-
Ok(item) => {
99-
let ts_sec = item.timestamp / 1000;
100-
// current_second should in [begin_sec, end_sec]
101-
if ts_sec < begin_sec || ts_sec > end_sec {
102-
return Ok((items, false));
103-
}
10490

105-
// empty resource name indicates "fetch all"
106-
if resource.is_empty() || resource == &item.resource {
107-
items.push(item);
108-
}
91+
let lines = buf_reader.lines();
10992

110-
if prev_size + items.len() >= MAX_ITEM_AMOUNT {
111-
return Ok((items, false));
112-
}
113-
}
93+
for line in lines {
94+
let line = line?;
95+
let item = match base::MetricItem::from_string(&line) {
96+
Ok(item) => item,
11497
Err(err) => {
115-
logging::error!("DefaultMetricLogReader::read_metrics_one_file_by_end_time: {:?} Failed to convert to MetricItem. Error: {:?}.", line,err);
98+
logging::error!("Failed to convert to MetricItem: {:?}", err);
11699
continue;
117100
}
101+
};
102+
103+
let ts_time = item.timestamp / 1000;
104+
if ts_time < begin_sec || ts_time > end_sec {
105+
return Ok((items, false)); // Outside time range
106+
}
107+
108+
if resource.is_empty() || resource == &item.resource {
109+
items.push(item);
110+
}
111+
112+
if prev_size + items.len() >= MAX_ITEM_AMOUNT {
113+
return Ok((items, false)); // Reached maximum item amount
118114
}
119115
}
116+
117+
Ok((items, true)) // End of file
120118
}
121119
}
122120

sentinel-core/src/core/log/metric/searcher.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use byteorder::{BigEndian, ReadBytesExt};
2+
13
use super::*;
24
use crate::{logging, Error, Result};
35
use std::fs::File;
4-
use std::io::{Read, Seek, SeekFrom};
6+
use std::io::{Cursor, Read, Seek, SeekFrom};
57
use std::sync::Mutex;
68
#[derive(Debug)]
79
pub struct FilePosition {
@@ -139,32 +141,32 @@ impl DefaultMetricSearcher {
139141
filename: &str,
140142
begin_time_ms: u64,
141143
last_pos: SeekFrom,
142-
) -> Result<u32> {
143-
let mut cached_pos = self.cached_pos.lock().unwrap();
144-
cached_pos.idx_filename = "".into();
145-
cached_pos.metric_filename = "".into();
146-
144+
) -> Result<u64> {
147145
let idx_filename = form_metric_idx_filename(filename);
148146
let begin_sec = begin_time_ms / 1000;
149147
let mut file = File::open(&idx_filename)?;
150148

151-
// Set position to the offset recorded in the idx file
152-
cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(last_pos)?);
153-
let mut sec: u64;
154-
loop {
155-
let mut buffer: [u8; 8] = [0; 8];
156-
file.read_exact(&mut buffer)?;
157-
sec = u64::from_be_bytes(buffer);
149+
let mut cached_pos = self.cached_pos.lock().unwrap();
150+
151+
// Seek to the last position
152+
file.seek(last_pos)?;
153+
154+
let mut index_data = Vec::new();
155+
file.read_to_end(&mut index_data)?;
156+
157+
let mut offset = 0;
158+
let mut sec = 0;
159+
160+
let mut reader = Cursor::new(index_data);
161+
while let Ok(sec_be) = ReadBytesExt::read_u64::<BigEndian>(&mut reader) {
162+
sec = sec_be;
163+
let offset_be = ReadBytesExt::read_u64::<BigEndian>(&mut reader)?;
164+
offset = offset_be;
158165
if sec >= begin_sec {
159166
break;
160167
}
161-
let mut buffer: [u8; 4] = [0; 4];
162-
file.read_exact(&mut buffer)?;
163-
cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(SeekFrom::Current(0))?);
164168
}
165-
let mut buffer: [u8; 4] = [0; 4];
166-
file.read_exact(&mut buffer)?;
167-
let offset = u32::from_be_bytes(buffer);
169+
168170
// Cache the idx filename and position
169171
cached_pos.metric_filename = filename.into();
170172
cached_pos.idx_filename = idx_filename.into();

0 commit comments

Comments
 (0)