Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ internal class EventUpSyncTask @Inject constructor(
val project = projectWithConfig.project
val config = projectWithConfig.configuration
var lastOperation = operation.copy()
var count = 0
var isUsefulUpload = false

try {
Expand All @@ -95,8 +94,7 @@ internal class EventUpSyncTask @Inject constructor(
isUsefulUpload = it > 0
EventUpSyncRequestEvent.UpSyncContent(sessionCount = it)
},
).collect {
count = it
).collect { count ->
lastOperation = lastOperation.copy(
lastState = RUNNING,
lastSyncTime = timeHelper.now().ms,
Expand All @@ -112,8 +110,7 @@ internal class EventUpSyncTask @Inject constructor(
isUsefulUpload = it > 0
EventUpSyncRequestEvent.UpSyncContent(eventDownSyncCount = it)
},
).collect {
count = it
).collect { count ->
lastOperation = lastOperation.copy(
lastState = RUNNING,
lastSyncTime = timeHelper.now().ms,
Expand All @@ -131,8 +128,7 @@ internal class EventUpSyncTask @Inject constructor(
eventUpSyncCount = if (isUsefulUpload) it else 0,
)
},
).collect {
count = it
).collect { count ->
lastOperation = lastOperation.copy(
lastState = RUNNING,
lastSyncTime = timeHelper.now().ms,
Expand All @@ -144,7 +140,7 @@ internal class EventUpSyncTask @Inject constructor(
lastSyncTime = timeHelper.now().ms,
)

emitProgress(lastOperation, count)
emitProgress(lastOperation, 0)
} catch (t: Throwable) {
if (t is RemoteDbNotSignedInException) {
throw t
Expand All @@ -156,7 +152,7 @@ internal class EventUpSyncTask @Inject constructor(
lastSyncTime = timeHelper.now().ms,
)

emitProgress(lastOperation, count)
emitProgress(lastOperation, 0)
}
}

Expand Down Expand Up @@ -331,7 +327,7 @@ internal class EventUpSyncTask @Inject constructor(
else -> emptyList()
}

private suspend fun attemptInvalidEventUpload(
private fun attemptInvalidEventUpload(
projectId: String,
corruptedScopes: Set<EventScope>,
) = flow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ internal class EventUpSyncUploaderWorker @AssistedInject constructor(
val workerId = this@EventUpSyncUploaderWorker.id.toString()
var count = eventSyncCache.readProgress(workerId)
val max = eventRepository
.observeEventCount(null)
.observeEventCountInClosedScopes()
.firstOrNull() ?: 0

upSyncTask.upSync(upSyncScope.operation, getEventScope()).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal class EventUpSyncUploaderWorkerTest {

coEvery { eventSyncCache.readProgress(any()) } returns 0
every { authStore.signedInProjectId } returns PROJECT_ID
coEvery { eventRepository.observeEventCount(any()) } returns flowOf(12)
coEvery { eventRepository.observeEventCountInClosedScopes() } returns flowOf(12)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ interface EventRepository {

suspend fun observeEventCount(type: EventType?): Flow<Int>

suspend fun observeEventCountInClosedScopes(): Flow<Int>

suspend fun addOrUpdateEvent(
scope: EventScope,
event: Event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ internal open class EventRepositoryImpl @Inject constructor(
eventLocalDataSource.observeEventCount()
}

override suspend fun observeEventCountInClosedScopes(): Flow<Int> = eventLocalDataSource.observeEventCountInClosedScopes()

override suspend fun addOrUpdateEvent(
scope: EventScope,
event: Event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ internal open class EventLocalDataSource @Inject constructor(
eventDao.observeCountFromType(type = type)
}

suspend fun observeEventCountInClosedScopes(): Flow<Int> = useRoomFlow(readingDispatcher) {
eventDao.observeCountInClosedScopes()
}

suspend fun loadAllEvents(): Flow<Event> = useRoom(readingDispatcher) {
eventDao.loadAll().map { it.fromDbToDomain() }.asFlow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ internal interface EventRoomDao {
@Query("select count(*) from DbEvent where type = :type")
fun observeCountFromType(type: EventType): Flow<Int>

@Query(
"""
select count(*) from DbEvent
left join DbEventScope on DbEvent.scopeId = DbEventScope.id
where DbEventScope.end_unixMs is not null
""",
)
fun observeCountInClosedScopes(): Flow<Int>

@Query("delete from DbEvent where scopeId = :scopeId")
suspend fun deleteAllFromScope(scopeId: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ internal class EventRepositoryImplTest {
coVerify(exactly = 1) { eventLocalDataSource.observeEventCount(any()) }
}

@Test
fun `should delegate observeEventCountInClosedScopes calls`() = runTest {
coEvery { eventLocalDataSource.observeEventCountInClosedScopes() } returns flowOf(7)

assertThat(eventRepo.observeEventCountInClosedScopes().firstOrNull()).isEqualTo(7)

coVerify { eventLocalDataSource.observeEventCountInClosedScopes() }
}

@Test
fun `insert event into event scope should update event fields`() = runTest {
val scope = createSessionScope(GUID1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ internal class EventLocalDataSourceTest {
coVerify { eventDao.observeCount() }
}

@Test
fun observeEventCountInClosedScopes() = runTest {
eventLocalDataSource.observeEventCountInClosedScopes().toList()

coVerify { eventDao.observeCountInClosedScopes() }
}

@Test
fun observeCountWithAProjectIdAndTypeQuery() = runTest {
eventLocalDataSource
Expand Down