Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/livekit_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,11 @@ export 'src/types/video_encoding.dart';
export 'src/types/video_parameters.dart';
export 'src/widgets/screen_select_dialog.dart';
export 'src/widgets/video_track_renderer.dart';
export 'src/token_source/token_source.dart';
export 'src/token_source/room_configuration.dart';
export 'src/token_source/literal.dart';
export 'src/token_source/endpoint.dart';
export 'src/token_source/custom.dart';
export 'src/token_source/caching.dart';
export 'src/token_source/sandbox.dart';
export 'src/token_source/jwt.dart';
114 changes: 114 additions & 0 deletions lib/src/support/completer_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2025 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:async';

/// A manager for Completer instances that provides safe completion and automatic lifecycle management.
///
/// Features:
/// - Safe completion (prevents double completion exceptions)
/// - Automatic timeout handling
/// - Clean state management and reusability
/// - Only exposes Future, not the Completer itself
/// - Thread-safe operations
class CompleterManager<T> {
Completer<T> _completer;
Timer? _timeoutTimer;
bool _isCompleted = false;

/// Creates a new CompleterManager with an active completer.
CompleterManager() : _completer = Completer<T>();

/// Gets the current future. Creates a new completer if previous one was completed.
Future<T> get future {
if (_isCompleted) {
_reset();
}
return _completer.future;
}

/// Whether the current completer is completed.
bool get isCompleted => _isCompleted;

/// Whether there's an active completer waiting for completion.
bool get isActive => !_isCompleted;

/// Completes the current completer with the given value.
/// Returns true if successfully completed, false if already completed.
bool complete([FutureOr<T>? value]) {
if (_isCompleted) {
return false;
}

_isCompleted = true;
_timeoutTimer?.cancel();
_timeoutTimer = null;

_completer.complete(value);
return true;
}

/// Completes the current completer with an error.
/// Returns true if successfully completed with error, false if already completed.
bool completeError(Object error, [StackTrace? stackTrace]) {
if (_isCompleted) {
return false;
}

_isCompleted = true;
_timeoutTimer?.cancel();
_timeoutTimer = null;

_completer.completeError(error, stackTrace);
return true;
}

/// Sets up a timeout for the current completer.
/// If the completer is not completed within the timeout, it will be completed with a TimeoutException.
void setTimer(Duration timeout, {String? timeoutReason}) {
if (_isCompleted) {
return;
}

_timeoutTimer?.cancel();
_timeoutTimer = Timer(timeout, () {
if (!_isCompleted) {
final reason = timeoutReason ?? 'Operation timed out after $timeout';
completeError(TimeoutException(reason, timeout));
}
});
}

/// Resets the manager, canceling any pending operations and preparing for reuse.
void reset() {
_reset();
}

void _reset() {
_timeoutTimer?.cancel();
_timeoutTimer = null;
_isCompleted = false;
_completer = Completer<T>();
}

/// Disposes the manager, canceling any pending operations.
void dispose() {
_timeoutTimer?.cancel();
_timeoutTimer = null;
if (!_isCompleted) {
_completer.completeError(StateError('CompleterManager disposed'));
_isCompleted = true;
}
}
}
176 changes: 176 additions & 0 deletions lib/src/token_source/caching.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:async';

import '../support/completer_manager.dart';
import 'jwt.dart';
import 'token_source.dart';

/// A validator function that determines if cached credentials are still valid.
///
/// The validator receives the original request options and cached response, and should
/// return `true` if the cached credentials are still valid for the given request.
///
/// The default validator checks JWT expiration using [isResponseExpired].
typedef TokenValidator = bool Function(TokenRequestOptions options, TokenSourceResponse response);

/// A tuple containing the request options and response that were cached.
class TokenStoreItem {
final TokenRequestOptions options;
final TokenSourceResponse response;

const TokenStoreItem({
required this.options,
required this.response,
});
}

/// Protocol for storing and retrieving cached token credentials.
///
/// Implement this abstract class to create custom storage solutions like
/// SharedPreferences or secure storage for token caching.
abstract class TokenStore {
/// Store credentials in the store.
///
/// This replaces any existing cached credentials with the new ones.
Future<void> store(TokenRequestOptions options, TokenSourceResponse response);

/// Retrieve the cached credentials.
///
/// Returns the cached credentials if found, null otherwise.
Future<TokenStoreItem?> retrieve();

/// Clear all stored credentials.
Future<void> clear();
}

/// A simple in-memory store implementation for token caching.
///
/// This store keeps credentials in memory and is lost when the app is terminated.
/// Suitable for development and testing.
class InMemoryTokenStore implements TokenStore {
TokenStoreItem? _cached;

@override
Future<void> store(TokenRequestOptions options, TokenSourceResponse response) async {
_cached = TokenStoreItem(options: options, response: response);
}

@override
Future<TokenStoreItem?> retrieve() async {
return _cached;
}

@override
Future<void> clear() async {
_cached = null;
}
}

/// Default validator that checks JWT expiration using [isResponseExpired].
bool _defaultValidator(TokenRequestOptions options, TokenSourceResponse response) {
return !isResponseExpired(response);
}

/// A token source that caches credentials from any [TokenSourceConfigurable] using a configurable store.
///
/// This wrapper improves performance by avoiding redundant token requests when credentials are still valid.
/// It automatically validates cached tokens and fetches new ones when needed.
///
/// The cache will refetch credentials when:
/// - The cached token has expired (validated via [TokenValidator])
/// - The request options have changed
/// - The cache has been explicitly invalidated via [invalidate]
class CachingTokenSource implements TokenSourceConfigurable {
final TokenSourceConfigurable _wrapped;
final TokenStore _store;
final TokenValidator _validator;
final Map<TokenRequestOptions, CompleterManager<TokenSourceResponse>> _inflightRequests = {};

/// Initialize a caching wrapper around any token source.
///
/// - Parameters:
/// - wrapped: The underlying token source to wrap and cache
/// - store: The store implementation to use for caching (defaults to in-memory store)
/// - validator: A function to determine if cached credentials are still valid (defaults to JWT expiration check)
CachingTokenSource(
this._wrapped, {
TokenStore? store,
TokenValidator? validator,
}) : _store = store ?? InMemoryTokenStore(),
_validator = validator ?? _defaultValidator;

@override
Future<TokenSourceResponse> fetch(TokenRequestOptions options) async {
final existingManager = _inflightRequests[options];
if (existingManager != null && existingManager.isActive) {
return existingManager.future;
}

final manager = existingManager ?? CompleterManager<TokenSourceResponse>();
_inflightRequests[options] = manager;
final resultFuture = manager.future;

try {
final cached = await _store.retrieve();
if (cached != null && cached.options == options && _validator(cached.options, cached.response)) {
manager.complete(cached.response);
return resultFuture;
}

final response = await _wrapped.fetch(options);
await _store.store(options, response);
manager.complete(response);
return resultFuture;
} catch (e, stackTrace) {
manager.completeError(e, stackTrace);
rethrow;
} finally {
_inflightRequests.remove(options);
}
}

/// Invalidate the cached credentials, forcing a fresh fetch on the next request.
Future<void> invalidate() async {
await _store.clear();
}

/// Get the cached credentials if one exists.
Future<TokenSourceResponse?> cachedResponse() async {
final cached = await _store.retrieve();
return cached?.response;
}
}

/// Extension to add caching capabilities to any [TokenSourceConfigurable].
extension CachedTokenSource on TokenSourceConfigurable {
/// Wraps this token source with caching capabilities.
///
/// The returned token source will reuse valid tokens and only fetch new ones when needed.
///
/// - Parameters:
/// - store: The store implementation to use for caching (defaults to in-memory store)
/// - validator: A function to determine if cached credentials are still valid (defaults to JWT expiration check)
/// - Returns: A caching token source that wraps this token source
TokenSourceConfigurable cached({
TokenStore? store,
TokenValidator? validator,
}) =>
CachingTokenSource(
this,
store: store,
validator: validator,
);
}
38 changes: 38 additions & 0 deletions lib/src/token_source/custom.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'token_source.dart';

/// Function signature for custom token generation logic.
typedef CustomTokenFunction = Future<TokenSourceResponse> Function(TokenRequestOptions options);

/// A custom token source that executes provided logic to fetch credentials.
///
/// This allows you to implement your own token fetching strategy with full control
/// over how credentials are generated or retrieved.
class CustomTokenSource implements TokenSourceConfigurable {
final CustomTokenFunction _function;

/// Initialize with a custom token generation function.
///
/// The [function] will be called whenever credentials need to be fetched,
/// receiving [TokenRequestOptions] and returning a [TokenSourceResponse].
CustomTokenSource(CustomTokenFunction function) : _function = function;

@override
Future<TokenSourceResponse> fetch([TokenRequestOptions? options]) async {
final requestOptions = options ?? const TokenRequestOptions();
return await _function(requestOptions);
}
}
Loading