diff --git a/app/src/main/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensions.kt b/app/src/main/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensions.kt index e2f2a1d5e..e5447a385 100644 --- a/app/src/main/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensions.kt +++ b/app/src/main/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensions.kt @@ -1,12 +1,14 @@ package com.x8bit.bitwarden.data.platform.repository.util import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson +import com.x8bit.bitwarden.data.vault.repository.model.VaultUnlockData import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map @@ -31,3 +33,35 @@ fun MutableStateFlow.observeWhenSubscribedAndLoggedIn( .flatMapLatest { activeUserId -> activeUserId?.let(observer) ?: flow { awaitCancellation() } } + +/** + * Invokes the [observer] callback whenever the user is logged in, the active changes, + * the vault for the user changes and there are subscribers to the [MutableStateFlow]. + * The flow from all previous calls to the `observer` + * is canceled whenever the `observer` is re-invoked, there is no active user (logged-out), + * there are no subscribers to the [MutableStateFlow] or the vault is not unlocked. + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun MutableStateFlow.observeWhenSubscribedAndUnlocked( + userStateFlow: Flow, + vaultUnlockFlow: Flow>, + observer: (activeUserId: String) -> Flow, +): Flow = + combine( + this.subscriptionCount.map { it > 0 }.distinctUntilChanged(), + userStateFlow.map { it?.activeUserId }.distinctUntilChanged(), + userStateFlow + .map { it?.activeUserId } + .distinctUntilChanged() + .filterNotNull() + .flatMapLatest { activeUserId -> + vaultUnlockFlow + .map { it.any { it.userId == activeUserId } } + .distinctUntilChanged() + }, + ) { isSubscribed, activeUserId, isUnlocked -> + activeUserId.takeIf { isSubscribed && isUnlocked } + } + .flatMapLatest { activeUserId -> + activeUserId?.let(observer) ?: flow { awaitCancellation() } + } diff --git a/app/src/main/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryImpl.kt b/app/src/main/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryImpl.kt index edc6d259d..12ac59c11 100644 --- a/app/src/main/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryImpl.kt +++ b/app/src/main/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryImpl.kt @@ -36,6 +36,7 @@ import com.x8bit.bitwarden.data.platform.repository.util.combineDataStates import com.x8bit.bitwarden.data.platform.repository.util.map import com.x8bit.bitwarden.data.platform.repository.util.mapNullable import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndLoggedIn +import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndUnlocked import com.x8bit.bitwarden.data.platform.repository.util.updateToPendingOrLoading import com.x8bit.bitwarden.data.platform.util.asFailure import com.x8bit.bitwarden.data.platform.util.asSuccess @@ -222,7 +223,13 @@ class VaultRepositoryImpl( // Cancel any ongoing sync request and clear the vault data in memory every time // the user switches or the vault is locked for the active user. merge( - authDiskSource.userSwitchingChangesFlow, + authDiskSource + .userSwitchingChangesFlow + .onEach { + // DomainState is not part of the locked data but should still be cleared + // when the user changes + mutableDomainsStateFlow.update { DataState.Loading } + }, vaultLockManager .vaultUnlockDataStateFlow .filter { vaultUnlockDataList -> @@ -238,31 +245,46 @@ class VaultRepositoryImpl( // Setup ciphers MutableStateFlow mutableCiphersStateFlow - .observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> + .observeWhenSubscribedAndUnlocked( + userStateFlow = authDiskSource.userStateFlow, + vaultUnlockFlow = vaultUnlockDataStateFlow, + ) { activeUserId -> observeVaultDiskCiphers(activeUserId) } .launchIn(unconfinedScope) + // Setup domains MutableStateFlow mutableDomainsStateFlow - .observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> + .observeWhenSubscribedAndLoggedIn( + userStateFlow = authDiskSource.userStateFlow, + ) { activeUserId -> observeVaultDiskDomains(activeUserId) } .launchIn(unconfinedScope) // Setup folders MutableStateFlow mutableFoldersStateFlow - .observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> + .observeWhenSubscribedAndUnlocked( + userStateFlow = authDiskSource.userStateFlow, + vaultUnlockFlow = vaultUnlockDataStateFlow, + ) { activeUserId -> observeVaultDiskFolders(activeUserId) } .launchIn(unconfinedScope) // Setup collections MutableStateFlow mutableCollectionsStateFlow - .observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> + .observeWhenSubscribedAndUnlocked( + userStateFlow = authDiskSource.userStateFlow, + vaultUnlockFlow = vaultUnlockDataStateFlow, + ) { activeUserId -> observeVaultDiskCollections(activeUserId) } .launchIn(unconfinedScope) // Setup sends MutableStateFlow mutableSendDataStateFlow - .observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> + .observeWhenSubscribedAndUnlocked( + authDiskSource.userStateFlow, + vaultUnlockFlow = vaultUnlockDataStateFlow, + ) { activeUserId -> observeVaultDiskSends(activeUserId) } .launchIn(unconfinedScope) @@ -305,7 +327,6 @@ class VaultRepositoryImpl( private fun clearUnlockedData() { mutableCiphersStateFlow.update { DataState.Loading } - mutableDomainsStateFlow.update { DataState.Loading } mutableFoldersStateFlow.update { DataState.Loading } mutableCollectionsStateFlow.update { DataState.Loading } mutableSendDataStateFlow.update { DataState.Loading } diff --git a/app/src/test/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensionsTest.kt b/app/src/test/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensionsTest.kt index 0f851b3b0..cef05ad73 100644 --- a/app/src/test/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensionsTest.kt +++ b/app/src/test/java/com/x8bit/bitwarden/data/platform/repository/util/StateFlowExtensionsTest.kt @@ -2,6 +2,7 @@ package com.x8bit.bitwarden.data.platform.repository.util import app.cash.turbine.test import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson +import com.x8bit.bitwarden.data.vault.repository.model.VaultUnlockData import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.flow.MutableStateFlow @@ -50,6 +51,74 @@ class StateFlowExtensionsTest { assertEquals(0, awaitItem()) assertEquals(1, awaitItem()) + job.cancel() + // Job is canceled, we should have no more subscribers + assertEquals(0, awaitItem()) + } + } + + @Suppress("MaxLineLength") + @Test + fun `observeWhenSubscribedAndUnlocked should observe the given flow depending on the state of the source user and vault unlock flow`() = + runTest { + val userStateFlow = MutableStateFlow(null) + val vaultUnlockFlow = MutableStateFlow>(emptyList()) + val observerStateFlow = MutableStateFlow("") + val sourceMutableStateFlow = MutableStateFlow(Unit) + + assertEquals(0, observerStateFlow.subscriptionCount.value) + sourceMutableStateFlow + .observeWhenSubscribedAndUnlocked( + userStateFlow = userStateFlow, + vaultUnlockFlow = vaultUnlockFlow, + observer = { observerStateFlow }, + ) + .launchIn(backgroundScope) + + observerStateFlow.subscriptionCount.test { + // No subscriber to start + assertEquals(0, awaitItem()) + + userStateFlow.value = mockk { + every { activeUserId } returns "user_id_1234" + } + // Still none, since no one has subscribed to the testMutableStateFlow + expectNoEvents() + + vaultUnlockFlow.value = listOf( + VaultUnlockData( + userId = "user_id_1234", + status = VaultUnlockData.Status.UNLOCKED, + ), + ) + + // Still none, since no one has subscribed to the testMutableStateFlow + expectNoEvents() + + val job = sourceMutableStateFlow.launchIn(backgroundScope) + // Now we subscribe to the observer flow since have a active user and a listener + assertEquals(1, awaitItem()) + + userStateFlow.value = mockk { + every { activeUserId } returns "user_id_4321" + } + // The user changed, so we clear the previous observer but then resubscribe + // with the new user ID + assertEquals(0, awaitItem()) + assertEquals(1, awaitItem()) + + vaultUnlockFlow.value = listOf( + VaultUnlockData( + userId = "user_id_4321", + status = VaultUnlockData.Status.UNLOCKED, + ), + ) + + // The VaultUnlockData changed, so we clear the previous observer but then resubscribe + // with the new data + assertEquals(0, awaitItem()) + assertEquals(1, awaitItem()) + job.cancel() // Job is canceled, we should have no more subscribers assertEquals(0, awaitItem()) diff --git a/app/src/test/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryTest.kt b/app/src/test/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryTest.kt index 159ed4dd2..8c9213a92 100644 --- a/app/src/test/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryTest.kt +++ b/app/src/test/java/com/x8bit/bitwarden/data/vault/repository/VaultRepositoryTest.kt @@ -344,8 +344,15 @@ class VaultRepositoryTest { DataState.Loaded(createMockDomainsData(number = 1)), domainsStateFlow.awaitItem(), ) + setVaultToUnlocked(userId = userId) + ciphersFlow.tryEmit(listOf(createMockCipher(number = 1))) + collectionsFlow.tryEmit(listOf(createMockCollection(number = 1))) + foldersFlow.tryEmit(listOf(createMockFolder(number = 1))) + sendsFlow.tryEmit(listOf(createMockSend(number = 1))) + domainsFlow.tryEmit(createMockDomains(number = 1)) + assertEquals( DataState.Loaded(listOf(createMockCipherView(number = 1))), ciphersStateFlow.awaitItem(), @@ -487,7 +494,7 @@ class VaultRepositoryTest { assertEquals(DataState.Loading, collectionsStateFlow.awaitItem()) assertEquals(DataState.Loading, foldersStateFlow.awaitItem()) assertEquals(DataState.Loading, sendsStateFlow.awaitItem()) - assertEquals(DataState.Loading, domainsStateFlow.awaitItem()) + domainsStateFlow.expectNoEvents() } } @@ -1807,6 +1814,9 @@ class VaultRepositoryTest { settingsDiskSource.getLastSyncTime(userId = userId) } returns clock.instant() + mutableVaultStateFlow.update { + listOf(VaultUnlockData(userId, VaultUnlockData.Status.UNLOCKED)) + } fakeAuthDiskSource.userState = MOCK_USER_STATE setupEmptyDecryptionResults() setupVaultDiskSourceFlows( @@ -1963,6 +1973,7 @@ class VaultRepositoryTest { expectNoEvents() setVaultToUnlocked(userId = MOCK_USER_STATE.activeUserId) + sendsFlow.tryEmit(emptyList()) assertEquals(DataState.Loaded(null), awaitItem()) sendsFlow.tryEmit(listOf(createMockSend(number = sendId))) assertEquals(DataState.Loaded(sendView), awaitItem()) @@ -4596,6 +4607,14 @@ class VaultRepositoryTest { */ private fun setVaultToUnlocked(userId: String) { mutableUnlockedUserIdsStateFlow.update { it + userId } + mutableVaultStateFlow.tryEmit( + listOf( + VaultUnlockData( + userId, + VaultUnlockData.Status.UNLOCKED, + ), + ), + ) } /**