Add helper method for observing a data flow when logged in and someone is subscribed (#392)

This commit is contained in:
David Perez 2023-12-14 14:08:57 -06:00 committed by Álison Fernandes
parent 65eb6ab5f8
commit a8005c15f1
5 changed files with 126 additions and 50 deletions

View file

@ -0,0 +1,33 @@
package com.x8bit.bitwarden.data.platform.repository.util
import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
/**
* Invokes the [observer] callback whenever the user is logged in, the active changes, and there
* are subscribers to the [MutableStateFlow]. The flow from all previous calls to the `observer`
* is canceled whenever the `observer` is re-invoked, there is no active user (logged-out), or
* there are no subscribers to the [MutableStateFlow].
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun <T, R> MutableStateFlow<T>.observeWhenSubscribedAndLoggedIn(
userStateFlow: Flow<UserStateJson?>,
observer: (activeUserId: String) -> Flow<R>,
): Flow<R> =
combine(
this.subscriptionCount.map { it > 0 }.distinctUntilChanged(),
userStateFlow.map { it?.activeUserId }.distinctUntilChanged(),
) { isSubscribed, activeUserId ->
activeUserId.takeIf { isSubscribed }
}
.flatMapLatest { activeUserId ->
activeUserId?.let(observer) ?: flow { awaitCancellation() }
}

View file

@ -4,7 +4,9 @@ import com.bitwarden.core.PassphraseGeneratorRequest
import com.bitwarden.core.PasswordGeneratorRequest
import com.bitwarden.core.PasswordHistoryView
import com.x8bit.bitwarden.data.auth.datasource.disk.AuthDiskSource
import com.x8bit.bitwarden.data.platform.manager.dispatcher.DispatcherManager
import com.x8bit.bitwarden.data.platform.repository.model.LocalDataState
import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndLoggedIn
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.GeneratorDiskSource
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.PasswordHistoryDiskSource
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.entity.toPasswordHistory
@ -15,89 +17,60 @@ import com.x8bit.bitwarden.data.tools.generator.repository.model.GeneratedPasswo
import com.x8bit.bitwarden.data.tools.generator.repository.model.PasscodeGenerationOptions
import com.x8bit.bitwarden.data.vault.datasource.sdk.VaultSdkSource
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import javax.inject.Singleton
/**
* Default implementation of [GeneratorRepository].
*/
@OptIn(ExperimentalCoroutinesApi::class)
@Singleton
class GeneratorRepositoryImpl constructor(
class GeneratorRepositoryImpl(
private val generatorSdkSource: GeneratorSdkSource,
private val generatorDiskSource: GeneratorDiskSource,
private val authDiskSource: AuthDiskSource,
private val vaultSdkSource: VaultSdkSource,
private val passwordHistoryDiskSource: PasswordHistoryDiskSource,
dispatcherManager: DispatcherManager,
) : GeneratorRepository {
private val scope = CoroutineScope(Dispatchers.IO)
private val scope = CoroutineScope(dispatcherManager.io)
private val mutablePasswordHistoryStateFlow =
MutableStateFlow<LocalDataState<List<PasswordHistoryView>>>(LocalDataState.Loading)
override val passwordHistoryStateFlow: StateFlow<LocalDataState<List<PasswordHistoryView>>>
get() = mutablePasswordHistoryStateFlow.asStateFlow()
private var passwordHistoryJob: Job? = null
init {
mutablePasswordHistoryStateFlow
.subscriptionCount
.flatMapLatest { subscriberCount ->
if (subscriberCount > 0) {
authDiskSource
.userStateFlow
.map { it?.activeUserId }
.distinctUntilChanged()
} else {
flow { awaitCancellation() }
}
}
.onEach { activeUserId ->
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId ->
observePasswordHistoryForUser(activeUserId)
}
.launchIn(scope)
}
private fun observePasswordHistoryForUser(userId: String?) {
passwordHistoryJob?.cancel()
userId ?: return
mutablePasswordHistoryStateFlow.value = LocalDataState.Loading
passwordHistoryJob = passwordHistoryDiskSource
private fun observePasswordHistoryForUser(
userId: String,
): Flow<Result<List<PasswordHistoryView>>> =
passwordHistoryDiskSource
.getPasswordHistoriesForUser(userId)
.onStart { mutablePasswordHistoryStateFlow.value = LocalDataState.Loading }
.map { encryptedPasswordHistoryList ->
val passwordHistories =
encryptedPasswordHistoryList.map { it.toPasswordHistory() }
vaultSdkSource
.decryptPasswordHistoryList(passwordHistories)
val passwordHistories = encryptedPasswordHistoryList.map { it.toPasswordHistory() }
vaultSdkSource.decryptPasswordHistoryList(passwordHistories)
}
.onEach { encryptedPasswordHistoryListResult ->
encryptedPasswordHistoryListResult
.fold(
onSuccess = {
mutablePasswordHistoryStateFlow.value = LocalDataState.Loaded(it)
},
onFailure = {
mutablePasswordHistoryStateFlow.value = LocalDataState.Error(it)
},
)
mutablePasswordHistoryStateFlow.value = encryptedPasswordHistoryListResult.fold(
onSuccess = { LocalDataState.Loaded(it) },
onFailure = { LocalDataState.Error(it) },
)
}
.launchIn(scope)
}
override suspend fun generatePassword(
passwordGeneratorRequest: PasswordGeneratorRequest,

View file

@ -1,6 +1,7 @@
package com.x8bit.bitwarden.data.tools.generator.repository.di
import com.x8bit.bitwarden.data.auth.datasource.disk.AuthDiskSource
import com.x8bit.bitwarden.data.platform.manager.dispatcher.DispatcherManager
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.GeneratorDiskSource
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.PasswordHistoryDiskSource
import com.x8bit.bitwarden.data.tools.generator.datasource.sdk.GeneratorSdkSource
@ -20,6 +21,7 @@ import javax.inject.Singleton
@InstallIn(SingletonComponent::class)
object GeneratorRepositoryModule {
@Suppress("LongParameterList")
@Provides
@Singleton
fun provideGeneratorRepository(
@ -28,11 +30,13 @@ object GeneratorRepositoryModule {
authDiskSource: AuthDiskSource,
vaultSdkSource: VaultSdkSource,
passwordHistoryDiskSource: PasswordHistoryDiskSource,
dispatcherManager: DispatcherManager,
): GeneratorRepository = GeneratorRepositoryImpl(
generatorSdkSource = generatorSdkSource,
generatorDiskSource = generatorDiskSource,
authDiskSource = authDiskSource,
vaultSdkSource = vaultSdkSource,
passwordHistoryDiskSource = passwordHistoryDiskSource,
dispatcherManager = dispatcherManager,
)
}

View file

@ -0,0 +1,58 @@
package com.x8bit.bitwarden.data.platform.repository.util
import app.cash.turbine.test
import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class StateFlowExtensionsTest {
@Suppress("MaxLineLength")
@Test
fun `observeWhenSubscribedAndLoggedIn should observe the given flow depending on the state of the source and user flow`() =
runTest {
val userStateFlow = MutableStateFlow<UserStateJson?>(null)
val observerStateFlow = MutableStateFlow("")
val sourceMutableStateFlow = MutableStateFlow(Unit)
assertEquals(0, observerStateFlow.subscriptionCount.value)
sourceMutableStateFlow
.observeWhenSubscribedAndLoggedIn(
userStateFlow = userStateFlow,
observer = { observerStateFlow },
)
.launchIn(backgroundScope)
observerStateFlow.subscriptionCount.test {
// No subscriber to start
assertEquals(0, awaitItem())
userStateFlow.value = mockk<UserStateJson> {
every { activeUserId } returns "user_id_1234"
}
// Still none, since no one has subscribed to the testMutableStateFlow
expectNoEvents()
val job = sourceMutableStateFlow.launchIn(backgroundScope)
// Now we subscribe to the observer flow since have a active user and a listener
assertEquals(1, awaitItem())
userStateFlow.value = mockk<UserStateJson> {
every { activeUserId } returns "user_id_4321"
}
// The user changed, so we clear the previous observer but then resubscribe
// with the new user ID
assertEquals(0, awaitItem())
assertEquals(1, awaitItem())
job.cancel()
// Job is canceled, we should have no more subscribers
assertEquals(0, awaitItem())
}
}
}

View file

@ -14,10 +14,11 @@ import com.x8bit.bitwarden.data.auth.datasource.network.model.KdfTypeJson
import com.x8bit.bitwarden.data.auth.datasource.network.model.KeyConnectorUserDecryptionOptionsJson
import com.x8bit.bitwarden.data.auth.datasource.network.model.TrustedDeviceUserDecryptionOptionsJson
import com.x8bit.bitwarden.data.auth.datasource.network.model.UserDecryptionOptionsJson
import com.x8bit.bitwarden.data.platform.base.FakeDispatcherManager
import com.x8bit.bitwarden.data.platform.repository.model.LocalDataState
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.entity.PasswordHistoryEntity
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.GeneratorDiskSource
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.PasswordHistoryDiskSource
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.entity.PasswordHistoryEntity
import com.x8bit.bitwarden.data.tools.generator.datasource.disk.entity.toPasswordHistoryEntity
import com.x8bit.bitwarden.data.tools.generator.datasource.sdk.GeneratorSdkSource
import com.x8bit.bitwarden.data.tools.generator.repository.model.GeneratedPassphraseResult
@ -27,9 +28,11 @@ import com.x8bit.bitwarden.data.vault.datasource.sdk.VaultSdkSource
import io.mockk.clearMocks
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.runs
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.assertEquals
@ -41,11 +44,16 @@ import java.time.Instant
class GeneratorRepositoryTest {
private val mutableUserStateFlow = MutableStateFlow<UserStateJson?>(null)
private val generatorSdkSource: GeneratorSdkSource = mockk()
private val generatorDiskSource: GeneratorDiskSource = mockk()
private val authDiskSource: AuthDiskSource = mockk()
private val authDiskSource: AuthDiskSource = mockk {
every { userStateFlow } returns mutableUserStateFlow
}
private val passwordHistoryDiskSource: PasswordHistoryDiskSource = mockk()
private val vaultSdkSource: VaultSdkSource = mockk()
private val dispatcherManager = FakeDispatcherManager()
private val repository = GeneratorRepositoryImpl(
generatorSdkSource = generatorSdkSource,
@ -53,6 +61,7 @@ class GeneratorRepositoryTest {
authDiskSource = authDiskSource,
passwordHistoryDiskSource = passwordHistoryDiskSource,
vaultSdkSource = vaultSdkSource,
dispatcherManager = dispatcherManager,
)
@BeforeEach
@ -290,8 +299,6 @@ class GeneratorRepositoryTest {
),
)
coEvery { authDiskSource.userStateFlow } returns flowOf(USER_STATE)
coEvery {
passwordHistoryDiskSource.getPasswordHistoriesForUser(USER_STATE.activeUserId)
} returns flowOf(encryptedPasswordHistoryEntities)
@ -304,6 +311,7 @@ class GeneratorRepositoryTest {
historyFlow.test {
assertEquals(LocalDataState.Loading, awaitItem())
mutableUserStateFlow.value = USER_STATE
assertEquals(LocalDataState.Loaded(decryptedPasswordHistoryList), awaitItem())
cancelAndIgnoreRemainingEvents()
}