Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,11 @@ static void populateConfiguration(
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format(
"The configuration class %s is missing a setter %s for %s with type %s",
config.getClass(),
setterName,
fieldName,
coder.getEncodedTypeDescriptor().getType().getTypeName()),
"The configuration class %s is missing a setter %s for %s",
config.getClass(), setterName, fieldName),
e);
}
method.invoke(
config, coder.decode(entry.getValue().getPayload().newInput(), Coder.Context.NESTED));
method.invoke(config, coder.decode(entry.getValue().getPayload().newInput()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -401,22 +402,26 @@ abstract Builder<K, V> setTimestampPolicyFactory(
public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
External.Configuration config) {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
for (String topic : config.topics) {
listBuilder.add(topic);
for (byte[] topic : config.topics) {
listBuilder.add(utf8String(topic));
}
setTopics(listBuilder.build());

Class keyDeserializer = resolveClass(config.keyDeserializer);
String keyDeserializerClassName = utf8String(config.keyDeserializer);
Class keyDeserializer = resolveClass(keyDeserializerClassName);
setKeyDeserializer(keyDeserializer);
setKeyCoder(resolveCoder(keyDeserializer));

Class valueDeserializer = resolveClass(config.valueDeserializer);
String valueDeserializerClassName = utf8String(config.valueDeserializer);
Class valueDeserializer = resolveClass(valueDeserializerClassName);
setValueDeserializer(valueDeserializer);
setValueCoder(resolveCoder(valueDeserializer));

Map<String, Object> consumerConfig = new HashMap<>();
for (KV<String, String> kv : config.consumerConfig) {
consumerConfig.put(kv.getKey(), kv.getValue());
for (KV<byte[], byte[]> kv : config.consumerConfig) {
String key = utf8String(kv.getKey());
String value = utf8String(kv.getValue());
consumerConfig.put(key, value);
}
// Key and Value Deserializers always have to be in the config.
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
Expand Down Expand Up @@ -475,24 +480,24 @@ public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
public static class Configuration {

// All byte arrays are UTF-8 encoded strings
private Iterable<KV<String, String>> consumerConfig;
private Iterable<String> topics;
private String keyDeserializer;
private String valueDeserializer;
private Iterable<KV<byte[], byte[]>> consumerConfig;
private Iterable<byte[]> topics;
private byte[] keyDeserializer;
private byte[] valueDeserializer;

public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
public void setConsumerConfig(Iterable<KV<byte[], byte[]>> consumerConfig) {
this.consumerConfig = consumerConfig;
}

public void setTopics(Iterable<String> topics) {
public void setTopics(Iterable<byte[]> topics) {
this.topics = topics;
}

public void setKeyDeserializer(String keyDeserializer) {
public void setKeyDeserializer(byte[] keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}

public void setValueDeserializer(String valueDeserializer) {
public void setValueDeserializer(byte[] valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
}
Expand Down Expand Up @@ -1360,21 +1365,24 @@ abstract static class Builder<K, V>
@Override
public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
External.Configuration configuration) {
setTopic(configuration.topic);
String topic = utf8String(configuration.topic);
setTopic(topic);

Map<String, Object> producerConfig = new HashMap<>();
for (KV<String, String> kv : configuration.producerConfig) {
producerConfig.put(kv.getKey(), kv.getValue());
for (KV<byte[], byte[]> kv : configuration.producerConfig) {
String key = utf8String(kv.getKey());
String value = utf8String(kv.getValue());
producerConfig.put(key, value);
}
Class keySerializer = resolveClass(configuration.keySerializer);
Class valSerializer = resolveClass(configuration.valueSerializer);
Class keySerializer = resolveClass(utf8String(configuration.keySerializer));
Class valSerializer = resolveClass(utf8String(configuration.valueSerializer));

WriteRecords<K, V> writeRecords =
KafkaIO.<K, V>writeRecords()
.withProducerConfigUpdates(producerConfig)
.withKeySerializer(keySerializer)
.withValueSerializer(valSerializer)
.withTopic(configuration.topic);
.withTopic(topic);
setWriteRecordsTransform(writeRecords);

return build();
Expand All @@ -1397,24 +1405,24 @@ public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
public static class Configuration {

// All byte arrays are UTF-8 encoded strings
private Iterable<KV<String, String>> producerConfig;
private String topic;
private String keySerializer;
private String valueSerializer;
private Iterable<KV<byte[], byte[]>> producerConfig;
private byte[] topic;
private byte[] keySerializer;
private byte[] valueSerializer;

public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
public void setProducerConfig(Iterable<KV<byte[], byte[]>> producerConfig) {
this.producerConfig = producerConfig;
}

public void setTopic(String topic) {
public void setTopic(byte[] topic) {
this.topic = topic;
}

public void setKeySerializer(String keySerializer) {
public void setKeySerializer(byte[] keySerializer) {
this.keySerializer = keySerializer;
}

public void setValueSerializer(String valueSerializer) {
public void setValueSerializer(byte[] valueSerializer) {
this.valueSerializer = valueSerializer;
}
}
Expand Down Expand Up @@ -1683,6 +1691,10 @@ static <T> NullableCoder<T> inferCoder(
String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
}

private static String utf8String(byte[] bytes) {
return new String(bytes, Charsets.UTF_8);
}

private static Class resolveClass(String className) {
try {
return Class.forName(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.expansion.ExpansionService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -74,28 +76,28 @@ public void testConstructKafkaRead() throws Exception {
"topics",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(listAsBytes(topics)))
.build())
.putConfiguration(
"consumer_config",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:kv:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig)))
.build())
.putConfiguration(
"key_deserializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(keyDeserializer)))
.build())
.putConfiguration(
"value_deserializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(valueDeserializer)))
.build())
.build();
Expand Down Expand Up @@ -159,28 +161,28 @@ public void testConstructKafkaWrite() throws Exception {
.putConfiguration(
"topic",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(topic)))
.build())
.putConfiguration(
"producer_config",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:kv:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(mapAsBytes(producerConfig)))
.build())
.putConfiguration(
"key_serializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(keySerializer)))
.build())
.putConfiguration(
"value_serializer",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:string_utf8:v1")
.addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(valueSerializer)))
.build())
.build();
Expand Down Expand Up @@ -246,30 +248,37 @@ public void testConstructKafkaWrite() throws Exception {
}

private static byte[] listAsBytes(List<String> stringList) throws IOException {
IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
IterableCoder<byte[]> coder = IterableCoder.of(ByteArrayCoder.of());
List<byte[]> bytesList =
stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
coder.encode(stringList, baos);
coder.encode(bytesList, baos);
return baos.toByteArray();
}

private static byte[] mapAsBytes(Map<String, String> stringMap) throws IOException {
IterableCoder<KV<String, String>> coder =
IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
List<KV<String, String>> stringList =
IterableCoder<KV<byte[], byte[]>> coder =
IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()));
List<KV<byte[], byte[]>> bytesList =
stringMap.entrySet().stream()
.map(kv -> KV.of(kv.getKey(), kv.getValue()))
.map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue())))
.collect(Collectors.toList());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
coder.encode(stringList, baos);
coder.encode(bytesList, baos);
return baos.toByteArray();
}

private static byte[] encodeString(String str) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(str, baos);
ByteArrayCoder.of().encode(utf8Bytes(str), baos);
return baos.toByteArray();
}

private static byte[] utf8Bytes(String str) {
Preconditions.checkNotNull(str, "String must not be null.");
return str.getBytes(Charsets.UTF_8);
}

private static class TestStreamObserver<T> implements StreamObserver<T> {

private T result;
Expand Down
62 changes: 47 additions & 15 deletions sdks/python/apache_beam/io/external/generate_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

from __future__ import absolute_import

from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
from apache_beam import ExternalTransform
from apache_beam import pvalue
from apache_beam.coders import VarIntCoder
from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
from apache_beam.transforms import ptransform


class GenerateSequence(ExternalTransform):
class GenerateSequence(ptransform.PTransform):
"""
An external PTransform which provides a bounded or unbounded stream of
integers.
Expand All @@ -45,19 +49,47 @@ class GenerateSequence(ExternalTransform):

Experimental; no backwards compatibility guarantees.
"""

URN = 'beam:external:java:generate_sequence:v1'

def __init__(self, start, stop=None,
elements_per_period=None, max_read_time=None,
expansion_service=None):
super(GenerateSequence, self).__init__(
self.URN,
ImplicitSchemaPayloadBuilder(
{
'start': start,
'stop': stop,
'elements_per_period': elements_per_period,
'max_read_time': max_read_time,
}
),
expansion_service)
expansion_service='localhost:8097'):
super(GenerateSequence, self).__init__()
self.start = start
self.stop = stop
self.elements_per_period = elements_per_period
self.max_read_time = max_read_time
self.expansion_service = expansion_service

def expand(self, pbegin):
if not isinstance(pbegin, pvalue.PBegin):
raise Exception("GenerateSequence must be a root transform")

coder = VarIntCoder()
coder_urn = ['beam:coder:varint:v1']
args = {
'start':
ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.start))
}
if self.stop:
args['stop'] = ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.stop))
if self.elements_per_period:
args['elements_per_period'] = ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.elements_per_period))
if self.max_read_time:
args['max_read_time'] = ConfigValue(
coder_urn=coder_urn,
payload=coder.encode(self.max_read_time))

payload = ExternalConfigurationPayload(configuration=args)
return pbegin.apply(
ExternalTransform(
self.URN,
payload.SerializeToString(),
self.expansion_service))
Loading