Skip to content

Commit 7dc1061

Browse files
committed
refine code
1 parent e988f5d commit 7dc1061

File tree

7 files changed

+101
-101
lines changed

7 files changed

+101
-101
lines changed

src/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
100100
let mut complete = false;
101101

102102
//config-parsing and pre-connection setup
103-
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(args.tcp_port_pool.to_string(), args.tcp6_port_pool.to_string());
104-
let mut udp_port_pool = udp::receiver::UdpPortPool::new(args.udp_port_pool.to_string(), args.udp6_port_pool.to_string());
103+
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(&args.tcp_port_pool, &args.tcp6_port_pool);
104+
let mut udp_port_pool = udp::receiver::UdpPortPool::new(&args.udp_port_pool, &args.udp6_port_pool);
105105

106106
let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
107107

src/protocol/communication.rs

+8-10
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
4242
let serialised_message = serde_json::to_vec(message)?;
4343

4444
log::debug!(
45-
"sending message of length {}, {:?}, to {}...",
45+
"sending message to {}, length {}, {:?}...",
46+
stream.peer_addr()?,
4647
serialised_message.len(),
4748
message,
48-
stream.peer_addr()?
4949
);
5050
let mut output_buffer = vec![0_u8; serialised_message.len() + 2];
5151
output_buffer[..2].copy_from_slice(&(serialised_message.len() as u16).to_be_bytes());
@@ -71,21 +71,19 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
7171
}
7272
}
7373
}
74-
Err(Box::new(simple_error::simple_error!(
75-
"timed out while attempting to send status-message to {}",
76-
stream.peer_addr()?
77-
)))
74+
let err = simple_error::simple_error!("timed out while attempting to send status-message to {}", stream.peer_addr()?);
75+
Err(Box::new(err))
7876
}
7977

8078
/// receives the length-count of a pending message over a client-server communications stream
81-
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
79+
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
8280
stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout");
8381

8482
let mut length_bytes_read = 0;
8583
let mut length_spec: [u8; 2] = [0; 2];
8684
while alive_check() {
8785
//waiting to find out how long the next message is
88-
results_handler()?; //send any outstanding results between cycles
86+
handler()?; //send any outstanding results between cycles
8987

9088
let size = match stream.read(&mut length_spec[length_bytes_read..]) {
9189
Ok(size) => size,
@@ -128,7 +126,7 @@ fn receive_payload(
128126
stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout");
129127

130128
let mut bytes_read = 0;
131-
let mut buffer = vec![0_u8; length.into()];
129+
let mut buffer = vec![0_u8; length as usize];
132130
while alive_check() {
133131
//waiting to receive the payload
134132
results_handler()?; //send any outstanding results between cycles
@@ -156,7 +154,7 @@ fn receive_payload(
156154
if bytes_read == length as usize {
157155
match serde_json::from_slice(&buffer) {
158156
Ok(v) => {
159-
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
157+
log::debug!("received message from {}: {:?}", stream.peer_addr()?, v);
160158
return Ok(v);
161159
}
162160
Err(e) => {

src/protocol/results.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub trait IntervalResult {
5858
fn to_string(&self, bit: bool) -> String;
5959
}
6060

61-
pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send>;
61+
pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send + 'static>;
6262

6363
pub struct ClientDoneResult {
6464
pub stream_idx: u8,
@@ -477,7 +477,7 @@ impl IntervalResult for UdpSendResult {
477477
}
478478
}
479479

480-
pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<Box<dyn IntervalResult>> {
480+
pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<IntervalResultBox> {
481481
match value.get("family") {
482482
Some(f) => match f.as_str() {
483483
Some(family) => match family {

src/server.rs

+49-52
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
use std::io;
2222
use std::net::{Shutdown, SocketAddr};
2323
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
24-
use std::sync::mpsc::channel;
25-
use std::sync::{Arc, Mutex};
24+
use std::sync::{mpsc, Arc, Mutex};
2625
use std::thread;
2726
use std::time::Duration;
2827

@@ -31,7 +30,7 @@ use std::net::{TcpListener, TcpStream};
3130
use crate::args::Args;
3231
use crate::protocol::communication::{receive, send};
3332
use crate::protocol::messaging::{prepare_connect, prepare_connect_ready};
34-
use crate::protocol::results::ServerDoneResult;
33+
use crate::protocol::results::{IntervalResultBox, ServerDoneResult};
3534
use crate::stream::{tcp, udp, TestStream};
3635
use crate::BoxResult;
3736

@@ -56,10 +55,7 @@ fn handle_client(
5655
let mut parallel_streams: Vec<Arc<Mutex<(dyn TestStream + Sync + Send)>>> = Vec::new();
5756
let mut parallel_streams_joinhandles = Vec::new();
5857

59-
let (results_tx, results_rx): (
60-
std::sync::mpsc::Sender<crate::protocol::results::IntervalResultBox>,
61-
std::sync::mpsc::Receiver<crate::protocol::results::IntervalResultBox>,
62-
) = channel();
58+
let (results_tx, results_rx) = mpsc::channel::<IntervalResultBox>();
6359

6460
//a closure used to pass results from stream-handlers to the client-communication stream
6561
let mut forwarding_send_stream = stream.try_clone()?;
@@ -295,12 +291,12 @@ impl Drop for ClientThreadMonitor {
295291
pub fn serve(args: &Args) -> BoxResult<()> {
296292
//config-parsing and pre-connection setup
297293
let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new(
298-
args.tcp_port_pool.to_string(),
299-
args.tcp6_port_pool.to_string(),
294+
&args.tcp_port_pool,
295+
&args.tcp6_port_pool,
300296
)));
301297
let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new(
302-
args.udp_port_pool.to_string(),
303-
args.udp6_port_pool.to_string(),
298+
&args.udp_port_pool,
299+
&args.udp6_port_pool,
304300
)));
305301

306302
let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
@@ -317,53 +313,54 @@ pub fn serve(args: &Args) -> BoxResult<()> {
317313
log::info!("server listening on {}", listener.local_addr()?);
318314

319315
while is_alive() {
320-
match listener.accept() {
321-
Ok((mut stream, address)) => {
322-
log::info!("connection from {}", address);
323-
324-
stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");
325-
326-
#[cfg(unix)]
327-
{
328-
use crate::protocol::communication::KEEPALIVE_DURATION;
329-
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
330-
let raw_socket = socket2::SockRef::from(&stream);
331-
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
332-
}
333-
334-
let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
335-
if client_limit > 0 && client_count > client_limit {
336-
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
337-
stream.shutdown(Shutdown::Both).unwrap_or_default();
338-
CLIENTS.fetch_sub(1, Ordering::Relaxed);
339-
} else {
340-
let c_cam = cpu_affinity_manager.clone();
341-
let c_tcp_port_pool = tcp_port_pool.clone();
342-
let c_udp_port_pool = udp_port_pool.clone();
343-
let thread_builder = thread::Builder::new().name(address.to_string());
344-
thread_builder.spawn(move || {
345-
//ensure the client is accounted-for even if the handler panics
346-
let _client_thread_monitor = ClientThreadMonitor {
347-
client_address: address.to_string(),
348-
};
349-
350-
match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
351-
Ok(_) => (),
352-
Err(e) => log::error!("error in client-handler: {}", e),
353-
}
354-
355-
//in the event of panic, this will happen when the stream is dropped
356-
stream.shutdown(Shutdown::Both).unwrap_or_default();
357-
})?;
358-
}
359-
}
316+
let (mut stream, address) = match listener.accept() {
317+
Ok((stream, address)) => (stream, address),
360318
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
361-
//no pending clients
319+
// no pending clients
362320
thread::sleep(POLL_TIMEOUT);
321+
continue;
363322
}
364323
Err(e) => {
365324
return Err(Box::new(e));
366325
}
326+
};
327+
328+
log::info!("connection from {}", address);
329+
330+
stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");
331+
332+
#[cfg(unix)]
333+
{
334+
use crate::protocol::communication::KEEPALIVE_DURATION;
335+
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
336+
let raw_socket = socket2::SockRef::from(&stream);
337+
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
338+
}
339+
340+
let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
341+
if client_limit > 0 && client_count > client_limit {
342+
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
343+
stream.shutdown(Shutdown::Both).unwrap_or_default();
344+
CLIENTS.fetch_sub(1, Ordering::Relaxed);
345+
} else {
346+
let c_cam = cpu_affinity_manager.clone();
347+
let c_tcp_port_pool = tcp_port_pool.clone();
348+
let c_udp_port_pool = udp_port_pool.clone();
349+
let thread_builder = thread::Builder::new().name(address.to_string());
350+
thread_builder.spawn(move || {
351+
// ensure the client is accounted-for even if the handler panics
352+
let _client_thread_monitor = ClientThreadMonitor {
353+
client_address: address.to_string(),
354+
};
355+
356+
match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
357+
Ok(_) => (),
358+
Err(e) => log::error!("error in client-handler: {}", e),
359+
}
360+
361+
//in the event of panic, this will happen when the stream is dropped
362+
stream.shutdown(Shutdown::Both).unwrap_or_default();
363+
})?;
367364
}
368365
}
369366

src/stream/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
pub mod tcp;
2222
pub mod udp;
2323

24-
use crate::BoxResult;
24+
use crate::{protocol::results::IntervalResultBox, BoxResult};
2525

2626
pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
2727

@@ -30,7 +30,7 @@ pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
3030
/// INTERVAL while gathering data.
3131
pub trait TestStream {
3232
/// gather data; returns None when the test is over
33-
fn run_interval(&mut self) -> Option<BoxResult<crate::protocol::results::IntervalResultBox>>;
33+
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>>;
3434
/// return the port associated with the test-stream; this may vary over the test's lifetime
3535
fn get_port(&self) -> BoxResult<u16>;
3636
/// returns the index of the test, used to match client and server data
@@ -39,7 +39,7 @@ pub trait TestStream {
3939
fn stop(&mut self);
4040
}
4141

42-
fn parse_port_spec(port_spec: String) -> Vec<u16> {
42+
fn parse_port_spec(port_spec: &str) -> Vec<u16> {
4343
let mut ports = Vec::<u16>::new();
4444
if !port_spec.is_empty() {
4545
for range in port_spec.split(',') {

0 commit comments

Comments
 (0)