Skip to content

Commit 0ab411f

Browse files
committed
feat: add load_blance on volo
1 parent 83f989c commit 0ab411f

File tree

15 files changed

+1864
-62
lines changed

15 files changed

+1864
-62
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/Cargo.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ path = "src/grpc/compression/client.rs"
4747
[[bin]]
4848
name = "multiplex-grpc-server"
4949
path = "src/grpc/multiplex/server.rs"
50+
51+
# load balance example
52+
[[bin]]
53+
name = "loadbalance-example"
54+
path = "src/loadbalance_example.rs"
55+
5056
[[bin]]
5157
name = "multiplex-grpc-client"
5258
path = "src/grpc/multiplex/client.rs"
@@ -145,11 +151,11 @@ tokio = { workspace = true, features = ["full"] }
145151
tokio-stream.workspace = true
146152
tracing.workspace = true
147153
tracing-subscriber.workspace = true
148-
154+
volo = { path = "../volo" }
155+
async-broadcast.workspace = true
149156
pilota.workspace = true
150157
pilota-thrift-fieldmask.workspace = true
151158
pilota-thrift-reflect.workspace = true
152-
volo = { path = "../volo" }
153159
volo-grpc = { path = "../volo-grpc", features = ["grpc-web"] }
154160
volo-thrift = { path = "../volo-thrift", features = ["multiplex"] }
155161
volo-http = { path = "../volo-http", features = [
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use std::collections::HashMap;
2+
use std::net::SocketAddr;
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
use tokio::time::sleep;
6+
7+
use async_broadcast::Receiver;
8+
use volo::{
9+
context::Endpoint,
10+
discovery::{Change, Discover, Instance},
11+
loadbalance::{
12+
LoadBalance,
13+
error::LoadBalanceError,
14+
least_conn::LeastConnectionBalance,
15+
p2c::P2c,
16+
random::WeightedRandomBalance,
17+
response_time_weighted::ResponseTimeWeightedBalance,
18+
round_robin::{RoundRobinBalance, WeightedRoundRobinBalance},
19+
},
20+
net::Address,
21+
};
22+
23+
// Mock service discovery implementation
24+
struct MockDiscover {
25+
instances: Vec<Arc<Instance>>,
26+
}
27+
28+
impl MockDiscover {
29+
fn new(instances: Vec<Arc<Instance>>) -> Self {
30+
Self { instances }
31+
}
32+
}
33+
34+
impl Discover for MockDiscover {
35+
type Key = String;
36+
type Error = LoadBalanceError;
37+
38+
fn key(&self, _: &Endpoint) -> Self::Key {
39+
String::from("mock-discover")
40+
}
41+
42+
async fn discover(&self, _: &Endpoint) -> Result<Vec<Arc<Instance>>, Self::Error> {
43+
Ok(self.instances.clone())
44+
}
45+
46+
fn watch(&self, _: Option<&[String]>) -> Option<Receiver<Change<String>>> {
47+
None
48+
}
49+
}
50+
51+
#[tokio::main]
52+
async fn main() {
53+
// Create some test instances
54+
let instances = vec![
55+
Arc::new(Instance {
56+
address: Address::from(SocketAddr::from(([127, 0, 0, 1], 8080))),
57+
weight: 100,
58+
tags: HashMap::new(),
59+
}),
60+
Arc::new(Instance {
61+
address: Address::from(SocketAddr::from(([127, 0, 0, 1], 8081))),
62+
weight: 200,
63+
tags: HashMap::new(),
64+
}),
65+
Arc::new(Instance {
66+
address: Address::from(SocketAddr::from(([127, 0, 0, 1], 8082))),
67+
weight: 300,
68+
tags: HashMap::new(),
69+
}),
70+
];
71+
72+
let discover = MockDiscover::new(instances);
73+
let endpoint = Endpoint::new("test-service".into());
74+
75+
// Example 1: Round Robin Load Balancer
76+
println!("\n=== Round Robin Load Balancer ===");
77+
let round_robin = RoundRobinBalance::new();
78+
demonstrate_lb(&round_robin, &discover, &endpoint).await;
79+
80+
// Example 2: Weighted Round Robin Load Balancer
81+
println!("\n=== Weighted Round Robin Load Balancer ===");
82+
let weighted_round_robin = WeightedRoundRobinBalance::new(vec![]);
83+
demonstrate_lb(&weighted_round_robin, &discover, &endpoint).await;
84+
85+
// Example 3: P2C (Power of Two Choices) Load Balancer
86+
println!("\n=== P2C Load Balancer ===");
87+
let p2c = P2c::default();
88+
demonstrate_lb(&p2c, &discover, &endpoint).await;
89+
90+
// Example 4: Weighted Random Load Balancer
91+
println!("\n=== Weighted Random Load Balancer ===");
92+
let random = WeightedRandomBalance::new();
93+
demonstrate_lb(&random, &discover, &endpoint).await;
94+
95+
// Example 5: Least Connection Load Balancer
96+
println!("\n=== Least Connection Load Balancer ===");
97+
let least_conn = LeastConnectionBalance::new();
98+
demonstrate_lb(&least_conn, &discover, &endpoint).await;
99+
100+
// Example 6: Response Time Weighted Load Balancer
101+
println!("\n=== Response Time Weighted Load Balancer ===");
102+
let response_time = ResponseTimeWeightedBalance::new(100); // window size in items
103+
demonstrate_lb(&response_time, &discover, &endpoint).await;
104+
}
105+
106+
async fn demonstrate_lb<L, D>(lb: &L, discover: &D, endpoint: &Endpoint)
107+
where
108+
L: LoadBalance<D>,
109+
D: Discover,
110+
{
111+
match lb.get_picker(endpoint, discover).await {
112+
Ok(mut picker) => {
113+
// Demonstrate 10 picks
114+
for i in 0..10 {
115+
if let Some(addr) = picker.next() {
116+
println!("Pick {}: Selected instance: {}", i + 1, addr);
117+
}
118+
sleep(Duration::from_millis(100)).await;
119+
}
120+
}
121+
Err(e) => println!("Failed to get picker: {e:?}"),
122+
}
123+
}

volo/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ webpki-roots = { workspace = true, optional = true }
5858
tokio-rustls = { workspace = true, optional = true }
5959
native-tls = { workspace = true, optional = true }
6060
tokio-native-tls = { workspace = true, optional = true }
61+
parking_lot.workspace = true
62+
async-trait = "0.1.89"
6163

6264
[features]
6365
default = []

0 commit comments

Comments
 (0)