Skip to content

Commit 92b30c9

Browse files
drmingdrmerclaude
andcommitted
feat: migrate state-machine-api from databend workspace to standalone crate
- Add complete source code from databend/src/meta/state-machine-api - Update Cargo.toml to use correct crate name and standalone dependencies - All tests passing (22 tests) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 4100df4 commit 92b30c9

File tree

9 files changed

+1634
-0
lines changed

9 files changed

+1634
-0
lines changed

Cargo.lock

Lines changed: 857 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "state-machine-api"
3+
version = "0.1.0"
4+
authors = ["drmingdrmer <[email protected]>"]
5+
license = "Apache-2.0"
6+
publish = true
7+
edition = "2021"
8+
9+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
10+
11+
[dependencies]
12+
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.2.0" }
13+
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.7" }
14+
serde = { version = "1.0", features = ["derive"] }
15+
16+
[dev-dependencies]
17+
anyhow = "1.0"
18+
serde_json = "1.0"

src/expire_key.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! This mod defines a key space in state machine to store the index of keys with an expiration time.
16+
//!
17+
//! This secondary index is `(expire_time, seq) -> key`, as the key-value's primary index is `key -> (seq, expire_time, value)`.
18+
//! Because `seq` in meta-store is globally unique, it may be used to identify every update to every key.
19+
20+
use std::fmt::Display;
21+
use std::fmt::Formatter;
22+
use std::time::Duration;
23+
24+
use display_more::DisplayUnixTimeStampExt;
25+
use map_api::MapKey;
26+
27+
/// The identifier of the index for kv with expiration.
28+
///
29+
/// Encoding to `exp-/<timestamp>/<primary_key_seq>`.
30+
/// The encoded value is `<ver=1>null<string_key>`. The `null` is json string for empty meta
31+
#[derive(
32+
Default,
33+
Debug,
34+
Clone,
35+
Copy,
36+
serde::Serialize,
37+
serde::Deserialize,
38+
PartialEq,
39+
Eq,
40+
PartialOrd,
41+
Ord,
42+
)]
43+
pub struct ExpireKey {
44+
/// The time in millisecond when a key will be expired.
45+
pub time_ms: u64,
46+
47+
/// The `seq` of the value when the key is written.
48+
///
49+
/// The `seq` of value is globally unique in meta-store.
50+
pub seq: u64,
51+
}
52+
53+
impl MapKey for ExpireKey {
54+
type V = String;
55+
}
56+
57+
impl ExpireKey {
58+
pub fn new(time_ms: u64, seq: u64) -> Self {
59+
Self { time_ms, seq }
60+
}
61+
62+
/// Return true if the provided time in millisecond is expired.
63+
///
64+
/// NOTE that `time_ms` equal to `self.time_ms` is not considered expired.
65+
pub fn is_expired(&self, time_ms: u64) -> bool {
66+
time_ms > self.time_ms
67+
}
68+
}
69+
70+
impl Display for ExpireKey {
71+
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
72+
write!(
73+
f,
74+
"{}={}",
75+
Duration::from_millis(self.time_ms).display_unix_timestamp_short(),
76+
self.seq
77+
)
78+
}
79+
}
80+
81+
#[cfg(test)]
82+
mod tests {
83+
84+
use super::*;
85+
86+
#[test]
87+
fn test_expire_key_display() -> anyhow::Result<()> {
88+
let ms = 1666670258202;
89+
let k = ExpireKey::new(ms, 1000);
90+
assert_eq!("2022-10-25T03:57:38.202=1000", format!("{}", k));
91+
92+
Ok(())
93+
}
94+
}

src/expire_value.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use map_api::Marked;
16+
use map_api::SeqMarked;
17+
18+
/// The value of an expiration index is the record key.
19+
#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
20+
pub struct ExpireValue {
21+
#[serde(skip_serializing_if = "is_zero")]
22+
#[serde(default)]
23+
pub seq: u64,
24+
pub key: String,
25+
}
26+
27+
fn is_zero(v: &u64) -> bool {
28+
*v == 0
29+
}
30+
31+
impl ExpireValue {
32+
pub fn new(key: impl ToString, seq: u64) -> Self {
33+
Self {
34+
key: key.to_string(),
35+
seq,
36+
}
37+
}
38+
/// Convert internally used expire-index value `Marked<String>` to externally used type `ExpireValue`.
39+
///
40+
/// `Marked<String>` is the value of an expire-index in the state machine.
41+
/// `ExpireValue.seq` equals to the seq of the str-map record,
42+
/// i.e., when an expire-index is inserted, the seq does not increase.
43+
pub fn from_marked(seq_marked: SeqMarked<String>) -> Option<Self> {
44+
let (seq, mm) = seq_marked.into_parts();
45+
46+
match mm {
47+
Marked::TombStone => None,
48+
Marked::Normal(s) => Some(ExpireValue::new(s, seq)),
49+
}
50+
}
51+
}
52+
53+
impl From<ExpireValue> for SeqMarked<String> {
54+
fn from(value: ExpireValue) -> Self {
55+
SeqMarked::new_normal(value.seq, value.key)
56+
}
57+
}
58+
59+
#[cfg(test)]
60+
mod tests {
61+
use super::*;
62+
63+
#[test]
64+
fn test_expire_value_serde() -> anyhow::Result<()> {
65+
{
66+
let v = ExpireValue {
67+
seq: 0,
68+
key: "a".to_string(),
69+
};
70+
let s = serde_json::to_string(&v)?;
71+
let want = r#"{"key":"a"}"#;
72+
assert_eq!(want, s);
73+
74+
let got = serde_json::from_str::<ExpireValue>(want)?;
75+
assert_eq!(v, got);
76+
}
77+
78+
{
79+
let v = ExpireValue {
80+
seq: 5,
81+
key: "a".to_string(),
82+
};
83+
let s = serde_json::to_string(&v)?;
84+
let want = r#"{"seq":5,"key":"a"}"#;
85+
assert_eq!(want, s);
86+
87+
let got = serde_json::from_str::<ExpireValue>(want)?;
88+
assert_eq!(v, got);
89+
}
90+
91+
Ok(())
92+
}
93+
94+
#[test]
95+
fn test_from_seq_marked() {
96+
// Test normal case - should return Some(ExpireValue)
97+
{
98+
let seq = 42;
99+
let key = "test_key".to_string();
100+
let marked = Marked::Normal(key.clone());
101+
let seq_marked = SeqMarked::new(seq, marked);
102+
103+
let result = ExpireValue::from_marked(seq_marked);
104+
105+
assert!(result.is_some());
106+
let expire_value = result.unwrap();
107+
assert_eq!(expire_value.seq, seq);
108+
assert_eq!(expire_value.key, key);
109+
}
110+
111+
// Test tombstone case - should return None
112+
{
113+
let seq = 100;
114+
let marked = Marked::TombStone;
115+
let seq_marked = SeqMarked::new(seq, marked);
116+
117+
let result = ExpireValue::from_marked(seq_marked);
118+
119+
assert!(result.is_none());
120+
}
121+
122+
// Test with zero seq
123+
{
124+
let seq = 0;
125+
let key = "zero_seq_key".to_string();
126+
let marked = Marked::Normal(key.clone());
127+
let seq_marked = SeqMarked::new(seq, marked);
128+
129+
let result = ExpireValue::from_marked(seq_marked);
130+
131+
assert!(result.is_some());
132+
let expire_value = result.unwrap();
133+
assert_eq!(expire_value.seq, seq);
134+
assert_eq!(expire_value.key, key);
135+
}
136+
}
137+
138+
// Test From<ExpireValue> for Marked<String>
139+
#[test]
140+
fn test_from_expire_value_for_marked() -> anyhow::Result<()> {
141+
let m = SeqMarked::new_normal(1, "2".to_string());
142+
let s = ExpireValue::new("2", 1);
143+
assert_eq!(m, s.into());
144+
145+
Ok(())
146+
}
147+
148+
// Test From<Marked<String>> for Option<ExpireValue>
149+
#[test]
150+
fn test_from_marked_for_option_expire_value() -> anyhow::Result<()> {
151+
let m = SeqMarked::new_normal(1, "2".to_string());
152+
let s: Option<ExpireValue> = Some(ExpireValue::new("2".to_string(), 1));
153+
assert_eq!(s, ExpireValue::from_marked(m));
154+
155+
let m = SeqMarked::new_tombstone(1);
156+
let s: Option<ExpireValue> = None;
157+
assert_eq!(s, ExpireValue::from_marked(m));
158+
159+
Ok(())
160+
}
161+
}

0 commit comments

Comments
 (0)