From b3c2a05c381c756606df747a601414caaea26496 Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Fri, 3 Jan 2025 11:09:32 +0800 Subject: [PATCH 1/6] [core] Add a http-report action to reporting partition done to remote servers. --- docs/content/flink/sql-write.md | 28 ++- .../generated/core_configuration.html | 14 +- .../java/org/apache/paimon/CoreOptions.java | 27 ++- .../actions/HttpReportMarkDoneAction.java | 161 +++++++++++++++++ .../actions/PartitionMarkDoneAction.java | 4 + .../org/apache/paimon/rest/HttpClient.java | 23 ++- .../apache/paimon/rest/TestHttpWebServer.java | 99 +++++++++++ paimon-flink/paimon-flink-common/pom.xml | 8 + .../action/MarkPartitionDoneActionITCase.java | 102 +++++++++++ .../HttpReportMarkDoneActionTest.java | 168 ++++++++++++++++++ 10 files changed, 624 insertions(+), 10 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md index b5950c669988..b8a2201e951c 100644 --- a/docs/content/flink/sql-write.md +++ b/docs/content/flink/sql-write.md @@ -261,11 +261,13 @@ CREATE TABLE my_partitioned_table ( 'partition.time-interval'='1 d', 'partition.idle-time-to-done'='15 m', 'partition.mark-done-action'='done-partition' - -- You can also customize a PartitionMarkDoneAction to mark the partition completed. - -- 'partition.mark-done-action'='done-partition,custom', - -- 'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction' ); ``` + +You can also customize a PartitionMarkDoneAction to mark the partition completed. +- partition.mark-done-action: custom +- partition.mark-done-action.custom.class: The partition mark done class for implement PartitionMarkDoneAction interface (e.g. org.apache.paimon.CustomPartitionMarkDoneAction). + Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface. ```java package org.apache.paimon; @@ -282,6 +284,26 @@ public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction { } ``` +Paimon also support http-report partition mark done action, this action will report the partition to the remote http server. +- partition.mark-done-action: http-report +- partition.mark-done-action.url : Action will report the partition to the remote http server. +- partition.mark-done-action.timeout : Http client connection timeout and default is 5s. + +Http Request body : +```json +{ + "table": "table fullName", + "path": "table location path", + "partition": "mark done partition" +} +``` +Http Response body : +```json +{ + "result": "success" +} +``` + 1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to determine when the partition can be properly marked done. 2. Secondly, you need to define idle-time, which determines how long it takes for the partition to have no new data, diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 1e7aa59ed869..ad6e1f46f735 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -663,7 +663,7 @@
partition.mark-done-action
"success-file" String - Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.
1. 'success-file': add '_success' file to directory.
2. 'done-partition': add 'xxx.done' partition to metastore.
3. 'mark-event': mark partition event to metastore.
4. 'custom': use policy class to create a mark-partition policy.
Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'. + Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.
1. 'success-file': add '_success' file to directory.
2. 'done-partition': add 'xxx.done' partition to metastore.
3. 'mark-event': mark partition event to metastore.
4. 'http-report': report partition mark done to remote http server.
5. 'custom': use policy class to create a mark-partition policy.
Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.
partition.mark-done-action.custom.class
@@ -671,6 +671,18 @@ String The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action. + +
partition.mark-done-action.timeout
+ 5 s + Duration + Http client connect timeout, This can only be used by http-report partition mark done action. + + +
partition.mark-done-action.url
+ (none) + String + Mark done action will reports the partition to the remote http server, This can only be used by http-report partition mark done action. +
partition.timestamp-formatter
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 93bc23a41f58..0561f3983ab2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1220,7 +1220,10 @@ public class CoreOptions implements Serializable { .text("3. 'mark-event': mark partition event to metastore.") .linebreak() .text( - "4. 'custom': use policy class to create a mark-partition policy.") + "4. 'http-report': report partition mark done to remote http server.") + .linebreak() + .text( + "5. 'custom': use policy class to create a mark-partition policy.") .linebreak() .text( "Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.") @@ -1234,6 +1237,20 @@ public class CoreOptions implements Serializable { "The partition mark done class for implement" + " PartitionMarkDoneAction interface. Only work in custom mark-done-action."); + public static final ConfigOption PARTITION_MARK_DONE_ACTION_URL = + key("partition.mark-done-action.url") + .stringType() + .noDefaultValue() + .withDescription( + "Mark done action will reports the partition to the remote http server, This can only be used by http-report partition mark done action."); + + public static final ConfigOption PARTITION_MARK_DONE_ACTION_TIMEOUT = + key("partition.mark-done-action.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(5)) + .withDescription( + "Http client connect timeout, This can only be used by http-report partition mark done action."); + public static final ConfigOption METASTORE_PARTITIONED_TABLE = key("metastore.partitioned-table") .booleanType() @@ -2262,6 +2279,14 @@ public String partitionTimestampPattern() { return options.get(PARTITION_TIMESTAMP_PATTERN); } + public String httpReportMarkDoneActionUrl() { + return options.get(PARTITION_MARK_DONE_ACTION_URL); + } + + public Duration httpReportMarkDoneActionTimeout() { + return options.get(PARTITION_MARK_DONE_ACTION_TIMEOUT); + } + public String partitionMarkDoneCustomClass() { return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS); } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java new file mode 100644 index 000000000000..53cab41c1659 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition.actions; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.rest.DefaultErrorHandler; +import org.apache.paimon.rest.HttpClient; +import org.apache.paimon.rest.HttpClientOptions; +import org.apache.paimon.rest.RESTClient; +import org.apache.paimon.rest.RESTObjectMapper; +import org.apache.paimon.rest.RESTRequest; +import org.apache.paimon.rest.RESTResponse; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; + +/** Report partition submission information to remote http server. */ +public class HttpReportMarkDoneAction implements PartitionMarkDoneAction { + + private final RESTClient client; + + private final FileStoreTable fileStoreTable; + + private static final String RESPONSE_SUCCESS = "SUCCESS"; + + public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()), + String.format( + "Parameter %s must be non-empty when you use `http-report` partition mark done action.", + PARTITION_MARK_DONE_ACTION_URL.key())); + this.fileStoreTable = fileStoreTable; + HttpClientOptions httpClientOptions = + new HttpClientOptions( + options.httpReportMarkDoneActionUrl(), + Optional.of(options.httpReportMarkDoneActionTimeout()), + Optional.of(options.httpReportMarkDoneActionTimeout()), + RESTObjectMapper.create(), + 1, + DefaultErrorHandler.getInstance()); + this.client = new HttpClient(httpClientOptions); + } + + @Override + public void markDone(String partition) throws Exception { + HttpReportMarkDoneResponse response = + client.post( + null, + new HttpReportMarkDoneRequest( + fileStoreTable.fullName(), + fileStoreTable.location().toString(), + partition), + HttpReportMarkDoneResponse.class, + Collections.emptyMap()); + Preconditions.checkArgument( + reportIsSuccess(response), + String.format( + "The http-report actions response attribute `result` should be 'SUCCESS' but is '%s'.", + response.getResult())); + } + + public boolean reportIsSuccess(HttpReportMarkDoneResponse response) { + return response != null && RESPONSE_SUCCESS.equalsIgnoreCase(response.getResult()); + } + + @Override + public void close() throws IOException { + try { + this.client.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** RestRequest only for HttpReportMarkDoneAction. */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class HttpReportMarkDoneRequest implements RESTRequest { + private static final String MARK_DONE_PARTITION = "partition"; + private static final String TABLE = "table"; + private static final String PATH = "path"; + + @JsonProperty(MARK_DONE_PARTITION) + private final String partition; + + @JsonProperty(TABLE) + private final String table; + + @JsonProperty(PATH) + private final String path; + + @JsonCreator + public HttpReportMarkDoneRequest( + @JsonProperty(TABLE) String table, + @JsonProperty(PATH) String path, + @JsonProperty(MARK_DONE_PARTITION) String partition) { + this.table = table; + this.path = path; + this.partition = partition; + } + + @JsonGetter(MARK_DONE_PARTITION) + public String getPartition() { + return partition; + } + + @JsonGetter(TABLE) + public String getTable() { + return table; + } + + @JsonGetter(PATH) + public String getPath() { + return path; + } + } + + /** Response only for HttpReportMarkDoneAction. */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class HttpReportMarkDoneResponse implements RESTResponse { + private static final String RESULT = "result"; + + @JsonProperty(RESULT) + private final String result; + + public HttpReportMarkDoneResponse(@JsonProperty(RESULT) String result) { + this.result = result; + } + + @JsonGetter(RESULT) + public String getResult() { + return result; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java index f5259f22054a..6a2bad0849e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java @@ -40,6 +40,7 @@ public interface PartitionMarkDoneAction extends Closeable { String SUCCESS_FILE = "success-file"; String DONE_PARTITION = "done-partition"; String MARK_EVENT = "mark-event"; + String HTTP_REPORT = "http-report"; String CUSTOM = "custom"; void markDone(String partition) throws Exception; @@ -59,6 +60,9 @@ static List createActions( case MARK_EVENT: return new MarkPartitionDoneEventAction( createPartitionHandler(fileStoreTable, options)); + createMetastoreClient(fileStoreTable, options)); + case HTTP_REPORT: + return new HttpReportMarkDoneAction(fileStoreTable, options); case CUSTOM: return generateCustomMarkDoneAction(cl, options); default: diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java index 2862e5ef02ed..d08d5af54df2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java @@ -22,6 +22,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.rest.exceptions.RESTException; import org.apache.paimon.rest.responses.ErrorResponse; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -50,7 +51,7 @@ public class HttpClient implements RESTClient { private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); - private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL"; + private static final String THREAD_NAME = "PAIMON-HTTP-CLIENT-THREAD-POOL"; private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); private final OkHttpClient okHttpClient; @@ -81,7 +82,11 @@ void setErrorHandler(ErrorHandler errorHandler) { public T get( String path, Class responseType, Map headers) { Request request = - new Request.Builder().url(uri + path).get().headers(Headers.of(headers)).build(); + new Request.Builder() + .url(getRequestUrl(path)) + .get() + .headers(Headers.of(headers)) + .build(); return exec(request, responseType); } @@ -92,7 +97,7 @@ public T post( RequestBody requestBody = buildRequestBody(body); Request request = new Request.Builder() - .url(uri + path) + .url(getRequestUrl(path)) .post(requestBody) .headers(Headers.of(headers)) .build(); @@ -105,7 +110,11 @@ public T post( @Override public T delete(String path, Map headers) { Request request = - new Request.Builder().url(uri + path).delete().headers(Headers.of(headers)).build(); + new Request.Builder() + .url(getRequestUrl(path)) + .delete() + .headers(Headers.of(headers)) + .build(); return exec(request, null); } @@ -116,7 +125,7 @@ public T delete( RequestBody requestBody = buildRequestBody(body); Request request = new Request.Builder() - .url(uri + path) + .url(getRequestUrl(path)) .delete(requestBody) .headers(Headers.of(headers)) .build(); @@ -169,6 +178,10 @@ private RequestBody buildRequestBody(RESTRequest body) throws JsonProcessingExce return RequestBody.create(OBJECT_MAPPER.writeValueAsBytes(body), MEDIA_TYPE); } + private String getRequestUrl(String path) { + return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path; + } + private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions) { BlockingQueue workQueue = new SynchronousQueue<>(); ExecutorService executorService = diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java new file mode 100644 index 000000000000..7b13e95da3a7 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** Mock a http web service locally. */ +public class TestHttpWebServer { + + private MockWebServer mockWebServer; + private final ObjectMapper objectMapper = RESTObjectMapper.create(); + private String baseUrl; + private final String path; + + public TestHttpWebServer(String path) { + this.path = path; + } + + public void start() throws Exception { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + baseUrl = mockWebServer.url(path).toString(); + } + + public void stop() throws IOException { + mockWebServer.shutdown(); + } + + public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception { + return mockWebServer.takeRequest(timeout, unit); + } + + public void enqueueResponse(String body, Integer code) { + MockResponse mockResponseObj = generateMockResponse(body, code); + enqueueResponse(mockResponseObj); + } + + public void enqueueResponse(MockResponse mockResponseObj) { + mockWebServer.enqueue(mockResponseObj); + } + + public void enqueueResponse(RESTResponse response, Integer code) + throws JsonProcessingException { + enqueueResponse(createResponseBody(response), code); + } + + public String getBaseUrl() { + return baseUrl; + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + + public MockResponse generateMockResponse(String data, Integer code) { + return new MockResponse() + .setResponseCode(code) + .setBody(data) + .addHeader("Content-Type", "application/json"); + } + + public String createResponseBody(RESTResponse response) throws JsonProcessingException { + return objectMapper.writeValueAsString(response); + } + + public T readRequestBody(String body, Class requestType) + throws JsonProcessingException { + return objectMapper.readValue(body, requestType); + } + + public T readResponseBody(String body, Class responseType) + throws JsonProcessingException { + return objectMapper.readValue(body, responseType); + } +} diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 84d4622b02b8..a02b6afb6e8c 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -99,6 +99,14 @@ under the License. test + + org.apache.paimon + paimon-core + ${project.version} + test-jar + test + + org.apache.flink flink-connector-test-utils diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index 63edb8bb4c98..21c014c83cfe 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -22,8 +22,10 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction; import org.apache.paimon.fs.Path; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction; import org.apache.paimon.partition.actions.PartitionMarkDoneAction; import org.apache.paimon.partition.file.SuccessFile; +import org.apache.paimon.rest.TestHttpWebServer; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataType; @@ -31,6 +33,9 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; + +import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -39,9 +44,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; import static org.assertj.core.api.Assertions.assertThat; @@ -213,6 +220,101 @@ public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) thr .containsExactlyInAnyOrder("partKey0=0/partKey1=1/", "partKey0=1/partKey1=0/"); } + @ParameterizedTest + @MethodSource("testArguments") + public void testHttpReportPartitionMarkDoneAction(boolean hasPk, String invoker) + throws Exception { + + TestHttpWebServer server = new TestHttpWebServer(""); + server.start(); + try { + Map options = new HashMap<>(); + options.put( + PARTITION_MARK_DONE_ACTION.key(), + PartitionMarkDoneAction.SUCCESS_FILE + + "," + + PartitionMarkDoneAction.HTTP_REPORT); + options.put(PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl()); + + FileStoreTable table = prepareTable(hasPk, options); + + HttpReportMarkDoneAction.HttpReportMarkDoneResponse expectResponse = + new HttpReportMarkDoneAction.HttpReportMarkDoneResponse("success"); + server.enqueueResponse(expectResponse, 200); + server.enqueueResponse(expectResponse, 200); + + switch (invoker) { + case "action": + createAction( + MarkPartitionDoneAction.class, + "mark_partition_done", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--partition", + "partKey0=0,partKey1=1", + "--partition", + "partKey0=1,partKey1=0") + .run(); + break; + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS"); + SuccessFile successFile1 = SuccessFile.safelyFromPath(table.fileIO(), successPath1); + assertThat(successFile1).isNotNull(); + + Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS"); + SuccessFile successFile2 = SuccessFile.safelyFromPath(table.fileIO(), successPath2); + assertThat(successFile2).isNotNull(); + + RecordedRequest recordedRequest = server.takeRequest(10, TimeUnit.SECONDS); + RecordedRequest recordedRequest2 = server.takeRequest(10, TimeUnit.SECONDS); + + assertRequest(server, table, recordedRequest, "partKey0=0/partKey1=1/"); + assertRequest(server, table, recordedRequest2, "partKey0=1/partKey1=0/"); + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + server.stop(); + } + } + + public static void assertRequest( + TestHttpWebServer server, + FileStoreTable table, + RecordedRequest recordedRequest, + String exceptPartition) + throws JsonProcessingException { + String requestBody = recordedRequest.getBody().readUtf8(); + HttpReportMarkDoneAction.HttpReportMarkDoneRequest request = + server.readRequestBody( + requestBody, HttpReportMarkDoneAction.HttpReportMarkDoneRequest.class); + + assertThat( + request.getPath().equals(table.location().toString()) + && request.getPartition().equals(exceptPartition) + && request.getTable().equals(table.fullName())) + .isTrue(); + } + private FileStoreTable prepareTable(boolean hasPk) throws Exception { return prepareTable(hasPk, Collections.emptyMap()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java new file mode 100644 index 000000000000..8aaab58805c3 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.FileIOFinder; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneRequest; +import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneResponse; +import org.apache.paimon.rest.TestHttpWebServer; +import org.apache.paimon.rest.exceptions.BadRequestException; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; + +import okhttp3.mockwebserver.RecordedRequest; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_TIMEOUT; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; +import static org.apache.paimon.utils.InternalRowUtilsTest.ROW_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case fo {@link HttpReportMarkDoneAction}. */ +public class HttpReportMarkDoneActionTest { + + private static TestHttpWebServer server; + + private static final String partition = "partition"; + private static FileStoreTable fileStoreTable; + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void startServer() throws Exception { + server = new TestHttpWebServer(""); + server.start(); + fileStoreTable = createFileStoreTable(); + } + + @After + public void stopServer() throws Exception { + server.stop(); + } + + @Test + public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { + HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction(); + + HttpReportMarkDoneResponse expectedResponse = new HttpReportMarkDoneResponse("success"); + server.enqueueResponse(expectedResponse, 200); + + httpReportMarkDoneAction.markDone(partition); + RecordedRequest request = server.takeRequest(10, TimeUnit.SECONDS); + assertRequest(request); + + String expectedResponse2 = "{\"unknow\" :\"unknow\", \"result\" :\"success\"}"; + server.enqueueResponse(expectedResponse2, 200); + + httpReportMarkDoneAction.markDone(partition); + RecordedRequest request2 = server.takeRequest(10, TimeUnit.SECONDS); + assertRequest(request2); + } + + @Test + public void testHttpReportMarkDoneActionFailedResponse() throws Exception { + HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction(); + + // status failed. + HttpReportMarkDoneResponse failedResponse = new HttpReportMarkDoneResponse("failed"); + server.enqueueResponse(failedResponse, 200); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "The http-report actions response attribute `result` should be 'SUCCESS' but is 'failed'."); + + // Illegal response body. + String unExpectResponse = "{\"unknow\" :\"unknow\"}"; + server.enqueueResponse(unExpectResponse, 200); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "The http-report actions response attribute `result` should be 'SUCCESS' but is 'null'."); + + // 400. + server.enqueueResponse("", 400); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(BadRequestException.class); + } + + public static void assertRequest(RecordedRequest recordedRequest) + throws JsonProcessingException { + String requestBody = recordedRequest.getBody().readUtf8(); + HttpReportMarkDoneRequest request = + server.readRequestBody(requestBody, HttpReportMarkDoneRequest.class); + + assertThat( + request.getPath().equals(fileStoreTable.location().toString()) + && request.getPartition().equals(partition) + && request.getTable().equals(fileStoreTable.fullName())) + .isTrue(); + } + + public static CoreOptions createCoreOptions() { + HashMap httpOptions = new HashMap<>(); + httpOptions.put(PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl()); + httpOptions.put(PARTITION_MARK_DONE_ACTION_TIMEOUT.key(), "2 s"); + return new CoreOptions(httpOptions); + } + + public HttpReportMarkDoneAction createHttpReportMarkDoneAction() { + return new HttpReportMarkDoneAction(fileStoreTable, createCoreOptions()); + } + + public FileStoreTable createFileStoreTable() throws Exception { + org.apache.paimon.fs.Path tablePath = + new org.apache.paimon.fs.Path(folder.newFolder().toURI().toString()); + Options options = new Options(); + options.set(CoreOptions.PATH, tablePath.toString()); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath), + new Schema( + ROW_TYPE.getFields(), + Collections.emptyList(), + Collections.emptyList(), + options.toMap(), + "")); + return FileStoreTableFactory.create( + FileIOFinder.find(tablePath), + tablePath, + tableSchema, + options, + CatalogEnvironment.empty()); + } +} From b1ea74516a16c89d9388416c91e0153a96ecf063 Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Wed, 8 Jan 2025 18:55:27 +0800 Subject: [PATCH 2/6] [core] add params. --- docs/content/flink/sql-write.md | 6 ++++-- .../generated/core_configuration.html | 6 ++++++ .../java/org/apache/paimon/CoreOptions.java | 11 +++++++++++ .../actions/HttpReportMarkDoneAction.java | 18 ++++++++++++++++++ .../HttpReportMarkDoneActionTest.java | 17 ++++++++++++++++- 5 files changed, 55 insertions(+), 3 deletions(-) diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md index b8a2201e951c..895b5a5cf5c8 100644 --- a/docs/content/flink/sql-write.md +++ b/docs/content/flink/sql-write.md @@ -288,13 +288,15 @@ Paimon also support http-report partition mark done action, this action will rep - partition.mark-done-action: http-report - partition.mark-done-action.url : Action will report the partition to the remote http server. - partition.mark-done-action.timeout : Http client connection timeout and default is 5s. +- partition.mark-done-action.params : Http client request params in the request body json. -Http Request body : +Http Post request body : ```json { "table": "table fullName", "path": "table location path", - "partition": "mark done partition" + "partition": "mark done partition", + "params" : "custom params" } ``` Http Response body : diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index ad6e1f46f735..69fda66438a9 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -671,6 +671,12 @@ String The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action. + +
partition.mark-done-action.params
+ (none) + String + Http client request parameters will be written to the request body, This can only be used by http-report partition mark done action. +
partition.mark-done-action.timeout
5 s diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 0561f3983ab2..2044a19cd846 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1251,6 +1251,13 @@ public class CoreOptions implements Serializable { .withDescription( "Http client connect timeout, This can only be used by http-report partition mark done action."); + public static final ConfigOption PARTITION_MARK_DONE_ACTION_PARAMS = + key("partition.mark-done-action.params") + .stringType() + .noDefaultValue() + .withDescription( + "Http client request parameters will be written to the request body, This can only be used by http-report partition mark done action."); + public static final ConfigOption METASTORE_PARTITIONED_TABLE = key("metastore.partitioned-table") .booleanType() @@ -2287,6 +2294,10 @@ public Duration httpReportMarkDoneActionTimeout() { return options.get(PARTITION_MARK_DONE_ACTION_TIMEOUT); } + public String httpReportMarkDoneActionParams() { + return options.get(PARTITION_MARK_DONE_ACTION_PARAMS); + } + public String partitionMarkDoneCustomClass() { return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS); } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java index 53cab41c1659..d40a7b47ffa6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -48,15 +48,21 @@ public class HttpReportMarkDoneAction implements PartitionMarkDoneAction { private final FileStoreTable fileStoreTable; + private final String params; + private static final String RESPONSE_SUCCESS = "SUCCESS"; public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) { + Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()), String.format( "Parameter %s must be non-empty when you use `http-report` partition mark done action.", PARTITION_MARK_DONE_ACTION_URL.key())); + this.fileStoreTable = fileStoreTable; + this.params = options.httpReportMarkDoneActionParams(); + HttpClientOptions httpClientOptions = new HttpClientOptions( options.httpReportMarkDoneActionUrl(), @@ -74,6 +80,7 @@ public void markDone(String partition) throws Exception { client.post( null, new HttpReportMarkDoneRequest( + params, fileStoreTable.fullName(), fileStoreTable.location().toString(), partition), @@ -105,6 +112,7 @@ public static class HttpReportMarkDoneRequest implements RESTRequest { private static final String MARK_DONE_PARTITION = "partition"; private static final String TABLE = "table"; private static final String PATH = "path"; + private static final String PARAMS = "params"; @JsonProperty(MARK_DONE_PARTITION) private final String partition; @@ -115,11 +123,16 @@ public static class HttpReportMarkDoneRequest implements RESTRequest { @JsonProperty(PATH) private final String path; + @JsonProperty(PARAMS) + private final String params; + @JsonCreator public HttpReportMarkDoneRequest( + @JsonProperty(PARAMS) String params, @JsonProperty(TABLE) String table, @JsonProperty(PATH) String path, @JsonProperty(MARK_DONE_PARTITION) String partition) { + this.params = params; this.table = table; this.path = path; this.partition = partition; @@ -139,6 +152,11 @@ public String getTable() { public String getPath() { return path; } + + @JsonGetter(PARAMS) + public String getParams() { + return params; + } } /** Response only for HttpReportMarkDoneAction. */ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java index 8aaab58805c3..9db49e4c4ecf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.concurrent.TimeUnit; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_PARAMS; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_TIMEOUT; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; import static org.apache.paimon.utils.InternalRowUtilsTest.ROW_TYPE; @@ -60,6 +61,7 @@ public class HttpReportMarkDoneActionTest { private static TestHttpWebServer server; private static final String partition = "partition"; + private static String params = "key1=value1,key2=value2"; private static FileStoreTable fileStoreTable; @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -92,6 +94,15 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { httpReportMarkDoneAction.markDone(partition); RecordedRequest request2 = server.takeRequest(10, TimeUnit.SECONDS); assertRequest(request2); + + // test params is null. + params = null; + HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction(); + HttpReportMarkDoneResponse expectedResponse3 = new HttpReportMarkDoneResponse("success"); + server.enqueueResponse(expectedResponse3, 200); + httpReportMarkDoneAction3.markDone(partition); + RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS); + assertRequest(request3); } @Test @@ -129,7 +140,8 @@ public static void assertRequest(RecordedRequest recordedRequest) assertThat( request.getPath().equals(fileStoreTable.location().toString()) && request.getPartition().equals(partition) - && request.getTable().equals(fileStoreTable.fullName())) + && request.getTable().equals(fileStoreTable.fullName()) + && (params == null || params.equals(request.getParams()))) .isTrue(); } @@ -137,6 +149,9 @@ public static CoreOptions createCoreOptions() { HashMap httpOptions = new HashMap<>(); httpOptions.put(PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl()); httpOptions.put(PARTITION_MARK_DONE_ACTION_TIMEOUT.key(), "2 s"); + if (params != null) { + httpOptions.put(PARTITION_MARK_DONE_ACTION_PARAMS.key(), params); + } return new CoreOptions(httpOptions); } From 943bf79e6986b99851d76992f5ce0882229b0e1e Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Thu, 9 Jan 2025 11:30:55 +0800 Subject: [PATCH 3/6] [core] fix comments. --- docs/content/flink/sql-write.md | 6 +++--- .../shortcodes/generated/core_configuration.html | 12 ++++++------ .../src/main/java/org/apache/paimon/CoreOptions.java | 12 ++++++------ .../partition/actions/HttpReportMarkDoneAction.java | 7 ++++--- .../flink/action/MarkPartitionDoneActionITCase.java | 2 +- .../sink/partition/HttpReportMarkDoneActionTest.java | 8 ++++---- 6 files changed, 24 insertions(+), 23 deletions(-) diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md index 895b5a5cf5c8..f8cdf2fd860b 100644 --- a/docs/content/flink/sql-write.md +++ b/docs/content/flink/sql-write.md @@ -286,9 +286,9 @@ public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction { Paimon also support http-report partition mark done action, this action will report the partition to the remote http server. - partition.mark-done-action: http-report -- partition.mark-done-action.url : Action will report the partition to the remote http server. -- partition.mark-done-action.timeout : Http client connection timeout and default is 5s. -- partition.mark-done-action.params : Http client request params in the request body json. +- partition.mark-done-action.http.url : Action will report the partition to the remote http server. +- partition.mark-done-action.http.timeout : Http client connection timeout and default is 5s. +- partition.mark-done-action.http.params : Http client request params in the request body json. Http Post request body : ```json diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 69fda66438a9..d650670bf824 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -672,22 +672,22 @@ The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action. -
partition.mark-done-action.params
+
partition.mark-done-action.http.params
(none) String - Http client request parameters will be written to the request body, This can only be used by http-report partition mark done action. + Http client request parameters will be written to the request body, this can only be used by http-report partition mark done action. -
partition.mark-done-action.timeout
+
partition.mark-done-action.http.timeout
5 s Duration - Http client connect timeout, This can only be used by http-report partition mark done action. + Http client connection timeout, this can only be used by http-report partition mark done action. -
partition.mark-done-action.url
+
partition.mark-done-action.http.url
(none) String - Mark done action will reports the partition to the remote http server, This can only be used by http-report partition mark done action. + Mark done action will reports the partition to the remote http server, this can only be used by http-report partition mark done action.
partition.timestamp-formatter
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 2044a19cd846..ebb425a73f3a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1238,25 +1238,25 @@ public class CoreOptions implements Serializable { + " PartitionMarkDoneAction interface. Only work in custom mark-done-action."); public static final ConfigOption PARTITION_MARK_DONE_ACTION_URL = - key("partition.mark-done-action.url") + key("partition.mark-done-action.http.url") .stringType() .noDefaultValue() .withDescription( - "Mark done action will reports the partition to the remote http server, This can only be used by http-report partition mark done action."); + "Mark done action will reports the partition to the remote http server, this can only be used by http-report partition mark done action."); public static final ConfigOption PARTITION_MARK_DONE_ACTION_TIMEOUT = - key("partition.mark-done-action.timeout") + key("partition.mark-done-action.http.timeout") .durationType() .defaultValue(Duration.ofSeconds(5)) .withDescription( - "Http client connect timeout, This can only be used by http-report partition mark done action."); + "Http client connection timeout, this can only be used by http-report partition mark done action."); public static final ConfigOption PARTITION_MARK_DONE_ACTION_PARAMS = - key("partition.mark-done-action.params") + key("partition.mark-done-action.http.params") .stringType() .noDefaultValue() .withDescription( - "Http client request parameters will be written to the request body, This can only be used by http-report partition mark done action."); + "Http client request parameters will be written to the request body, this can only be used by http-report partition mark done action."); public static final ConfigOption METASTORE_PARTITIONED_TABLE = key("metastore.partitioned-table") diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java index d40a7b47ffa6..c89f88b4c653 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -86,14 +86,14 @@ public void markDone(String partition) throws Exception { partition), HttpReportMarkDoneResponse.class, Collections.emptyMap()); - Preconditions.checkArgument( + Preconditions.checkState( reportIsSuccess(response), String.format( - "The http-report actions response attribute `result` should be 'SUCCESS' but is '%s'.", + "The http-report action's response attribute `result` should be 'SUCCESS' but is '%s'.", response.getResult())); } - public boolean reportIsSuccess(HttpReportMarkDoneResponse response) { + private boolean reportIsSuccess(HttpReportMarkDoneResponse response) { return response != null && RESPONSE_SUCCESS.equalsIgnoreCase(response.getResult()); } @@ -109,6 +109,7 @@ public void close() throws IOException { /** RestRequest only for HttpReportMarkDoneAction. */ @JsonIgnoreProperties(ignoreUnknown = true) public static class HttpReportMarkDoneRequest implements RESTRequest { + private static final String MARK_DONE_PARTITION = "partition"; private static final String TABLE = "table"; private static final String PATH = "path"; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index 21c014c83cfe..d5b840d6e2ae 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -165,7 +165,7 @@ public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String @MethodSource("testArguments") public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception { - Map options = new HashMap<>(); + Map options = new HashMap<>(2); options.put( PARTITION_MARK_DONE_ACTION.key(), PartitionMarkDoneAction.SUCCESS_FILE + "," + PartitionMarkDoneAction.CUSTOM); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java index 9db49e4c4ecf..e62443176d13 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java @@ -113,17 +113,17 @@ public void testHttpReportMarkDoneActionFailedResponse() throws Exception { HttpReportMarkDoneResponse failedResponse = new HttpReportMarkDoneResponse("failed"); server.enqueueResponse(failedResponse, 200); Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(IllegalStateException.class) .hasMessageContaining( - "The http-report actions response attribute `result` should be 'SUCCESS' but is 'failed'."); + "The http-report action's response attribute `result` should be 'SUCCESS' but is 'failed'."); // Illegal response body. String unExpectResponse = "{\"unknow\" :\"unknow\"}"; server.enqueueResponse(unExpectResponse, 200); Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(IllegalStateException.class) .hasMessageContaining( - "The http-report actions response attribute `result` should be 'SUCCESS' but is 'null'."); + "The http-report action's response attribute `result` should be 'SUCCESS' but is 'null'."); // 400. server.enqueueResponse("", 400); From 84069c5700e78e331ad308835d86247f14691cea Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Thu, 9 Jan 2025 16:01:21 +0800 Subject: [PATCH 4/6] [core] rebase master. --- .../actions/HttpReportMarkDoneAction.java | 14 +++-- .../actions/PartitionMarkDoneAction.java | 1 - .../apache/paimon/rest/HttpClientTest.java | 51 +++++++------------ 3 files changed, 23 insertions(+), 43 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java index c89f88b4c653..8148ac25ccae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -19,11 +19,10 @@ package org.apache.paimon.partition.actions; import org.apache.paimon.CoreOptions; -import org.apache.paimon.rest.DefaultErrorHandler; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.rest.HttpClient; import org.apache.paimon.rest.HttpClientOptions; import org.apache.paimon.rest.RESTClient; -import org.apache.paimon.rest.RESTObjectMapper; import org.apache.paimon.rest.RESTRequest; import org.apache.paimon.rest.RESTResponse; import org.apache.paimon.table.FileStoreTable; @@ -37,7 +36,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.Optional; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; @@ -66,11 +64,9 @@ public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions optio HttpClientOptions httpClientOptions = new HttpClientOptions( options.httpReportMarkDoneActionUrl(), - Optional.of(options.httpReportMarkDoneActionTimeout()), - Optional.of(options.httpReportMarkDoneActionTimeout()), - RESTObjectMapper.create(), - 1, - DefaultErrorHandler.getInstance()); + options.httpReportMarkDoneActionTimeout(), + options.httpReportMarkDoneActionTimeout(), + 1); this.client = new HttpClient(httpClientOptions); } @@ -108,6 +104,7 @@ public void close() throws IOException { /** RestRequest only for HttpReportMarkDoneAction. */ @JsonIgnoreProperties(ignoreUnknown = true) + @VisibleForTesting public static class HttpReportMarkDoneRequest implements RESTRequest { private static final String MARK_DONE_PARTITION = "partition"; @@ -162,6 +159,7 @@ public String getParams() { /** Response only for HttpReportMarkDoneAction. */ @JsonIgnoreProperties(ignoreUnknown = true) + @VisibleForTesting public static class HttpReportMarkDoneResponse implements RESTResponse { private static final String RESULT = "result"; diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java index 6a2bad0849e6..71a6e8dda749 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java @@ -60,7 +60,6 @@ static List createActions( case MARK_EVENT: return new MarkPartitionDoneEventAction( createPartitionHandler(fileStoreTable, options)); - createMetastoreClient(fileStoreTable, options)); case HTTP_REPORT: return new HttpReportMarkDoneAction(fileStoreTable, options); case CUSTOM: diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java index 54e7d3a68eee..161dbaf3bb50 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java @@ -24,10 +24,6 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.ErrorResponseResourceType; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,29 +41,28 @@ public class HttpClientTest { private static final String MOCK_PATH = "/v1/api/mock"; private static final String TOKEN = "token"; - private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); - private MockWebServer mockWebServer; + private TestHttpWebServer server; private HttpClient httpClient; private ErrorHandler errorHandler; private MockRESTData mockResponseData; private String mockResponseDataStr; - private ErrorResponse errorResponse; private String errorResponseStr; private Map headers; @Before - public void setUp() throws IOException { - mockWebServer = new MockWebServer(); - mockWebServer.start(); - String baseUrl = mockWebServer.url("").toString(); + public void setUp() throws Exception { + server = new TestHttpWebServer(MOCK_PATH); + server.start(); errorHandler = DefaultErrorHandler.getInstance(); HttpClientOptions httpClientOptions = - new HttpClientOptions(baseUrl, Duration.ofSeconds(3), Duration.ofSeconds(3), 1); + new HttpClientOptions( + server.getBaseUrl(), Duration.ofSeconds(3), Duration.ofSeconds(3), 1); mockResponseData = new MockRESTData(MOCK_PATH); - mockResponseDataStr = OBJECT_MAPPER.writeValueAsString(mockResponseData); - errorResponse = new ErrorResponse(ErrorResponseResourceType.DATABASE, "test", "test", 400); - errorResponseStr = OBJECT_MAPPER.writeValueAsString(errorResponse); + mockResponseDataStr = server.createResponseBody(mockResponseData); + errorResponseStr = + server.createResponseBody( + new ErrorResponse(ErrorResponseResourceType.DATABASE, "test", "test", 400)); httpClient = new HttpClient(httpClientOptions); httpClient.setErrorHandler(errorHandler); CredentialsProvider credentialsProvider = new BearTokenCredentialsProvider(TOKEN); @@ -76,19 +71,19 @@ public void setUp() throws IOException { @After public void tearDown() throws IOException { - mockWebServer.shutdown(); + server.stop(); } @Test public void testGetSuccess() { - mockHttpCallWithCode(mockResponseDataStr, 200); + server.enqueueResponse(mockResponseDataStr, 200); MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class, headers); assertEquals(mockResponseData.data(), response.data()); } @Test public void testGetFail() { - mockHttpCallWithCode(errorResponseStr, 400); + server.enqueueResponse(errorResponseStr, 400); assertThrows( BadRequestException.class, () -> httpClient.get(MOCK_PATH, MockRESTData.class, headers)); @@ -96,7 +91,7 @@ public void testGetFail() { @Test public void testPostSuccess() { - mockHttpCallWithCode(mockResponseDataStr, 200); + server.enqueueResponse(mockResponseDataStr, 200); MockRESTData response = httpClient.post(MOCK_PATH, mockResponseData, MockRESTData.class, headers); assertEquals(mockResponseData.data(), response.data()); @@ -104,7 +99,7 @@ public void testPostSuccess() { @Test public void testPostFail() { - mockHttpCallWithCode(errorResponseStr, 400); + server.enqueueResponse(errorResponseStr, 400); assertThrows( BadRequestException.class, () -> httpClient.post(MOCK_PATH, mockResponseData, ErrorResponse.class, headers)); @@ -112,25 +107,13 @@ public void testPostFail() { @Test public void testDeleteSuccess() { - mockHttpCallWithCode(mockResponseDataStr, 200); + server.enqueueResponse(mockResponseDataStr, 200); assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH, headers)); } @Test public void testDeleteFail() { - mockHttpCallWithCode(errorResponseStr, 400); + server.enqueueResponse(errorResponseStr, 400); assertThrows(BadRequestException.class, () -> httpClient.delete(MOCK_PATH, headers)); } - - private void mockHttpCallWithCode(String body, Integer code) { - MockResponse mockResponseObj = generateMockResponse(body, code); - mockWebServer.enqueue(mockResponseObj); - } - - private MockResponse generateMockResponse(String data, Integer code) { - return new MockResponse() - .setResponseCode(code) - .setBody(data) - .addHeader("Content-Type", "application/json"); - } } From 005416a2f8586dfc4be376084c4324319b245a5d Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Sun, 12 Jan 2025 22:03:24 +0800 Subject: [PATCH 5/6] [core] improvement. --- .../java/org/apache/paimon/CoreOptions.java | 27 +++++++++++++++++++ .../actions/PartitionMarkDoneAction.java | 14 +++------- .../action/MarkPartitionDoneActionITCase.java | 14 ++++------ .../CustomPartitionMarkDoneActionTest.java | 2 +- 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index ebb425a73f3a..afe4c50208f7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -49,6 +49,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -2302,6 +2303,12 @@ public String partitionMarkDoneCustomClass() { return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS); } + public Set partitionMarkDoneActions() { + return Arrays.stream(options.get(PARTITION_MARK_DONE_ACTION).split(",")) + .map(x -> PartitionMarkDoneAction.valueOf(x.replace('-', '_').toUpperCase())) + .collect(Collectors.toCollection(HashSet::new)); + } + public String consumerId() { String consumerId = options.get(CONSUMER_ID); if (consumerId != null && consumerId.isEmpty()) { @@ -3199,4 +3206,24 @@ public enum MaterializedTableRefreshStatus { ACTIVATED, SUSPENDED } + + /** Partition mark done actions. */ + public enum PartitionMarkDoneAction { + SUCCESS_FILE("success-file"), + DONE_PARTITION("done-partition"), + MARK_EVENT("mark-event"), + HTTP_REPORT("http-report"), + CUSTOM("custom"); + + private final String value; + + PartitionMarkDoneAction(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java index 71a6e8dda749..ee12fce528ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java @@ -24,33 +24,27 @@ import org.apache.paimon.utils.StringUtils; import java.io.Closeable; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Action to mark partitions done. */ public interface PartitionMarkDoneAction extends Closeable { - String SUCCESS_FILE = "success-file"; - String DONE_PARTITION = "done-partition"; - String MARK_EVENT = "mark-event"; - String HTTP_REPORT = "http-report"; - String CUSTOM = "custom"; - void markDone(String partition) throws Exception; static List createActions( ClassLoader cl, FileStoreTable fileStoreTable, CoreOptions options) { - return Arrays.stream(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).split(",")) + return options.partitionMarkDoneActions().stream() .map( action -> { - switch (action.toLowerCase()) { + switch (action) { case SUCCESS_FILE: return new SuccessFileMarkDoneAction( fileStoreTable.fileIO(), fileStoreTable.location()); @@ -65,7 +59,7 @@ static List createActions( case CUSTOM: return generateCustomMarkDoneAction(cl, options); default: - throw new UnsupportedOperationException(action); + throw new UnsupportedOperationException(action.toString()); } }) .collect(Collectors.toList()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index d5b840d6e2ae..82f9637faff8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction; import org.apache.paimon.fs.Path; import org.apache.paimon.partition.actions.HttpReportMarkDoneAction; -import org.apache.paimon.partition.actions.PartitionMarkDoneAction; import org.apache.paimon.partition.file.SuccessFile; import org.apache.paimon.rest.TestHttpWebServer; import org.apache.paimon.table.FileStoreTable; @@ -50,6 +49,9 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.HTTP_REPORT; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.SUCCESS_FILE; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for {@link MarkPartitionDoneAction}. */ @@ -166,9 +168,7 @@ public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception { Map options = new HashMap<>(2); - options.put( - PARTITION_MARK_DONE_ACTION.key(), - PartitionMarkDoneAction.SUCCESS_FILE + "," + PartitionMarkDoneAction.CUSTOM); + options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + CUSTOM); options.put( PARTITION_MARK_DONE_CUSTOM_CLASS.key(), MockCustomPartitionMarkDoneAction.class.getName()); @@ -229,11 +229,7 @@ public void testHttpReportPartitionMarkDoneAction(boolean hasPk, String invoker) server.start(); try { Map options = new HashMap<>(); - options.put( - PARTITION_MARK_DONE_ACTION.key(), - PartitionMarkDoneAction.SUCCESS_FILE - + "," - + PartitionMarkDoneAction.HTTP_REPORT); + options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + HTTP_REPORT); options.put(PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl()); FileStoreTable table = prepareTable(hasPk, options); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java index d1599b6c57ea..73ba630b638c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java @@ -32,8 +32,8 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; +import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM; import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits; -import static org.apache.paimon.partition.actions.PartitionMarkDoneAction.CUSTOM; import static org.assertj.core.api.Assertions.assertThat; /** Test for custom PartitionMarkDoneAction. */ From c410748c50e1868f63959458f2270df6c7aff3fe Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Mon, 13 Jan 2025 13:55:39 +0800 Subject: [PATCH 6/6] [core] fix. --- .../actions/HttpReportMarkDoneAction.java | 92 +++++++++++++++---- .../actions/HttpReportMarkDoneException.java | 33 +++++++ .../org/apache/paimon/rest/HttpClient.java | 2 +- .../apache/paimon/rest/TestHttpWebServer.java | 5 +- .../action/MarkPartitionDoneActionITCase.java | 3 +- .../HttpReportMarkDoneActionTest.java | 23 +++-- 6 files changed, 123 insertions(+), 35 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java index 8148ac25ccae..39c17406b339 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -20,11 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.rest.HttpClient; -import org.apache.paimon.rest.HttpClientOptions; -import org.apache.paimon.rest.RESTClient; -import org.apache.paimon.rest.RESTRequest; -import org.apache.paimon.rest.RESTResponse; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; @@ -33,16 +28,37 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializationFeature; + +import okhttp3.Dispatcher; +import okhttp3.Headers; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; +import java.util.concurrent.SynchronousQueue; +import static okhttp3.ConnectionSpec.CLEARTEXT; +import static okhttp3.ConnectionSpec.COMPATIBLE_TLS; +import static okhttp3.ConnectionSpec.MODERN_TLS; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION_URL; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; /** Report partition submission information to remote http server. */ public class HttpReportMarkDoneAction implements PartitionMarkDoneAction { - private final RESTClient client; + private final OkHttpClient client; + private final String url; + private final ObjectMapper mapper; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); private final FileStoreTable fileStoreTable; @@ -50,6 +66,8 @@ public class HttpReportMarkDoneAction implements PartitionMarkDoneAction { private static final String RESPONSE_SUCCESS = "SUCCESS"; + private static final String THREAD_NAME = "PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD"; + public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) { Preconditions.checkArgument( @@ -60,27 +78,34 @@ public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions optio this.fileStoreTable = fileStoreTable; this.params = options.httpReportMarkDoneActionParams(); - - HttpClientOptions httpClientOptions = - new HttpClientOptions( - options.httpReportMarkDoneActionUrl(), - options.httpReportMarkDoneActionTimeout(), - options.httpReportMarkDoneActionTimeout(), - 1); - this.client = new HttpClient(httpClientOptions); + this.url = options.httpReportMarkDoneActionUrl(); + this.mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + + OkHttpClient.Builder builder = + new OkHttpClient.Builder() + .dispatcher( + new Dispatcher( + createCachedThreadPool( + 1, THREAD_NAME, new SynchronousQueue<>()))) + .retryOnConnectionFailure(true) + .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT)) + .connectTimeout(options.httpReportMarkDoneActionTimeout()) + .readTimeout(options.httpReportMarkDoneActionTimeout()); + + this.client = builder.build(); } @Override public void markDone(String partition) throws Exception { HttpReportMarkDoneResponse response = - client.post( - null, + post( new HttpReportMarkDoneRequest( params, fileStoreTable.fullName(), fileStoreTable.location().toString(), partition), - HttpReportMarkDoneResponse.class, Collections.emptyMap()); Preconditions.checkState( reportIsSuccess(response), @@ -96,7 +121,8 @@ private boolean reportIsSuccess(HttpReportMarkDoneResponse response) { @Override public void close() throws IOException { try { - this.client.close(); + client.dispatcher().cancelAll(); + client.connectionPool().evictAll(); } catch (Exception e) { throw new RuntimeException(e); } @@ -105,7 +131,7 @@ public void close() throws IOException { /** RestRequest only for HttpReportMarkDoneAction. */ @JsonIgnoreProperties(ignoreUnknown = true) @VisibleForTesting - public static class HttpReportMarkDoneRequest implements RESTRequest { + public static class HttpReportMarkDoneRequest { private static final String MARK_DONE_PARTITION = "partition"; private static final String TABLE = "table"; @@ -160,7 +186,7 @@ public String getParams() { /** Response only for HttpReportMarkDoneAction. */ @JsonIgnoreProperties(ignoreUnknown = true) @VisibleForTesting - public static class HttpReportMarkDoneResponse implements RESTResponse { + public static class HttpReportMarkDoneResponse { private static final String RESULT = "result"; @JsonProperty(RESULT) @@ -175,4 +201,30 @@ public String getResult() { return result; } } + + public HttpReportMarkDoneResponse post( + HttpReportMarkDoneRequest body, Map headers) throws IOException { + RequestBody requestBody = RequestBody.create(mapper.writeValueAsBytes(body), MEDIA_TYPE); + Request request = + new Request.Builder() + .url(url) + .post(requestBody) + .headers(Headers.of(headers)) + .build(); + try (Response response = client.newCall(request).execute()) { + String responseBodyStr = response.body() != null ? response.body().string() : null; + if (!response.isSuccessful() || StringUtils.isNullOrWhitespaceOnly(responseBodyStr)) { + throw new HttpReportMarkDoneException( + response.isSuccessful() + ? "ResponseBody is null or empty." + : String.format( + "Response is not successful, response is %s", response)); + } + return mapper.readValue(responseBodyStr, HttpReportMarkDoneResponse.class); + } catch (HttpReportMarkDoneException e) { + throw e; + } catch (Exception e) { + throw new HttpReportMarkDoneException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java new file mode 100644 index 000000000000..56d177575472 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition.actions; + +/** Exception for {@link HttpReportMarkDoneAction}. */ +public class HttpReportMarkDoneException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public HttpReportMarkDoneException(Throwable e) { + super("Http request exception.", e); + } + + public HttpReportMarkDoneException(String msg) { + super(msg); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java index d08d5af54df2..bbfee103c7b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java @@ -51,7 +51,7 @@ public class HttpClient implements RESTClient { private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); - private static final String THREAD_NAME = "PAIMON-HTTP-CLIENT-THREAD-POOL"; + private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL"; private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); private final OkHttpClient okHttpClient; diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java index 7b13e95da3a7..c56e18f977c4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestHttpWebServer.java @@ -87,12 +87,11 @@ public String createResponseBody(RESTResponse response) throws JsonProcessingExc return objectMapper.writeValueAsString(response); } - public T readRequestBody(String body, Class requestType) - throws JsonProcessingException { + public T readRequestBody(String body, Class requestType) throws JsonProcessingException { return objectMapper.readValue(body, requestType); } - public T readResponseBody(String body, Class responseType) + public T readResponseBody(String body, Class responseType) throws JsonProcessingException { return objectMapper.readValue(body, responseType); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index 82f9637faff8..0a29aebf22bf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -234,8 +234,7 @@ public void testHttpReportPartitionMarkDoneAction(boolean hasPk, String invoker) FileStoreTable table = prepareTable(hasPk, options); - HttpReportMarkDoneAction.HttpReportMarkDoneResponse expectResponse = - new HttpReportMarkDoneAction.HttpReportMarkDoneResponse("success"); + String expectResponse = "{\"result\":\"success\"}"; server.enqueueResponse(expectResponse, 200); server.enqueueResponse(expectResponse, 200); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java index e62443176d13..b79597cb810f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java @@ -24,9 +24,8 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.actions.HttpReportMarkDoneAction; import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneRequest; -import org.apache.paimon.partition.actions.HttpReportMarkDoneAction.HttpReportMarkDoneResponse; +import org.apache.paimon.partition.actions.HttpReportMarkDoneException; import org.apache.paimon.rest.TestHttpWebServer; -import org.apache.paimon.rest.exceptions.BadRequestException; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -62,6 +61,8 @@ public class HttpReportMarkDoneActionTest { private static final String partition = "partition"; private static String params = "key1=value1,key2=value2"; + private static final String successResponse = "{\"result\":\"success\"}"; + private static final String failedResponse = "{\"result\":\"failed\"}"; private static FileStoreTable fileStoreTable; @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -81,8 +82,7 @@ public void stopServer() throws Exception { public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction(); - HttpReportMarkDoneResponse expectedResponse = new HttpReportMarkDoneResponse("success"); - server.enqueueResponse(expectedResponse, 200); + server.enqueueResponse(successResponse, 200); httpReportMarkDoneAction.markDone(partition); RecordedRequest request = server.takeRequest(10, TimeUnit.SECONDS); @@ -98,8 +98,7 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { // test params is null. params = null; HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction(); - HttpReportMarkDoneResponse expectedResponse3 = new HttpReportMarkDoneResponse("success"); - server.enqueueResponse(expectedResponse3, 200); + server.enqueueResponse(successResponse, 200); httpReportMarkDoneAction3.markDone(partition); RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS); assertRequest(request3); @@ -110,7 +109,6 @@ public void testHttpReportMarkDoneActionFailedResponse() throws Exception { HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction(); // status failed. - HttpReportMarkDoneResponse failedResponse = new HttpReportMarkDoneResponse("failed"); server.enqueueResponse(failedResponse, 200); Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) .isInstanceOf(IllegalStateException.class) @@ -125,10 +123,17 @@ public void testHttpReportMarkDoneActionFailedResponse() throws Exception { .hasMessageContaining( "The http-report action's response attribute `result` should be 'SUCCESS' but is 'null'."); + // empty response. + server.enqueueResponse("", 200); + Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) + .isInstanceOf(HttpReportMarkDoneException.class) + .hasMessageContaining("ResponseBody is null or empty."); + // 400. - server.enqueueResponse("", 400); + server.enqueueResponse(successResponse, 400); Assertions.assertThatThrownBy(() -> markDoneAction.markDone(partition)) - .isInstanceOf(BadRequestException.class); + .isInstanceOf(HttpReportMarkDoneException.class) + .hasMessageContaining("Response is not successful"); } public static void assertRequest(RecordedRequest recordedRequest)