From 1301d6aaed543815cdca43c1ab3f190cc60d93c2 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Wed, 31 May 2023 20:17:47 +0900 Subject: [PATCH 1/4] wip --- common/src/util.rs | 12 +++++------- player/src/main.rs | 3 +++ receiver/Cargo.toml | 1 + receiver/src/bot.rs | 16 +++++++++++++++- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/common/src/util.rs b/common/src/util.rs index 4c54c64..9f48b30 100644 --- a/common/src/util.rs +++ b/common/src/util.rs @@ -80,17 +80,15 @@ pub async fn ctrl_c() { }; } + #[cfg(unix)] -pub async fn ctrl_c_and_pipe() { +pub async fn usr1() { use tokio::signal::unix::{signal, SignalKind}; - let others = ctrl_c(); - let mut pipe = signal(SignalKind::pipe()).unwrap(); - tokio::select! { - _ = others => {} - _ = pipe.recv() => {} - }; + let mut usr1 = signal(SignalKind::user()).unwrap(); + let _ = usr1.recv().await } + #[cfg(test)] mod tests { use super::*; diff --git a/player/src/main.rs b/player/src/main.rs index 1ce65b8..27b62dd 100644 --- a/player/src/main.rs +++ b/player/src/main.rs @@ -90,6 +90,9 @@ pub async fn main() -> Result<()> { // TODO: what can we do about that tracing::debug!("received ctrl-c or pipe"); }, + _ = util::usr1() => { + tracing::debug!("got SIGUSR1, shutting down") + }, }; tracing::debug!("exiting"); diff --git a/receiver/Cargo.toml b/receiver/Cargo.toml index 342439f..d2b2bf0 100644 --- a/receiver/Cargo.toml +++ b/receiver/Cargo.toml @@ -14,3 +14,4 @@ axum = "0.6.15" protocol = { path = "../protocol" } common = { path = "../common" } poise = "0.5.3" +nix = "0.26.2" diff --git a/receiver/src/bot.rs b/receiver/src/bot.rs index 2a84901..e1b5a50 100644 --- a/receiver/src/bot.rs +++ b/receiver/src/bot.rs @@ -20,10 +20,11 @@ pub struct BotOptions { discord_token: String, } -// User data, which is stored and accessible in all command invocations +#[derive(Debug)] struct Data { bot_options: BotOptions, creds_registry: Arc>, + currently_playing_pid: Arc>>, } type Error = anyhow::Error; type Context<'a> = poise::Context<'a, Data, Error>; @@ -63,6 +64,7 @@ pub async fn run_bot(opts: BotOptions, stream_registry: Arc, #[description = "Stream key"] key: Strin let input = Input::new(true, reader, Codec::Pcm, Container::Raw, None); + // TODO: send player a signal on stop, so it can shut down gracefully before it's Dropped + let mut call_handler = call_handler_lock.lock().await; call_handler.play_source(input); @@ -210,6 +214,16 @@ async fn stop(ctx: Context<'_>) -> Result<()> { } Some(g) => g, }; + + { + let mut pid = ctx.data().currently_playing_pid.read().unwrap(); + if let Some(pid) = pid.as_ref().take() { + tracing::debug!(?pid, "asking player to stop"); + let _ = nix::sys::signal::kill(Pid::from_raw(pid as i32), Signal::SIGUSR1); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + let voice_manager = songbird::get(ctx.serenity_context()).await.unwrap().clone(); let call_handler_lock = voice_manager.get(guild.id); if let Some(call_handler_lock) = call_handler_lock { From bc4a37297f34a1c18a53e0d6f0ffa633bdeade1a Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Wed, 31 May 2023 21:04:43 +0900 Subject: [PATCH 2/4] wip --- .cargo/config.toml | 3 +++ Cargo.lock | 34 ++++++++++++++++++++++++++++++++-- common/src/util.rs | 2 +- receiver/src/bot.rs | 11 +++++++++-- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index bff29e6..9c68c3a 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,5 @@ [build] rustflags = ["--cfg", "tokio_unstable"] + +[registries.crates-io] +protocol = "sparse" diff --git a/Cargo.lock b/Cargo.lock index 0d5e312..a2f1fd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1390,7 +1390,7 @@ dependencies = [ "if-addrs", "log", "multimap", - "nix", + "nix 0.23.2", "rand", "socket2", "thiserror", @@ -1659,6 +1659,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1730,7 +1739,21 @@ dependencies = [ "cc", "cfg-if", "libc", - "memoffset", + "memoffset 0.6.5", +] + +[[package]] +name = "nix" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset 0.7.1", + "pin-utils", + "static_assertions", ] [[package]] @@ -2170,6 +2193,7 @@ dependencies = [ "axum", "clap", "common", + "nix 0.26.2", "poise", "protocol", "serde_json", @@ -2656,6 +2680,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "streamcatcher" version = "1.0.1" diff --git a/common/src/util.rs b/common/src/util.rs index 9f48b30..f496958 100644 --- a/common/src/util.rs +++ b/common/src/util.rs @@ -84,7 +84,7 @@ pub async fn ctrl_c() { #[cfg(unix)] pub async fn usr1() { use tokio::signal::unix::{signal, SignalKind}; - let mut usr1 = signal(SignalKind::user()).unwrap(); + let mut usr1 = signal(SignalKind::user_defined1()).unwrap(); let _ = usr1.recv().await } diff --git a/receiver/src/bot.rs b/receiver/src/bot.rs index e1b5a50..a907e59 100644 --- a/receiver/src/bot.rs +++ b/receiver/src/bot.rs @@ -218,9 +218,16 @@ async fn stop(ctx: Context<'_>) -> Result<()> { { let mut pid = ctx.data().currently_playing_pid.read().unwrap(); if let Some(pid) = pid.as_ref().take() { + let pid = Pid::from_raw(pid as i32); tracing::debug!(?pid, "asking player to stop"); - let _ = nix::sys::signal::kill(Pid::from_raw(pid as i32), Signal::SIGUSR1); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let _ = nix::sys::signal::kill(pid, Signal::SIGUSR1); + + // wait for it to exit, or timeout + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, + _ = nix::sys::wait::waitpid(pid, None) => {}, + } + } } From e09e539f06af41fc7a1c99f5055ec4bd75de9cec Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Thu, 1 Jun 2023 10:40:37 +0900 Subject: [PATCH 3/4] test --- Cargo.lock | 14 +----- common/Cargo.toml | 1 + common/src/util.rs | 4 +- player/src/main.rs | 7 +-- receiver/Cargo.toml | 2 +- receiver/src/bot.rs | 116 ++++++++++++++++++++++++++++++++++++++------ 6 files changed, 108 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2f1fd6..bb44958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -482,6 +482,7 @@ dependencies = [ "anyhow", "atty", "console-subscriber", + "nix 0.26.2", "tokio", "tracing", "tracing-subscriber", @@ -1659,15 +1660,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "memoffset" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.17" @@ -1739,7 +1731,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "memoffset 0.6.5", + "memoffset", ] [[package]] @@ -1751,8 +1743,6 @@ dependencies = [ "bitflags", "cfg-if", "libc", - "memoffset 0.7.1", - "pin-utils", "static_assertions", ] diff --git a/common/Cargo.toml b/common/Cargo.toml index 13bc56b..20a49da 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" anyhow = { version = "1.0.70", features = ["backtrace"] } atty = "0.2.14" console-subscriber = "0.1.8" +nix = { version = "0.26.2", features = ["signal"], default-features = false } tokio = { version = "1.27.0", features = ["full", "tracing"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = [ diff --git a/common/src/util.rs b/common/src/util.rs index f496958..91d1e10 100644 --- a/common/src/util.rs +++ b/common/src/util.rs @@ -80,15 +80,13 @@ pub async fn ctrl_c() { }; } - #[cfg(unix)] pub async fn usr1() { use tokio::signal::unix::{signal, SignalKind}; let mut usr1 = signal(SignalKind::user_defined1()).unwrap(); - let _ = usr1.recv().await + let _ = usr1.recv().await; } - #[cfg(test)] mod tests { use super::*; diff --git a/player/src/main.rs b/player/src/main.rs index 27b62dd..12b9276 100644 --- a/player/src/main.rs +++ b/player/src/main.rs @@ -84,11 +84,8 @@ pub async fn main() -> Result<()> { _ = &mut spirc_task => { tracing::debug!("spirc task finished"); } - _ = util::ctrl_c_and_pipe() => { - // what happens is songbird sends SIGKILL(9) to the last child -- gstreamer. presumably then its stdin is closed, which means our stdout is closed -> SIGPIPE - // actually what happens is the Player fails to write to stoud and then calls std::process::exit(1) :( - // TODO: what can we do about that - tracing::debug!("received ctrl-c or pipe"); + _ = util::ctrl_c() => { + tracing::debug!("received ctrl-c"); }, _ = util::usr1() => { tracing::debug!("got SIGUSR1, shutting down") diff --git a/receiver/Cargo.toml b/receiver/Cargo.toml index d2b2bf0..9c8ce60 100644 --- a/receiver/Cargo.toml +++ b/receiver/Cargo.toml @@ -14,4 +14,4 @@ axum = "0.6.15" protocol = { path = "../protocol" } common = { path = "../common" } poise = "0.5.3" -nix = "0.26.2" +nix = { version = "0.26.2", default-features = false, features = ["signal"] } diff --git a/receiver/src/bot.rs b/receiver/src/bot.rs index a907e59..cfca0ff 100644 --- a/receiver/src/bot.rs +++ b/receiver/src/bot.rs @@ -1,9 +1,9 @@ use std::{ process::{Command, Stdio}, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, }; -use anyhow::Result; +use anyhow::{anyhow, Context as _, Result}; use clap::Parser; use poise::serenity_prelude::GatewayIntents; @@ -24,7 +24,7 @@ pub struct BotOptions { struct Data { bot_options: BotOptions, creds_registry: Arc>, - currently_playing_pid: Arc>>, + currently_playing_pid: Arc>>, } type Error = anyhow::Error; type Context<'a> = poise::Context<'a, Data, Error>; @@ -64,7 +64,7 @@ pub async fn run_bot(opts: BotOptions, stream_registry: Arc) -> Result<()> { }; { - let mut pid = ctx.data().currently_playing_pid.read().unwrap(); - if let Some(pid) = pid.as_ref().take() { - let pid = Pid::from_raw(pid as i32); - tracing::debug!(?pid, "asking player to stop"); - let _ = nix::sys::signal::kill(pid, Signal::SIGUSR1); - - // wait for it to exit, or timeout - tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, - _ = nix::sys::wait::waitpid(pid, None) => {}, + let pid = { + let mut pid_mu = ctx.data().currently_playing_pid.lock().unwrap(); + pid_mu.take() + }; + if let Some(pid) = pid { + if let Err(e) = kill_player(pid as _).await.context("killing player") { + tracing::error!(?e, "failed to kill player"); } - } } @@ -247,3 +243,93 @@ async fn stop(ctx: Context<'_>) -> Result<()> { async fn restart(_ctx: Context<'_>) -> Result<()> { std::process::exit(0); } + +#[derive(Debug, Clone, Copy)] +enum HowKilled { + Usr1, + Term, + Kill, +} +/// gracefully kill player by sending it SIGUSR1, waiting, then sending it SIGTERM +async fn kill_player(pid: u32) -> Result { + use nix::{sys::signal::Signal, unistd::Pid}; + + let pid = Pid::from_raw(pid as i32); + tracing::debug!(?pid, "asking player to stop"); + nix::sys::signal::kill(pid, Signal::SIGUSR1).context("sending usr1")?; + + // wait for it to exit, or timeout + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, + _ = async { tokio::task::spawn_blocking(move || nix::sys::wait::waitpid(pid, None).map_err(|e| anyhow!("error waiting: {:?}", e))).await? } => { + return Ok(HowKilled::Usr1); + }, + } + + tracing::warn!("player did not exit in time after USR1; sending TERM"); + nix::sys::signal::kill(pid, Signal::SIGTERM)?; + + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, + _ = async { tokio::task::spawn_blocking(move || nix::sys::wait::waitpid(pid, None).map_err(|e| anyhow!("error waiting: {:?}", e))).await? } => { + return Ok(HowKilled::Term); + }, + } + + tracing::warn!("player did not exit in time after TERM; sending KILL"); + nix::sys::signal::kill(pid, Signal::SIGKILL)?; + + Ok::<_, anyhow::Error>(HowKilled::Kill) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::AsyncBufReadExt; + + #[tokio::test] + async fn kill_player_works() -> Result<()> { + common::util::setup_logging()?; + + let sh = r#" +trap 'echo usr1' SIGUSR1; +trap 'echo term' SIGTERM; +echo setup +while true; do sleep 0.5; done; +"#; + let mut child = tokio::process::Command::new("bash") + .args(&["-c", sh]) + .stdout(Stdio::piped()) + .spawn()?; + let pid = child.id().unwrap(); + let out = child.stdout.take().unwrap(); + + let (setup_tx, setup_rx) = tokio::sync::oneshot::channel(); + + let output = tokio::task::spawn(async move { + let mut setup_tx = Some(setup_tx); + let mut out_str = String::new(); + let mut reader = tokio::io::BufReader::new(out).lines(); + while let Some(line) = reader.next_line().await.unwrap() { + if line.trim() == "setup" { + setup_tx.take().unwrap().send(()).unwrap(); + } + tracing::debug!("{}", line); + out_str.push_str(&format!("{}\n", line)); + } + out_str + }); + + // wait for child's handlers to be setup + setup_rx.await.unwrap(); + + tracing::debug!(?pid, "started player"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + kill_player(pid).await?; + + let out_str = output.await?; + assert_eq!(out_str, "setup\nusr1\nterm\n"); + + Ok(()) + } +} From 4cabdb0cebcb77851dff5ce7809d246374ce7ef1 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Thu, 1 Jun 2023 10:49:43 +0900 Subject: [PATCH 4/4] machete --- Cargo.lock | 2 -- common/Cargo.toml | 1 - forwarder/Cargo.toml | 7 +++---- player/Cargo.toml | 3 ++- receiver/Cargo.toml | 5 +++-- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb44958..33a8c97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -482,7 +482,6 @@ dependencies = [ "anyhow", "atty", "console-subscriber", - "nix 0.26.2", "tokio", "tracing", "tracing-subscriber", @@ -852,7 +851,6 @@ dependencies = [ "protocol", "rand", "reqwest", - "serde_json", "sha1", "tokio", "tracing", diff --git a/common/Cargo.toml b/common/Cargo.toml index 20a49da..13bc56b 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,7 +9,6 @@ edition = "2021" anyhow = { version = "1.0.70", features = ["backtrace"] } atty = "0.2.14" console-subscriber = "0.1.8" -nix = { version = "0.26.2", features = ["signal"], default-features = false } tokio = { version = "1.27.0", features = ["full", "tracing"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = [ diff --git a/forwarder/Cargo.toml b/forwarder/Cargo.toml index 3b5866d..c7cc810 100644 --- a/forwarder/Cargo.toml +++ b/forwarder/Cargo.toml @@ -6,6 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +protocol = { path = "../protocol" } +common = { path = "../common" } + anyhow = { version = "1.0.70", features = ["backtrace"] } clap = { version = "4.2.2", features = ["derive"] } futures-util = "0.3.28" @@ -15,11 +18,7 @@ reqwest = { version = "0.11.16", default_features = false, features = [ "rustls-tls", "json", ] } -serde_json = "1.0.96" sha1 = "0.10.5" tokio = { version = "1.27.0", features = ["full"] } tracing = "0.1.37" rand = "0.8.5" - -protocol = { path = "../protocol" } -common = { path = "../common" } diff --git a/player/Cargo.toml b/player/Cargo.toml index f589f3c..98271fb 100644 --- a/player/Cargo.toml +++ b/player/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] +common = { path = "../common" } + anyhow = { version = "1.0.70", features = ["backtrace"] } clap = { version = "4.2.2", features = ["derive", "env"] } hex = "0.4.3" @@ -13,4 +15,3 @@ sha1 = "0.10.5" tokio = { version = "1.27.0", features = ["full"] } tracing = "0.1.37" serde_json = "1.0.96" -common = { path = "../common" } diff --git a/receiver/Cargo.toml b/receiver/Cargo.toml index 9c8ce60..7099e06 100644 --- a/receiver/Cargo.toml +++ b/receiver/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] +protocol = { path = "../protocol" } +common = { path = "../common" } + songbird = { version = "0.3" } anyhow = { version = "1.0.70", features = ["backtrace"] } clap = { version = "4.2.2", features = ["derive", "env"] } @@ -11,7 +14,5 @@ tokio = { version = "1.27.0", features = ["full"] } tracing = "0.1.37" serde_json = "1.0.96" axum = "0.6.15" -protocol = { path = "../protocol" } -common = { path = "../common" } poise = "0.5.3" nix = { version = "0.26.2", default-features = false, features = ["signal"] }