From 03de550a09536188d3d82cd09a7c7d9e850eaf9a Mon Sep 17 00:00:00 2001 From: Denis Pyshev Date: Mon, 11 Jul 2022 10:07:42 +0300 Subject: [PATCH 01/14] BEAM-13592 Add getOrderingKey in org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage Add orderingKey from proto PubsubMessage to beam Pubsubmessage. Add a new coder for a PubsubMessage with all fields. Propagate this new view of a PubsubMessage to PubsubIO as a new API call. Add a unit test for a coder. --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 18 ++++- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 28 +++++-- .../sdk/io/gcp/pubsub/PubsubMessageCoder.java | 78 +++++++++++++++++++ .../sdk/io/gcp/pubsub/PubsubMessages.java | 5 +- .../io/gcp/pubsub/PubsubUnboundedSource.java | 38 ++++++--- .../io/gcp/pubsub/PubsubMessageCoderTest.java | 63 +++++++++++++++ 6 files changed, 212 insertions(+), 18 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index eea908378967..48a14679c5b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -524,6 +524,16 @@ public static Read readMessagesWithAttributesAndMessageId() { .build(); } + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will contain a {@link PubsubMessage#getPayload() payload}, {@link + * PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId() + * messageId} and {PubsubMessage#getOrderingKey() orderingKey} from PubSub. + */ + public static Read readMessagesWithAllAttributesAndMessageIdAndOrderingKey() { + return Read.newBuilder().setCoder(PubsubMessageCoder.of()).setNeedsOrderingKey(true).build(); + } + /** * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud * Pub/Sub stream. @@ -735,6 +745,8 @@ public abstract static class Read extends PTransform> abstract boolean getNeedsMessageId(); + abstract boolean getNeedsOrderingKey(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction parseFn) { @@ -743,6 +755,7 @@ static Builder newBuilder(SerializableFunction parseFn) builder.setPubsubClientFactory(FACTORY); builder.setNeedsAttributes(false); builder.setNeedsMessageId(false); + builder.setNeedsOrderingKey(false); return builder; } @@ -781,6 +794,8 @@ abstract static class Builder { abstract Builder setNeedsMessageId(boolean needsMessageId); + abstract Builder setNeedsOrderingKey(boolean needsOrderingKey); + abstract Builder setClock(Clock clock); abstract Read build(); @@ -988,7 +1003,8 @@ public PCollection expand(PBegin input) { getTimestampAttribute(), getIdAttribute(), getNeedsAttributes(), - getNeedsMessageId()); + getNeedsMessageId(), + getNeedsOrderingKey()); PCollection read; PCollection preParse = input.apply(source); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index 3801c2d71f1e..549daf92657f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -25,7 +25,7 @@ /** * Class representing a Pub/Sub message. Each message contains a single message payload, a map of - * attached attributes, and a message id. + * attached attributes, a message id and an ordering key. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -40,21 +40,34 @@ abstract static class Impl { abstract @Nullable String getMessageId(); + abstract @Nullable String getOrderingKey(); + static Impl create( - byte[] payload, @Nullable Map attributes, @Nullable String messageId) { - return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId); + byte[] payload, + @Nullable Map attributes, + @Nullable String messageId, + @Nullable String orderingKey) { + return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId, orderingKey); } } private Impl impl; public PubsubMessage(byte[] payload, @Nullable Map attributes) { - this(payload, attributes, null); + this(payload, attributes, null, null); } public PubsubMessage( byte[] payload, @Nullable Map attributes, @Nullable String messageId) { - impl = Impl.create(payload, attributes, messageId); + impl = Impl.create(payload, attributes, messageId, null); + } + + public PubsubMessage( + byte[] payload, + @Nullable Map attributes, + @Nullable String messageId, + @Nullable String orderingKey) { + impl = Impl.create(payload, attributes, messageId, orderingKey); } /** Returns the main PubSub message. */ @@ -78,6 +91,11 @@ public byte[] getPayload() { return impl.getMessageId(); } + /** Returns the ordering key of the message. */ + public @Nullable String getOrderingKey() { + return impl.getOrderingKey(); + } + @Override public String toString() { return impl.toString(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java new file mode 100644 index 000000000000..53136d606c26 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for PubsubMessage including all fields of a PubSub message from server. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class PubsubMessageCoder extends CustomCoder { + // A message's payload cannot be null + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's attributes can be null. + private static final Coder> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + // A message's messageId cannot be null + private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); + // A message's publish time, populated by server + private static final Coder PUBLISH_TIME_CODER = ProtoCoder.of(Timestamp.class); + // A message's ordering key can be null + private static final Coder ORDERING_KEY_CODER = NullableCoder.of(StringUtf8Coder.of()); + + public static Coder of(TypeDescriptor ignored) { + return of(); + } + + public static PubsubMessageCoder of() { + return new PubsubMessageCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + // TODO(discuss what to do with publish_time field) + PUBLISH_TIME_CODER.encode(Timestamp.getDefaultInstance(), outStream); + ORDERING_KEY_CODER.encode(value.getOrderingKey(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + Map attributes = ATTRIBUTES_CODER.decode(inStream); + String messageId = MESSAGE_ID_CODER.decode(inStream); + PUBLISH_TIME_CODER.decode(inStream); + String orderingKey = ORDERING_KEY_CODER.decode(inStream); + return new PubsubMessage(payload, attributes, messageId, orderingKey); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index 1f0025a003a1..239b207cb4f3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -45,7 +45,10 @@ public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) { return new PubsubMessage( - input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId()); + input.getData().toByteArray(), + input.getAttributesMap(), + input.getMessageId(), + input.getOrderingKey()); } // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 857ef1cceb86..28e4601bda9f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1200,6 +1200,9 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Whether this source should include the messageId from PubSub. */ private final boolean needsMessageId; + /** Whether this source should include the orderingKey from PubSub. */ + private final boolean needsOrderingKey; + @VisibleForTesting PubsubUnboundedSource( Clock clock, @@ -1210,7 +1213,8 @@ public void populateDisplayData(DisplayData.Builder builder) { @Nullable String timestampAttribute, @Nullable String idAttribute, boolean needsAttributes, - boolean needsMessageId) { + boolean needsMessageId, + boolean needsOrderingKey) { checkArgument( (topic == null) != (subscription == null), "Exactly one of topic and subscription must be given"); @@ -1223,6 +1227,7 @@ public void populateDisplayData(DisplayData.Builder builder) { this.idAttribute = idAttribute; this.needsAttributes = needsAttributes; this.needsMessageId = needsMessageId; + this.needsOrderingKey = needsOrderingKey; } /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ @@ -1243,6 +1248,7 @@ public PubsubUnboundedSource( timestampAttribute, idAttribute, needsAttributes, + false, false); } @@ -1265,6 +1271,7 @@ public PubsubUnboundedSource( timestampAttribute, idAttribute, needsAttributes, + false, false); } @@ -1287,7 +1294,8 @@ public PubsubUnboundedSource( timestampAttribute, idAttribute, needsAttributes, - needsMessageId); + needsMessageId, + false); } /** Get the project path. */ @@ -1333,6 +1341,10 @@ public boolean getNeedsMessageId() { return needsMessageId; } + public boolean getNeedsOrderingKey() { + return needsOrderingKey; + } + @Override public PCollection expand(PBegin input) { SerializableFunction function; @@ -1342,16 +1354,20 @@ public PCollection expand(PBegin input) { function = new DeserializeBytesIntoPubsubMessagePayloadOnly(); } Coder messageCoder; - if (getNeedsMessageId()) { - messageCoder = - getNeedsAttributes() - ? PubsubMessageWithAttributesAndMessageIdCoder.of() - : PubsubMessageWithMessageIdCoder.of(); + if (getNeedsOrderingKey()) { + messageCoder = PubsubMessageCoder.of(); } else { - messageCoder = - getNeedsAttributes() - ? PubsubMessageWithAttributesCoder.of() - : PubsubMessagePayloadOnlyCoder.of(); + if (getNeedsMessageId()) { + messageCoder = + getNeedsAttributes() + ? PubsubMessageWithAttributesAndMessageIdCoder.of() + : PubsubMessageWithMessageIdCoder.of(); + } else { + messageCoder = + getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of(); + } } PCollection messages = input diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java new file mode 100644 index 000000000000..9c03ea558dd3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageCoderTest { + + private static final String DATA = "testData"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final String ORDERING_KEY = "key123"; + private static final Coder TEST_CODER = PubsubMessageCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage( + DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID, ORDERING_KEY); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} From d6f5ae47180b4a89b37cebaf3ad8eaca8fd8261c Mon Sep 17 00:00:00 2001 From: Denis Pyshev Date: Mon, 11 Jul 2022 11:25:51 +0300 Subject: [PATCH 02/14] Add CHANGES entry --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index a2d47753afe5..d5c1bfb99e66 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,7 @@ * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Previously available in Java sdk, Python sdk now also supports logging level overrides per module. ([#18222](https://github.com/apache/beam/issues/18222)). +* Added support for accessing GCP PubSub Message ordering keys (Java) ([BEAM-13592](https://issues.apache.org/jira/browse/BEAM-13592)) ## Breaking Changes From 267cd10980f228af79b7084212343738435c0c11 Mon Sep 17 00:00:00 2001 From: Denis Pyshev Date: Tue, 12 Jul 2022 16:20:52 +0300 Subject: [PATCH 03/14] Rename transform name according to review comment --- .../main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 48a14679c5b6..f90f1644235f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -530,7 +530,7 @@ public static Read readMessagesWithAttributesAndMessageId() { * PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId() * messageId} and {PubsubMessage#getOrderingKey() orderingKey} from PubSub. */ - public static Read readMessagesWithAllAttributesAndMessageIdAndOrderingKey() { + public static Read readMessagesWithAttributesAndMessageIdAndOrderingKey() { return Read.newBuilder().setCoder(PubsubMessageCoder.of()).setNeedsOrderingKey(true).build(); } From bc97c6b90863f18e42070287e418df2b2d14f041 Mon Sep 17 00:00:00 2001 From: Denis Pyshev Date: Tue, 19 Jul 2022 15:02:09 +0300 Subject: [PATCH 04/14] Update to pass ordering key --- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index 239b207cb4f3..e5e8abcb0c1e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -40,6 +40,10 @@ public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { if (messageId != null) { message.setMessageId(messageId); } + String orderingKey = input.getOrderingKey(); + if (orderingKey != null) { + message.setOrderingKey(orderingKey); + } return message.build(); } From 7be63b94f4d1c0f818333f5ee74991cfb723f3fa Mon Sep 17 00:00:00 2001 From: Denis Pyshev Date: Mon, 11 Jul 2022 11:25:51 +0300 Subject: [PATCH 05/14] Add CHANGES entry From bd1050a77040a3d0834ac5cc337f82bf0d4e1433 Mon Sep 17 00:00:00 2001 From: egalpin Date: Wed, 20 Jul 2022 10:03:42 -0400 Subject: [PATCH 06/14] Adds ordering key to OutgoingMessage builder, adds new coders to pubsub registrar --- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 3 + .../pubsub/PubsubCoderProviderRegistrar.java | 6 +- ...sageWithAttributesAndOrderingKeyCoder.java | 74 +++++++++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 8af2971e4f22..b3cddb2a62d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -318,6 +318,9 @@ public static OutgoingMessage of( if (message.getAttributeMap() != null) { builder.putAllAttributes(message.getAttributeMap()); } + if (message.getOrderingKey() != null) { + builder.setOrderingKey(message.getOrderingKey()); + } return of(builder.build(), timestampMsSinceEpoch, recordId); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index c10c60d717a5..35bf3513a6a4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -37,6 +37,10 @@ public List getCoderProviders() { TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), - PubsubMessageWithAttributesAndMessageIdCoder.of())); + PubsubMessageWithAttributesAndMessageIdCoder.of()), + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), + PubsubMessageWithAttributesAndOrderingKeyCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java new file mode 100644 index 000000000000..eb87dcc2f572 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for PubsubMessage including all fields of a PubSub message from server. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class PubsubMessageWithAttributesAndOrderingKeyCoder extends CustomCoder { + // A message's payload cannot be null + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's attributes can be null. + private static final Coder> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + // A message's publish time, populated by server + private static final Coder PUBLISH_TIME_CODER = ProtoCoder.of(Timestamp.class); + // A message's ordering key can be null + private static final Coder ORDERING_KEY_CODER = NullableCoder.of(StringUtf8Coder.of()); + + public static Coder of(TypeDescriptor ignored) { + return of(); + } + + public static PubsubMessageWithAttributesAndOrderingKeyCoder of() { + return new PubsubMessageWithAttributesAndOrderingKeyCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); + // TODO(discuss what to do with publish_time field) + PUBLISH_TIME_CODER.encode(Timestamp.getDefaultInstance(), outStream); + ORDERING_KEY_CODER.encode(value.getOrderingKey(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + Map attributes = ATTRIBUTES_CODER.decode(inStream); + PUBLISH_TIME_CODER.decode(inStream); + String orderingKey = ORDERING_KEY_CODER.decode(inStream); + return new PubsubMessage(payload, attributes, null, orderingKey); + } +} From e2a7c445ef707eaf52a99a9d0508cba770679e12 Mon Sep 17 00:00:00 2001 From: egalpin Date: Wed, 20 Jul 2022 11:12:20 -0400 Subject: [PATCH 07/14] Fixes pubsub bounded writer allowing for orderingKey --- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index f90f1644235f..c620f689cccb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1309,6 +1309,7 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed validatePubsubMessage(message); payload = message.getPayload(); Map attributes = message.getAttributeMap(); + String orderingKey = message.getOrderingKey(); if (payload.length > maxPublishBatchByteSize) { String msg = @@ -1324,13 +1325,18 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed publish(); } + com.google.pubsub.v1.PubsubMessage.Builder msgBuilder = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload)) + .putAllAttributes(attributes); + + if (orderingKey != null) { + msgBuilder.setOrderingKey(orderingKey); + } + // NOTE: The record id is always null. output.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes) - .build(), + OutgoingMessage.of(msgBuilder.build(), c.timestamp().getMillis(), null)); currentOutputBytes += payload.length; From bc60b746a7ca4055aee8b7b5cb0f3ddff8caea6f Mon Sep 17 00:00:00 2001 From: egalpin Date: Wed, 20 Jul 2022 16:22:23 -0400 Subject: [PATCH 08/14] Alters order of pubsub message support in registrar --- .../pubsub/PubsubCoderProviderRegistrar.java | 7 +- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 9 +-- ...sageWithAttributesAndOrderingKeyCoder.java | 74 ------------------- 3 files changed, 5 insertions(+), 85 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index 35bf3513a6a4..c561bad3dd7b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -31,16 +31,13 @@ public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar { @Override public List getCoderProviders() { return ImmutableList.of( + CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), - PubsubMessageWithAttributesAndMessageIdCoder.of()), - CoderProviders.forCoder( - TypeDescriptor.of(PubsubMessage.class), - PubsubMessageWithAttributesAndOrderingKeyCoder.of()), - CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of())); + PubsubMessageWithAttributesAndMessageIdCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index c620f689cccb..17bce913a875 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1327,18 +1327,15 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed com.google.pubsub.v1.PubsubMessage.Builder msgBuilder = com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes); + .setData(ByteString.copyFrom(payload)) + .putAllAttributes(attributes); if (orderingKey != null) { msgBuilder.setOrderingKey(orderingKey); } // NOTE: The record id is always null. - output.add( - OutgoingMessage.of(msgBuilder.build(), - c.timestamp().getMillis(), - null)); + output.add(OutgoingMessage.of(msgBuilder.build(), c.timestamp().getMillis(), null)); currentOutputBytes += payload.length; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java deleted file mode 100644 index eb87dcc2f572..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndOrderingKeyCoder.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import com.google.protobuf.Timestamp; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** A coder for PubsubMessage including all fields of a PubSub message from server. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public class PubsubMessageWithAttributesAndOrderingKeyCoder extends CustomCoder { - // A message's payload cannot be null - private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); - // A message's attributes can be null. - private static final Coder> ATTRIBUTES_CODER = - NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - // A message's publish time, populated by server - private static final Coder PUBLISH_TIME_CODER = ProtoCoder.of(Timestamp.class); - // A message's ordering key can be null - private static final Coder ORDERING_KEY_CODER = NullableCoder.of(StringUtf8Coder.of()); - - public static Coder of(TypeDescriptor ignored) { - return of(); - } - - public static PubsubMessageWithAttributesAndOrderingKeyCoder of() { - return new PubsubMessageWithAttributesAndOrderingKeyCoder(); - } - - @Override - public void encode(PubsubMessage value, OutputStream outStream) throws IOException { - PAYLOAD_CODER.encode(value.getPayload(), outStream); - ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); - // TODO(discuss what to do with publish_time field) - PUBLISH_TIME_CODER.encode(Timestamp.getDefaultInstance(), outStream); - ORDERING_KEY_CODER.encode(value.getOrderingKey(), outStream); - } - - @Override - public PubsubMessage decode(InputStream inStream) throws IOException { - byte[] payload = PAYLOAD_CODER.decode(inStream); - Map attributes = ATTRIBUTES_CODER.decode(inStream); - PUBLISH_TIME_CODER.decode(inStream); - String orderingKey = ORDERING_KEY_CODER.decode(inStream); - return new PubsubMessage(payload, attributes, null, orderingKey); - } -} From 46497d1a2f1f3c521889d32d455caed1e874d007 Mon Sep 17 00:00:00 2001 From: egalpin Date: Thu, 21 Jul 2022 09:31:56 -0400 Subject: [PATCH 09/14] Removed publishTime and messageId in grpc pubsub client publish --- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 3 ++- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 2 ++ .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index f70d30a55d63..9a70cda8ae73 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -190,7 +190,8 @@ private SubscriberBlockingStub subscriberStub() throws IOException { public int publish(TopicPath topic, List outgoingMessages) throws IOException { PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath()); for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage.Builder message = outgoingMessage.message().toBuilder(); + PubsubMessage.Builder message = + outgoingMessage.message().toBuilder().clearMessageId().clearPublishTime(); if (timestampAttribute != null) { message.putAttributes( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index d33ebf8917d4..a3037cc35113 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -133,6 +133,8 @@ public int publish(TopicPath topic, List outgoingMessages) thro if (!outgoingMessage.message().getOrderingKey().isEmpty()) { pubsubMessage.setOrderingKey(outgoingMessage.message().getOrderingKey()); } + + // N.B. publishTime and messageId are intentionally not set on the message that is published pubsubMessages.add(pubsubMessage); } PublishRequest request = new PublishRequest().setMessages(pubsubMessages); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index e5e8abcb0c1e..89879657a2b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -40,6 +40,7 @@ public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { if (messageId != null) { message.setMessageId(messageId); } + String orderingKey = input.getOrderingKey(); if (orderingKey != null) { message.setOrderingKey(orderingKey); From c3ea5633d71413dc9c740b5cfddc384e6210c354 Mon Sep 17 00:00:00 2001 From: egalpin Date: Thu, 21 Jul 2022 16:08:46 -0400 Subject: [PATCH 10/14] Attempts to allow different pubsub root url for PubsubIO.Write --- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 7 ++ .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 14 +++- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 12 ++- .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 14 +++- .../sdk/io/gcp/pubsub/PubsubTestClient.java | 40 +++++++++ .../io/gcp/pubsub/PubsubUnboundedSink.java | 82 +++++++++++++++++-- .../PubSubWritePayloadTranslationTest.java | 14 +++- .../gcp/pubsub/PubsubUnboundedSinkTest.java | 12 ++- 8 files changed, 180 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index b3cddb2a62d1..48d647779917 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -49,6 +49,13 @@ public interface PubsubClientFactory extends Serializable { * {@code timestampAttribute} and {@code idAttribute} to store custom timestamps/ids within * message metadata. */ + PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException; + PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) throws IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 9a70cda8ae73..7cc57b7a4c00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import avro.shaded.com.google.common.base.Objects; import com.google.auth.Credentials; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.AcknowledgeRequest; @@ -84,11 +85,22 @@ private static class PubsubGrpcClientFactory implements PubsubClientFactory { public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) throws IOException { + + return newClient(timestampAttribute, idAttribute, options, null); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + String rootUrlOverride) + throws IOException { return new PubsubGrpcClient( timestampAttribute, idAttribute, DEFAULT_TIMEOUT_S, - channelForRootUrl(options.getPubsubRootUrl()), + channelForRootUrl(Objects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())), options.getGcpCredential()); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 17bce913a875..0acb2663a42b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1108,6 +1108,8 @@ public abstract static class Write extends PTransform, PDone> /** The format function for input PubsubMessage objects. */ abstract SerializableFunction getFormatFn(); + abstract @Nullable String getPubsubRootUrl(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction formatFn) { @@ -1137,6 +1139,8 @@ abstract static class Builder { abstract Builder setFormatFn(SerializableFunction formatFn); + abstract Builder setPubsubRootUrl(String pubsubRootUrl); + abstract Write build(); } @@ -1216,6 +1220,10 @@ public Write withIdAttribute(String idAttribute) { return toBuilder().setIdAttribute(idAttribute).build(); } + public Write withPubsubRootUrl(String pubsubRootUrl) { + return toBuilder().setPubsubRootUrl(pubsubRootUrl).build(); + } + @Override public PDone expand(PCollection input) { if (getTopicProvider() == null) { @@ -1255,8 +1263,8 @@ public PDone expand(PCollection input) { MoreObjects.firstNonNull( getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), MoreObjects.firstNonNull( - getMaxBatchBytesSize(), - PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES))); + getMaxBatchBytesSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES), + getPubsubRootUrl())); } throw new RuntimeException(); // cases are exhaustive. } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index a3037cc35113..308f3606a53e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import avro.shaded.com.google.common.base.Objects; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; @@ -73,6 +74,17 @@ private static HttpRequestInitializer chainHttpRequestInitializer( public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) throws IOException { + + return newClient(timestampAttribute, idAttribute, options, null); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + String rootUrlOverride) + throws IOException { Pubsub pubsub = new Pubsub.Builder( Transport.getTransport(), @@ -82,7 +94,7 @@ public PubsubClient newClient( // Do not log 404. It clutters the output and is possibly even required by the // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(options.getPubsubRootUrl()) + .setRootUrl(Objects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index d025d85240c4..6c2d3af1877f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -106,6 +106,16 @@ public static PubsubTestClientFactory createFactoryForPublish( activate( () -> setPublishState(expectedTopic, expectedOutgoingMessages, failingOutgoingMessages)); return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) @@ -137,6 +147,16 @@ public static PubsubTestClientFactory createFactoryForPull( activate( () -> setPullState(expectedSubscription, clock, ackTimeoutSec, expectedIncomingMessages)); return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) @@ -183,6 +203,16 @@ public void close() throws IOException { }); } + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) @@ -294,6 +324,16 @@ public void close() throws IOException { numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls); } + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index b3ea42ed6522..cc3009c73131 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -198,6 +198,8 @@ private static class WriterFn extends DoFn private final int publishBatchSize; private final int publishBatchBytes; + private final String pubsubRootUrl; + /** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */ private transient @Nullable PubsubClient pubsubClient; @@ -218,6 +220,24 @@ private static class WriterFn extends DoFn this.idAttribute = idAttribute; this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; + this.pubsubRootUrl = null; + } + + WriterFn( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int publishBatchSize, + int publishBatchBytes, + String pubsubRootUrl) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.timestampAttribute = timestampAttribute; + this.idAttribute = idAttribute; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + this.pubsubRootUrl = pubsubRootUrl; } /** BLOCKING Send {@code messages} as a batch to Pubsub. */ @@ -238,7 +258,10 @@ public void startBundle(StartBundleContext c) throws Exception { checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); pubsubClient = pubsubFactory.newClient( - timestampAttribute, idAttribute, c.getPipelineOptions().as(PubsubOptions.class)); + timestampAttribute, + idAttribute, + c.getPipelineOptions().as(PubsubOptions.class), + pubsubRootUrl); } @ProcessElement @@ -327,6 +350,8 @@ public void populateDisplayData(DisplayData.Builder builder) { */ private final RecordIdMethod recordIdMethod; + private final String pubsubRootUrl; + @VisibleForTesting PubsubUnboundedSink( PubsubClientFactory pubsubFactory, @@ -337,7 +362,8 @@ public void populateDisplayData(DisplayData.Builder builder) { int publishBatchSize, int publishBatchBytes, Duration maxLatency, - RecordIdMethod recordIdMethod) { + RecordIdMethod recordIdMethod, + String pubsubRootUrl) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampAttribute = timestampAttribute; @@ -346,6 +372,7 @@ public void populateDisplayData(DisplayData.Builder builder) { this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.maxLatency = maxLatency; + this.pubsubRootUrl = pubsubRootUrl; this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : recordIdMethod; } @@ -364,7 +391,28 @@ public PubsubUnboundedSink( DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, - RecordIdMethod.RANDOM); + RecordIdMethod.RANDOM, + null); + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int numShards, + String pubsubRootUrl) { + this( + pubsubFactory, + topic, + timestampAttribute, + idAttribute, + numShards, + DEFAULT_PUBLISH_BATCH_SIZE, + DEFAULT_PUBLISH_BATCH_BYTES, + DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM, + pubsubRootUrl); } public PubsubUnboundedSink( @@ -384,7 +432,30 @@ public PubsubUnboundedSink( publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, - RecordIdMethod.RANDOM); + RecordIdMethod.RANDOM, + null); + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int numShards, + int publishBatchSize, + int publishBatchBytes, + String pubsubRootUrl) { + this( + pubsubFactory, + topic, + timestampAttribute, + idAttribute, + numShards, + publishBatchSize, + publishBatchBytes, + DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM, + pubsubRootUrl); } /** Get the topic being written to. */ public TopicPath getTopic() { @@ -451,7 +522,8 @@ public PDone expand(PCollection input) { outer.timestampAttribute, outer.idAttribute, outer.publishBatchSize, - outer.publishBatchBytes))); + outer.publishBatchBytes, + outer.pubsubRootUrl))); return PDone.in(input.getPipeline()); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index ea722ac70c48..ec7039815e70 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -67,7 +67,8 @@ public void testTranslateSinkWithTopic() throws Exception { 0, 0, Duration.ZERO, - null); + null, + pubsubRootUrl); PubsubUnboundedSink.PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSink); PCollection input = pipeline.apply(Create.of(new byte[0])); PDone output = input.apply(pubsubSink); @@ -96,7 +97,16 @@ public void testTranslateSinkWithTopicOverridden() throws Exception { ValueProvider runtimeProvider = pipeline.newProvider(TOPIC); PubsubUnboundedSink pubsubUnboundedSinkSink = new PubsubUnboundedSink( - null, runtimeProvider, TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, 0, 0, Duration.ZERO, null); + null, + runtimeProvider, + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + 0, + 0, + 0, + Duration.ZERO, + null, + pubsubRootUrl); PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSinkSink); PCollection input = pipeline.apply(Create.of(new byte[0])); PDone output = input.apply(pubsubSink); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f8cd86ee463c..61c9f2f6cfd7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -119,7 +119,8 @@ public void sendOneMessage() throws IOException { batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + pubsubRootUrl); p.apply(Create.of(ImmutableList.of(DATA))).apply(ParDo.of(new Stamp(ATTRIBUTES))).apply(sink); p.run(); } @@ -149,7 +150,8 @@ public void sendOneMessageWithoutAttributes() throws IOException { 1 /* batchSize */, 1 /* batchBytes */, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + pubsubRootUrl); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp(null /* attributes */))) .apply(sink); @@ -188,7 +190,8 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + pubsubRootUrl); p.apply(Create.of(data)).apply(ParDo.of(new Stamp())).apply(sink); p.run(); } @@ -231,7 +234,8 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + pubsubRootUrl); p.apply(Create.of(data)).apply(ParDo.of(new Stamp())).apply(sink); p.run(); } From 92e4d39d4f8ccb3e4d9936fe5f1056d2902d6f75 Mon Sep 17 00:00:00 2001 From: egalpin Date: Fri, 22 Jul 2022 11:08:29 -0400 Subject: [PATCH 11/14] Fixes pubsub tests root url --- .../io/gcp/pubsub/PubSubWritePayloadTranslationTest.java | 4 ++-- .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index ec7039815e70..75f484d9d29a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -68,7 +68,7 @@ public void testTranslateSinkWithTopic() throws Exception { 0, Duration.ZERO, null, - pubsubRootUrl); + null); PubsubUnboundedSink.PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSink); PCollection input = pipeline.apply(Create.of(new byte[0])); PDone output = input.apply(pubsubSink); @@ -106,7 +106,7 @@ public void testTranslateSinkWithTopicOverridden() throws Exception { 0, Duration.ZERO, null, - pubsubRootUrl); + null); PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSinkSink); PCollection input = pipeline.apply(Create.of(new byte[0])); PDone output = input.apply(pubsubSink); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 61c9f2f6cfd7..418f65551e1a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -120,7 +120,7 @@ public void sendOneMessage() throws IOException { batchBytes, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC, - pubsubRootUrl); + null); p.apply(Create.of(ImmutableList.of(DATA))).apply(ParDo.of(new Stamp(ATTRIBUTES))).apply(sink); p.run(); } @@ -151,7 +151,7 @@ public void sendOneMessageWithoutAttributes() throws IOException { 1 /* batchBytes */, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC, - pubsubRootUrl); + null); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp(null /* attributes */))) .apply(sink); @@ -191,7 +191,7 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { batchBytes, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC, - pubsubRootUrl); + null); p.apply(Create.of(data)).apply(ParDo.of(new Stamp())).apply(sink); p.run(); } @@ -235,7 +235,7 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { batchBytes, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC, - pubsubRootUrl); + null); p.apply(Create.of(data)).apply(ParDo.of(new Stamp())).apply(sink); p.run(); } From 0d622d9605e1da6739f1a357735458895a289d3d Mon Sep 17 00:00:00 2001 From: egalpin Date: Mon, 25 Jul 2022 15:52:46 -0400 Subject: [PATCH 12/14] Puts PubsubMessageCoder last in registrar --- .../beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index c561bad3dd7b..7f6d686ecb52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -31,13 +31,13 @@ public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar { @Override public List getCoderProviders() { return ImmutableList.of( - CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), - PubsubMessageWithAttributesAndMessageIdCoder.of())); + PubsubMessageWithAttributesAndMessageIdCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of())); } } From 8d502294e919d7e710bc0ec4f797acb19d77f771 Mon Sep 17 00:00:00 2001 From: egalpin Date: Mon, 25 Jul 2022 16:26:47 -0400 Subject: [PATCH 13/14] Uses MoreObjects over Objects --- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 4 ++-- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 7cc57b7a4c00..6db088faccdd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -20,7 +20,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; -import avro.shaded.com.google.common.base.Objects; import com.google.auth.Credentials; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.AcknowledgeRequest; @@ -58,6 +57,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -100,7 +100,7 @@ public PubsubClient newClient( timestampAttribute, idAttribute, DEFAULT_TIMEOUT_S, - channelForRootUrl(Objects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())), + channelForRootUrl(MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())), options.getGcpCredential()); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 308f3606a53e..9a008041fc68 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; -import avro.shaded.com.google.common.base.Objects; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; @@ -49,6 +48,7 @@ import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -94,7 +94,7 @@ public PubsubClient newClient( // Do not log 404. It clutters the output and is possibly even required by the // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(Objects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())) + .setRootUrl(MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) .build(); From fd7deddcbafe15cd97f23cb012971de2b3540980 Mon Sep 17 00:00:00 2001 From: egalpin Date: Thu, 29 Sep 2022 11:04:48 -0400 Subject: [PATCH 14/14] Renames PubsubMessageCoder to PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder --- .../sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java | 4 +++- .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 5 ++++- ...sageWithAttributesAndMessageIdAndOrderingKeyCoder.java} | 7 ++++--- .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 2 +- ...WithAttributesAndMessageIdAndOrderingKeyCoderTest.java} | 7 ++++--- 5 files changed, 16 insertions(+), 9 deletions(-) rename sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/{PubsubMessageCoder.java => PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java} (92%) rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/{PubsubMessageCoderTest.java => PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java} (89%) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index 7f6d686ecb52..dacd5b6ebe58 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -38,6 +38,8 @@ public List getCoderProviders() { CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesAndMessageIdCoder.of()), - CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of())); + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), + PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 0acb2663a42b..f095a03e1167 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -531,7 +531,10 @@ public static Read readMessagesWithAttributesAndMessageId() { * messageId} and {PubsubMessage#getOrderingKey() orderingKey} from PubSub. */ public static Read readMessagesWithAttributesAndMessageIdAndOrderingKey() { - return Read.newBuilder().setCoder(PubsubMessageCoder.of()).setNeedsOrderingKey(true).build(); + return Read.newBuilder() + .setCoder(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of()) + .setNeedsOrderingKey(true) + .build(); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java similarity index 92% rename from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java index 53136d606c26..7c2a4250e87c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java @@ -35,7 +35,8 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class PubsubMessageCoder extends CustomCoder { +public class PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder + extends CustomCoder { // A message's payload cannot be null private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); // A message's attributes can be null. @@ -52,8 +53,8 @@ public static Coder of(TypeDescriptor ignored) { return of(); } - public static PubsubMessageCoder of() { - return new PubsubMessageCoder(); + public static PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder of() { + return new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 28e4601bda9f..562ce824a51a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1355,7 +1355,7 @@ public PCollection expand(PBegin input) { } Coder messageCoder; if (getNeedsOrderingKey()) { - messageCoder = PubsubMessageCoder.of(); + messageCoder = PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of(); } else { if (getNeedsMessageId()) { messageCoder = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java similarity index 89% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java index 9c03ea558dd3..15d4a75f1793 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageCoderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java @@ -31,16 +31,17 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Unit tests for {@link PubsubMessageCoder}. */ +/** Unit tests for {@link PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder}. */ @RunWith(JUnit4.class) -public class PubsubMessageCoderTest { +public class PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest { private static final String DATA = "testData"; private static final String MESSAGE_ID = "testMessageId"; private static final Map ATTRIBUTES = new ImmutableMap.Builder().put("1", "hello").build(); private static final String ORDERING_KEY = "key123"; - private static final Coder TEST_CODER = PubsubMessageCoder.of(); + private static final Coder TEST_CODER = + PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of(); private static final PubsubMessage TEST_VALUE = new PubsubMessage( DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID, ORDERING_KEY);