diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8f9d2336336e..9816a516acf1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -538,8 +538,10 @@ class BeamModulePlugin implements Plugin { cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version", cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version", cdap_api : "io.cdap.cdap:cdap-api:$cdap_version", + cdap_api_commons : "io.cdap.cdap:cdap-api-common:$cdap_version", cdap_common : "io.cdap.cdap:cdap-common:$cdap_version", cdap_etl_api : "io.cdap.cdap:cdap-etl-api:$cdap_version", + cdap_etl_api_spark : "io.cdap.cdap:cdap-etl-api-spark:$cdap_version", cdap_plugin_service_now : "io.cdap.plugin:servicenow-plugins:1.1.0", checker_qual : "org.checkerframework:checker-qual:$checkerframework_version", classgraph : "io.github.classgraph:classgraph:$classgraph_version", @@ -693,6 +695,7 @@ class BeamModulePlugin implements Plugin { spark3_sql : "org.apache.spark:spark-sql_2.12:$spark3_version", spark3_streaming : "org.apache.spark:spark-streaming_2.12:$spark3_version", stax2_api : "org.codehaus.woodstox:stax2-api:4.2.1", + tephra : "org.apache.tephra:tephra-api:0.15.0-incubating", testcontainers_base : "org.testcontainers:testcontainers:$testcontainers_version", testcontainers_clickhouse : "org.testcontainers:clickhouse:$testcontainers_version", testcontainers_elasticsearch : "org.testcontainers:elasticsearch:$testcontainers_version", diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle index 4ef361924d75..9fe7d305a296 100644 --- a/sdks/java/io/cdap/build.gradle +++ b/sdks/java/io/cdap/build.gradle @@ -38,15 +38,22 @@ interface for integration with CDAP plugins.""" */ dependencies { - implementation library.java.guava implementation library.java.cdap_api - implementation library.java.cdap_common + implementation library.java.cdap_api_commons + implementation (library.java.cdap_common) { + exclude module: "log4j-over-slf4j" + } + implementation library.java.cdap_etl_api + implementation library.java.cdap_etl_api_spark implementation library.java.jackson_core implementation library.java.jackson_databind + implementation library.java.guava implementation library.java.slf4j_api + implementation library.java.tephra implementation project(path: ":sdks:java:core", configuration: "shadow") testImplementation library.java.cdap_plugin_service_now testImplementation library.java.cdap_etl_api testImplementation library.java.vendored_guava_26_0_jre testImplementation library.java.junit + testImplementation project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java new file mode 100644 index 000000000000..06b174062df0 --- /dev/null +++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import io.cdap.cdap.api.data.DatasetInstantiationException; +import io.cdap.cdap.api.data.batch.InputFormatProvider; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.Dataset; +import io.cdap.cdap.api.dataset.DatasetManagementException; +import io.cdap.cdap.api.dataset.DatasetProperties; +import io.cdap.cdap.api.metadata.Metadata; +import io.cdap.cdap.api.metadata.MetadataEntity; +import io.cdap.cdap.api.metadata.MetadataException; +import io.cdap.cdap.api.metadata.MetadataScope; +import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.Lookup; +import io.cdap.cdap.etl.api.StageMetrics; +import io.cdap.cdap.etl.api.SubmitterLifecycle; +import io.cdap.cdap.etl.api.action.SettableArguments; +import io.cdap.cdap.etl.api.batch.BatchContext; +import io.cdap.cdap.etl.api.lineage.field.FieldOperation; +import java.net.URL; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** Class for Batch, Sink and Stream CDAP wrapper classes that use it to provide common details. */ +@SuppressWarnings({"TypeParameterUnusedInFormals", "nullness"}) +public abstract class BatchContextImpl implements BatchContext { + + private final FailureCollectorWrapper failureCollector = new FailureCollectorWrapper(); + + /** + * This should be set after {@link SubmitterLifecycle#prepareRun(Object)} call with passing this + * context object as a param. + */ + protected InputFormatProvider inputFormatProvider; + + private final Timestamp startTime = new Timestamp(System.currentTimeMillis()); + + public InputFormatProvider getInputFormatProvider() { + return inputFormatProvider; + } + + @Override + public String getStageName() { + return null; + } + + @Override + public String getNamespace() { + return null; + } + + @Override + public String getPipelineName() { + return null; + } + + @Override + public long getLogicalStartTime() { + return this.startTime.getTime(); + } + + @Override + public StageMetrics getMetrics() { + return null; + } + + @Override + public PluginProperties getPluginProperties() { + return null; + } + + @Override + public PluginProperties getPluginProperties(String pluginId) { + return null; + } + + @Override + public Class loadPluginClass(String pluginId) { + return null; + } + + @Override + public T newPluginInstance(String pluginId) throws InstantiationException { + return null; + } + + @Nullable + @Override + public Schema getInputSchema() { + return null; + } + + @Override + public @Nullable Map getInputSchemas() { + return null; + } + + @Override + public @Nullable Schema getOutputSchema() { + return null; + } + + @Override + public Map getOutputPortSchemas() { + return null; + } + + @Override + public void createDataset(String datasetName, String typeName, DatasetProperties properties) + throws DatasetManagementException {} + + @Override + public boolean datasetExists(String datasetName) throws DatasetManagementException { + return false; + } + + @Override + public SettableArguments getArguments() { + return null; + } + + @Override + public FailureCollector getFailureCollector() { + return this.failureCollector; + } + + @Nullable + @Override + public URL getServiceURL(String applicationId, String serviceId) { + return null; + } + + @Nullable + @Override + public URL getServiceURL(String serviceId) { + return null; + } + + @Override + public Map getMetadata(MetadataEntity metadataEntity) + throws MetadataException { + return null; + } + + @Override + public Metadata getMetadata(MetadataScope scope, MetadataEntity metadataEntity) + throws MetadataException { + return null; + } + + @Override + public void addProperties(MetadataEntity metadataEntity, Map properties) {} + + @Override + public void addTags(MetadataEntity metadataEntity, String... tags) {} + + @Override + public void addTags(MetadataEntity metadataEntity, Iterable tags) {} + + @Override + public void removeMetadata(MetadataEntity metadataEntity) {} + + @Override + public void removeProperties(MetadataEntity metadataEntity) {} + + @Override + public void removeProperties(MetadataEntity metadataEntity, String... keys) {} + + @Override + public void removeTags(MetadataEntity metadataEntity) {} + + @Override + public void removeTags(MetadataEntity metadataEntity, String... tags) {} + + @Override + public void record(List fieldOperations) {} + + @Override + public T getDataset(String name) throws DatasetInstantiationException { + return null; + } + + @Override + public T getDataset(String namespace, String name) + throws DatasetInstantiationException { + return null; + } + + @Override + public T getDataset(String name, Map arguments) + throws DatasetInstantiationException { + return null; + } + + @Override + public T getDataset( + String namespace, String name, Map arguments) + throws DatasetInstantiationException { + return null; + } + + @Override + public void releaseDataset(Dataset dataset) {} + + @Override + public void discardDataset(Dataset dataset) {} + + @Override + public Lookup provide(String table, Map arguments) { + return null; + } +} diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java new file mode 100644 index 000000000000..f0374f7793df --- /dev/null +++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import io.cdap.cdap.api.data.batch.Output; +import io.cdap.cdap.etl.api.batch.BatchSinkContext; + +/** Class for creating context object of different CDAP classes with batch sink type. */ +public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext { + + @Override + public void addOutput(Output output) {} + + @Override + public boolean isPreviewEnabled() { + return false; + } +} diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java new file mode 100644 index 000000000000..98532936035d --- /dev/null +++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import io.cdap.cdap.api.data.batch.Input; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; + +/** Class for creating context object of different CDAP classes with batch source type. */ +public class BatchSourceContextImpl extends BatchContextImpl implements BatchSourceContext { + + @Override + public void setInput(Input input) { + this.inputFormatProvider = ((Input.InputFormatProviderInput) input).getInputFormatProvider(); + } + + @Override + public boolean isPreviewEnabled() { + return false; + } + + @Override + public int getMaxPreviewRecords() { + return 0; + } +} diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java new file mode 100644 index 000000000000..d697909d02ef --- /dev/null +++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.validation.ValidationException; +import io.cdap.cdap.etl.api.validation.ValidationFailure; +import java.util.ArrayList; +import javax.annotation.Nullable; + +/** Class FailureCollectorWrapper is a class for collecting ValidationFailure. */ +public class FailureCollectorWrapper implements FailureCollector { + private ArrayList failuresCollection; + + public FailureCollectorWrapper() { + this.failuresCollection = new ArrayList<>(); + } + + @Override + public ValidationFailure addFailure(String message, @Nullable String correctiveAction) { + ValidationFailure validationFailure = new ValidationFailure(message, correctiveAction); + failuresCollection.add(validationFailure); + + return validationFailure; + } + + @Override + public ValidationException getOrThrowException() throws ValidationException { + if (failuresCollection.isEmpty()) { + return new ValidationException(this.failuresCollection); + } + + throw new ValidationException(this.failuresCollection); + } + + @Override + public ArrayList getValidationFailures() { + return this.failuresCollection; + } +} diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java new file mode 100644 index 000000000000..7c09ba19f5fa --- /dev/null +++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.DatasetManagementException; +import io.cdap.cdap.etl.api.streaming.StreamingSourceContext; +import javax.annotation.Nullable; +import org.apache.tephra.TransactionFailureException; + +/** Class for creating context object of different CDAP classes with stream source type. */ +public class StreamingSourceContextImpl extends BatchContextImpl implements StreamingSourceContext { + + @Override + public void registerLineage(String referenceName, @Nullable Schema schema) + throws DatasetManagementException, TransactionFailureException {} + + @Override + public boolean isPreviewEnabled() { + return false; + } +} diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java new file mode 100644 index 000000000000..f6548ccdf932 --- /dev/null +++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Context for CDAP classes. */ +@Experimental(Kind.SOURCE_SINK) +package org.apache.beam.sdk.io.cdap.context; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java new file mode 100644 index 000000000000..8f679fe3fc08 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.validation.ValidationException; +import java.sql.Timestamp; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for {@link BatchContextImpl}. */ +@RunWith(JUnit4.class) +public class BatchContextImplTest { + + @Test + public void getLogicalStartTime() { + /** arrange */ + Timestamp expectedStartTime = new Timestamp(System.currentTimeMillis()); + BatchContextImpl context = new BatchSourceContextImpl(); + + /** act */ + long actualStartTime = context.getLogicalStartTime(); + + /** assert */ + assertTrue((expectedStartTime.getTime() - actualStartTime) <= 100); + } + + @Test + public void getFailureCollector() { + /** arrange */ + BatchContextImpl context = new BatchSinkContextImpl(); + + /** act */ + FailureCollector failureCollector = context.getFailureCollector(); + + /** assert */ + ValidationException validationException = failureCollector.getOrThrowException(); + assertEquals(0, validationException.getFailures().size()); + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java new file mode 100644 index 000000000000..0e35c8a06a59 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap.context; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import io.cdap.cdap.etl.api.validation.ValidationException; +import io.cdap.cdap.etl.api.validation.ValidationFailure; +import java.util.ArrayList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for {@link FailureCollectorWrapper}. */ +@RunWith(JUnit4.class) +public class FailureCollectorWrapperTest { + + @Test + public void addFailure() { + /** arrange */ + FailureCollectorWrapper failureCollectorWrapper = new FailureCollectorWrapper(); + + /** act */ + RuntimeException error = new RuntimeException("An error has occurred"); + failureCollectorWrapper.addFailure(error.getMessage(), null); + + /** assert */ + assertThrows(ValidationException.class, () -> failureCollectorWrapper.getOrThrowException()); + } + + @Test + public void getOrThrowException() { + /** arrange */ + FailureCollectorWrapper failureCollectorWrapper = new FailureCollectorWrapper(); + String errorMessage = "An error has occurred"; + String expectedMessage = "Errors were encountered during validation. An error has occurred"; + + FailureCollectorWrapper emptyFailureCollectorWrapper = new FailureCollectorWrapper(); + + RuntimeException error = new RuntimeException(errorMessage); + failureCollectorWrapper.addFailure(error.getMessage(), null); + + /** act && assert */ + ValidationException e = + assertThrows( + ValidationException.class, () -> failureCollectorWrapper.getOrThrowException()); + assertEquals(expectedMessage, e.getMessage()); + + // A case when return ValidationException with empty collector + ArrayList exceptionCollector = + emptyFailureCollectorWrapper.getValidationFailures(); + assertEquals(0, exceptionCollector.size()); + } + + @Test + public void getValidationFailures() { + /** arrange */ + FailureCollectorWrapper failureCollectorWrapper = new FailureCollectorWrapper(); + String errorMessage = "An error has occurred"; + + FailureCollectorWrapper emptyFailureCollectorWrapper = new FailureCollectorWrapper(); + + RuntimeException error = new RuntimeException(errorMessage); + failureCollectorWrapper.addFailure(error.getMessage(), null); + + /** act */ + ArrayList exceptionCollector = + failureCollectorWrapper.getValidationFailures(); + ArrayList emptyExceptionCollector = + emptyFailureCollectorWrapper.getValidationFailures(); + + /** assert */ + assertEquals(1, exceptionCollector.size()); + assertEquals(errorMessage, exceptionCollector.get(0).getMessage()); + assertEquals(0, emptyExceptionCollector.size()); + } +}