diff --git a/livekit/src/proto.rs b/livekit/src/proto.rs index ee178b040..146724ed5 100644 --- a/livekit/src/proto.rs +++ b/livekit/src/proto.rs @@ -158,6 +158,17 @@ impl From for participant::ParticipantKind { } } +impl From for participant::ParticipantState { + fn from(value: participant_info::State) -> Self { + match value { + participant_info::State::Joining => participant::ParticipantState::Joining, + participant_info::State::Joined => participant::ParticipantState::Joined, + participant_info::State::Active => participant::ParticipantState::Active, + participant_info::State::Disconnected => participant::ParticipantState::Disconnected, + } + } +} + impl From for RoomChatMessage { fn from(proto_msg: ChatMessage) -> Self { RoomChatMessage { diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index aebe35a56..47f14aae4 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -45,7 +45,7 @@ pub use self::{ }; pub use crate::rtc_engine::SimulateScenario; use crate::{ - participant::ConnectionQuality, + participant::{ParticipantState, ConnectionQuality}, prelude::*, registered_audio_filter_plugins, rtc_engine::{ @@ -86,7 +86,16 @@ pub enum RoomError { #[derive(Clone, Debug)] #[non_exhaustive] pub enum RoomEvent { + /// Remote participant joined the room. + /// + /// This event is fired immediately after a participant joins before + /// it is able to receive data messages. To send data messages in response + /// to a participant joining, respond to the [`Self::ParticipantActive`] event instead. + /// ParticipantConnected(RemoteParticipant), + /// Remote participant is active and ready to receive data messages. + ParticipantActive(RemoteParticipant), + /// Remote participant disconnected from the room. ParticipantDisconnected(RemoteParticipant), LocalTrackPublished { publication: LocalTrackPublication, @@ -495,6 +504,7 @@ impl Room { let local_participant = LocalParticipant::new( rtc_engine.clone(), pi.kind().into(), + pi.state().into(), pi.sid.try_into().unwrap(), pi.identity.into(), pi.name, @@ -639,6 +649,7 @@ impl Room { let pi = pi.clone(); inner.create_participant( pi.kind().into(), + pi.state().into(), pi.sid.try_into().unwrap(), pi.identity.into(), pi.name, @@ -989,7 +1000,12 @@ impl RoomSession { // disconnected } } else if let Some(remote_participant) = remote_participant { + let already_active = remote_participant.state() == ParticipantState::Active; remote_participant.update_info(pi.clone()); + if !already_active && remote_participant.state() == ParticipantState::Active { + self.dispatcher + .dispatch(&RoomEvent::ParticipantActive(remote_participant.clone())); + } participants.push(Participant::Remote(remote_participant)); } else { // Create a new participant @@ -997,6 +1013,7 @@ impl RoomSession { let pi = pi.clone(); self.create_participant( pi.kind().into(), + pi.state().into(), pi.sid.try_into().unwrap(), pi.identity.into(), pi.name, @@ -1008,6 +1025,11 @@ impl RoomSession { self.dispatcher .dispatch(&RoomEvent::ParticipantConnected(remote_participant.clone())); + if remote_participant.state() == ParticipantState::Active { + // Already active, also emit active event + self.dispatcher + .dispatch(&RoomEvent::ParticipantActive(remote_participant.clone())); + } remote_participant.update_info(pi.clone()); // Add tracks } } @@ -1577,6 +1599,7 @@ impl RoomSession { fn create_participant( self: &Arc, kind: ParticipantKind, + state: ParticipantState, sid: ParticipantSid, identity: ParticipantIdentity, name: String, @@ -1586,6 +1609,7 @@ impl RoomSession { let participant = RemoteParticipant::new( self.rtc_engine.clone(), kind, + state, sid.clone(), identity.clone(), name, diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index a9a2320fd..21de731b3 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -22,7 +22,10 @@ use std::{ time::Duration, }; -use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantTrackPermission}; +use super::{ + ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState, + ParticipantTrackPermission, +}; use crate::{ data_stream::{ ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions, @@ -98,6 +101,7 @@ impl Debug for LocalParticipant { .field("sid", &self.sid()) .field("identity", &self.identity()) .field("name", &self.name()) + .field("state", &self.state()) .finish() } } @@ -106,6 +110,7 @@ impl LocalParticipant { pub(crate) fn new( rtc_engine: Arc, kind: ParticipantKind, + state: ParticipantState, sid: ParticipantSid, identity: ParticipantIdentity, name: String, @@ -114,7 +119,9 @@ impl LocalParticipant { encryption_type: EncryptionType, ) -> Self { Self { - inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind), + inner: super::new_inner( + rtc_engine, sid, identity, name, metadata, attributes, kind, state, + ), local: Arc::new(LocalInfo { events: LocalEvents::default(), encryption_type, @@ -691,6 +698,10 @@ impl LocalParticipant { self.inner.info.read().name.clone() } + pub fn state(&self) -> ParticipantState { + self.inner.info.read().state + } + pub fn metadata(&self) -> String { self.inner.info.read().metadata.clone() } diff --git a/livekit/src/room/participant/mod.rs b/livekit/src/room/participant/mod.rs index cbe687776..e56df5bb9 100644 --- a/livekit/src/room/participant/mod.rs +++ b/livekit/src/room/participant/mod.rs @@ -46,6 +46,14 @@ pub enum ParticipantKind { Agent, } +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum ParticipantState { + Joining, + Joined, + Active, + Disconnected, +} + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum DisconnectReason { UnknownReason, @@ -84,6 +92,7 @@ impl Participant { pub fn audio_level(self: &Self) -> f32; pub fn connection_quality(self: &Self) -> ConnectionQuality; pub fn kind(self: &Self) -> ParticipantKind; + pub fn state(self: &Self) -> ParticipantState; pub fn disconnect_reason(self: &Self) -> DisconnectReason; pub fn is_encrypted(self: &Self) -> bool; @@ -116,6 +125,7 @@ struct ParticipantInfo { pub audio_level: f32, pub connection_quality: ConnectionQuality, pub kind: ParticipantKind, + pub state: ParticipantState, pub disconnect_reason: DisconnectReason, } @@ -160,6 +170,7 @@ pub(super) fn new_inner( metadata: String, attributes: HashMap, kind: ParticipantKind, + state: ParticipantState, ) -> Arc { Arc::new(ParticipantInner { rtc_engine, @@ -170,6 +181,7 @@ pub(super) fn new_inner( metadata, attributes, kind, + state, speaking: false, audio_level: 0.0, connection_quality: ConnectionQuality::Excellent, @@ -190,6 +202,7 @@ pub(super) fn update_info( let mut info = inner.info.write(); info.disconnect_reason = new_info.disconnect_reason().into(); info.kind = new_info.kind().into(); + info.state = new_info.state().into(); info.sid = new_info.sid.try_into().unwrap(); info.identity = new_info.identity.into(); diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index a4e1df2ae..ef812bdc3 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -24,7 +24,7 @@ use livekit_protocol as proto; use livekit_runtime::timeout; use parking_lot::Mutex; -use super::{ConnectionQuality, ParticipantInner, ParticipantKind, TrackKind}; +use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState, TrackKind}; use crate::{ prelude::*, rtc_engine::RtcEngine, @@ -67,6 +67,7 @@ impl Debug for RemoteParticipant { .field("sid", &self.sid()) .field("identity", &self.identity()) .field("name", &self.name()) + .field("state", &self.state()) .finish() } } @@ -75,6 +76,7 @@ impl RemoteParticipant { pub(crate) fn new( rtc_engine: Arc, kind: ParticipantKind, + state: ParticipantState, sid: ParticipantSid, identity: ParticipantIdentity, name: String, @@ -83,7 +85,9 @@ impl RemoteParticipant { auto_subscribe: bool, ) -> Self { Self { - inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind), + inner: super::new_inner( + rtc_engine, sid, identity, name, metadata, attributes, kind, state, + ), remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }), } } @@ -488,6 +492,10 @@ impl RemoteParticipant { self.inner.info.read().name.clone() } + pub fn state(&self) -> ParticipantState { + self.inner.info.read().state + } + pub fn metadata(&self) -> String { self.inner.info.read().metadata.clone() }