@@ -3,38 +3,33 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
33use fuso:: {
44 cli,
55 client:: port_forward:: {
6- MuxTransmitterConnector , PortForwarder , Protocol , TransmitterConnector ,
6+ Linker , MuxTransmitterConnector , PortForwarder , Protocol , TransmitterConnector ,
77 } ,
88 config:: {
99 client:: {
10- Config , FinalTarget , Host , ServerAddr , Service , WithBridgeService , WithForwardService ,
10+ Config , FinalTarget , Host , Service , WithBridgeService , WithForwardService ,
1111 WithProxyService ,
1212 } ,
13- Compress , Crypto , Expose , Stateful ,
13+ Expose , Stateful ,
1414 } ,
1515 core:: {
1616 accepter:: AccepterExt ,
17- connector:: {
18- AbstractConnector , Connector , ConnectorWithFn , EncryptedAndCompressedConnector ,
19- MultiConnector ,
20- } ,
17+ connector:: { Connector , EncryptedAndCompressedConnector , MultiConnector } ,
2118 net:: { KcpConnector , TcpListener , TcpStream } ,
2219 protocol:: { AsyncPacketRead , AsyncPacketSend } ,
2320 rpc:: { Decoder , Encoder } ,
2421 stream:: {
25- compress:: CompressedStream ,
2622 fragment:: Fragment ,
2723 handshake:: { Handshake , MuxConfig } ,
2824 UseCompress , UseCrypto ,
2925 } ,
30- transfer:: { AbstractTransmitter , TransmitterExt } ,
26+ transfer:: TransmitterExt ,
3127 AbstractStream ,
3228 } ,
3329 error,
3430 runner:: { FnRunnable , NamedRunnable , Rise , ServiceRunner } ,
3531 runtime:: tokio:: { TokioRuntime , UdpWithTokioRuntime } ,
3632} ;
37- use serde:: de;
3833
3934macro_rules! unsupport_protocol {
4035 ( ) => {
@@ -86,6 +81,7 @@ macro_rules! try_connect {
8681fn main ( ) {
8782 env_logger:: Builder :: new ( )
8883 . filter_level ( log:: LevelFilter :: Debug )
84+ . filter_module ( "kcp_rust" , log:: LevelFilter :: Trace )
8985 . init ( ) ;
9086
9187 match cli:: client:: parse ( ) {
@@ -233,96 +229,10 @@ async fn enter_port_forward_service_main(
233229
234230 loop {
235231 let ( linker, target) = forwarder. accept ( ) . await ?;
236-
237232 tokio:: spawn ( async move {
238- log:: debug!( "target {:?}" , target) ;
239-
240- match target {
241- FinalTarget :: Udp { addr, port } => { }
242- FinalTarget :: Shell { path, args } => {
243- let mut builder = fuso:: pty:: builder ( path) ;
244-
245- builder. args ( & args) ;
246-
247- let pty = builder. build ( ) . unwrap ( ) ;
248-
249- if let Ok ( a) = linker. link ( Protocol :: Tcp ) . await {
250- a. transfer ( pty) . await . unwrap ( ) ;
251- } ;
252- }
253- FinalTarget :: Dynamic => {
254- let transmitter = linker. link ( Protocol :: Tcp ) . await . unwrap ( ) ;
255-
256- let udp = Arc :: new ( tokio:: net:: UdpSocket :: bind ( "0.0.0.0:0" ) . await . unwrap ( ) ) ;
257-
258- let ( writer, mut reader) = transmitter. split ( ) ;
259-
260- loop {
261- let pkt = match reader. recv_packet ( ) . await {
262- Err ( _) => break ,
263- Ok ( data) => data,
264- } ;
265-
266- let pkt: Fragment = pkt. decode ( ) . unwrap ( ) ;
267-
268- let udp = udp. clone ( ) ;
269- let mut writer = writer. clone ( ) ;
270-
271- match pkt {
272- Fragment :: UdpForward {
273- saddr,
274- daddr,
275- dport,
276- data,
277- } => {
278- let target = format ! ( "{}:{dport}" , daddr. to_string( ) ) ;
279- log:: debug!( "udp forward {saddr} -> {target}" ) ;
280- udp. send_to ( & data, target) . await . unwrap ( ) ;
281- let mut buf = [ 0u8 ; 1400 ] ;
282- let ( n, addr) = udp. recv_from ( & mut buf) . await . unwrap ( ) ;
283-
284- writer
285- . send_packet (
286- & Fragment :: Udp ( saddr, buf[ ..n] . to_vec ( ) ) . encode ( ) . unwrap ( ) ,
287- )
288- . await
289- . unwrap ( ) ;
290- }
291- _ => { }
292- }
293- }
294- }
295- FinalTarget :: Tcp { addr, port } => {
296- let result = addr
297- . try_connect ( port, |host, port| async move {
298- match host {
299- Host :: Ip ( ip) => {
300- TcpStream :: connect ( SocketAddr :: new ( * ip, port) ) . await
301- }
302- Host :: Domain ( domain) => {
303- TcpStream :: connect ( format ! ( "{domain}:{port}" ) ) . await
304- }
305- }
306- } )
307- . await ;
308-
309- match result {
310- Ok ( stream) => match linker. link ( Protocol :: Tcp ) . await {
311- Ok ( transmitter) => {
312- transmitter. transfer ( stream) . await ;
313- }
314- Err ( e) => {
315- log:: debug!( "{:?}" , e) ;
316- }
317- } ,
318- Err ( e) => {
319- if let Err ( e) = linker. cancel ( e) . await {
320- log:: warn!( "{:?}" , e) ;
321- } ;
322- }
323- }
324- }
325- }
233+ if let Err ( e) = do_forward ( linker, target) . await {
234+ log:: warn!( "forward: {e:?}" )
235+ } ;
326236 } ) ;
327237 }
328238}
@@ -404,3 +314,80 @@ async fn enter_bridge_service_main(
404314 } ) ;
405315 }
406316}
317+
318+ async fn do_forward ( linker : Linker , target : FinalTarget ) -> error:: Result < ( ) > {
319+ match target {
320+ FinalTarget :: Udp { .. } => { }
321+ FinalTarget :: Shell { path, args } => {
322+ let mut builder = fuso:: pty:: builder ( path) ;
323+
324+ builder. args ( & args) ;
325+
326+ match builder. build ( ) {
327+ Err ( e) => linker. cancel ( e) . await ?,
328+ Ok ( pty) => {
329+ linker. link ( Protocol :: Tcp ) . await ?. transfer ( pty) . await ?;
330+ }
331+ }
332+ }
333+ FinalTarget :: Tcp { addr, port } => {
334+ let result = addr
335+ . try_connect ( port, |host, port| async move {
336+ log:: debug!( "connect to {host}:{port}" ) ;
337+ TcpStream :: connect ( format ! ( "{host}:{port}" ) ) . await
338+ } )
339+ . await ;
340+
341+ match result {
342+ Err ( e) => linker. cancel ( e) . await ?,
343+ Ok ( stream) => {
344+ linker. link ( Protocol :: Kcp ) . await ?. transfer ( stream) . await ?;
345+ }
346+ }
347+ }
348+ FinalTarget :: Dynamic => {
349+ let transmitter = linker. link ( Protocol :: Tcp ) . await ?;
350+
351+ let udp = Arc :: new ( tokio:: net:: UdpSocket :: bind ( "0.0.0.0:0" ) . await ?) ;
352+
353+ let ( writer, mut reader) = transmitter. split ( ) ;
354+
355+ loop {
356+ let pkt = reader. recv_packet ( ) . await ?;
357+
358+ let pkt: Fragment = pkt. decode ( ) ?;
359+
360+ let udp = udp. clone ( ) ;
361+
362+ let mut writer = writer. clone ( ) ;
363+
364+ match pkt {
365+ Fragment :: UdpForward {
366+ saddr,
367+ daddr,
368+ dport,
369+ data,
370+ } => {
371+ let target = format ! ( "{}:{dport}" , daddr. to_string( ) ) ;
372+ log:: debug!( "udp forward {saddr} -> {target}" ) ;
373+
374+ udp. send_to ( & data, target) . await ?;
375+
376+ let mut buf = [ 0u8 ; 1400 ] ;
377+
378+ let _ =
379+ tokio:: time:: timeout ( std:: time:: Duration :: from_secs ( 2 ) , async move {
380+ let ( n, _) = udp. recv_from ( & mut buf) . await . unwrap ( ) ;
381+
382+ let pkt = Fragment :: Udp ( saddr, buf[ ..n] . to_vec ( ) ) . encode ( ) . unwrap ( ) ;
383+ writer. send_packet ( & pkt) . await . unwrap ( ) ;
384+ } )
385+ . await ;
386+ }
387+ _ => { }
388+ }
389+ }
390+ }
391+ }
392+ Ok ( ( ) )
393+ }
0 commit comments