Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ int new_listen_socket2(int &fd, address_t &addr) {
setnonblocking(fd);
set_buf_size(fd, socket_buf_size);

mylog(log_debug, "local_listen_fd=%d\n", fd);
mylog(log_debug, "[%s]local_listen_fd=%d\n", addr.get_str(), fd);

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const int default_mtu = 1250;
// const u32_t timer_interval=400;
////const u32_t conv_timeout=180000;
// const u32_t conv_timeout=40000;//for test
const u32_t conv_timeout = 180000;
// const u32_t conv_timeout = 180000;
const int max_conv_num = 10000;
const int max_conn_num = 200;

Expand Down Expand Up @@ -143,7 +143,7 @@ const u32_t client_conn_timeout = 10000;
const u32_t client_conn_uplink_timeout = client_conn_timeout + 2000;

// const uint32_t server_conn_timeout=conv_timeout+60000;//this should be 60s+ longer than conv_timeout,so that conv_manager can destruct convs gradually,to avoid latency glicth
const u32_t server_conn_timeout = conv_timeout + 20000; // for test
// const u32_t server_conn_timeout = conv_timeout + 20000; // for test

extern int about_to_exit;

Expand Down
8 changes: 7 additions & 1 deletion connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
const int disable_conn_clear = 0; // a raw connection is called conn.

int report_interval = 0;
u32_t server_conn_timeout_s = 60; // default connection timeout in seconds
u32_t conv_timeout_s = 30; // default conversation timeout in seconds

void server_clear_function(u64_t u64) // used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
// so we have to close the fd when conv expires
Expand Down Expand Up @@ -114,7 +116,7 @@ int conn_manager_t::clear_inactive0() {
if (it->second->conv_manager.s.get_size() > 0) {
// mylog(log_info,"[%s:%d]size %d \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first),(int)it->second->conv_manager.get_size());
it++;
} else if (current_time < it->second->last_active_time + server_conn_timeout) {
} else if (current_time < it->second->last_active_time + server_conn_timeout_s * 1000) {
it++;
} else {
address_t tmp_addr = it->first; // avoid making get_str() const;
Expand All @@ -128,3 +130,7 @@ int conn_manager_t::clear_inactive0() {
clear_it = it;
return 0;
}

bool conn_manager_t::has_active_connections() {
return !mp.empty();
}
5 changes: 4 additions & 1 deletion connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#define CONNECTION_H_

extern int disable_anti_replay;
extern unsigned int server_conn_timeout_s;
extern unsigned int conv_timeout_s;

#include "connection.h"
#include "common.h"
Expand Down Expand Up @@ -146,7 +148,7 @@ struct conv_manager_t // manage the udp connections
u32_t conv;
my_time_t ts = lru.peek_back(conv);

if (current_time - ts < conv_timeout) break;
if (current_time - ts < conv_timeout_s * 1000) break;

erase_conv(conv);
if (info == 0) {
Expand Down Expand Up @@ -318,6 +320,7 @@ struct conn_manager_t // manager for connections. for client,we dont need conn_
int erase(unordered_map<address_t, conn_info_t *>::iterator erase_it);
int clear_inactive();
int clear_inactive0();
bool has_active_connections();
};

extern conn_manager_t conn_manager;
Expand Down
9 changes: 9 additions & 0 deletions examples/udp_speeder.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Unit]
Description=UDP Speeder Service
After=network.target

[Service]
ExecStart=/home/mkarpets/repos/UDPspeeder/speederv2 -s -l0.0.0.0:4096 -r127.0.0.1:7777 -f20:10 --log-level 4 --conv-timeout 10 --conn-timeout 20 --shutdown

[Install]
WantedBy=multi-user.target
9 changes: 9 additions & 0 deletions examples/udp_speeder.socket
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Unit]
Description=UDP Speeder Socket

[Socket]
ListenDatagram=0.0.0.0:4096
FreeBind=true

[Install]
WantedBy=sockets.target
3 changes: 3 additions & 0 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ static void print_help() {
printf(" --disable-obscure <number> disable obscure, to save a bit bandwidth and cpu\n");
printf(" --disable-checksum <number> disable checksum to save a bit bandwdith and cpu\n");
// printf(" --disable-xor <number> disable xor\n");
printf(" --conn-timeout <number> connection timeout in seconds, default: 60\n");
printf(" --conv-timeout <number> conversation timeout in seconds, default: 30\n");
printf(" --shutdown shut down after all connections are disconnected\n");

printf("developer options:\n");
printf(" --fifo <string> use a fifo(named pipe) for sending commands to the running program, so that you\n");
Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export STAGING_DIR=/tmp/ #just for supress warning of staging_dir not define
# targets for nativei (non-cross) compile
all:git_version
rm -f ${NAME}
${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -ggdb -static -O2
${cc_local} -DSYSTEMD_SOCKET_ACTIVATION -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -lsystemd -ggdb -O2

freebsd:git_version
rm -f ${NAME}
Expand Down
15 changes: 15 additions & 0 deletions misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ int mssfix = default_mtu;
int manual_set_tun = 0;
int persist_tun = 0;

bool shutdown_if_all_disconnected = false;

char rs_par_str[rs_str_len] = "20:10";

int from_normal_to_fec(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay) {
Expand Down Expand Up @@ -225,6 +227,7 @@ int print_parameter() {
jitter_min / 1000, jitter_max / 1000, output_interval_min / 1000, output_interval_max / 1000, g_fec_par.timeout / 1000, g_fec_par.mtu, g_fec_par.queue_len, g_fec_par.mode);
mylog(log_info, "fec_str=%s\n", rs_par_str);
mylog(log_info, "fec_inner_parameter=%s\n", g_fec_par.rs_to_str());
mylog(log_info, "conv_timeout=%d conn_timeout=%d\n", conv_timeout_s, server_conn_timeout_s);
return 0;
}
int handle_command(char *s) {
Expand Down Expand Up @@ -582,6 +585,9 @@ void process_arg(int argc, char *argv[]) {
{"persist-tun", no_argument, 0, 1},
{"manual-set-tun", no_argument, 0, 1},
{"interval", required_argument, 0, 'i'},
{"conn-timeout", required_argument, 0, 1},
{"conv-timeout", required_argument, 0, 1},
{"shutdown", no_argument, 0, 1},
{NULL, 0, 0, 0}};
int option_index = 0;
assert(g_fec_par.rs_from_str(rs_par_str) == 0);
Expand Down Expand Up @@ -842,6 +848,15 @@ void process_arg(int argc, char *argv[]) {
} else if (strcmp(long_options[option_index].name, "mssfix") == 0) {
sscanf(optarg, "%d", &mssfix);
mylog(log_warn, "mssfix=%d\n", mssfix);
} else if (strcmp(long_options[option_index].name, "conn-timeout") == 0) {
sscanf(optarg, "%u", &server_conn_timeout_s);
mylog(log_warn, "conn_timeout=%d\n", server_conn_timeout_s);
} else if (strcmp(long_options[option_index].name, "conv-timeout") == 0) {
sscanf(optarg, "%u", &conv_timeout_s);
mylog(log_warn, "conv_timeout=%d\n", conv_timeout_s);
} else if (strcmp(long_options[option_index].name, "shutdown") == 0) {
shutdown_if_all_disconnected = true;
mylog(log_warn, "shutdown enabled\n");
} else {
mylog(log_fatal, "unknown option\n");
myexit(-1);
Expand Down
2 changes: 2 additions & 0 deletions misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ extern int mssfix;
extern int manual_set_tun;
extern int persist_tun;

extern bool shutdown_if_all_disconnected;

int from_normal_to_fec(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay);
int from_fec_to_normal(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay);

Expand Down
62 changes: 62 additions & 0 deletions test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
Testing UDP speeder
---

### Build
```
export UDP_SPEEDER_REPO=<path to UPDspeeder repo>
make -C ${UDP_SPEEDER_REPO}
```

### Simple test
```
# Start server
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--speederv2-path ${UDP_SPEEDER_REPO}/speederv2 \
--mode server \
--extra-args "--conn-timeout 20 --conv-timeout 10" \
--log-level info

# Start client
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--speederv2-path ${UDP_SPEEDER_REPO}/UDPspeeder/speederv2 \
--mode client \
--log-level info
```

### Test with systemd-socket-activate
```
# Start server with --socket-activate
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--speederv2-path ${UDP_SPEEDER_REPO}/speederv2 \
--mode server \
--socket-activate \
--extra-args "--conn-timeout 20 --conv-timeout 10" \
--log-level info

#Start client
```

### Test with real systemd units
```
# Add unit files
sudo ln -s ${UDP_SPEEDER_REPO}/test/udp_speeder.socket /etc/systemd/system/
sudo ln -s ${UDP_SPEEDER_REPO}/test/udp_speeder.service /etc/systemd/system/

# Enable them to systemd
sudo systemctl daemon-reload
sudo systemctl enable udp_speeder.socket
sudo systemctl enable udp_speeder.socket

# Observe status and logs
sudo systemctl status udp_speeder.socket
journalctl -f -u udp_speeder.socket -u udp_speeder.service

# Start test server without speederv2 (will be started by udp_speeder.socket)
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--mode server \
--no-udpspeeder \
--log-level info

# Start client
```

133 changes: 133 additions & 0 deletions test/udp_speeder_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import socket
import subprocess
import sys
import threading
import signal
import time
import argparse

LOG_LEVEL_MAP = {
"never": 0,
"fatal": 1,
"error": 2,
"warn": 3,
"info": 4,
"debug": 5,
"trace": 6
}

class UDPspeederTest:
def __init__(self, speederv2_path, log_level="info", socket_activate=False, extra_udpspeeder_args="", no_udpspeeder=False):
self.speederv2_path = speederv2_path
self.log_level = log_level
self.socket_activate = socket_activate
self.extra_udpspeeder_args = extra_udpspeeder_args
self.no_udpspeeder = no_udpspeeder

def run_speederv2(self, mode, local_port, remote_ip, remote_port):
log_level_num = LOG_LEVEL_MAP.get(self.log_level, 4) # Default to "info" if log_level is not found
cmd = f"{self.speederv2_path} {mode} -l0.0.0.0:{local_port} -r{remote_ip}:{remote_port} -f20:10 --log-level {log_level_num} {self.extra_udpspeeder_args}"
if self.socket_activate:
cmd = f"systemd-socket-activate -l0.0.0.0:{local_port} -d {cmd}"
print(f"UPDspeeder command: {cmd}")

process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process

def start_udpspeeder_server(self, local_port, remote_port):
return self.run_speederv2("-s", local_port, "127.0.0.1", remote_port)

def start_udpspeeder_client(self, local_port, remote_ip, remote_port):
return self.run_speederv2("-c", local_port, remote_ip, remote_port)

def server(self, udp_speeder_port, server_port):
# Start UDPspeeder server if not disabled
if not self.no_udpspeeder:
udpspeeder_process = self.start_udpspeeder_server(udp_speeder_port, server_port)
threading.Thread(target=self.log_udpspeeder_output, args=(udpspeeder_process,)).start()

# Start UDP server
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", server_port))
print(f"Server listening on port {server_port}")

def cleanup(signum, frame):
print("Shutting down server...")
sock.close()
if not self.no_udpspeeder:
udpspeeder_process.terminate()
sys.exit(0)

signal.signal(signal.SIGTERM, cleanup)

while True:
data, addr = sock.recvfrom(1024)
print(f"Received message: {data.decode()} from {addr}")
reply = f"Server reply to {data.decode()}"
sock.sendto(reply.encode(), addr)
print(f"Sent reply: {reply} ==============================================================")

def client(self, us_client_port, us_server_port):
# Start UDPspeeder client if not disabled
if not self.no_udpspeeder:
udpspeeder_process = self.start_udpspeeder_client(us_client_port, "127.0.0.1", us_server_port)
threading.Thread(target=self.log_udpspeeder_output, args=(udpspeeder_process,)).start()

# Start UDP client
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ("127.0.0.1", us_client_port)
message = "Hello, Server!"

def cleanup(signum, frame):
print("Shutting down client...")
sock.close()
if not self.no_udpspeeder:
udpspeeder_process.terminate()
sys.exit(0)

signal.signal(signal.SIGTERM, cleanup)
sock.settimeout(1.0)

while True:
print(f"Sending message: {message}")
sent = sock.sendto(message.encode(), server_address)

try:
data, server = sock.recvfrom(1024)
except socket.timeout:
print("Request timed out")
continue

print(f"Received reply: {data.decode()} ==============================================================")
time.sleep(1)

sock.close()

def log_udpspeeder_output(self, process):
for line in process.stdout:
print(line.decode(), end='')

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="UDPspeeder test script")
parser.add_argument("--mode", choices=["server", "client"], help="Mode to run: 'server' or 'client'")
parser.add_argument("--us-server-port", type=int, default=4096, help="UDPspeeder server port")
parser.add_argument("--us-client-port", type=int, default=3333, help="UDPspeeder client port")
parser.add_argument("--server-port", type=int, default=7777, help="Server port behind UDPspeeder")
parser.add_argument("--log-level", choices=LOG_LEVEL_MAP.keys(), default="info", nargs="?", help="Log level: 'never', 'fatal', 'error', 'warn', 'info', 'debug', 'trace'")
parser.add_argument("--socket-activate", action="store_true", help="Test systemd socket activation")
parser.add_argument("--extra-args", help="Extra arguments to pass to UDPspeeder")
parser.add_argument("--no-udpspeeder", action="store_true", help="Do not start UDPspeeder")
parser.add_argument("--speederv2-path", default="speederv2", help="Path to the speederv2 binary")


args = parser.parse_args()

udpspeeder_test = UDPspeederTest(args.speederv2_path, args.log_level, args.socket_activate, args.extra_args, args.no_udpspeeder)

if args.mode == "server":
udpspeeder_test.server(args.us_server_port, args.server_port)
elif args.mode == "client":
udpspeeder_test.client(args.us_client_port, args.us_server_port)
else:
print("Invalid mode. Use 'server' or 'client'.")
sys.exit(1)
Loading