Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,15 @@ static void populateConfiguration(
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format(
"The configuration class %s is missing a setter %s for %s",
config.getClass(), setterName, fieldName),
"The configuration class %s is missing a setter %s for %s with type %s",
config.getClass(),
setterName,
fieldName,
coder.getEncodedTypeDescriptor().getType().getTypeName()),
e);
}
method.invoke(config, coder.decode(entry.getValue().getPayload().newInput()));
method.invoke(
config, coder.decode(entry.getValue().getPayload().newInput(), Coder.Context.NESTED));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -402,26 +401,22 @@ abstract Builder<K, V> setTimestampPolicyFactory(
public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
External.Configuration config) {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
for (byte[] topic : config.topics) {
listBuilder.add(utf8String(topic));
for (String topic : config.topics) {
listBuilder.add(topic);
}
setTopics(listBuilder.build());

String keyDeserializerClassName = utf8String(config.keyDeserializer);
Class keyDeserializer = resolveClass(keyDeserializerClassName);
Class keyDeserializer = resolveClass(config.keyDeserializer);
setKeyDeserializer(keyDeserializer);
setKeyCoder(resolveCoder(keyDeserializer));

String valueDeserializerClassName = utf8String(config.valueDeserializer);
Class valueDeserializer = resolveClass(valueDeserializerClassName);
Class valueDeserializer = resolveClass(config.valueDeserializer);
setValueDeserializer(valueDeserializer);
setValueCoder(resolveCoder(valueDeserializer));

Map<String, Object> consumerConfig = new HashMap<>();
for (KV<byte[], byte[]> kv : config.consumerConfig) {
String key = utf8String(kv.getKey());
String value = utf8String(kv.getValue());
consumerConfig.put(key, value);
for (KV<String, String> kv : config.consumerConfig) {
consumerConfig.put(kv.getKey(), kv.getValue());
}
// Key and Value Deserializers always have to be in the config.
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
Expand Down Expand Up @@ -480,24 +475,24 @@ public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
public static class Configuration {

// All byte arrays are UTF-8 encoded strings
private Iterable<KV<byte[], byte[]>> consumerConfig;
private Iterable<byte[]> topics;
private byte[] keyDeserializer;
private byte[] valueDeserializer;
private Iterable<KV<String, String>> consumerConfig;
private Iterable<String> topics;
private String keyDeserializer;
private String valueDeserializer;

public void setConsumerConfig(Iterable<KV<byte[], byte[]>> consumerConfig) {
public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
this.consumerConfig = consumerConfig;
}

public void setTopics(Iterable<byte[]> topics) {
public void setTopics(Iterable<String> topics) {
this.topics = topics;
}

public void setKeyDeserializer(byte[] keyDeserializer) {
public void setKeyDeserializer(String keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}

public void setValueDeserializer(byte[] valueDeserializer) {
public void setValueDeserializer(String valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
}
Expand Down Expand Up @@ -1365,24 +1360,21 @@ abstract static class Builder<K, V>
@Override
public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
External.Configuration configuration) {
String topic = utf8String(configuration.topic);
setTopic(topic);
setTopic(configuration.topic);

Map<String, Object> producerConfig = new HashMap<>();
for (KV<byte[], byte[]> kv : configuration.producerConfig) {
String key = utf8String(kv.getKey());
String value = utf8String(kv.getValue());
producerConfig.put(key, value);
for (KV<String, String> kv : configuration.producerConfig) {
producerConfig.put(kv.getKey(), kv.getValue());
}
Class keySerializer = resolveClass(utf8String(configuration.keySerializer));
Class valSerializer = resolveClass(utf8String(configuration.valueSerializer));
Class keySerializer = resolveClass(configuration.keySerializer);
Class valSerializer = resolveClass(configuration.valueSerializer);

WriteRecords<K, V> writeRecords =
KafkaIO.<K, V>writeRecords()
.withProducerConfigUpdates(producerConfig)
.withKeySerializer(keySerializer)
.withValueSerializer(valSerializer)
.withTopic(topic);
.withTopic(configuration.topic);
setWriteRecordsTransform(writeRecords);

return build();
Expand All @@ -1405,24 +1397,24 @@ public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
public static class Configuration {

// All byte arrays are UTF-8 encoded strings
private Iterable<KV<byte[], byte[]>> producerConfig;
private byte[] topic;
private byte[] keySerializer;
private byte[] valueSerializer;
private Iterable<KV<String, String>> producerConfig;
private String topic;
private String keySerializer;
private String valueSerializer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change the types here this will affect the lookup of the configuration fields in ExpansionService. The lookup is performed based on the returned type of the coder. The test still uses the bytes coder, so this will attempt to lookup a byte[] field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the KafkaIOExternal test which I see failing.


public void setProducerConfig(Iterable<KV<byte[], byte[]>> producerConfig) {
public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
this.producerConfig = producerConfig;
}

public void setTopic(byte[] topic) {
public void setTopic(String topic) {
this.topic = topic;
}

public void setKeySerializer(byte[] keySerializer) {
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}

public void setValueSerializer(byte[] valueSerializer) {
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
}
Expand Down Expand Up @@ -1691,10 +1683,6 @@ static <T> NullableCoder<T> inferCoder(
String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
}

private static String utf8String(byte[] bytes) {
return new String(bytes, Charsets.UTF_8);
}

private static Class resolveClass(String className) {
try {
return Class.forName(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.expansion.ExpansionService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -76,28 +74,28 @@ public void testConstructKafkaRead() throws Exception {
"topics",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(listAsBytes(topics)))
.build())
.putConfiguration(
"consumer_config",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:kv:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig)))
.build())
.putConfiguration(
"key_deserializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(encodeString(keyDeserializer)))
.build())
.putConfiguration(
"value_deserializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(encodeString(valueDeserializer)))
.build())
.build();
Expand Down Expand Up @@ -161,28 +159,28 @@ public void testConstructKafkaWrite() throws Exception {
.putConfiguration(
"topic",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(encodeString(topic)))
.build())
.putConfiguration(
"producer_config",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:kv:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(mapAsBytes(producerConfig)))
.build())
.putConfiguration(
"key_serializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(encodeString(keySerializer)))
.build())
.putConfiguration(
"value_serializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.setPayload(ByteString.copyFrom(encodeString(valueSerializer)))
.build())
.build();
Expand Down Expand Up @@ -248,37 +246,30 @@ public void testConstructKafkaWrite() throws Exception {
}

private static byte[] listAsBytes(List<String> stringList) throws IOException {
IterableCoder<byte[]> coder = IterableCoder.of(ByteArrayCoder.of());
List<byte[]> bytesList =
stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList());
IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
coder.encode(bytesList, baos);
coder.encode(stringList, baos);
return baos.toByteArray();
}

private static byte[] mapAsBytes(Map<String, String> stringMap) throws IOException {
IterableCoder<KV<byte[], byte[]>> coder =
IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()));
List<KV<byte[], byte[]>> bytesList =
IterableCoder<KV<String, String>> coder =
IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
List<KV<String, String>> stringList =
stringMap.entrySet().stream()
.map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue())))
.map(kv -> KV.of(kv.getKey(), kv.getValue()))
.collect(Collectors.toList());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
coder.encode(bytesList, baos);
coder.encode(stringList, baos);
return baos.toByteArray();
}

private static byte[] encodeString(String str) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteArrayCoder.of().encode(utf8Bytes(str), baos);
StringUtf8Coder.of().encode(str, baos);
return baos.toByteArray();
}

private static byte[] utf8Bytes(String str) {
Preconditions.checkNotNull(str, "String must not be null.");
return str.getBytes(Charsets.UTF_8);
}

private static class TestStreamObserver<T> implements StreamObserver<T> {

private T result;
Expand Down
62 changes: 15 additions & 47 deletions sdks/python/apache_beam/io/external/generate_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

from __future__ import absolute_import

from apache_beam import ExternalTransform
from apache_beam import pvalue
from apache_beam.coders import VarIntCoder
from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
from apache_beam.transforms import ptransform
from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder


class GenerateSequence(ptransform.PTransform):
class GenerateSequence(ExternalTransform):
"""
An external PTransform which provides a bounded or unbounded stream of
integers.
Expand All @@ -49,47 +45,19 @@ class GenerateSequence(ptransform.PTransform):

Experimental; no backwards compatibility guarantees.
"""

URN = 'beam:external:java:generate_sequence:v1'

def __init__(self, start, stop=None,
elements_per_period=None, max_read_time=None,
expansion_service='localhost:8097'):
super(GenerateSequence, self).__init__()
self.start = start
self.stop = stop
self.elements_per_period = elements_per_period
self.max_read_time = max_read_time
self.expansion_service = expansion_service

def expand(self, pbegin):
if not isinstance(pbegin, pvalue.PBegin):
raise Exception("GenerateSequence must be a root transform")

coder = VarIntCoder()
coder_urn = ['beam:coder:varint:v1']
args = {
'start':
ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.start))
}
if self.stop:
args['stop'] = ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.stop))
if self.elements_per_period:
args['elements_per_period'] = ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.elements_per_period))
if self.max_read_time:
args['max_read_time'] = ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.max_read_time))

payload = ExternalConfigurationPayload(configuration=args)
return pbegin.apply(
ExternalTransform(
self.URN,
payload.SerializeToString(),
self.expansion_service))
expansion_service=None):
super(GenerateSequence, self).__init__(
self.URN,
ImplicitSchemaPayloadBuilder(
{
'start': start,
'stop': stop,
'elements_per_period': elements_per_period,
'max_read_time': max_read_time,
}
),
expansion_service)
Loading