-
Notifications
You must be signed in to change notification settings - Fork 2
[MS-1052] CoDownSync Phase 2 - Handle diffs with CommCare #1294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,23 +13,40 @@ import com.simprints.infra.config.store.LastCallingPackageStore | |
| import com.simprints.infra.events.event.cosync.CoSyncEnrolmentRecordCreationEventDeserializer | ||
| import com.simprints.infra.events.event.cosync.CoSyncEnrolmentRecordEvents | ||
| import com.simprints.infra.events.event.domain.models.subject.EnrolmentRecordCreationEvent | ||
| import com.simprints.infra.events.event.domain.models.subject.EnrolmentRecordDeletionEvent | ||
| import com.simprints.infra.events.event.domain.models.subject.EnrolmentRecordEvent | ||
| import com.simprints.infra.eventsync.event.commcare.cache.CommCareSyncCache | ||
| import com.simprints.infra.eventsync.event.commcare.cache.SyncedCaseEntity | ||
| import com.simprints.infra.eventsync.status.down.domain.CommCareEventSyncResult | ||
| import com.simprints.infra.logging.LoggingConstants.CrashReportTag.COMMCARE_SYNC | ||
| import com.simprints.infra.logging.Simber | ||
| import com.simprints.libsimprints.Constants.SIMPRINTS_COSYNC_SUBJECT_ACTIONS | ||
| import dagger.hilt.android.qualifiers.ApplicationContext | ||
| import kotlinx.coroutines.flow.Flow | ||
| import kotlinx.coroutines.flow.flow | ||
| import java.text.SimpleDateFormat | ||
| import java.util.Locale | ||
| import java.util.concurrent.CopyOnWriteArrayList | ||
| import javax.inject.Inject | ||
|
|
||
| internal class CommCareEventDataSource @Inject constructor( | ||
| private val jsonHelper: JsonHelper, | ||
| private val commCareSyncCache: CommCareSyncCache, | ||
| private val lastCallingPackageStore: LastCallingPackageStore, | ||
| @ApplicationContext private val context: Context, | ||
| ) { | ||
|
|
||
| private val pendingSyncedCases = CopyOnWriteArrayList<SyncedCaseEntity>() | ||
|
|
||
| // Pre-created date formatters to avoid repeated instantiation during sync | ||
| private val commCareDateFormats = listOf( | ||
| SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy", Locale.US), // Standard Date.toString() format | ||
| SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy", Locale.US) // Numeric timezone fallback | ||
| ) | ||
| fun getEvents(): CommCareEventSyncResult { | ||
| pendingSyncedCases.clear() // Clear any leftover state from previous syncs | ||
| val totalCount = count() | ||
| val eventFlow = loadEnrolmentRecordCreationEvents() | ||
| val eventFlow = loadDataFromCommCare() | ||
|
|
||
| return CommCareEventSyncResult( | ||
| totalCount = totalCount, | ||
|
|
@@ -45,58 +62,122 @@ internal class CommCareEventDataSource @Inject constructor( | |
| return count | ||
| } | ||
|
|
||
| private fun loadEnrolmentRecordCreationEvents(): Flow<EnrolmentRecordCreationEvent> = flow { | ||
| private fun loadDataFromCommCare(): Flow<EnrolmentRecordEvent> = flow { | ||
| try { | ||
| // First collect all case IDs in a list | ||
| Simber.i("Start listing caseIds", tag = COMMCARE_SYNC) | ||
| val caseIds = mutableListOf<String>() | ||
| Simber.i("Start listing caseIds for CommCare sync", tag = COMMCARE_SYNC) | ||
|
|
||
| val casesToParse = mutableListOf<SyncedCaseEntity>() | ||
| val caseIdsPresentInCommCare = mutableSetOf<String>() | ||
| // Fetch all previously synced cases with their details (including lastSyncedTimestamp) | ||
| val previouslySyncedCasesMap = commCareSyncCache.getAllSyncedCases() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that you only need the caseId and lastModified values, so it could be more memory efficient to use `Map<String, Long> instead. You can filter by key later when removing cases.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but then you need to pass the full |
||
| .associateBy { it.caseId } | ||
|
|
||
| context.contentResolver | ||
| .query(getCaseMetadataUri(), arrayOf(COLUMN_CASE_ID), null, null, null) | ||
| .query(getCaseMetadataUri(), arrayOf(COLUMN_CASE_ID, COLUMN_LAST_MODIFIED), null, null, null) | ||
| ?.use { cursor -> | ||
| while (cursor.moveToNext()) { | ||
| cursor.getString(cursor.getColumnIndexOrThrow(COLUMN_CASE_ID))?.let { caseId -> | ||
| caseIds.add(caseId) | ||
| val caseId = cursor.getString(cursor.getColumnIndexOrThrow(COLUMN_CASE_ID)) | ||
| if (caseId.isNullOrEmpty()) { | ||
| continue // Skip empty case IDs | ||
| } | ||
| caseIdsPresentInCommCare.add(caseId) | ||
|
|
||
| val commCareLastModifiedString = cursor.getString(cursor.getColumnIndexOrThrow(COLUMN_LAST_MODIFIED)) | ||
| val commCareLastModifiedTime = parseCommCareDateToMillis(commCareLastModifiedString) | ||
|
|
||
| val cachedCase = previouslySyncedCasesMap[caseId] | ||
| if (cachedCase != null) { | ||
| // Case was synced before, check its specific lastSyncedTimestamp | ||
| if (commCareLastModifiedTime > 0L && commCareLastModifiedTime <= cachedCase.lastSyncedTimestamp) { | ||
| Simber.d( | ||
| "Skipping caseId $caseId: CommCare lastModified ($commCareLastModifiedTime) is not newer than lastSyncedTimestamp (${cachedCase.lastSyncedTimestamp})", | ||
| tag = COMMCARE_SYNC | ||
| ) | ||
| continue // Skip cases not modified since last sync | ||
| } | ||
| } | ||
|
|
||
| casesToParse.add(SyncedCaseEntity(caseId, "", commCareLastModifiedTime)) | ||
| } | ||
| } | ||
| Simber.i("Finished listing caseIds", tag = COMMCARE_SYNC) | ||
| Simber.i("Finished listing caseIds. ${casesToParse.size} cases to parse.", tag = COMMCARE_SYNC) | ||
|
|
||
| // Process case IDs in batches to avoid large pauses | ||
| val batchSize = BATCH_SIZE // Adjust based on performance testing | ||
| caseIds.chunked(batchSize).forEach { batch -> | ||
| batch.forEach { caseId -> | ||
| loadEnrolmentRecordCreationEvents(caseId).collect { emit(it) } | ||
| casesToParse.chunked(batchSize).forEach { batch -> | ||
| batch.forEach { case -> | ||
| loadEnrolmentRecordCreationEvents(case).collect { emit(it) } | ||
| } | ||
| } | ||
|
|
||
| // If no cases were found in CommCare, it's most likely that CommCare is logged out. | ||
| if (caseIdsPresentInCommCare.isNotEmpty()) { | ||
| val casesToRemove = previouslySyncedCasesMap.values.filterNot { (it.caseId in caseIdsPresentInCommCare) } | ||
|
BurningAXE marked this conversation as resolved.
|
||
| Simber.i("Generating deletion events for ${casesToRemove.size} cases no longer in CommCare.", tag = COMMCARE_SYNC) | ||
| casesToRemove.forEach { case -> | ||
| generateEnrolmentRecordDeletionEvent(case).collect { emit(it) } | ||
| } | ||
| } | ||
| } catch (e: Exception) { | ||
| Simber.e("Error while querying CommCare", e) | ||
| throw e | ||
| Simber.e("Error during CommCare data loading", e, tag = COMMCARE_SYNC) | ||
| throw e // Rethrow to let the sync worker handle the failure | ||
| } | ||
| } | ||
|
|
||
| /* Generates deletion events for enrolment records that were previously synced but are no longer present in CommCare. | ||
| * This is called when a case is not found in the latest sync. | ||
| */ | ||
| private fun generateEnrolmentRecordDeletionEvent(case: SyncedCaseEntity): Flow<EnrolmentRecordDeletionEvent> = flow { | ||
| if (case.simprintsId.isEmpty()) { | ||
| Simber.d("Skipping deletion event for caseId ${case.caseId} with empty simprintsId", tag = COMMCARE_SYNC) | ||
| // Directly remove the case from the cache if it has no simprintsId | ||
| commCareSyncCache.removeSyncedCase(case.caseId) | ||
| return@flow | ||
| } | ||
|
|
||
| Simber.d("Generating deletion event for caseId ${case.caseId} with simprintsId ${case.simprintsId}", tag = COMMCARE_SYNC) | ||
| pendingSyncedCases.add(case) | ||
|
BurningAXE marked this conversation as resolved.
|
||
| emit(EnrolmentRecordDeletionEvent( | ||
| subjectId = case.simprintsId, | ||
| projectId = "", // Only subjectId is required for deletion events | ||
| moduleId = "", | ||
| attendantId = "", | ||
| )) | ||
| } | ||
|
|
||
| private fun loadEnrolmentRecordCreationEvents(caseId: String): Flow<EnrolmentRecordCreationEvent> = flow { | ||
| private fun loadEnrolmentRecordCreationEvents(case: SyncedCaseEntity): Flow<EnrolmentRecordCreationEvent> = flow { | ||
| // Access Case Data Listing for the caseId | ||
| val caseDataUri = getCaseDataUri().buildUpon().appendPath(caseId).build() | ||
| val caseDataUri = getCaseDataUri().buildUpon().appendPath(case.caseId).build() | ||
|
|
||
| val cursor = context.contentResolver | ||
| .query(caseDataUri, null, null, null, null) | ||
| Simber.d("Cursor for caseId $caseId: $cursor", tag = COMMCARE_SYNC) | ||
| if (cursor != null) { | ||
| cursor.use { caseDataCursor -> | ||
| val subjectActions = getSubjectActionsValue(caseDataCursor) | ||
| Simber.d(subjectActions) | ||
| val coSyncEnrolmentRecordEvents = parseRecordEvents(subjectActions) | ||
|
|
||
| if (coSyncEnrolmentRecordEvents == null) { | ||
| Simber.d("No valid enrolment records found for caseId ${case.caseId}.", tag = COMMCARE_SYNC) | ||
| // Add the case to the cache with an empty simprintsId so that we don't try to sync it again until updated | ||
| commCareSyncCache.addSyncedCase(case) | ||
| Simber.d("Added case ${case.caseId} with empty simprintsId to CommCareSyncCache", tag = COMMCARE_SYNC) | ||
| return@flow | ||
| } | ||
|
|
||
| coSyncEnrolmentRecordEvents | ||
| ?.events | ||
| ?.filterIsInstance<EnrolmentRecordCreationEvent>() | ||
| ?.forEach { emit(it) } | ||
| .events | ||
| .filterIsInstance<EnrolmentRecordCreationEvent>() | ||
| .forEach { event -> | ||
| pendingSyncedCases.add(case.copy(simprintsId = event.payload.subjectId)) | ||
| emit(event) | ||
| } | ||
| } | ||
| } else { | ||
| // If listing returned the caseId but the cursor is null, most likely CommCare | ||
| // logged out in the middle of sync. Throw an exception to retry the worker | ||
| // instead of thinking sync is complete (and possibly deleting unsynced subjects). | ||
| throw IllegalStateException("Cursor for caseId $caseId is null") | ||
| throw IllegalStateException("Cursor for caseId ${case.caseId} is null") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -130,6 +211,57 @@ internal class CommCareEventDataSource @Inject constructor( | |
|
|
||
| private fun getCaseDataUri() = "content://${lastCallingPackageStore.lastCallingPackageName}.case/casedb/data".toUri() | ||
|
|
||
| private fun parseCommCareDateToMillis(dateString: String): Long { | ||
| for (format in commCareDateFormats) { | ||
| try { | ||
| return format.parse(dateString)?.time ?: 0L | ||
| } catch (e: Exception) { | ||
| Simber.e("Error parsing date: $dateString", e, tag = COMMCARE_SYNC) | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| Simber.w("All date parsing attempts failed for: $dateString", tag = COMMCARE_SYNC) | ||
| return 0L | ||
| } | ||
|
|
||
| /** | ||
| * This function is called after all events have been processed. | ||
| * It updates the CommCareSyncCache with the latest case IDs and their corresponding Simprints IDs. | ||
| */ | ||
| suspend fun onEventsProcessed(events: List<EnrolmentRecordEvent>) { | ||
| val creationSubjectIds = mutableSetOf<String>() | ||
| val deletionSubjectIds = mutableSetOf<String>() | ||
|
|
||
| events.forEach { event -> | ||
| when (event) { | ||
| is EnrolmentRecordCreationEvent -> creationSubjectIds.add(event.payload.subjectId) | ||
| is EnrolmentRecordDeletionEvent -> deletionSubjectIds.add(event.payload.subjectId) | ||
| else -> { /* Ignore other event types */ } | ||
| } | ||
| } | ||
|
|
||
| val pendingCasesToRemove = mutableListOf<SyncedCaseEntity>() | ||
|
|
||
| pendingSyncedCases.forEach { case -> | ||
| when (case.simprintsId) { | ||
| in creationSubjectIds -> { | ||
| commCareSyncCache.addSyncedCase(case) | ||
| Simber.d("Added case ${case.caseId} with simprintsId ${case.simprintsId} to CommCareSyncCache", tag = COMMCARE_SYNC) | ||
| pendingCasesToRemove.add(case) | ||
| } | ||
| in deletionSubjectIds -> { | ||
| commCareSyncCache.removeSyncedCase(case.caseId) | ||
| Simber.d("Removed case ${case.caseId} with simprintsId ${case.simprintsId} from CommCareSyncCache", tag = COMMCARE_SYNC) | ||
| pendingCasesToRemove.add(case) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Remove processed cases from pendingSyncedCases | ||
| pendingSyncedCases.removeAll(pendingCasesToRemove) | ||
| } | ||
|
|
||
| private val coSyncSerializationModule = SimpleModule().apply { | ||
| addSerializer( | ||
| TokenizableString::class.java, | ||
|
|
@@ -147,8 +279,9 @@ internal class CommCareEventDataSource @Inject constructor( | |
|
|
||
| companion object { | ||
| internal const val COLUMN_CASE_ID = "case_id" | ||
| internal const val COLUMN_LAST_MODIFIED = "last_modified" | ||
| internal const val COLUMN_DATUM_ID = "datum_id" | ||
| internal const val COLUMN_VALUE = "value" | ||
| private const val BATCH_SIZE = 20 | ||
| internal const val BATCH_SIZE = 20 | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| package com.simprints.infra.eventsync.event.commcare.cache | ||
|
|
||
| import com.simprints.infra.logging.Simber | ||
| import javax.inject.Inject | ||
| import javax.inject.Singleton | ||
|
|
||
| @Singleton | ||
| class CommCareSyncCache @Inject constructor( | ||
| private val commCareSyncDao: CommCareSyncDao, | ||
| ) { | ||
|
|
||
| suspend fun addSyncedCase(case: SyncedCaseEntity) = commCareSyncDao.insert(case).also { | ||
| Simber.d("Added/Updated case: ${case.caseId} -> ${case.simprintsId} with timestamp ${case.lastSyncedTimestamp} in CommCareSyncCache (DB)") | ||
| } | ||
|
|
||
| suspend fun getSimprintsId(caseId: String): String? = commCareSyncDao.getByCaseId(caseId).also { entity -> | ||
| Simber.d("Retrieved simprintsId for case: $caseId -> ${entity?.simprintsId} from CommCareSyncCache (DB)") | ||
| }?.simprintsId | ||
|
|
||
| suspend fun removeSyncedCase(caseId: String) = commCareSyncDao.deleteByCaseId(caseId).also { | ||
| Simber.d("Removed case: $caseId from CommCareSyncCache (DB)") | ||
| } | ||
|
|
||
| suspend fun getAllSyncedCases(): List<SyncedCaseEntity> = commCareSyncDao.getAll().also { allEntries -> | ||
| Simber.d("Retrieved all ${allEntries.size} case entities from CommCareSyncCache (DB)") | ||
| } | ||
|
|
||
| suspend fun clearAllSyncedCases() = commCareSyncDao.clearAll().also { | ||
| Simber.d("Cleared all cases from CommCareSyncCache (DB)") | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package com.simprints.infra.eventsync.event.commcare.cache | ||
|
|
||
| import androidx.room.Dao | ||
| import androidx.room.Insert | ||
| import androidx.room.OnConflictStrategy | ||
| import androidx.room.Query | ||
|
|
||
| @Dao | ||
| interface CommCareSyncDao { | ||
|
|
||
| @Insert(onConflict = OnConflictStrategy.REPLACE) | ||
| suspend fun insert(syncedCase: SyncedCaseEntity) | ||
|
|
||
| @Query("SELECT * FROM synced_commcare_cases WHERE caseId = :caseId") | ||
| suspend fun getByCaseId(caseId: String): SyncedCaseEntity? | ||
|
|
||
| @Query("SELECT * FROM synced_commcare_cases") | ||
| suspend fun getAll(): List<SyncedCaseEntity> | ||
|
|
||
| @Query("DELETE FROM synced_commcare_cases WHERE caseId = :caseId") | ||
| suspend fun deleteByCaseId(caseId: String) | ||
|
|
||
| @Query("DELETE FROM synced_commcare_cases") | ||
| suspend fun clearAll() | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.