diff --git a/okhttp/README.md b/okhttp/README.md index c6a9c1a..78f5935 100644 --- a/okhttp/README.md +++ b/okhttp/README.md @@ -11,13 +11,15 @@ dependencies { } ``` -- [OkHttpClient.call()](src/main/java/com/linecorp/lich/okhttp/Call.kt) - A suspending -function to send an HTTP request and receive its response. +- [OkHttpClient.call()](src/main/java/com/linecorp/lich/okhttp/Call.kt) - A suspending function to +send an HTTP request and receive its response. +- [Response.saveBodyToFileAtomically()](src/main/java/com/linecorp/lich/okhttp/AtomicDownload.kt) - +Saves an HTTP response body to a file *atomically*. - [OkHttpClient.callWithCounting()](src/main/java/com/linecorp/lich/okhttp/CallWithCounting.kt) - Creates a `Flow` that executes an HTTP call with counting the number of bytes transferred in its request and response body. - [Response.saveToResourceWithSupportingResumption()](src/main/java/com/linecorp/lich/okhttp/ResumableDownload.kt) - -Performs a *resumable download* using the HTTP semantics defined in [RFC 7233](https://tools.ietf.org/html/rfc7233). +Performs a *resumable download* using the HTTP semantics defined in [RFC 9110, Section 14](https://www.rfc-editor.org/rfc/rfc9110.html#section-14). ## Simple HTTP call @@ -55,11 +57,36 @@ NOTE: You *don't* need to use `withContext(Dispatchers.IO) { ... }` in the above The `response` handler of the `call` function is always executed on OkHttp's background threads, and the caller thread is never blocked. -## File upload +## Download to file atomically -This is a sample code that sends the content of `fileToUpload` as an HTTP POST method. +This is a sample code that downloads the content of `url` using an HTTP GET method, and saves it to +`fileToSave`. This download is performed *atomically*. That is, `fileToSave` is updated if and only +if the entire response body has been successfully downloaded. ```kotlin -suspend fun performUpload(url: HttpUrl, fileToUpload: File) { +suspend fun performDownloadAtomically(url: HttpUrl, fileToSave: File): Boolean { + val request = Request.Builder().url(url).build() + return try { + okHttpClient.call(request) { response -> + if (response.code != StatusCode.OK) { + throw ResponseStatusException(response.code) + } + response.saveBodyToFileAtomically(fileToSave) + } + // At this point, `fileToSave` contains the complete response body downloaded. + true + } catch (e: IOException) { + println("Failed to download: $e") + // At this point, `fileToSave` is not modified at all. + false + } +} +``` + +## Upload with progress monitoring + +This is a sample code that uploads the content of `fileToUpload` with monitoring its progress. +```kotlin +suspend fun performUploadWithProgress(url: HttpUrl, fileToUpload: File) { val request = Request.Builder() .url(url) .post(fileToUpload.asRequestBody("application/octet-stream".toMediaType())) @@ -83,19 +110,17 @@ suspend fun performUpload(url: HttpUrl, fileToUpload: File) { } ``` -## File download +## Download with progress monitoring -This is a sample code that downloads the content of `url` using an HTTP GET method, and saves it to `fileToSave`. +This is a sample code that downloads the content of `url` to `fileToSave` with monitoring its progress. ```kotlin -suspend fun performDownload(url: HttpUrl, fileToSave: File) { +suspend fun performDownloadWithProgress(url: HttpUrl, fileToSave: File) { val request = Request.Builder().url(url).build() - okHttpClient.callWithCounting(request) { response -> + okHttpClient.callWithCounting(request) { response -> if (response.code != StatusCode.OK) { throw ResponseStatusException(response.code) } - fileToSave.sink().use { - checkNotNull(response.body).source().readAll(it) - } + response.saveBodyToFileAtomically(fileToSave) }.collect { state -> when (state) { is Uploading -> Unit @@ -113,9 +138,11 @@ suspend fun performDownload(url: HttpUrl, fileToSave: File) { ## Resumable download -This is a sample code that performs a resumable download using Range requests defined in RFC 7233. +This is similar to the example above, but if it fails in the middle of the download, `fileToSave` +will still contain what has been downloaded up to that point. Then, when it is run again, the +download will resume from the continuation. ```kotlin -suspend fun performResumableDownload(url: HttpUrl, fileToSave: File) { +suspend fun performResumableDownloadWithProgress(url: HttpUrl, fileToSave: File) { val resourceToSave = fileToSave.asWritableResource() val request = Request.Builder() .url(url) diff --git a/okhttp/src/main/java/com/linecorp/lich/okhttp/AtomicDownload.kt b/okhttp/src/main/java/com/linecorp/lich/okhttp/AtomicDownload.kt new file mode 100644 index 0000000..5709786 --- /dev/null +++ b/okhttp/src/main/java/com/linecorp/lich/okhttp/AtomicDownload.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2022 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linecorp.lich.okhttp + +import java.io.File +import java.io.FileOutputStream +import java.io.IOException +import okhttp3.Response +import okio.sink + +/** + * Saves the response body to [fileToSave] **atomically**. + * + * [fileToSave] is updated if and only if the entire response body has been successfully downloaded. + * Otherwise, [fileToSave] will not be modified at all. + * + * This is a sample code that downloads a content of the given URL to a file. + * ``` + * suspend fun performDownloadAtomically(url: HttpUrl, fileToSave: File): Boolean { + * val request = Request.Builder().url(url).build() + * return try { + * okHttpClient.call(request) { response -> + * if (response.code != StatusCode.OK) { + * throw ResponseStatusException(response.code) + * } + * response.saveBodyToFileAtomically(fileToSave) + * } + * // At this point, `fileToSave` contains the complete response body downloaded. + * true + * } catch (e: IOException) { + * println("Failed to download: $e") + * // At this point, `fileToSave` is not modified at all. + * false + * } + * } + * ``` + * + * Implementation note: This function is inspired by `android.util.AtomicFile`, but does not depend + * on any Android API. Technically, this atomicity is implemented by writing the response body to + * a temporary file and then renaming it. The name of this temporary file is given by appending + * [suffixForTmpFile] to the path of [fileToSave]. + * + * @param fileToSave the file to save the downloaded content. + * @param suffixForTmpFile the suffix to be appended to the temporary file. + * @throws IOException if any I/O error occurs. In this case, [fileToSave] is not modified at all. + */ +fun Response.saveBodyToFileAtomically(fileToSave: File, suffixForTmpFile: String = ".new") { + if (suffixForTmpFile.isEmpty()) { + throw IllegalArgumentException("suffixForTmpFile must not be empty.") + } + + val tmpFile = File(fileToSave.path + suffixForTmpFile) + tmpFile.parentFile?.mkdirs() + val tmpFileOutputStream = FileOutputStream(tmpFile) + try { + tmpFileOutputStream.use { + val responseBody = body ?: throw IOException("This response has no body.") + responseBody.source().readAll(it.sink()) + it.flush() + it.fd.sync() + } + if (!tmpFile.renameTo(fileToSave)) { + throw IOException("Failed to rename the tmpFile to $fileToSave") + } + } catch (e: IOException) { + tmpFile.delete() + throw e + } +} diff --git a/okhttp/src/main/java/com/linecorp/lich/okhttp/CallWithCounting.kt b/okhttp/src/main/java/com/linecorp/lich/okhttp/CallWithCounting.kt index e815c11..d5f0ff5 100644 --- a/okhttp/src/main/java/com/linecorp/lich/okhttp/CallWithCounting.kt +++ b/okhttp/src/main/java/com/linecorp/lich/okhttp/CallWithCounting.kt @@ -47,7 +47,7 @@ import java.io.IOException * * This is a sample code that sends the content of `fileToUpload` as an HTTP POST method. * ``` - * suspend fun performUpload(url: HttpUrl, fileToUpload: File) { + * suspend fun performUploadWithProgress(url: HttpUrl, fileToUpload: File) { * val request = Request.Builder() * .url(url) * .post(fileToUpload.asRequestBody("application/octet-stream".toMediaType())) @@ -74,15 +74,13 @@ import java.io.IOException * This is a sample code that downloads the content of `url` using an HTTP GET method, and saves it * to `fileToSave`. * ``` - * suspend fun performDownload(url: HttpUrl, fileToSave: File) { + * suspend fun performDownloadWithProgress(url: HttpUrl, fileToSave: File) { * val request = Request.Builder().url(url).build() - * okHttpClient.callWithCounting(request) { response -> + * okHttpClient.callWithCounting(request) { response -> * if (response.code != StatusCode.OK) { * throw ResponseStatusException(response.code) * } - * fileToSave.sink().use { - * checkNotNull(response.body).source().readAll(it) - * } + * response.saveBodyToFileAtomically(fileToSave) * }.collect { state -> * when (state) { * is Uploading -> Unit @@ -109,6 +107,7 @@ import java.io.IOException * closed automatically after the function call. This function is called from a background thread of * OkHttp's thread pool. * @return A [Flow] that emits the progress of the HTTP call. + * @see Response.saveBodyToFileAtomically * @see Response.saveToResourceWithSupportingResumption */ fun OkHttpClient.callWithCounting( diff --git a/okhttp/src/main/java/com/linecorp/lich/okhttp/ContentRange.kt b/okhttp/src/main/java/com/linecorp/lich/okhttp/ContentRange.kt index 0e8602d..611c365 100644 --- a/okhttp/src/main/java/com/linecorp/lich/okhttp/ContentRange.kt +++ b/okhttp/src/main/java/com/linecorp/lich/okhttp/ContentRange.kt @@ -18,9 +18,9 @@ package com.linecorp.lich.okhttp import okhttp3.Response /** - * A *valid* value of a `Content-Range` header field. + * A *valid* (not "unsatisfied") value of a `Content-Range` header field. * - * Specification: [RFC 7233, section 4.2](https://tools.ietf.org/html/rfc7233#section-4.2) + * Specification: [RFC 9110, Section 14.4](https://www.rfc-editor.org/rfc/rfc9110.html#section-14.4) * * @property start the offset of the first byte in the content range. * @property endInclusive the offset of the last byte in the content range. @@ -44,14 +44,14 @@ class ContentRange(val start: Long, val endInclusive: Long, val totalLength: Lon * Returns the [ContentRange] of a `206 Partial Content` response transferring a single part. * If the response doesn't have a valid `Content-Range` header field, returns `null`. * - * Specification: [RFC 7233, section 4.1](https://tools.ietf.org/html/rfc7233#section-4.1) + * Specification: [RFC 9110, Section 14.4](https://www.rfc-editor.org/rfc/rfc9110.html#section-14.4) */ fun Response.mayGetSinglePartContentRange(): ContentRange? { // A single-part partial content response must have a Content-Range header field. val headerValue = header("Content-Range") ?: return null // Parse the Content-Range header value. - // cf. https://tools.ietf.org/html/rfc7233#section-4.2 + // cf. https://www.rfc-editor.org/rfc/rfc9110.html#section-14.4 val result = validRangeRegex.matchEntire(headerValue) ?: return null val start = result.groupValues[1].toLongOrNull()?.takeIf { it >= 0 } ?: return null val end = result.groupValues[2].toLongOrNull()?.takeIf { it >= start } ?: return null @@ -67,7 +67,7 @@ class ContentRange(val start: Long, val endInclusive: Long, val totalLength: Lon * a `416 Range Not Satisfiable` response. * If the response doesn't have an unsatisfied `Content-Range` header field, returns `null`. * - * Specification: [RFC 7233, section 4.4](https://tools.ietf.org/html/rfc7233#section-4.4) + * Specification: [RFC 9110, Section 14.4](https://www.rfc-editor.org/rfc/rfc9110.html#section-14.4) */ fun Response.mayGetTotalLengthOfUnsatisfiedRange(): Long? { // A "416 Range Not Satisfiable" response should have a Content-Range header field @@ -75,7 +75,7 @@ class ContentRange(val start: Long, val endInclusive: Long, val totalLength: Lon val headerValue = header("Content-Range") ?: return null // Parse the Content-Range header value. - // cf. https://tools.ietf.org/html/rfc7233#section-4.2 + // cf. https://www.rfc-editor.org/rfc/rfc9110.html#section-14.4 val result = unsatisfiedRangeRegex.matchEntire(headerValue) ?: return null return result.groupValues[1].toLongOrNull()?.takeIf { it >= 0 } } diff --git a/okhttp/src/main/java/com/linecorp/lich/okhttp/ResumableDownload.kt b/okhttp/src/main/java/com/linecorp/lich/okhttp/ResumableDownload.kt index 3e142f0..db081dd 100644 --- a/okhttp/src/main/java/com/linecorp/lich/okhttp/ResumableDownload.kt +++ b/okhttp/src/main/java/com/linecorp/lich/okhttp/ResumableDownload.kt @@ -24,12 +24,12 @@ import java.io.IOException /** * Saves the response body to the given [WritableResource] with handling a single-part partial - * response defined in [RFC 7233](https://tools.ietf.org/html/rfc7233). + * response defined in [RFC 9110, Section 14](https://www.rfc-editor.org/rfc/rfc9110.html#section-14). * * This is a sample code that performs a *resumable download* using Range requests defined in - * RFC 7233. + * RFC 9110, Section 14. * ``` - * suspend fun performResumableDownload(url: HttpUrl, fileToSave: File) { + * suspend fun performResumableDownloadWithProgress(url: HttpUrl, fileToSave: File) { * val resourceToSave = fileToSave.asWritableResource() * val request = Request.Builder() * .url(url) @@ -77,6 +77,7 @@ fun Response.saveToResourceWithSupportingResumption(resourceToSave: WritableReso isLastPart = true } StatusCode.PARTIAL_CONTENT -> { + // cf. https://www.rfc-editor.org/rfc/rfc9110.html#section-15.3.7 val contentRange = mayGetSinglePartContentRange() ?: throw InconsistentContentRangeException("No valid Content-Range header in the `Partial Content` response.") if (contentRange.start != resourceToSave.length) { @@ -87,6 +88,7 @@ fun Response.saveToResourceWithSupportingResumption(resourceToSave: WritableReso isLastPart = contentRange.isLastPart } StatusCode.RANGE_NOT_SATISFIABLE -> { + // cf. https://www.rfc-editor.org/rfc/rfc9110.html#section-15.5.17 val totalLength = mayGetTotalLengthOfUnsatisfiedRange() ?: throw InconsistentContentRangeException("No unsatisfied Content-Range header in the `Range Not Satisfiable` response.") if (totalLength != resourceToSave.length) { @@ -106,7 +108,7 @@ fun Response.saveToResourceWithSupportingResumption(resourceToSave: WritableReso /** * Sets a single-part `Range` header with the given [start] offset. * - * Specification: [RFC 7233, section 3.1](https://tools.ietf.org/html/rfc7233#section-3.1) + * Specification: [RFC 9110, Section 14.2](https://www.rfc-editor.org/rfc/rfc9110.html#section-14.2) */ fun Request.Builder.setRangeHeader(start: Long): Request.Builder { require(start >= 0) { "The start position must not be less than zero." } @@ -116,7 +118,7 @@ fun Request.Builder.setRangeHeader(start: Long): Request.Builder { /** * Sets a single-part `Range` header with the given [start] and [endInclusive] offset. * - * Specification: [RFC 7233, section 3.1](https://tools.ietf.org/html/rfc7233#section-3.1) + * Specification: [RFC 9110, Section 14.2](https://www.rfc-editor.org/rfc/rfc9110.html#section-14.2) */ fun Request.Builder.setRangeHeader(start: Long, endInclusive: Long): Request.Builder { require(start >= 0) { "The start position must not be less than zero." } @@ -127,7 +129,7 @@ fun Request.Builder.setRangeHeader(start: Long, endInclusive: Long): Request.Bui /** * Sets a single-part `Range` header with the given [suffixLength]. * - * Specification: [RFC 7233, section 3.1](https://tools.ietf.org/html/rfc7233#section-3.1) + * Specification: [RFC 9110, Section 14.2](https://www.rfc-editor.org/rfc/rfc9110.html#section-14.2) */ fun Request.Builder.setSuffixRangeHeader(suffixLength: Long): Request.Builder { require(suffixLength > 0) { "The suffix length must be greater than zero." } diff --git a/okhttp/src/test/java/com/linecorp/lich/okhttp/AtomicDownloadTest.kt b/okhttp/src/test/java/com/linecorp/lich/okhttp/AtomicDownloadTest.kt new file mode 100644 index 0000000..8a881bc --- /dev/null +++ b/okhttp/src/test/java/com/linecorp/lich/okhttp/AtomicDownloadTest.kt @@ -0,0 +1,150 @@ +/* + * Copyright 2022 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linecorp.lich.okhttp + +import java.io.File +import java.io.IOException +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.junit.After +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder + +class AtomicDownloadTest { + + @get:Rule + val tempFolder: TemporaryFolder = TemporaryFolder() + + private lateinit var server: MockWebServer + + private lateinit var okHttpClient: OkHttpClient + + @Before + fun setUp() { + server = MockWebServer() + okHttpClient = OkHttpClient.Builder() + .readTimeout(5, TimeUnit.SECONDS) + .build() + } + + @After + fun tearDown() { + server.shutdown() + } + + @Test + fun downloadToNewFile() = runBlocking { + val fileToSave = File(tempFolder.root, "new_file.txt") + assertFalse(fileToSave.exists()) + + server.enqueue(MockResponse().apply { + setBody("ABCDEF") + }) + server.start() + + performDownloadAtomically(fileToSave) + + assertEquals("ABCDEF", fileToSave.readText()) + assertFalse(File(fileToSave.path + ".new").exists()) + } + + @Test + fun downloadToExistingFile() = runBlocking { + val fileToSave = tempFolder.newFile().apply { + writeText("0123456789") + } + assertEquals("0123456789", fileToSave.readText()) + + server.enqueue(MockResponse().apply { + setBody("ABCDEF") + }) + server.start() + + performDownloadAtomically(fileToSave) + + assertEquals("ABCDEF", fileToSave.readText()) + assertFalse(File(fileToSave.path + ".new").exists()) + } + + @Test + fun failedDueToReadTimeout() = runBlocking { + val fileToSave = tempFolder.newFile().apply { + writeText("0123456789") + } + assertEquals("0123456789", fileToSave.readText()) + + server.enqueue(MockResponse().apply { + setBody("ABCDEF") + setBodyDelay(8, TimeUnit.SECONDS) + }) + server.start() + + assertFailsWith { + performDownloadAtomically(fileToSave) + } + + assertEquals("0123456789", fileToSave.readText()) + assertFalse(File(fileToSave.path + ".new").exists()) + } + + @Test + fun coroutineTimeout() = runBlocking { + val fileToSave = tempFolder.newFile().apply { + writeText("0123456789") + } + assertEquals("0123456789", fileToSave.readText()) + + server.enqueue(MockResponse().apply { + setBody("ABCDEF") + setBodyDelay(5, TimeUnit.SECONDS) + }) + server.start() + + assertFailsWith { + withTimeout(2000L) { + performDownloadAtomically(fileToSave) + } + } + delay(1000L) // To ensure the tmp file is deleted. + + assertEquals("0123456789", fileToSave.readText()) + assertFalse(File(fileToSave.path + ".new").exists()) + } + + private suspend fun performDownloadAtomically(fileToSave: File) { + val request = Request.Builder() + .url(server.url("/foo")) + .build() + okHttpClient.call(request) { response -> + if (response.code != StatusCode.OK) { + throw ResponseStatusException(response.code) + } + response.saveBodyToFileAtomically(fileToSave) + } + } +}