Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.simprints.infra.network.exceptions.SyncCloudIntegrationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.produce
import okhttp3.ResponseBody
import retrofit2.Response
import java.io.ByteArrayInputStream
import java.io.InputStream
Expand Down Expand Up @@ -65,19 +64,19 @@ internal class EventRemoteDataSource @Inject constructor(
}

suspend fun getEvents(
requestId: String,
query: ApiRemoteEventQuery,
scope: CoroutineScope,
): EventDownSyncResult {
return try {
val response = takeStreaming(query)
val response = takeStreaming(requestId, query)
val eventCount = getEventCountFromHeader(response)

val streaming = response.body()?.byteStream() ?: ByteArrayInputStream(byteArrayOf())
Simber.tag("SYNC").d("[EVENT_REMOTE_SOURCE] Stream taken")

EventDownSyncResult(
totalCount = eventCount.exactCount,
requestId = getRequestId(response),
status = response.code(),
eventStream = scope.produce(capacity = CHANNEL_CAPACITY_FOR_PROPAGATION) {
parseStreamAndEmitEvents(streaming, this)
Expand Down Expand Up @@ -118,9 +117,10 @@ internal class EventRemoteDataSource @Inject constructor(
}
}

private suspend fun takeStreaming(query: ApiRemoteEventQuery) =
private suspend fun takeStreaming(requestId: String, query: ApiRemoteEventQuery) =
executeCall { eventsRemoteInterface ->
eventsRemoteInterface.downloadEvents(
requestId = requestId,
projectId = query.projectId,
moduleId = query.moduleId,
attendantId = query.userId,
Expand All @@ -131,22 +131,20 @@ internal class EventRemoteDataSource @Inject constructor(
}

suspend fun post(
requestId: String,
projectId: String,
body: ApiUploadEventsBody,
acceptInvalidEvents: Boolean = true,
): EventUpSyncResult {
val response = executeCall { remoteInterface ->
remoteInterface.uploadEvents(projectId, acceptInvalidEvents, body)
remoteInterface.uploadEvents(requestId, projectId, acceptInvalidEvents, body)
}

return EventUpSyncResult(
requestId = getRequestId(response),
status = response.code(),
)
}

fun getRequestId(response: Response<*>) = response.headers()[REQUEST_ID_HEADER].orEmpty()

private suspend fun <T> executeCall(block: suspend (EventRemoteInterface) -> T): T =
getEventsApiClient().executeCall { block(it) }

Expand All @@ -159,7 +157,6 @@ internal class EventRemoteDataSource @Inject constructor(
private const val TOO_MANY_REQUEST_STATUS = 429

private const val COUNT_HEADER = "x-event-count"
internal const val REQUEST_ID_HEADER = "x-request-id"
private const val IS_COUNT_HEADER_LOWER_BOUND = "x-event-count-is-lower-bound"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,28 @@ internal interface EventRemoteInterface : SimRemoteInterface {
@Query("l_attendantId") attendantId: String?,
@Query("l_subjectId") subjectId: String?,
@Query("l_mode") modes: List<ApiModes>,
@Query("lastEventId") lastEventId: String?
@Query("lastEventId") lastEventId: String?,
): Response<Void>

@Headers("Content-Encoding: gzip")
@POST("projects/{projectId}/events")
suspend fun uploadEvents(
@Header("X-Request-ID") requestId: String,
@Path("projectId") projectId: String,
@Query("acceptInvalidEvents") acceptInvalidEvents: Boolean = true,
@Body body: ApiUploadEventsBody
@Body body: ApiUploadEventsBody,
): Response<ResponseBody>

@Streaming
@GET("projects/{projectId}/events")
suspend fun downloadEvents(
@Header("X-Request-ID") requestId: String,
@Path("projectId") projectId: String,
@Query("l_moduleId") moduleId: String?,
@Query("l_attendantId") attendantId: String?,
@Query("l_subjectId") subjectId: String?,
@Query("l_mode") modes: List<ApiModes>,
@Query("lastEventId") lastEventId: String?
@Query("lastEventId") lastEventId: String?,
): Response<ResponseBody>

@Headers("Content-Encoding: gzip")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import kotlinx.coroutines.channels.ReceiveChannel

data class EventDownSyncResult(
val totalCount: Int?,
val requestId: String,
val status: Int,
val eventStream: ReceiveChannel<EnrolmentRecordEvent>
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.simprints.infra.eventsync.status.up.domain

data class EventUpSyncResult(
val requestId: String,
val status: Int,
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flow
import java.util.UUID
import javax.inject.Inject

internal class EventDownSyncTask @Inject constructor(
Expand All @@ -58,11 +59,13 @@ internal class EventDownSyncTask @Inject constructor(
val requestStartTime = timeHelper.now()

var firstEventTimestamp: Timestamp? = null
val requestId = UUID.randomUUID().toString()
var result: EventDownSyncResult? = null
var errorType: String? = null

try {
result = eventRemoteDataSource.getEvents(
requestId,
operation.queryEvent.fromDomainToApi(),
scope
)
Expand Down Expand Up @@ -118,7 +121,7 @@ internal class EventDownSyncTask @Inject constructor(
EventDownSyncRequestEvent(
createdAt = requestStartTime,
endedAt = timeHelper.now(),
requestId = result?.requestId.orEmpty(),
requestId = requestId,
query = operation.queryEvent.let { query ->
EventDownSyncRequestEvent.QueryParameters(
query.moduleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import retrofit2.HttpException
import java.util.UUID
import javax.inject.Inject

internal class EventUpSyncTask @Inject constructor(
Expand Down Expand Up @@ -167,7 +168,8 @@ internal class EventUpSyncTask @Inject constructor(
eventFilter: (Map<EventScope, List<Event>?>) -> Map<EventScope, List<Event>?> = { it },
createUpSyncContentContent: (Int) -> EventUpSyncRequestEvent.UpSyncContent,
) = flow {
Simber.tag(SYNC_LOG_TAG).d("Uploading event scope - $eventScopeTypeToUpload in batches of $batchSize")
Simber.tag(SYNC_LOG_TAG)
.d("Uploading event scope - $eventScopeTypeToUpload in batches of $batchSize")
val sessionScopes = getClosedScopesForType(eventScopeTypeToUpload)

// Re-emitting the number of uploaded corrupted events
Expand All @@ -183,29 +185,35 @@ internal class EventUpSyncTask @Inject constructor(
val uploadedScopes = mutableListOf<String>()

scopesToUpload.chunked(batchSize.coerceAtLeast(1)).forEach { scopes ->
val requestId = UUID.randomUUID().toString()

val requestStartTime = timeHelper.now()
try {
val result = eventRemoteDataSource.post(
requestId,
projectId,
scopes.asApiUploadEventsBody(eventScopeTypeToUpload)
)
addRequestEvent(
requestId = requestId,
eventScope = eventScope,
startTime = requestStartTime,
result = result,
content = createUpSyncContentContent(scopes.size),
)
uploadedScopes.addAll(scopes.map { it.id })
} catch (ex: Exception) {
handleFailedRequest(ex, eventScope, requestStartTime)
handleFailedRequest(requestId, ex, eventScope, requestStartTime)
}
}

Simber.tag(SYNC_LOG_TAG).d("Deleting ${uploadedScopes.size} session scopes")
eventRepository.deleteEventScopes(uploadedScopes)
}

private fun List<ApiEventScope>.asApiUploadEventsBody(eventScopeTypeToUpload: EventScopeType) = when(eventScopeTypeToUpload) {
private fun List<ApiEventScope>.asApiUploadEventsBody(
eventScopeTypeToUpload: EventScopeType,
) = when (eventScopeTypeToUpload) {
EventScopeType.SESSION -> ApiUploadEventsBody(sessions = this)
EventScopeType.DOWN_SYNC -> ApiUploadEventsBody(eventDownSyncs = this)
EventScopeType.UP_SYNC -> ApiUploadEventsBody(eventUpSyncs = this)
Expand Down Expand Up @@ -238,6 +246,7 @@ internal class EventUpSyncTask @Inject constructor(
}

private suspend fun addRequestEvent(
requestId: String,
eventScope: EventScope,
startTime: Timestamp,
result: EventUpSyncResult,
Expand All @@ -249,7 +258,7 @@ internal class EventUpSyncTask @Inject constructor(
EventUpSyncRequestEvent(
createdAt = startTime,
endedAt = timeHelper.now(),
requestId = result.requestId,
requestId = requestId,
content = content,
responseStatus = result.status,
)
Expand All @@ -258,6 +267,7 @@ internal class EventUpSyncTask @Inject constructor(
}

private suspend fun handleFailedRequest(
requestId: String,
ex: Exception,
eventScope: EventScope,
requestStartTime: Timestamp,
Expand All @@ -267,13 +277,9 @@ internal class EventUpSyncTask @Inject constructor(
is NetworkConnectionException -> Simber.i(ex)
is HttpException -> {
Simber.i(ex)
result = ex.response()?.let {
EventUpSyncResult(
eventRemoteDataSource.getRequestId(it),
it.code()
)
}
result = ex.response()?.let { EventUpSyncResult(it.code()) }
}

is RemoteDbNotSignedInException -> throw ex

else -> {
Expand All @@ -287,7 +293,7 @@ internal class EventUpSyncTask @Inject constructor(
EventUpSyncRequestEvent(
createdAt = requestStartTime,
endedAt = timeHelper.now(),
requestId = result?.requestId.orEmpty(),
requestId = requestId,
responseStatus = result?.status,
errorType = ex.toString(),
)
Expand Down
Loading