Skip to content

Commit 38f7b36

Browse files
committed
feat: add transport tag
1 parent 24e4d9d commit 38f7b36

File tree

3 files changed

+57
-5
lines changed

3 files changed

+57
-5
lines changed

volo-thrift/src/transport/pingpong/client.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ use std::{io, marker::PhantomData};
22

33
use motore::service::{Service, UnaryService};
44
use pilota::thrift::TransportException;
5-
use volo::net::{conn::ConnExt, dial::MakeTransport, Address};
5+
use volo::{
6+
net::{conn::ConnExt, dial::MakeTransport, shm::TransportEndpoint, Address},
7+
FastStr,
8+
};
69

710
use crate::{
811
codec::MakeCodec,
912
context::ClientContext,
1013
protocol::TMessageType,
1114
transport::{
1215
pingpong::thrift_transport::ThriftTransport,
13-
pool::{Config, PooledMakeTransport, Ver},
16+
pool::{Config, PooledMakeTransport, Transport, Ver},
1417
},
1518
EntryMessage, ThriftMessage,
1619
};
@@ -128,6 +131,11 @@ where
128131
.call((target, shmipc_target.clone(), Ver::PingPong))
129132
.await?;
130133
cx.stats.record_make_transport_end_at();
134+
if let Transport::UnPooled(_) = transport {
135+
cx.rpc_info
136+
.caller_mut()
137+
.set_transport(volo::net::shm::Transport(FastStr::new("shmipc")))
138+
}
131139
let resp = transport.send(cx, req, oneway).await;
132140
if let Ok(None) = resp {
133141
if !oneway {

volo-thrift/src/transport/pingpong/server.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ use pilota::thrift::ThriftException;
99
use tokio::sync::futures::Notified;
1010
use tracing::*;
1111
use volo::{
12-
net::{shm::ShmExt, Address},
13-
volo_unreachable,
12+
net::{
13+
shm::{ShmExt, TransportEndpoint},
14+
Address,
15+
},
16+
volo_unreachable, FastStr,
1417
};
1518

1619
use crate::{
@@ -53,7 +56,16 @@ pub async fn serve<Svc, Req, Resp, E, D, SP>(
5356
cache.pop().unwrap_or_default()
5457
});
5558
if let Some(peer_addr) = &peer_addr {
56-
cx.rpc_info.caller_mut().set_address(peer_addr.clone());
59+
if stream.is_none() {
60+
cx.rpc_info.caller_mut().set_address(peer_addr.clone());
61+
} else {
62+
cx.rpc_info
63+
.caller_mut()
64+
.set_transport(volo::net::shm::Transport(FastStr::new("shmipc")));
65+
cx.rpc_info
66+
.caller_mut()
67+
.set_shmipc_address(peer_addr.clone());
68+
}
5769
}
5870

5971
let msg = tokio::select! {

volo/src/net/shm.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,35 @@
1+
use crate::context::Endpoint;
2+
3+
crate::new_type! {
4+
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
5+
pub struct Transport(pub faststr::FastStr);
6+
}
7+
8+
pub trait TransportEndpoint {
9+
fn get_transport(&self) -> Option<Transport>;
10+
fn has_transport(&self) -> bool;
11+
fn set_transport(&mut self, transport: Transport);
12+
}
13+
14+
impl TransportEndpoint for Endpoint {
15+
#[inline]
16+
fn get_transport(&self) -> Option<Transport> {
17+
self.get_faststr::<Transport>()
18+
.cloned()
19+
.map(Transport::from)
20+
}
21+
22+
#[inline]
23+
fn has_transport(&self) -> bool {
24+
self.contains_faststr::<Transport>()
25+
}
26+
27+
#[inline]
28+
fn set_transport(&mut self, transport: Transport) {
29+
self.insert_faststr::<Transport>(transport.0);
30+
}
31+
}
32+
133
#[async_trait::async_trait]
234
pub trait ShmExt: Send + Sync {
335
fn release_read_and_reuse(&self) {}

0 commit comments

Comments
 (0)