diff --git a/sdks/java/container/Dockerfile b/sdks/java/container/Dockerfile index 7fb325d7affa..af5c31efc817 100644 --- a/sdks/java/container/Dockerfile +++ b/sdks/java/container/Dockerfile @@ -23,6 +23,11 @@ ADD target/slf4j-api.jar /opt/apache/beam/jars/ ADD target/slf4j-jdk14.jar /opt/apache/beam/jars/ ADD target/beam-sdks-java-harness.jar /opt/apache/beam/jars/ +# Required to run cross-language pipelines with KafkaIO +# TODO May be removed once custom environments are supported +ADD target/beam-sdks-java-io-kafka.jar /opt/apache/beam/jars/ +ADD target/kafka-clients.jar /opt/apache/beam/jars/ + ADD target/linux_amd64/boot /opt/apache/beam/ ENTRYPOINT ["/opt/apache/beam/boot"] diff --git a/sdks/java/container/Dockerfile-java11 b/sdks/java/container/Dockerfile-java11 index 25ec30db299c..0ca4fa350e86 100644 --- a/sdks/java/container/Dockerfile-java11 +++ b/sdks/java/container/Dockerfile-java11 @@ -23,6 +23,11 @@ ADD target/slf4j-api.jar /opt/apache/beam/jars/ ADD target/slf4j-jdk14.jar /opt/apache/beam/jars/ ADD target/beam-sdks-java-harness.jar /opt/apache/beam/jars/ +# Required to run cross-language pipelines with KafkaIO +# TODO May be removed once custom environments are supported +ADD target/beam-sdks-java-io-kafka.jar /opt/apache/beam/jars/ +ADD target/kafka-clients.jar /opt/apache/beam/jars/ + ADD target/linux_amd64/boot /opt/apache/beam/ ENTRYPOINT ["/opt/apache/beam/boot"] diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 96915668860e..67cd38760542 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -102,6 +102,8 @@ func main() { filepath.Join(jarsDir, "slf4j-api.jar"), filepath.Join(jarsDir, "slf4j-jdk14.jar"), filepath.Join(jarsDir, "beam-sdks-java-harness.jar"), + filepath.Join(jarsDir, "beam-sdks-java-io-kafka.jar"), + filepath.Join(jarsDir, "kafka-clients.jar"), } var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar") diff --git a/sdks/java/container/build.gradle b/sdks/java/container/build.gradle index b60f3d6fd9a3..f7924a066112 100644 --- a/sdks/java/container/build.gradle +++ b/sdks/java/container/build.gradle @@ -41,6 +41,8 @@ dependencies { dockerDependency library.java.slf4j_api dockerDependency library.java.slf4j_jdk14 dockerDependency project(path: ":beam-sdks-java-harness", configuration: "shadow") + // For executing KafkaIO, e.g. as an external transform + dockerDependency project(path: ":beam-sdks-java-io-kafka", configuration: "shadow") } def dockerfileName = project.findProperty('dockerfile') ?: 'Dockerfile' @@ -50,6 +52,8 @@ task copyDockerfileDependencies(type: Copy) { rename "slf4j-api.*", "slf4j-api.jar" rename "slf4j-jdk14.*", "slf4j-jdk14.jar" rename 'beam-sdks-java-harness-.*.jar', 'beam-sdks-java-harness.jar' + rename 'beam-sdks-java-io-kafka.*.jar', 'beam-sdks-java-io-kafka.jar' + rename 'kafka-clients.*.jar', 'kafka-clients.jar' into "build/target" } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index d542bcc5e587..b54c07945d6b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -458,18 +458,6 @@ private static Coder resolveCoder(Class deserializer) { } throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer); } - - private static Class resolveClass(String className) { - try { - return Class.forName(className); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not find deserializer class: " + className); - } - } - - private static String utf8String(byte[] bytes) { - return new String(bytes, Charsets.UTF_8); - } } /** @@ -486,7 +474,7 @@ public Map> knownBuilders() { return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class); } - /** Parameters class to expose the transform to an external SDK. */ + /** Parameters class to expose the Read transform to an external SDK. */ public static class Configuration { // All byte arrays are UTF-8 encoded strings @@ -1325,12 +1313,77 @@ public abstract static class Write extends PTransform abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder { + abstract static class Builder + implements ExternalTransformBuilder>, PDone> { abstract Builder setTopic(String topic); abstract Builder setWriteRecordsTransform(WriteRecords transform); abstract Write build(); + + @Override + public PTransform>, PDone> buildExternal( + External.Configuration configuration) { + String topic = utf8String(configuration.topic); + setTopic(topic); + + Map producerConfig = new HashMap<>(); + for (KV kv : configuration.producerConfig) { + String key = utf8String(kv.getKey()); + String value = utf8String(kv.getValue()); + producerConfig.put(key, value); + } + Class keySerializer = resolveClass(utf8String(configuration.keySerializer)); + Class valSerializer = resolveClass(utf8String(configuration.valueSerializer)); + + WriteRecords writeRecords = + KafkaIO.writeRecords() + .updateProducerProperties(producerConfig) + .withKeySerializer(keySerializer) + .withValueSerializer(valSerializer) + .withTopic(topic); + setWriteRecordsTransform(writeRecords); + + return build(); + } + } + + /** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */ + @AutoService(ExternalTransformRegistrar.class) + public static class External implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:kafka:write:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class); + } + + /** Parameters class to expose the Write transform to an external SDK. */ + public static class Configuration { + + // All byte arrays are UTF-8 encoded strings + private Iterable> producerConfig; + private byte[] topic; + private byte[] keySerializer; + private byte[] valueSerializer; + + public void setProducerConfig(Iterable> producerConfig) { + this.producerConfig = producerConfig; + } + + public void setTopic(byte[] topic) { + this.topic = topic; + } + + public void setKeySerializer(byte[] keySerializer) { + this.keySerializer = keySerializer; + } + + public void setValueSerializer(byte[] valueSerializer) { + this.valueSerializer = valueSerializer; + } + } } /** Used mostly to reduce using of boilerplate of wrapping {@link WriteRecords} methods. */ @@ -1580,4 +1633,16 @@ static NullableCoder inferCoder( throw new RuntimeException( String.format("Could not extract the Kafka Deserializer type from %s", deserializer)); } + + private static String utf8String(byte[] bytes) { + return new String(bytes, Charsets.UTF_8); + } + + private static Class resolveClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not find class: " + className); + } + } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index d5adbdb3600f..4f2130e3ddcf 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -27,11 +27,17 @@ import org.apache.beam.model.expansion.v1.ExpansionApi; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.expansion.ExpansionService; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver; @@ -39,17 +45,20 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.internal.util.reflection.Whitebox; /** Tests for building {@link KafkaIO} externally via the ExpansionService. */ @RunWith(JUnit4.class) public class KafkaIOExternalTest { @Test - public void testConstructKafkaIO() throws Exception { + public void testConstructKafkaRead() throws Exception { List topics = ImmutableList.of("topic1", "topic2"); String keyDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; String valueDeserializer = "org.apache.kafka.common.serialization.LongDeserializer"; @@ -136,10 +145,112 @@ public void testConstructKafkaIO() throws Exception { assertThat(spec.getValueDeserializer().getName(), Matchers.is(valueDeserializer)); } + @Test + public void testConstructKafkaWrite() throws Exception { + String topic = "topic"; + String keySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"; + String valueSerializer = "org.apache.kafka.common.serialization.LongSerializer"; + ImmutableMap producerConfig = + ImmutableMap.builder() + .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:port,server2:port") + .put("retries", "3") + .build(); + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .putConfiguration( + "topic", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(topic))) + .build()) + .putConfiguration( + "producer_config", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:iterable:v1") + .addCoderUrn("beam:coder:kv:v1") + .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(mapAsBytes(producerConfig))) + .build()) + .putConfiguration( + "key_serializer", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(keySerializer))) + .build()) + .putConfiguration( + "value_serializer", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(valueSerializer))) + .build()) + .build(); + + Pipeline p = Pipeline.create(); + p.apply(Impulse.create()).apply(WithKeys.of("key")); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + String inputPCollection = + Iterables.getOnlyElement( + Iterables.getLast(pipelineProto.getComponents().getTransformsMap().values()) + .getOutputsMap() + .values()); + + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(pipelineProto.getComponents()) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName("test") + .putInputs("input", inputPCollection) + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn("beam:external:java:kafka:write:v1") + .setPayload(payload.toByteString()))) + .setNamespace("test_namespace") + .build(); + + ExpansionService expansionService = new ExpansionService(); + TestStreamObserver observer = new TestStreamObserver<>(); + expansionService.expand(request, observer); + + ExpansionApi.ExpansionResponse result = observer.result; + RunnerApi.PTransform transform = result.getTransform(); + assertThat( + transform.getSubtransformsList(), + Matchers.contains( + "test_namespacetest/Kafka ProducerRecord", "test_namespacetest/KafkaIO.WriteRecords")); + assertThat(transform.getInputsCount(), Matchers.is(1)); + assertThat(transform.getOutputsCount(), Matchers.is(0)); + + RunnerApi.PTransform writeComposite = + result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)); + RunnerApi.PTransform writeParDo = + result + .getComponents() + .getTransformsOrThrow( + result + .getComponents() + .getTransformsOrThrow(writeComposite.getSubtransforms(0)) + .getSubtransforms(0)); + + RunnerApi.ParDoPayload parDoPayload = + RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload()); + DoFn kafkaWriter = ParDoTranslation.getDoFn(parDoPayload); + assertThat(kafkaWriter, Matchers.instanceOf(KafkaWriter.class)); + KafkaIO.WriteRecords spec = + (KafkaIO.WriteRecords) Whitebox.getInternalState(kafkaWriter, "spec"); + + assertThat(spec.getProducerConfig(), Matchers.is(producerConfig)); + assertThat(spec.getTopic(), Matchers.is(topic)); + assertThat(spec.getKeySerializer().getName(), Matchers.is(keySerializer)); + assertThat(spec.getValueSerializer().getName(), Matchers.is(valueSerializer)); + } + private static byte[] listAsBytes(List stringList) throws IOException { IterableCoder coder = IterableCoder.of(ByteArrayCoder.of()); List bytesList = - stringList.stream().map(KafkaIOExternalTest::rawBytes).collect(Collectors.toList()); + stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); coder.encode(bytesList, baos); return baos.toByteArray(); @@ -150,7 +261,7 @@ private static byte[] mapAsBytes(Map stringMap) throws IOExcepti IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of())); List> bytesList = stringMap.entrySet().stream() - .map(kv -> KV.of(rawBytes(kv.getKey()), rawBytes(kv.getValue()))) + .map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue()))) .collect(Collectors.toList()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); coder.encode(bytesList, baos); @@ -159,11 +270,11 @@ private static byte[] mapAsBytes(Map stringMap) throws IOExcepti private static byte[] encodeString(String str) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ByteArrayCoder.of().encode(rawBytes(str), baos); + ByteArrayCoder.of().encode(utf8Bytes(str), baos); return baos.toByteArray(); } - private static byte[] rawBytes(String str) { + private static byte[] utf8Bytes(String str) { Preconditions.checkNotNull(str, "String must not be null."); return str.getBytes(Charsets.UTF_8); } diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py index b5b49436b0de..b61cf2c205ca 100644 --- a/sdks/python/apache_beam/io/external/kafka.py +++ b/sdks/python/apache_beam/io/external/kafka.py @@ -16,11 +16,23 @@ # """ PTransforms for supporting Kafka in Python pipelines. These transforms do not - run a Kafka client in Python. Instead, they expand to ExternalTransform and - utilize the Java SDK's Kafka IO. The expansion service will insert Kafka Java - transforms before the pipeline is executed. Users currently have to provide - the address of the Java expansion service. Flink Users can use the built-in - expansion service of the Flink Runner's job server. + run a Kafka client in Python. Instead, they expand to ExternalTransforms + which the Expansion Service resolves to the Java SDK's KafkaIO. In other + words: they are cross-language transforms. + + Note: To use these transforms, you need to start a Java Expansion Service. + Please refer to the portability documentation on how to do that. Flink Users + can use the built-in Expansion Service of the Flink Runner's Job Server. The + expansion service address has to be provided when instantiating the + transforms. + + If you start Flink's Job Server, the expansion service will be started on + port 8097. This is also the configured default for this transform. For a + different address, please set the expansion_service parameter. + + For more information see: + - https://beam.apache.org/documentation/runners/flink/ + - https://beam.apache.org/roadmap/portability/ """ from __future__ import absolute_import @@ -42,20 +54,6 @@ class ReadFromKafka(ptransform.PTransform): each item in the specified Kafka topics. If no Kafka Deserializer for key/value is provided, then the data will be returned as a raw byte array. - Note: To use this transform, you need to start the Java expansion service. - Please refer to the portability documentation on how to do that. The - expansion service address has to be provided when instantiating this - transform. During pipeline translation this transform will be replaced by - the Java SDK's KafkaIO. - - If you start Flink's job server, the expansion service will be started on - port 8097. This is also the configured default for this transform. For a - different address, please set the expansion_service parameter. - - For more information see: - - https://beam.apache.org/documentation/runners/flink/ - - https://beam.apache.org/roadmap/portability/ - Note: Runners need to support translating Read operations in order to use this source. At the moment only the Flink Runner supports this. """ @@ -102,13 +100,13 @@ def expand(self, pbegin): args = { 'consumer_config': - ReadFromKafka._encode_map(self.consumer_config), + _encode_map(self.consumer_config), 'topics': - ReadFromKafka._encode_list(self.topics), + _encode_list(self.topics), 'key_deserializer': - ReadFromKafka._encode_str(self.key_deserializer), + _encode_str(self.key_deserializer), 'value_deserializer': - ReadFromKafka._encode_str(self.value_deserializer), + _encode_str(self.value_deserializer), } payload = ExternalConfigurationPayload(configuration=args) @@ -118,35 +116,98 @@ def expand(self, pbegin): payload.SerializeToString(), self.expansion_service)) - @staticmethod - def _encode_map(dict_obj): - kv_list = [(key.encode('utf-8'), val.encode('utf-8')) - for key, val in dict_obj.items()] - coder = IterableCoder(TupleCoder( - [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())])) - coder_urns = ['beam:coder:iterable:v1', - 'beam:coder:kv:v1', - 'beam:coder:bytes:v1', - 'beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(kv_list)) - - @staticmethod - def _encode_list(list_obj): - encoded_list = [val.encode('utf-8') for val in list_obj] - coder = IterableCoder(LengthPrefixCoder(BytesCoder())) - coder_urns = ['beam:coder:iterable:v1', - 'beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(encoded_list)) - - @staticmethod - def _encode_str(str_obj): - encoded_str = str_obj.encode('utf-8') - coder = LengthPrefixCoder(BytesCoder()) - coder_urns = ['beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(encoded_str)) + +class WriteToKafka(ptransform.PTransform): + """ + An external PTransform which writes KV data to a specified Kafka topic. + If no Kafka Serializer for key/value is provided, then key/value are + assumed to be byte arrays. + """ + + # Default serializer which passes raw bytes to Kafka + byte_array_serializer = 'org.apache.kafka.common.serialization.' \ + 'ByteArraySerializer' + + def __init__(self, producer_config, + topic, + key_serializer=byte_array_serializer, + value_serializer=byte_array_serializer, + expansion_service='localhost:8097'): + """ + Initializes a write operation to Kafka. + + :param consumer_config: A dictionary containing the producer configuration. + :param topic: A Kafka topic name. + :param key_deserializer: A fully-qualified Java class name of a Kafka + Serializer for the topic's key, e.g. + 'org.apache.kafka.common. + serialization.LongSerializer'. + Default: 'org.apache.kafka.common. + serialization.ByteArraySerializer'. + :param value_deserializer: A fully-qualified Java class name of a Kafka + Serializer for the topic's value, e.g. + 'org.apache.kafka.common. + serialization.LongSerializer'. + Default: 'org.apache.kafka.common. + serialization.ByteArraySerializer'. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + super(WriteToKafka, self).__init__() + self._urn = 'beam:external:java:kafka:write:v1' + self.producer_config = producer_config + self.topic = topic + self.key_serializer = key_serializer + self.value_serializer = value_serializer + self.expansion_service = expansion_service + + def expand(self, pvalue): + args = { + 'producer_config': + _encode_map(self.producer_config), + 'topic': + _encode_str(self.topic), + 'key_serializer': + _encode_str(self.key_serializer), + 'value_serializer': + _encode_str(self.value_serializer), + } + + payload = ExternalConfigurationPayload(configuration=args) + return pvalue.apply( + ExternalTransform( + self._urn, + payload.SerializeToString(), + self.expansion_service)) + + +def _encode_map(dict_obj): + kv_list = [(key.encode('utf-8'), val.encode('utf-8')) + for key, val in dict_obj.items()] + coder = IterableCoder(TupleCoder( + [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())])) + coder_urns = ['beam:coder:iterable:v1', + 'beam:coder:kv:v1', + 'beam:coder:bytes:v1', + 'beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(kv_list)) + + +def _encode_list(list_obj): + encoded_list = [val.encode('utf-8') for val in list_obj] + coder = IterableCoder(LengthPrefixCoder(BytesCoder())) + coder_urns = ['beam:coder:iterable:v1', + 'beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_list)) + + +def _encode_str(str_obj): + encoded_str = str_obj.encode('utf-8') + coder = LengthPrefixCoder(BytesCoder()) + coder_urns = ['beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_str)) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index b607e5a83dff..33d2bedf6c5a 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -28,8 +28,11 @@ from tempfile import mkdtemp import apache_beam as beam +from apache_beam import Impulse +from apache_beam import Map from apache_beam.io.external.generate_sequence import GenerateSequence from apache_beam.io.external.kafka import ReadFromKafka +from apache_beam.io.external.kafka import WriteToKafka from apache_beam.metrics import Metrics from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PortableOptions @@ -159,17 +162,15 @@ def test_no_subtransform_composite(self): raise unittest.SkipTest("BEAM-4781") def test_external_transforms(self): - options = self.create_options() - options._all_options['parallelism'] = 1 - options._all_options['streaming'] = True - - expansion_address = "localhost:" + str(FlinkRunnerTest.expansion_port) + # TODO Move expansion address resides into PipelineOptions + def get_expansion_service(): + return "localhost:" + str(FlinkRunnerTest.expansion_port) with self.create_pipeline() as p: res = ( p | GenerateSequence(start=1, stop=10, - expansion_service=expansion_address)) + expansion_service=get_expansion_service())) assert_that(res, equal_to([i for i in range(1, 10)])) @@ -189,12 +190,28 @@ def test_external_transforms(self): value_deserializer='org.apache.kafka.' 'common.serialization.' 'LongDeserializer', - expansion_service=expansion_address)) + expansion_service=get_expansion_service())) self.assertTrue('No resolvable bootstrap urls given in bootstrap.servers' in str(ctx.exception), 'Expected to fail due to invalid bootstrap.servers, but ' 'failed due to:\n%s' % str(ctx.exception)) + # We just test the expansion but do not execute. + # pylint: disable=expression-not-assigned + (self.create_pipeline() + | Impulse() + | Map(lambda input: (1, input)) + | WriteToKafka(producer_config={'bootstrap.servers': + 'localhost:9092, notvalid2:3531'}, + topic='topic1', + key_serializer='org.apache.kafka.' + 'common.serialization.' + 'LongSerializer', + value_serializer='org.apache.kafka.' + 'common.serialization.' + 'ByteArraySerializer', + expansion_service=get_expansion_service())) + def test_flattened_side_input(self): # Blocked on support for transcoding # https://jira.apache.org/jira/browse/BEAM-6523