[PM-11753] Listening to vaultUnlock state on mutableCiphers, folders, collections and send state flow (#4214)

This commit is contained in:
aj-rosado 2024-11-08 18:33:49 +00:00 committed by GitHub
parent b6dfc3d17b
commit 54d3b34876
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 151 additions and 8 deletions

View file

@ -1,12 +1,14 @@
package com.x8bit.bitwarden.data.platform.repository.util package com.x8bit.bitwarden.data.platform.repository.util
import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson
import com.x8bit.bitwarden.data.vault.repository.model.VaultUnlockData
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
@ -31,3 +33,35 @@ fun <T, R> MutableStateFlow<T>.observeWhenSubscribedAndLoggedIn(
.flatMapLatest { activeUserId -> .flatMapLatest { activeUserId ->
activeUserId?.let(observer) ?: flow { awaitCancellation() } activeUserId?.let(observer) ?: flow { awaitCancellation() }
} }
/**
* Invokes the [observer] callback whenever the user is logged in, the active changes,
* the vault for the user changes and there are subscribers to the [MutableStateFlow].
* The flow from all previous calls to the `observer`
* is canceled whenever the `observer` is re-invoked, there is no active user (logged-out),
* there are no subscribers to the [MutableStateFlow] or the vault is not unlocked.
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun <T, R> MutableStateFlow<T>.observeWhenSubscribedAndUnlocked(
userStateFlow: Flow<UserStateJson?>,
vaultUnlockFlow: Flow<List<VaultUnlockData>>,
observer: (activeUserId: String) -> Flow<R>,
): Flow<R> =
combine(
this.subscriptionCount.map { it > 0 }.distinctUntilChanged(),
userStateFlow.map { it?.activeUserId }.distinctUntilChanged(),
userStateFlow
.map { it?.activeUserId }
.distinctUntilChanged()
.filterNotNull()
.flatMapLatest { activeUserId ->
vaultUnlockFlow
.map { it.any { it.userId == activeUserId } }
.distinctUntilChanged()
},
) { isSubscribed, activeUserId, isUnlocked ->
activeUserId.takeIf { isSubscribed && isUnlocked }
}
.flatMapLatest { activeUserId ->
activeUserId?.let(observer) ?: flow { awaitCancellation() }
}

View file

@ -36,6 +36,7 @@ import com.x8bit.bitwarden.data.platform.repository.util.combineDataStates
import com.x8bit.bitwarden.data.platform.repository.util.map import com.x8bit.bitwarden.data.platform.repository.util.map
import com.x8bit.bitwarden.data.platform.repository.util.mapNullable import com.x8bit.bitwarden.data.platform.repository.util.mapNullable
import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndLoggedIn import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndLoggedIn
import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndUnlocked
import com.x8bit.bitwarden.data.platform.repository.util.updateToPendingOrLoading import com.x8bit.bitwarden.data.platform.repository.util.updateToPendingOrLoading
import com.x8bit.bitwarden.data.platform.util.asFailure import com.x8bit.bitwarden.data.platform.util.asFailure
import com.x8bit.bitwarden.data.platform.util.asSuccess import com.x8bit.bitwarden.data.platform.util.asSuccess
@ -222,7 +223,13 @@ class VaultRepositoryImpl(
// Cancel any ongoing sync request and clear the vault data in memory every time // Cancel any ongoing sync request and clear the vault data in memory every time
// the user switches or the vault is locked for the active user. // the user switches or the vault is locked for the active user.
merge( merge(
authDiskSource.userSwitchingChangesFlow, authDiskSource
.userSwitchingChangesFlow
.onEach {
// DomainState is not part of the locked data but should still be cleared
// when the user changes
mutableDomainsStateFlow.update { DataState.Loading }
},
vaultLockManager vaultLockManager
.vaultUnlockDataStateFlow .vaultUnlockDataStateFlow
.filter { vaultUnlockDataList -> .filter { vaultUnlockDataList ->
@ -238,31 +245,46 @@ class VaultRepositoryImpl(
// Setup ciphers MutableStateFlow // Setup ciphers MutableStateFlow
mutableCiphersStateFlow mutableCiphersStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> .observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskCiphers(activeUserId) observeVaultDiskCiphers(activeUserId)
} }
.launchIn(unconfinedScope) .launchIn(unconfinedScope)
// Setup domains MutableStateFlow // Setup domains MutableStateFlow
mutableDomainsStateFlow mutableDomainsStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> .observeWhenSubscribedAndLoggedIn(
userStateFlow = authDiskSource.userStateFlow,
) { activeUserId ->
observeVaultDiskDomains(activeUserId) observeVaultDiskDomains(activeUserId)
} }
.launchIn(unconfinedScope) .launchIn(unconfinedScope)
// Setup folders MutableStateFlow // Setup folders MutableStateFlow
mutableFoldersStateFlow mutableFoldersStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> .observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskFolders(activeUserId) observeVaultDiskFolders(activeUserId)
} }
.launchIn(unconfinedScope) .launchIn(unconfinedScope)
// Setup collections MutableStateFlow // Setup collections MutableStateFlow
mutableCollectionsStateFlow mutableCollectionsStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> .observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskCollections(activeUserId) observeVaultDiskCollections(activeUserId)
} }
.launchIn(unconfinedScope) .launchIn(unconfinedScope)
// Setup sends MutableStateFlow // Setup sends MutableStateFlow
mutableSendDataStateFlow mutableSendDataStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId -> .observeWhenSubscribedAndUnlocked(
authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskSends(activeUserId) observeVaultDiskSends(activeUserId)
} }
.launchIn(unconfinedScope) .launchIn(unconfinedScope)
@ -305,7 +327,6 @@ class VaultRepositoryImpl(
private fun clearUnlockedData() { private fun clearUnlockedData() {
mutableCiphersStateFlow.update { DataState.Loading } mutableCiphersStateFlow.update { DataState.Loading }
mutableDomainsStateFlow.update { DataState.Loading }
mutableFoldersStateFlow.update { DataState.Loading } mutableFoldersStateFlow.update { DataState.Loading }
mutableCollectionsStateFlow.update { DataState.Loading } mutableCollectionsStateFlow.update { DataState.Loading }
mutableSendDataStateFlow.update { DataState.Loading } mutableSendDataStateFlow.update { DataState.Loading }

View file

@ -2,6 +2,7 @@ package com.x8bit.bitwarden.data.platform.repository.util
import app.cash.turbine.test import app.cash.turbine.test
import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson
import com.x8bit.bitwarden.data.vault.repository.model.VaultUnlockData
import io.mockk.every import io.mockk.every
import io.mockk.mockk import io.mockk.mockk
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
@ -50,6 +51,74 @@ class StateFlowExtensionsTest {
assertEquals(0, awaitItem()) assertEquals(0, awaitItem())
assertEquals(1, awaitItem()) assertEquals(1, awaitItem())
job.cancel()
// Job is canceled, we should have no more subscribers
assertEquals(0, awaitItem())
}
}
@Suppress("MaxLineLength")
@Test
fun `observeWhenSubscribedAndUnlocked should observe the given flow depending on the state of the source user and vault unlock flow`() =
runTest {
val userStateFlow = MutableStateFlow<UserStateJson?>(null)
val vaultUnlockFlow = MutableStateFlow<List<VaultUnlockData>>(emptyList())
val observerStateFlow = MutableStateFlow("")
val sourceMutableStateFlow = MutableStateFlow(Unit)
assertEquals(0, observerStateFlow.subscriptionCount.value)
sourceMutableStateFlow
.observeWhenSubscribedAndUnlocked(
userStateFlow = userStateFlow,
vaultUnlockFlow = vaultUnlockFlow,
observer = { observerStateFlow },
)
.launchIn(backgroundScope)
observerStateFlow.subscriptionCount.test {
// No subscriber to start
assertEquals(0, awaitItem())
userStateFlow.value = mockk<UserStateJson> {
every { activeUserId } returns "user_id_1234"
}
// Still none, since no one has subscribed to the testMutableStateFlow
expectNoEvents()
vaultUnlockFlow.value = listOf(
VaultUnlockData(
userId = "user_id_1234",
status = VaultUnlockData.Status.UNLOCKED,
),
)
// Still none, since no one has subscribed to the testMutableStateFlow
expectNoEvents()
val job = sourceMutableStateFlow.launchIn(backgroundScope)
// Now we subscribe to the observer flow since have a active user and a listener
assertEquals(1, awaitItem())
userStateFlow.value = mockk<UserStateJson> {
every { activeUserId } returns "user_id_4321"
}
// The user changed, so we clear the previous observer but then resubscribe
// with the new user ID
assertEquals(0, awaitItem())
assertEquals(1, awaitItem())
vaultUnlockFlow.value = listOf(
VaultUnlockData(
userId = "user_id_4321",
status = VaultUnlockData.Status.UNLOCKED,
),
)
// The VaultUnlockData changed, so we clear the previous observer but then resubscribe
// with the new data
assertEquals(0, awaitItem())
assertEquals(1, awaitItem())
job.cancel() job.cancel()
// Job is canceled, we should have no more subscribers // Job is canceled, we should have no more subscribers
assertEquals(0, awaitItem()) assertEquals(0, awaitItem())

View file

@ -344,8 +344,15 @@ class VaultRepositoryTest {
DataState.Loaded(createMockDomainsData(number = 1)), DataState.Loaded(createMockDomainsData(number = 1)),
domainsStateFlow.awaitItem(), domainsStateFlow.awaitItem(),
) )
setVaultToUnlocked(userId = userId) setVaultToUnlocked(userId = userId)
ciphersFlow.tryEmit(listOf(createMockCipher(number = 1)))
collectionsFlow.tryEmit(listOf(createMockCollection(number = 1)))
foldersFlow.tryEmit(listOf(createMockFolder(number = 1)))
sendsFlow.tryEmit(listOf(createMockSend(number = 1)))
domainsFlow.tryEmit(createMockDomains(number = 1))
assertEquals( assertEquals(
DataState.Loaded(listOf(createMockCipherView(number = 1))), DataState.Loaded(listOf(createMockCipherView(number = 1))),
ciphersStateFlow.awaitItem(), ciphersStateFlow.awaitItem(),
@ -487,7 +494,7 @@ class VaultRepositoryTest {
assertEquals(DataState.Loading, collectionsStateFlow.awaitItem()) assertEquals(DataState.Loading, collectionsStateFlow.awaitItem())
assertEquals(DataState.Loading, foldersStateFlow.awaitItem()) assertEquals(DataState.Loading, foldersStateFlow.awaitItem())
assertEquals(DataState.Loading, sendsStateFlow.awaitItem()) assertEquals(DataState.Loading, sendsStateFlow.awaitItem())
assertEquals(DataState.Loading, domainsStateFlow.awaitItem()) domainsStateFlow.expectNoEvents()
} }
} }
@ -1807,6 +1814,9 @@ class VaultRepositoryTest {
settingsDiskSource.getLastSyncTime(userId = userId) settingsDiskSource.getLastSyncTime(userId = userId)
} returns clock.instant() } returns clock.instant()
mutableVaultStateFlow.update {
listOf(VaultUnlockData(userId, VaultUnlockData.Status.UNLOCKED))
}
fakeAuthDiskSource.userState = MOCK_USER_STATE fakeAuthDiskSource.userState = MOCK_USER_STATE
setupEmptyDecryptionResults() setupEmptyDecryptionResults()
setupVaultDiskSourceFlows( setupVaultDiskSourceFlows(
@ -1963,6 +1973,7 @@ class VaultRepositoryTest {
expectNoEvents() expectNoEvents()
setVaultToUnlocked(userId = MOCK_USER_STATE.activeUserId) setVaultToUnlocked(userId = MOCK_USER_STATE.activeUserId)
sendsFlow.tryEmit(emptyList())
assertEquals(DataState.Loaded<SendView?>(null), awaitItem()) assertEquals(DataState.Loaded<SendView?>(null), awaitItem())
sendsFlow.tryEmit(listOf(createMockSend(number = sendId))) sendsFlow.tryEmit(listOf(createMockSend(number = sendId)))
assertEquals(DataState.Loaded<SendView?>(sendView), awaitItem()) assertEquals(DataState.Loaded<SendView?>(sendView), awaitItem())
@ -4596,6 +4607,14 @@ class VaultRepositoryTest {
*/ */
private fun setVaultToUnlocked(userId: String) { private fun setVaultToUnlocked(userId: String) {
mutableUnlockedUserIdsStateFlow.update { it + userId } mutableUnlockedUserIdsStateFlow.update { it + userId }
mutableVaultStateFlow.tryEmit(
listOf(
VaultUnlockData(
userId,
VaultUnlockData.Status.UNLOCKED,
),
),
)
} }
/** /**