diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md index b5950c669988..f8cdf2fd860b 100644 --- a/docs/content/flink/sql-write.md +++ b/docs/content/flink/sql-write.md @@ -261,11 +261,13 @@ CREATE TABLE my_partitioned_table ( 'partition.time-interval'='1 d', 'partition.idle-time-to-done'='15 m', 'partition.mark-done-action'='done-partition' - -- You can also customize a PartitionMarkDoneAction to mark the partition completed. - -- 'partition.mark-done-action'='done-partition,custom', - -- 'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction' ); ``` + +You can also customize a PartitionMarkDoneAction to mark the partition completed. +- partition.mark-done-action: custom +- partition.mark-done-action.custom.class: The partition mark done class for implement PartitionMarkDoneAction interface (e.g. org.apache.paimon.CustomPartitionMarkDoneAction). + Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface. ```java package org.apache.paimon; @@ -282,6 +284,28 @@ public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction { } ``` +Paimon also support http-report partition mark done action, this action will report the partition to the remote http server. +- partition.mark-done-action: http-report +- partition.mark-done-action.http.url : Action will report the partition to the remote http server. +- partition.mark-done-action.http.timeout : Http client connection timeout and default is 5s. +- partition.mark-done-action.http.params : Http client request params in the request body json. + +Http Post request body : +```json +{ + "table": "table fullName", + "path": "table location path", + "partition": "mark done partition", + "params" : "custom params" +} +``` +Http Response body : +```json +{ + "result": "success" +} +``` + 1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to determine when the partition can be properly marked done. 2. Secondly, you need to define idle-time, which determines how long it takes for the partition to have no new data, diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 1e7aa59ed869..d650670bf824 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -663,7 +663,7 @@
partition.mark-done-action
"success-file" String - Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.
1. 'success-file': add '_success' file to directory.
2. 'done-partition': add 'xxx.done' partition to metastore.
3. 'mark-event': mark partition event to metastore.
4. 'custom': use policy class to create a mark-partition policy.
Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'. + Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.
1. 'success-file': add '_success' file to directory.
2. 'done-partition': add 'xxx.done' partition to metastore.
3. 'mark-event': mark partition event to metastore.
4. 'http-report': report partition mark done to remote http server.
5. 'custom': use policy class to create a mark-partition policy.
Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.
partition.mark-done-action.custom.class
@@ -671,6 +671,24 @@ String The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action. + +
partition.mark-done-action.http.params
+ (none) + String + Http client request parameters will be written to the request body, this can only be used by http-report partition mark done action. + + +
partition.mark-done-action.http.timeout
+ 5 s + Duration + Http client connection timeout, this can only be used by http-report partition mark done action. + + +
partition.mark-done-action.http.url
+ (none) + String + Mark done action will reports the partition to the remote http server, this can only be used by http-report partition mark done action. +
partition.timestamp-formatter
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 93bc23a41f58..afe4c50208f7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -49,6 +49,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -1220,7 +1221,10 @@ public class CoreOptions implements Serializable { .text("3. 'mark-event': mark partition event to metastore.") .linebreak() .text( - "4. 'custom': use policy class to create a mark-partition policy.") + "4. 'http-report': report partition mark done to remote http server.") + .linebreak() + .text( + "5. 'custom': use policy class to create a mark-partition policy.") .linebreak() .text( "Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.") @@ -1234,6 +1238,27 @@ public class CoreOptions implements Serializable { "The partition mark done class for implement" + " PartitionMarkDoneAction interface. Only work in custom mark-done-action."); + public static final ConfigOption PARTITION_MARK_DONE_ACTION_URL = + key("partition.mark-done-action.http.url") + .stringType() + .noDefaultValue() + .withDescription( + "Mark done action will reports the partition to the remote http server, this can only be used by http-report partition mark done action."); + + public static final ConfigOption PARTITION_MARK_DONE_ACTION_TIMEOUT = + key("partition.mark-done-action.http.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(5)) + .withDescription( + "Http client connection timeout, this can only be used by http-report partition mark done action."); + + public static final ConfigOption PARTITION_MARK_DONE_ACTION_PARAMS = + key("partition.mark-done-action.http.params") + .stringType() + .noDefaultValue() + .withDescription( + "Http client request parameters will be written to the request body, this can only be used by http-report partition mark done action."); + public static final ConfigOption METASTORE_PARTITIONED_TABLE = key("metastore.partitioned-table") .booleanType() @@ -2262,10 +2287,28 @@ public String partitionTimestampPattern() { return options.get(PARTITION_TIMESTAMP_PATTERN); } + public String httpReportMarkDoneActionUrl() { + return options.get(PARTITION_MARK_DONE_ACTION_URL); + } + + public Duration httpReportMarkDoneActionTimeout() { + return options.get(PARTITION_MARK_DONE_ACTION_TIMEOUT); + } + + public String httpReportMarkDoneActionParams() { + return options.get(PARTITION_MARK_DONE_ACTION_PARAMS); + } + public String partitionMarkDoneCustomClass() { return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS); } + public Set partitionMarkDoneActions() { + return Arrays.stream(options.get(PARTITION_MARK_DONE_ACTION).split(",")) + .map(x -> PartitionMarkDoneAction.valueOf(x.replace('-', '_').toUpperCase())) + .collect(Collectors.toCollection(HashSet::new)); + } + public String consumerId() { String consumerId = options.get(CONSUMER_ID); if (consumerId != null && consumerId.isEmpty()) { @@ -3163,4 +3206,24 @@ public enum MaterializedTableRefreshStatus { ACTIVATED, SUSPENDED } + + /** Partition mark done actions. */ + public enum PartitionMarkDoneAction { + SUCCESS_FILE("success-file"), + DONE_PARTITION("done-partition"), + MARK_EVENT("mark-event"), + HTTP_REPORT("http-report"), + CUSTOM("custom"); + + private final String value; + + PartitionMarkDoneAction(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java new file mode 100644 index 000000000000..39c17406b339 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition.actions; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializationFeature; + +import okhttp3.Dispatcher; +import okhttp3.Headers; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.SynchronousQueue; + +import static okhttp3.ConnectionSpec.CLEARTEXT; +import static okhttp3.ConnectionSpec.COMPATIBLE_TLS; +import static okhttp3.ConnectionSpec.MODERN_TLS; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; + +/** Report partition submission information to remote http server. */ +public class HttpReportMarkDoneAction implements PartitionMarkDoneAction { + + private final OkHttpClient client; + private final String url; + private final ObjectMapper mapper; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); + + private final FileStoreTable fileStoreTable; + + private final String params; + + private static final String RESPONSE_SUCCESS = "SUCCESS"; + + private static final String THREAD_NAME = "PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD"; + + public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) { + + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()), + String.format( + "Parameter %s must be non-empty when you use `http-report` partition mark done action.", + PARTITION_MARK_DONE_ACTION_URL.key())); + + this.fileStoreTable = fileStoreTable; + this.params = options.httpReportMarkDoneActionParams(); + this.url = options.httpReportMarkDoneActionUrl(); + this.mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + + OkHttpClient.Builder builder = + new OkHttpClient.Builder() + .dispatcher( + new Dispatcher( + createCachedThreadPool( + 1, THREAD_NAME, new SynchronousQueue<>()))) + .retryOnConnectionFailure(true) + .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT)) + .connectTimeout(options.httpReportMarkDoneActionTimeout()) + .readTimeout(options.httpReportMarkDoneActionTimeout()); + + this.client = builder.build(); + } + + @Override + public void markDone(String partition) throws Exception { + HttpReportMarkDoneResponse response = + post( + new HttpReportMarkDoneRequest( + params, + fileStoreTable.fullName(), + fileStoreTable.location().toString(), + partition), + Collections.emptyMap()); + Preconditions.checkState( + reportIsSuccess(response), + String.format( + "The http-report action's response attribute `result` should be 'SUCCESS' but is '%s'.", + response.getResult())); + } + + private boolean reportIsSuccess(HttpReportMarkDoneResponse response) { + return response != null && RESPONSE_SUCCESS.equalsIgnoreCase(response.getResult()); + } + + @Override + public void close() throws IOException { + try { + client.dispatcher().cancelAll(); + client.connectionPool().evictAll(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** RestRequest only for HttpReportMarkDoneAction. */ + @JsonIgnoreProperties(ignoreUnknown = true) + @VisibleForTesting + public static class HttpReportMarkDoneRequest { + + private static final String MARK_DONE_PARTITION = "partition"; + private static final String TABLE = "table"; + private static final String PATH = "path"; + private static final String PARAMS = "params"; + + @JsonProperty(MARK_DONE_PARTITION) + private final String partition; + + @JsonProperty(TABLE) + private final String table; + + @JsonProperty(PATH) + private final String path; + + @JsonProperty(PARAMS) + private final String params; + + @JsonCreator + public HttpReportMarkDoneRequest( + @JsonProperty(PARAMS) String params, + @JsonProperty(TABLE) String table, + @JsonProperty(PATH) String path, + @JsonProperty(MARK_DONE_PARTITION) String partition) { + this.params = params; + this.table = table; + this.path = path; + this.partition = partition; + } + + @JsonGetter(MARK_DONE_PARTITION) + public String getPartition() { + return partition; + } + + @JsonGetter(TABLE) + public String getTable() { + return table; + } + + @JsonGetter(PATH) + public String getPath() { + return path; + } + + @JsonGetter(PARAMS) + public String getParams() { + return params; + } + } + + /** Response only for HttpReportMarkDoneAction. */ + @JsonIgnoreProperties(ignoreUnknown = true) + @VisibleForTesting + public static class HttpReportMarkDoneResponse { + private static final String RESULT = "result"; + + @JsonProperty(RESULT) + private final String result; + + public HttpReportMarkDoneResponse(@JsonProperty(RESULT) String result) { + this.result = result; + } + + @JsonGetter(RESULT) + public String getResult() { + return result; + } + } + + public HttpReportMarkDoneResponse post( + HttpReportMarkDoneRequest body, Map headers) throws IOException { + RequestBody requestBody = RequestBody.create(mapper.writeValueAsBytes(body), MEDIA_TYPE); + Request request = + new Request.Builder() + .url(url) + .post(requestBody) + .headers(Headers.of(headers)) + .build(); + try (Response response = client.newCall(request).execute()) { + String responseBodyStr = response.body() != null ? response.body().string() : null; + if (!response.isSuccessful() || StringUtils.isNullOrWhitespaceOnly(responseBodyStr)) { + throw new HttpReportMarkDoneException( + response.isSuccessful() + ? "ResponseBody is null or empty." + : String.format( + "Response is not successful, response is %s", response)); + } + return mapper.readValue(responseBodyStr, HttpReportMarkDoneResponse.class); + } catch (HttpReportMarkDoneException e) { + throw e; + } catch (Exception e) { + throw new HttpReportMarkDoneException(e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java new file mode 100644 index 000000000000..56d177575472 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition.actions; + +/** Exception for {@link HttpReportMarkDoneAction}. */ +public class HttpReportMarkDoneException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public HttpReportMarkDoneException(Throwable e) { + super("Http request exception.", e); + } + + public HttpReportMarkDoneException(String msg) { + super(msg); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java index f5259f22054a..ee12fce528ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java @@ -24,32 +24,27 @@ import org.apache.paimon.utils.StringUtils; import java.io.Closeable; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Action to mark partitions done. */ public interface PartitionMarkDoneAction extends Closeable { - String SUCCESS_FILE = "success-file"; - String DONE_PARTITION = "done-partition"; - String MARK_EVENT = "mark-event"; - String CUSTOM = "custom"; - void markDone(String partition) throws Exception; static List createActions( ClassLoader cl, FileStoreTable fileStoreTable, CoreOptions options) { - return Arrays.stream(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).split(",")) + return options.partitionMarkDoneActions().stream() .map( action -> { - switch (action.toLowerCase()) { + switch (action) { case SUCCESS_FILE: return new SuccessFileMarkDoneAction( fileStoreTable.fileIO(), fileStoreTable.location()); @@ -59,10 +54,12 @@ static List createActions( case MARK_EVENT: return new MarkPartitionDoneEventAction( createPartitionHandler(fileStoreTable, options)); + case HTTP_REPORT: + return new HttpReportMarkDoneAction(fileStoreTable, options); case CUSTOM: return generateCustomMarkDoneAction(cl, options); default: - throw new UnsupportedOperationException(action); + throw new UnsupportedOperationException(action.toString()); } }) .collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java index 2862e5ef02ed..bbfee103c7b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java @@ -22,6 +22,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.rest.exceptions.RESTException; import org.apache.paimon.rest.responses.ErrorResponse; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -81,7 +82,11 @@ void setErrorHandler(ErrorHandler errorHandler) { public T get( String path, Class responseType, Map headers) { Request request = - new Request.Builder().url(uri + path).get().headers(Headers.of(headers)).build(); + new Request.Builder() + .url(getRequestUrl(path)) + .get() + .headers(Headers.of(headers)) + .build(); return exec(request, responseType); } @@ -92,7 +97,7 @@ public T post( RequestBody requestBody = buildRequestBody(body); Request request = new Request.Builder() - .url(uri + path) + .url(getRequestUrl(path)) .post(requestBody) .headers(Headers.of(headers)) .build(); @@ -105,7 +110,11 @@ public T post( @Override public T delete(String path, Map headers) { Request request = - new Request.Builder().url(uri + path).delete().headers(Headers.of(headers)).build(); + new Request.Builder() + .url(getRequestUrl(path)) + .delete() + .headers(Headers.of(headers)) + .build(); return exec(request, null); } @@ -116,7 +125,7 @@ public T delete( RequestBody requestBody = buildRequestBody(body); Request request = new Request.Builder() - .url(uri + path) + .url(getRequestUrl(path)) .delete(requestBody) .headers(Headers.of(headers)) .build(); @@ -169,6 +178,10 @@ private RequestBody buildRequestBody(RESTRequest body) throws JsonProcessingExce return RequestBody.create(OBJECT_MAPPER.writeValueAsBytes(body), MEDIA_TYPE); } + private String getRequestUrl(String path) { + return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path; + } + private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions) { BlockingQueue workQueue = new SynchronousQueue<>(); ExecutorService executorService = diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java index 54e7d3a68eee..161dbaf3bb50 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java @@ -24,10 +24,6 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.ErrorResponseResourceType; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,29 +41,28 @@ public class HttpClientTest { private static final String MOCK_PATH = "/v1/api/mock"; private static final String TOKEN = "token"; - private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); - private MockWebServer mockWebServer; + private TestHttpWebServer server; private HttpClient httpClient; private ErrorHandler errorHandler; private MockRESTData mockResponseData; private String mockResponseDataStr; - private ErrorResponse errorResponse; private String errorResponseStr; private Map headers; @Before - public void setUp() throws IOException { - mockWebServer = new MockWebServer(); - mockWebServer.start(); - String baseUrl = mockWebServer.url("").toString(); + public void setUp() throws Exception { + server = new TestHttpWebServer(MOCK_PATH); + server.start(); errorHandler = DefaultErrorHandler.getInstance(); HttpClientOptions httpClientOptions = - new HttpClientOptions(baseUrl, Duration.ofSeconds(3), Duration.ofSeconds(3), 1); + new HttpClientOptions( + server.getBaseUrl(), Duration.ofSeconds(3), Duration.ofSeconds(3), 1); mockResponseData = new MockRESTData(MOCK_PATH); - mockResponseDataStr = OBJECT_MAPPER.writeValueAsString(mockResponseData); - errorResponse = new ErrorResponse(ErrorResponseResourceType.DATABASE, "test", "test", 400); - errorResponseStr = OBJECT_MAPPER.writeValueAsString(errorResponse); + mockResponseDataStr = server.createResponseBody(mockResponseData); + errorResponseStr = + server.createResponseBody( + new ErrorResponse(ErrorResponseResourceType.DATABASE, "test", "test", 400)); httpClient = new HttpClient(httpClientOptions); httpClient.setErrorHandler(errorHandler); CredentialsProvider credentialsProvider = new BearTokenCredentialsProvider(TOKEN); @@ -76,19 +71,19 @@ public void setUp() throws IOException { @After public void tearDown() throws IOException { - mockWebServer.shutdown(); + server.stop(); } @Test public void testGetSuccess() { - mockHttpCallWithCode(mockResponseDataStr, 200); + server.enqueueResponse(mockResponseDataStr, 200); MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class, headers); assertEquals(mockResponseData.data(), response.data()); } @Test public void testGetFail() { - mockHttpCallWithCode(errorResponseStr, 400); + server.enqueueResponse(errorResponseStr, 400); assertThrows( BadRequestException.class, () -> httpClient.get(MOCK_PATH, MockRESTData.class, headers)); @@ -96,7 +91,7 @@ public void testGetFail() { @Test public void testPostSuccess() { - mockHttpCallWithCode(mockResponseDataStr, 200); + server.enqueueResponse(mockResponseDataStr, 200); MockRESTData response = httpClient.post(MOCK_PATH, mockResponseData, MockRESTData.class, headers); assertEquals(mockResponseData.data(), response.data()); @@ -104,7 +99,7 @@ public void testPostSuccess() { @Test public void testPostFail() { - mockHttpCallWithCode(errorResponseStr, 400); + server.enqueueResponse(errorResponseStr, 400); assertThrows( BadRequestException.class, () -> httpClient.post(MOCK_PATH, mockResponseData, ErrorResponse.class, headers)); @@ -112,25 +107,13 @@ public void testPostFail() { @Test public void testDeleteSuccess() { - mockHttpCallWithCode(mockResponseDataStr, 200); + server.enqueueResponse(mockResponseDataStr, 200); assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH, headers)); } @Test public void testDeleteFail() { - mockHttpCallWithCode(errorResponseStr, 400); + server.enqueueResponse(errorResponseStr, 400); assertThrows(BadRequestException.class, () -> httpClient.delete(MOCK_PATH, headers)); } - - private void mockHttpCallWithCode(String body, Integer code) { - MockResponse mockResponseObj = generateMockResponse(body, code); - mockWebServer.enqueue(mockResponseObj); - } - - private MockResponse generateMockResponse(String data, Integer code) { - return new MockResponse() - .setResponseCode(code) - .setBody(data) - .addHeader("Content-Type", "application/json"); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java new file mode 100644 index 000000000000..c56e18f977c4 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** Mock a http web service locally. */ +public class TestHttpWebServer { + + private MockWebServer mockWebServer; + private final ObjectMapper objectMapper = RESTObjectMapper.create(); + private String baseUrl; + private final String path; + + public TestHttpWebServer(String path) { + this.path = path; + } + + public void start() throws Exception { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + baseUrl = mockWebServer.url(path).toString(); + } + + public void stop() throws IOException { + mockWebServer.shutdown(); + } + + public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception { + return mockWebServer.takeRequest(timeout, unit); + } + + public void enqueueResponse(String body, Integer code) { + MockResponse mockResponseObj = generateMockResponse(body, code); + enqueueResponse(mockResponseObj); + } + + public void enqueueResponse(MockResponse mockResponseObj) { + mockWebServer.enqueue(mockResponseObj); + } + + public void enqueueResponse(RESTResponse response, Integer code) + throws JsonProcessingException { + enqueueResponse(createResponseBody(response), code); + } + + public String getBaseUrl() { + return baseUrl; + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + + public MockResponse generateMockResponse(String data, Integer code) { + return new MockResponse() + .setResponseCode(code) + .setBody(data) + .addHeader("Content-Type", "application/json"); + } + + public String createResponseBody(RESTResponse response) throws JsonProcessingException { + return objectMapper.writeValueAsString(response); + } + + public T readRequestBody(String body, Class requestType) throws JsonProcessingException { + return objectMapper.readValue(body, requestType); + } + + public T readResponseBody(String body, Class responseType) + throws JsonProcessingException { + return objectMapper.readValue(body, responseType); + } +} diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 84d4622b02b8..a02b6afb6e8c 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -99,6 +99,14 @@ under the License. test + + org.apache.paimon + paimon-core + ${project.version} + test-jar + test + + org.apache.flink flink-connector-test-utils diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index 63edb8bb4c98..0a29aebf22bf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -22,8 +22,9 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction; import org.apache.paimon.fs.Path; -import org.apache.paimon.partition.actions.PartitionMarkDoneAction; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction; import org.apache.paimon.partition.file.SuccessFile; +import org.apache.paimon.rest.TestHttpWebServer; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataType; @@ -31,6 +32,9 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; + +import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -39,10 +43,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.HTTP_REPORT; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.SUCCESS_FILE; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for {@link MarkPartitionDoneAction}. */ @@ -158,10 +167,8 @@ public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String @MethodSource("testArguments") public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception { - Map options = new HashMap<>(); - options.put( - PARTITION_MARK_DONE_ACTION.key(), - PartitionMarkDoneAction.SUCCESS_FILE + "," + PartitionMarkDoneAction.CUSTOM); + Map options = new HashMap<>(2); + options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + CUSTOM); options.put( PARTITION_MARK_DONE_CUSTOM_CLASS.key(), MockCustomPartitionMarkDoneAction.class.getName()); @@ -213,6 +220,96 @@ public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) thr .containsExactlyInAnyOrder("partKey0=0/partKey1=1/", "partKey0=1/partKey1=0/"); } + @ParameterizedTest + @MethodSource("testArguments") + public void testHttpReportPartitionMarkDoneAction(boolean hasPk, String invoker) + throws Exception { + + TestHttpWebServer server = new TestHttpWebServer(""); + server.start(); + try { + Map options = new HashMap<>(); + options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + HTTP_REPORT); + options.put(PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl()); + + FileStoreTable table = prepareTable(hasPk, options); + + String expectResponse = "{\"result\":\"success\"}"; + server.enqueueResponse(expectResponse, 200); + server.enqueueResponse(expectResponse, 200); + + switch (invoker) { + case "action": + createAction( + MarkPartitionDoneAction.class, + "mark_partition_done", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--partition", + "partKey0=0,partKey1=1", + "--partition", + "partKey0=1,partKey1=0") + .run(); + break; + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS"); + SuccessFile successFile1 = SuccessFile.safelyFromPath(table.fileIO(), successPath1); + assertThat(successFile1).isNotNull(); + + Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS"); + SuccessFile successFile2 = SuccessFile.safelyFromPath(table.fileIO(), successPath2); + assertThat(successFile2).isNotNull(); + + RecordedRequest recordedRequest = server.takeRequest(10, TimeUnit.SECONDS); + RecordedRequest recordedRequest2 = server.takeRequest(10, TimeUnit.SECONDS); + + assertRequest(server, table, recordedRequest, "partKey0=0/partKey1=1/"); + assertRequest(server, table, recordedRequest2, "partKey0=1/partKey1=0/"); + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + server.stop(); + } + } + + public static void assertRequest( + TestHttpWebServer server, + FileStoreTable table, + RecordedRequest recordedRequest, + String exceptPartition) + throws JsonProcessingException { + String requestBody = recordedRequest.getBody().readUtf8(); + HttpReportMarkDoneAction.HttpReportMarkDoneRequest request = + server.readRequestBody( + requestBody, HttpReportMarkDoneAction.HttpReportMarkDoneRequest.class); + + assertThat( + request.getPath().equals(table.location().toString()) + && request.getPartition().equals(exceptPartition) + && request.getTable().equals(table.fullName())) + .isTrue(); + } + private FileStoreTable prepareTable(boolean hasPk) throws Exception { return prepareTable(hasPk, Collections.emptyMap()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java index d1599b6c57ea..73ba630b638c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java @@ -32,8 +32,8 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM; import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits; -import static org.apache.paimon.partition.actions.PartitionMarkDoneAction.CUSTOM; import static org.assertj.core.api.Assertions.assertThat; /** Test for custom PartitionMarkDoneAction. */ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java new file mode 100644 index 000000000000..b79597cb810f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.FileIOFinder; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneRequest; +import org.apache.paimon.partition.actions.HttpReportMarkDoneException; +import org.apache.paimon.rest.TestHttpWebServer; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; + +import okhttp3.mockwebserver.RecordedRequest; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_PARAMS; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_TIMEOUT; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; +import static org.apache.paimon.utils.InternalRowUtilsTest.ROW_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case fo {@link HttpReportMarkDoneAction}. */ +public class HttpReportMarkDoneActionTest { + + private static TestHttpWebServer server; + + private static final String partition = "partition"; + private static String params = "key1=value1,key2=value2"; + private static final String successResponse = "{\"result\":\"success\"}"; + private static final String failedResponse = "{\"result\":\"failed\"}"; + private static FileStoreTable fileStoreTable; + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void startServer() throws Exception { + server = new TestHttpWebServer(""); + server.start(); + fileStoreTable = createFileStoreTable(); + } + + @After + public void stopServer() throws Exception { + server.stop(); + } + + @Test + public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { + HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction(); + + server.enqueueResponse(successResponse, 200); + + httpReportMarkDoneAction.markDone(partition); + RecordedRequest request = server.takeRequest(10, TimeUnit.SECONDS); + assertRequest(request); + + String expectedResponse2 = "{\"unknow\" :\"unknow\", \"result\" :\"success\"}"; + server.enqueueResponse(expectedResponse2, 200); + + httpReportMarkDoneAction.markDone(partition); + RecordedRequest request2 = server.takeRequest(10, TimeUnit.SECONDS); + assertRequest(request2); + + // test params is null. + params = null; + HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction(); + server.enqueueResponse(successResponse, 200); + httpReportMarkDoneAction3.markDone(partition); + RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS); + assertRequest(request3); + } + + @Test + public void testHttpReportMarkDoneActionFailedResponse() throws Exception { + HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction(); + + // status failed. + server.enqueueResponse(failedResponse, 200); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The http-report action's response attribute `result` should be 'SUCCESS' but is 'failed'."); + + // Illegal response body. + String unExpectResponse = "{\"unknow\" :\"unknow\"}"; + server.enqueueResponse(unExpectResponse, 200); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The http-report action's response attribute `result` should be 'SUCCESS' but is 'null'."); + + // empty response. + server.enqueueResponse("", 200); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(HttpReportMarkDoneException.class) + .hasMessageContaining("ResponseBody is null or empty."); + + // 400. + server.enqueueResponse(successResponse, 400); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(HttpReportMarkDoneException.class) + .hasMessageContaining("Response is not successful"); + } + + public static void assertRequest(RecordedRequest recordedRequest) + throws JsonProcessingException { + String requestBody = recordedRequest.getBody().readUtf8(); + HttpReportMarkDoneRequest request = + server.readRequestBody(requestBody, HttpReportMarkDoneRequest.class); + + assertThat( + request.getPath().equals(fileStoreTable.location().toString()) + && request.getPartition().equals(partition) + && request.getTable().equals(fileStoreTable.fullName()) + && (params == null || params.equals(request.getParams()))) + .isTrue(); + } + + public static CoreOptions createCoreOptions() { + HashMap httpOptions = new HashMap<>(); + httpOptions.put(PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl()); + httpOptions.put(PARTITION_MARK_DONE_ACTION_TIMEOUT.key(), "2 s"); + if (params != null) { + httpOptions.put(PARTITION_MARK_DONE_ACTION_PARAMS.key(), params); + } + return new CoreOptions(httpOptions); + } + + public HttpReportMarkDoneAction createHttpReportMarkDoneAction() { + return new HttpReportMarkDoneAction(fileStoreTable, createCoreOptions()); + } + + public FileStoreTable createFileStoreTable() throws Exception { + org.apache.paimon.fs.Path tablePath = + new org.apache.paimon.fs.Path(folder.newFolder().toURI().toString()); + Options options = new Options(); + options.set(CoreOptions.PATH, tablePath.toString()); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath), + new Schema( + ROW_TYPE.getFields(), + Collections.emptyList(), + Collections.emptyList(), + options.toMap(), + "")); + return FileStoreTableFactory.create( + FileIOFinder.find(tablePath), + tablePath, + tableSchema, + options, + CatalogEnvironment.empty()); + } +}