Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions livekit/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ impl From<participant_info::Kind> for participant::ParticipantKind {
}
}

impl From<participant_info::State> for participant::ParticipantState {
fn from(value: participant_info::State) -> Self {
match value {
participant_info::State::Joining => participant::ParticipantState::Joining,
participant_info::State::Joined => participant::ParticipantState::Joined,
participant_info::State::Active => participant::ParticipantState::Active,
participant_info::State::Disconnected => participant::ParticipantState::Disconnected,
}
}
}

impl From<ChatMessage> for RoomChatMessage {
fn from(proto_msg: ChatMessage) -> Self {
RoomChatMessage {
Expand Down
26 changes: 25 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use self::{
};
pub use crate::rtc_engine::SimulateScenario;
use crate::{
participant::ConnectionQuality,
participant::{ParticipantState, ConnectionQuality},
prelude::*,
registered_audio_filter_plugins,
rtc_engine::{
Expand Down Expand Up @@ -86,7 +86,16 @@ pub enum RoomError {
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum RoomEvent {
/// Remote participant joined the room.
///
/// This event is fired immediately after a participant joins before
/// it is able to receive data messages. To send data messages in response
/// to a participant joining, respond to the [`Self::ParticipantActive`] event instead.
///
ParticipantConnected(RemoteParticipant),
/// Remote participant is active and ready to receive data messages.
ParticipantActive(RemoteParticipant),
/// Remote participant disconnected from the room.
ParticipantDisconnected(RemoteParticipant),
LocalTrackPublished {
publication: LocalTrackPublication,
Expand Down Expand Up @@ -495,6 +504,7 @@ impl Room {
let local_participant = LocalParticipant::new(
rtc_engine.clone(),
pi.kind().into(),
pi.state().into(),
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
Expand Down Expand Up @@ -639,6 +649,7 @@ impl Room {
let pi = pi.clone();
inner.create_participant(
pi.kind().into(),
pi.state().into(),
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
Expand Down Expand Up @@ -989,14 +1000,20 @@ impl RoomSession {
// disconnected
}
} else if let Some(remote_participant) = remote_participant {
let already_active = remote_participant.state() == ParticipantState::Active;
remote_participant.update_info(pi.clone());
if !already_active && remote_participant.state() == ParticipantState::Active {
self.dispatcher
.dispatch(&RoomEvent::ParticipantActive(remote_participant.clone()));
}
participants.push(Participant::Remote(remote_participant));
} else {
// Create a new participant
let remote_participant = {
let pi = pi.clone();
self.create_participant(
pi.kind().into(),
pi.state().into(),
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
Expand All @@ -1008,6 +1025,11 @@ impl RoomSession {
self.dispatcher
.dispatch(&RoomEvent::ParticipantConnected(remote_participant.clone()));

if remote_participant.state() == ParticipantState::Active {
// Already active, also emit active event
self.dispatcher
.dispatch(&RoomEvent::ParticipantActive(remote_participant.clone()));
}
remote_participant.update_info(pi.clone()); // Add tracks
}
}
Expand Down Expand Up @@ -1577,6 +1599,7 @@ impl RoomSession {
fn create_participant(
self: &Arc<Self>,
kind: ParticipantKind,
state: ParticipantState,
sid: ParticipantSid,
identity: ParticipantIdentity,
name: String,
Expand All @@ -1586,6 +1609,7 @@ impl RoomSession {
let participant = RemoteParticipant::new(
self.rtc_engine.clone(),
kind,
state,
sid.clone(),
identity.clone(),
name,
Expand Down
15 changes: 13 additions & 2 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use std::{
time::Duration,
};

use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantTrackPermission};
use super::{
ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState,
ParticipantTrackPermission,
};
use crate::{
data_stream::{
ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions,
Expand Down Expand Up @@ -98,6 +101,7 @@ impl Debug for LocalParticipant {
.field("sid", &self.sid())
.field("identity", &self.identity())
.field("name", &self.name())
.field("state", &self.state())
.finish()
}
}
Expand All @@ -106,6 +110,7 @@ impl LocalParticipant {
pub(crate) fn new(
rtc_engine: Arc<RtcEngine>,
kind: ParticipantKind,
state: ParticipantState,
sid: ParticipantSid,
identity: ParticipantIdentity,
name: String,
Expand All @@ -114,7 +119,9 @@ impl LocalParticipant {
encryption_type: EncryptionType,
) -> Self {
Self {
inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind),
inner: super::new_inner(
rtc_engine, sid, identity, name, metadata, attributes, kind, state,
),
local: Arc::new(LocalInfo {
events: LocalEvents::default(),
encryption_type,
Expand Down Expand Up @@ -691,6 +698,10 @@ impl LocalParticipant {
self.inner.info.read().name.clone()
}

pub fn state(&self) -> ParticipantState {
self.inner.info.read().state
}

pub fn metadata(&self) -> String {
self.inner.info.read().metadata.clone()
}
Expand Down
13 changes: 13 additions & 0 deletions livekit/src/room/participant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pub enum ParticipantKind {
Agent,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ParticipantState {
Joining,
Joined,
Active,
Disconnected,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum DisconnectReason {
UnknownReason,
Expand Down Expand Up @@ -84,6 +92,7 @@ impl Participant {
pub fn audio_level(self: &Self) -> f32;
pub fn connection_quality(self: &Self) -> ConnectionQuality;
pub fn kind(self: &Self) -> ParticipantKind;
pub fn state(self: &Self) -> ParticipantState;
pub fn disconnect_reason(self: &Self) -> DisconnectReason;
pub fn is_encrypted(self: &Self) -> bool;

Expand Down Expand Up @@ -116,6 +125,7 @@ struct ParticipantInfo {
pub audio_level: f32,
pub connection_quality: ConnectionQuality,
pub kind: ParticipantKind,
pub state: ParticipantState,
pub disconnect_reason: DisconnectReason,
}

Expand Down Expand Up @@ -160,6 +170,7 @@ pub(super) fn new_inner(
metadata: String,
attributes: HashMap<String, String>,
kind: ParticipantKind,
state: ParticipantState,
) -> Arc<ParticipantInner> {
Arc::new(ParticipantInner {
rtc_engine,
Expand All @@ -170,6 +181,7 @@ pub(super) fn new_inner(
metadata,
attributes,
kind,
state,
speaking: false,
audio_level: 0.0,
connection_quality: ConnectionQuality::Excellent,
Expand All @@ -190,6 +202,7 @@ pub(super) fn update_info(
let mut info = inner.info.write();
info.disconnect_reason = new_info.disconnect_reason().into();
info.kind = new_info.kind().into();
info.state = new_info.state().into();
info.sid = new_info.sid.try_into().unwrap();
info.identity = new_info.identity.into();

Expand Down
12 changes: 10 additions & 2 deletions livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use livekit_protocol as proto;
use livekit_runtime::timeout;
use parking_lot::Mutex;

use super::{ConnectionQuality, ParticipantInner, ParticipantKind, TrackKind};
use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState, TrackKind};
use crate::{
prelude::*,
rtc_engine::RtcEngine,
Expand Down Expand Up @@ -67,6 +67,7 @@ impl Debug for RemoteParticipant {
.field("sid", &self.sid())
.field("identity", &self.identity())
.field("name", &self.name())
.field("state", &self.state())
.finish()
}
}
Expand All @@ -75,6 +76,7 @@ impl RemoteParticipant {
pub(crate) fn new(
rtc_engine: Arc<RtcEngine>,
kind: ParticipantKind,
state: ParticipantState,
sid: ParticipantSid,
identity: ParticipantIdentity,
name: String,
Expand All @@ -83,7 +85,9 @@ impl RemoteParticipant {
auto_subscribe: bool,
) -> Self {
Self {
inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind),
inner: super::new_inner(
rtc_engine, sid, identity, name, metadata, attributes, kind, state,
),
remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }),
}
}
Expand Down Expand Up @@ -488,6 +492,10 @@ impl RemoteParticipant {
self.inner.info.read().name.clone()
}

pub fn state(&self) -> ParticipantState {
self.inner.info.read().state
}

pub fn metadata(&self) -> String {
self.inner.info.read().metadata.clone()
}
Expand Down