Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,33 @@

package io.getstream.chat.android.offline.extensions

import io.getstream.log.StreamLog
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal suspend fun CoroutineScope.launchWithMutex(mutex: Mutex, block: suspend () -> Unit) = launch {
mutex.withLock { block() }
private val logger by lazy { StreamLog.getLogger("Chat:CoroutineScope") }

/**
* Launches a coroutine that executes the given [block] within a mutex lock.
*
* Used to protect against concurrent writes to a single database table.
*
* This extension function ensures thread-safe execution by acquiring the provided [mutex]
* before executing the [block].
*
* Any [Exception] thrown during execution is caught and logged. We assume such exceptions
* can happen only in the edge-case where the database is corrupted/closed, so we can safely ignore them.
*
* @param mutex The [Mutex] to acquire before executing the block.
* @param block The suspend function to execute within the mutex lock.
* @return A [kotlinx.coroutines.Job] representing the launched coroutine.
*/
internal fun CoroutineScope.launchWithMutex(mutex: Mutex, block: suspend () -> Unit) = launch {
try {
mutex.withLock { block() }
} catch (e: IllegalStateException) {
logger.e(e) { "Exception in launchWithMutex: ${e.message}" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ public class StreamOfflinePluginFactory @JvmOverloads constructor(
override fun createRepositoryFactory(user: User): RepositoryFactory {
logger.d { "[createRepositoryFactory] user.id: '${user.id}'" }
return DatabaseRepositoryFactory(
database = createDatabase(appContext, user),
database = {
// Lazily fetch the current instance of ChatDatabase
ChatDatabase.getDatabase(appContext, user.id)
},
currentUser = user,
scope = ChatClient.instance().inheritScope { SupervisorJob(it) },
ignoredChannelTypes = ignoredChannelTypes,
Expand Down Expand Up @@ -226,11 +229,4 @@ public class StreamOfflinePluginFactory @JvmOverloads constructor(
draftMessageListener = draftMessageListener,
)
}

private fun createDatabase(
context: Context,
user: User,
): ChatDatabase {
return ChatDatabase.getDatabase(context, user.id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import androidx.room.Room
import androidx.room.RoomDatabase
import androidx.room.TypeConverters
import androidx.sqlite.db.SupportSQLiteDatabase
import androidx.sqlite.db.SupportSQLiteOpenHelper
import androidx.sqlite.db.framework.FrameworkSQLiteOpenHelperFactory
import io.getstream.chat.android.offline.repository.database.converter.internal.AnswerConverter
import io.getstream.chat.android.offline.repository.database.converter.internal.DateConverter
import io.getstream.chat.android.offline.repository.database.converter.internal.ExtraDataConverter
Expand Down Expand Up @@ -69,6 +71,8 @@ import io.getstream.chat.android.offline.repository.domain.threads.internal.Thre
import io.getstream.chat.android.offline.repository.domain.threads.internal.ThreadOrderEntity
import io.getstream.chat.android.offline.repository.domain.user.internal.UserDao
import io.getstream.chat.android.offline.repository.domain.user.internal.UserEntity
import io.getstream.log.TaggedLogger
import io.getstream.log.taggedLogger

@Database(
entities = [
Expand Down Expand Up @@ -134,23 +138,108 @@ internal abstract class ChatDatabase : RoomDatabase() {
fun getDatabase(context: Context, userId: String): ChatDatabase {
if (!INSTANCES.containsKey(userId)) {
synchronized(this) {
val db = Room.databaseBuilder(
context.applicationContext,
ChatDatabase::class.java,
"stream_chat_database_$userId",
).fallbackToDestructiveMigration()
.addCallback(
object : Callback() {
override fun onOpen(db: SupportSQLiteDatabase) {
db.execSQL("PRAGMA synchronous = 1")
}
},
)
.build()
val db = createDb(context, userId)
INSTANCES[userId] = db
}
}
return INSTANCES[userId] ?: error("DB not created")
}

private fun createDb(context: Context, userId: String): ChatDatabase {
synchronized(this) {
val db = Room.databaseBuilder(
context.applicationContext,
ChatDatabase::class.java,
"stream_chat_database_$userId",
)
.fallbackToDestructiveMigration()
.addCallback(
object : Callback() {
override fun onOpen(db: SupportSQLiteDatabase) {
db.execSQL("PRAGMA synchronous = 1")
}
},
)
.openHelperFactory(
StreamSQLiteOpenHelperFactory(onCorrupted = {
synchronized(this) {
// Re-instantiate the DB if corrupted
INSTANCES.remove(userId)
INSTANCES[userId] = createDb(context, userId)
}
}),
)
.build()
return db
}
}
}
}

/**
* A [SupportSQLiteOpenHelper.Factory] binding the [StreamSQLiteCallback] to the [ChatDatabase] instance.
*
* @param delegate The original [SupportSQLiteOpenHelper.Factory] to delegate all operations to.
* @param onCorrupted Callback invoked when the database is corrupted.
*/
private class StreamSQLiteOpenHelperFactory(
private val delegate: SupportSQLiteOpenHelper.Factory = FrameworkSQLiteOpenHelperFactory(),
private val onCorrupted: (SupportSQLiteDatabase) -> Unit,
) : SupportSQLiteOpenHelper.Factory {

override fun create(configuration: SupportSQLiteOpenHelper.Configuration): SupportSQLiteOpenHelper {
val callback = StreamSQLiteCallback(configuration.callback, onCorrupted)
val newConfig = SupportSQLiteOpenHelper.Configuration.builder(configuration.context)
.name(configuration.name)
.callback(callback)
.noBackupDirectory(configuration.useNoBackupDirectory)
.allowDataLossOnRecovery(configuration.allowDataLossOnRecovery)
.build()
return delegate.create(newConfig)
}
}

/**
* A [SupportSQLiteOpenHelper.Callback] informing about the database corruption event.
*
* @param delegate The original [SupportSQLiteOpenHelper.Callback] to delegate all operations to.
* @param onCorrupted Callback invoked when the database is corrupted.
*/
private class StreamSQLiteCallback(
private val delegate: SupportSQLiteOpenHelper.Callback,
private val onCorrupted: (SupportSQLiteDatabase) -> Unit,
) : SupportSQLiteOpenHelper.Callback(delegate.version) {

private val logger: TaggedLogger by taggedLogger("Chat:StreamSQLiteCallback")

override fun onCreate(db: SupportSQLiteDatabase) {
delegate.onCreate(db)
logger.d { "onCreate called for DB" }
}

override fun onOpen(db: SupportSQLiteDatabase) {
delegate.onOpen(db)
logger.d { "onOpen called for DB" }
}

override fun onUpgrade(db: SupportSQLiteDatabase, oldVersion: Int, newVersion: Int) {
delegate.onUpgrade(db, oldVersion, newVersion)
logger.d { "onUpgrade called for DB" }
}

override fun onDowngrade(db: SupportSQLiteDatabase, oldVersion: Int, newVersion: Int) {
delegate.onDowngrade(db, oldVersion, newVersion)
logger.d { "onDowngrade called for DB" }
}

override fun onConfigure(db: SupportSQLiteDatabase) {
delegate.onConfigure(db)
logger.d { "onConfigure called for DB" }
}

override fun onCorruption(db: SupportSQLiteDatabase) {
delegate.onCorruption(db)
this.onCorrupted(db)
logger.d { "onCorruption called for DB" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2014-2022 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.chat.android.offline.repository.domain.channel.internal

import io.getstream.chat.android.models.SyncStatus
import io.getstream.chat.android.offline.repository.database.internal.ChatDatabase
import java.util.Date

/**
* A [ChannelDao] implementation which lazily retrieves the original [ChannelDao] from the currently active
* [ChatDatabase] instance. The [ChatDatabase] instance can change in runtime if it becomes corrupted
* and is manually recreated.
*
* @param getDatabase Method retrieving the current instance of [ChatDatabase].
*/
internal class RecoverableChannelDao(private val getDatabase: () -> ChatDatabase) : ChannelDao {

private val delegate: ChannelDao
get() = getDatabase().channelStateDao()

override suspend fun insert(channelEntity: ChannelEntity) {

Check warning on line 35 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Rb&open=AZqiHUJYZUAEphefh6Rb&pullRequest=6014
delegate.insert(channelEntity)
}

override suspend fun insertMany(channelEntities: List<ChannelEntity>) {

Check warning on line 39 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Rc&open=AZqiHUJYZUAEphefh6Rc&pullRequest=6014
delegate.insertMany(channelEntities)
}

override suspend fun selectAllCids(): List<String> {

Check warning on line 43 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Rd&open=AZqiHUJYZUAEphefh6Rd&pullRequest=6014
return delegate.selectAllCids()
}

override suspend fun selectCidsBySyncNeeded(syncStatus: SyncStatus, limit: Int): List<String> {
return delegate.selectCidsBySyncNeeded(syncStatus, limit)
}

override suspend fun selectSyncNeeded(syncStatus: SyncStatus, limit: Int): List<ChannelEntity> {
return delegate.selectSyncNeeded(syncStatus, limit)
}

override suspend fun select(cids: List<String>): List<ChannelEntity> {
return delegate.select(cids)
}

override suspend fun select(cid: String?): ChannelEntity? {

Check warning on line 59 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Rh&open=AZqiHUJYZUAEphefh6Rh&pullRequest=6014
return delegate.select(cid)
}

override suspend fun delete(cid: String) {

Check warning on line 63 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Ri&open=AZqiHUJYZUAEphefh6Ri&pullRequest=6014
delegate.delete(cid)
}

override suspend fun setDeletedAt(cid: String, deletedAt: Date) {

Check warning on line 67 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Rj&open=AZqiHUJYZUAEphefh6Rj&pullRequest=6014
delegate.setDeletedAt(cid, deletedAt)
}

override suspend fun setHidden(cid: String, hidden: Boolean, hideMessagesBefore: Date) {

Check warning on line 71 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channel/internal/RecoverableChannelDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJYZUAEphefh6Rk&open=AZqiHUJYZUAEphefh6Rk&pullRequest=6014
delegate.setHidden(cid, hidden, hideMessagesBefore)
}

override suspend fun setHidden(cid: String, hidden: Boolean) {
delegate.setHidden(cid, hidden)
}

override suspend fun deleteAll() {
delegate.deleteAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.getstream.chat.android.offline.repository.domain.channelconfig.intern

import io.getstream.chat.android.client.persistance.repository.ChannelConfigRepository
import io.getstream.chat.android.models.ChannelConfig
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.Collections

/**
Expand All @@ -28,6 +30,7 @@ internal class DatabaseChannelConfigRepository(
private val channelConfigDao: ChannelConfigDao,
) : ChannelConfigRepository {
private val channelConfigs: MutableMap<String, ChannelConfig> = Collections.synchronizedMap(mutableMapOf())
private val mutex = Mutex()

/**
* Caches in memory data from DB.
Expand All @@ -52,18 +55,24 @@ internal class DatabaseChannelConfigRepository(
channelConfigs += configs.associateBy(ChannelConfig::type)

// insert into room db
channelConfigDao.insert(configs.map(ChannelConfig::toEntity))
mutex.withLock {
channelConfigDao.insert(configs.map(ChannelConfig::toEntity))
}
}

/**
* Writes [ChannelConfig]
*/
override suspend fun insertChannelConfig(config: ChannelConfig) {
channelConfigs += config.type to config
channelConfigDao.insert(config.toEntity())
mutex.withLock {
channelConfigDao.insert(config.toEntity())
}
}

override suspend fun clear() {
channelConfigDao.deleteAll()
mutex.withLock {
channelConfigDao.deleteAll()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2014-2022 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.chat.android.offline.repository.domain.channelconfig.internal

import io.getstream.chat.android.offline.repository.database.internal.ChatDatabase

/**
* A [ChannelConfigDao] implementation which lazily retrieves the original [ChannelConfigDao] from the currently active
* [ChatDatabase] instance. The [ChatDatabase] instance can change in runtime if it becomes corrupted
* and is manually recreated.
*
* @param getDatabase Method retrieving the current instance of [ChatDatabase].
*/
internal class RecoverableChannelConfigDao(private val getDatabase: () -> ChatDatabase) : ChannelConfigDao {

private val delegate: ChannelConfigDao
get() = getDatabase().channelConfigDao()

override suspend fun insert(channelConfigEntities: List<ChannelConfigEntity>) {
delegate.insert(channelConfigEntities)
}

override suspend fun insert(channelConfigEntity: ChannelConfigEntity) {
delegate.insert(channelConfigEntity)
}

override suspend fun insertConfig(channelConfigInnerEntity: ChannelConfigInnerEntity) {
delegate.insertConfig(channelConfigInnerEntity)
}

override suspend fun insertConfigs(channelConfigInnerEntities: List<ChannelConfigInnerEntity>) {
delegate.insertConfigs(channelConfigInnerEntities)
}

override suspend fun insertCommands(commands: List<CommandInnerEntity>) {

Check warning on line 49 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channelconfig/internal/RecoverableChannelConfigDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJ9ZUAEphefh6R4&open=AZqiHUJ9ZUAEphefh6R4&pullRequest=6014
delegate.insertCommands(commands)
}

override suspend fun selectAll(): List<ChannelConfigEntity> {

Check warning on line 53 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channelconfig/internal/RecoverableChannelConfigDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJ9ZUAEphefh6R5&open=AZqiHUJ9ZUAEphefh6R5&pullRequest=6014
return delegate.selectAll()
}

override suspend fun deleteCommands() {

Check warning on line 57 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channelconfig/internal/RecoverableChannelConfigDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJ9ZUAEphefh6R6&open=AZqiHUJ9ZUAEphefh6R6&pullRequest=6014
delegate.deleteCommands()
}

override suspend fun deleteConfigs() {
delegate.deleteConfigs()
}

override suspend fun deleteAll() {

Check warning on line 65 in stream-chat-android-offline/src/main/java/io/getstream/chat/android/offline/repository/domain/channelconfig/internal/RecoverableChannelConfigDao.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZqiHUJ9ZUAEphefh6R8&open=AZqiHUJ9ZUAEphefh6R8&pullRequest=6014
delegate.deleteAll()
}
}
Loading
Loading