Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions moq-relay-ietf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ edition = "2021"
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

[lib]
name = "moq_relay_ietf"
path = "src/lib.rs"

[[bin]]
name = "moq-relay-ietf"
path = "src/main.rs"

[dependencies]
moq-transport = { path = "../moq-transport", version = "0.11" }
moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" }
Expand All @@ -22,6 +30,7 @@ url = "2"
# Async stuff
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"

# Web server to serve the fingerprint
axum = { version = "0.7", features = ["tokio"] }
Expand Down
88 changes: 0 additions & 88 deletions moq-relay-ietf/src/api.rs

This file was deleted.

44 changes: 30 additions & 14 deletions moq-relay-ietf/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@ use moq_transport::{
session::{Announced, SessionError, Subscriber},
};

use crate::{Api, Locals, Producer};
use crate::control_plane::{ControlPlane, Origin};
use crate::{Locals, Producer};
use url::Url;

/// Consumer of tracks from a remote Publisher
#[derive(Clone)]
pub struct Consumer {
pub struct Consumer<CP: ControlPlane> {
remote: Subscriber,
locals: Locals,
api: Option<Api>,
forward: Option<Producer>, // Forward all announcements to this subscriber
control_plane: Option<CP>,
node_url: Option<Url>,
forward: Option<Producer<CP>>, // Forward all announcements to this subscriber
}

impl Consumer {
impl<CP: ControlPlane> Consumer<CP> {
pub fn new(
remote: Subscriber,
locals: Locals,
api: Option<Api>,
forward: Option<Producer>,
control_plane: Option<CP>,
node_url: Option<Url>,
forward: Option<Producer<CP>>,
) -> Self {
Self {
remote,
locals,
api,
control_plane,
node_url,
forward,
}
}
Expand Down Expand Up @@ -64,12 +69,23 @@ impl Consumer {
// Produce the tracks for this announce and return the reader
let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce();

// Start refreshing the API origin, if any
if let Some(api) = self.api.as_ref() {
let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?;
tasks.push(
async move { refresh.run().await.context("failed refreshing origin") }.boxed(),
);
// Start refreshing the control plane origin, if any
if let Some(control_plane) = self.control_plane.as_ref() {
if let Some(node_url) = &self.node_url {
let origin = Origin {
url: node_url.clone(),
};
let namespace = reader.namespace.to_utf8_path();

// Set the origin initially
control_plane.set_origin(&namespace, origin.clone()).await?;

// Create and spawn refresher task
let mut refresh = control_plane.create_refresher(namespace, origin);
tasks.push(
async move { refresh.run().await.context("failed refreshing origin") }.boxed(),
);
}
}

// Register the local tracks, unregister on drop
Expand Down
34 changes: 34 additions & 0 deletions moq-relay-ietf/src/control_plane.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use anyhow::Result;
use async_trait::async_trait;
use url::Url;

/// Origin information for routing
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Origin {
pub url: Url,
}

/// Trait for control plane operations that enable cross-relay routing and state sharing
#[async_trait]
pub trait ControlPlane: Send + Sync + Clone + 'static {
/// Get the origin URL for a given namespace
async fn get_origin(&self, namespace: &str) -> Result<Option<Origin>>;

/// Set/register the origin for a given namespace
async fn set_origin(&self, namespace: &str, origin: Origin) -> Result<()>;

/// Delete/unregister the origin for a given namespace
async fn delete_origin(&self, namespace: &str) -> Result<()>;

/// Create a refresher that periodically updates the origin registration
/// Returns a future that runs the refresh loop
fn create_refresher(&self, namespace: String, origin: Origin)
-> Box<dyn ControlPlaneRefresher>;
}

/// Trait for periodically refreshing origin registrations
#[async_trait]
pub trait ControlPlaneRefresher: Send + 'static {
/// Run the refresh loop (should run indefinitely until dropped)
async fn run(&mut self) -> Result<()>;
}
113 changes: 113 additions & 0 deletions moq-relay-ietf/src/control_plane_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use anyhow::Result;
use async_trait::async_trait;
use url::Url;

use crate::control_plane::{ControlPlane, ControlPlaneRefresher, Origin};

/// HTTP-based control plane implementation using moq-api
#[derive(Clone)]
pub struct HttpControlPlane {
client: moq_api::Client,
node: Url,
}

impl HttpControlPlane {
pub fn new(api_url: Url, node_url: Url) -> Self {
let client = moq_api::Client::new(api_url);
Self {
client,
node: node_url,
}
}

pub fn node_url(&self) -> &Url {
&self.node
}
}

#[async_trait]
impl ControlPlane for HttpControlPlane {
async fn get_origin(&self, namespace: &str) -> Result<Option<Origin>> {
match self.client.get_origin(namespace).await? {
Some(origin) => Ok(Some(Origin { url: origin.url })),
None => Ok(None),
}
}

async fn set_origin(&self, namespace: &str, origin: Origin) -> Result<()> {
let moq_origin = moq_api::Origin { url: origin.url };
self.client.set_origin(namespace, moq_origin).await?;
Ok(())
}

async fn delete_origin(&self, namespace: &str) -> Result<()> {
self.client.delete_origin(namespace).await?;
Ok(())
}

fn create_refresher(
&self,
namespace: String,
origin: Origin,
) -> Box<dyn ControlPlaneRefresher> {
Box::new(HttpRefresher::new(self.client.clone(), namespace, origin))
}
}

/// Periodically refreshes the origin registration via HTTP
pub struct HttpRefresher {
client: moq_api::Client,
namespace: String,
origin: Origin,
refresh: tokio::time::Interval,
}

impl HttpRefresher {
fn new(client: moq_api::Client, namespace: String, origin: Origin) -> Self {
// Refresh every 5 minutes
let duration = tokio::time::Duration::from_secs(300);
let mut refresh = tokio::time::interval(duration);
refresh.reset_after(duration); // skip the first tick

Self {
client,
namespace,
origin,
refresh,
}
}

async fn update(&self) -> Result<()> {
log::debug!(
"registering origin: namespace={} url={}",
self.namespace,
self.origin.url
);
let moq_origin = moq_api::Origin {
url: self.origin.url.clone(),
};
self.client.set_origin(&self.namespace, moq_origin).await?;
Ok(())
}
}

#[async_trait]
impl ControlPlaneRefresher for HttpRefresher {
async fn run(&mut self) -> Result<()> {
loop {
self.refresh.tick().await;
self.update().await?;
}
}
}

impl Drop for HttpRefresher {
fn drop(&mut self) {
let namespace = self.namespace.clone();
let client = self.client.clone();
log::debug!("removing origin: namespace={}", namespace);
tokio::spawn(async move {
let _ = client.delete_origin(&namespace).await;
});
}
}
Loading