From 73b444287333456a5e067bb446d83d268ce576ef Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Mon, 6 Sep 2021 15:21:54 +0200 Subject: [PATCH 1/6] [BEAM-12634]: Provide generic Autoscaler interface, - Provide DefaultAutoscaler implementation - Add new autoscaler field to JmsIO --- .../apache/beam/sdk/io/jms/AutoScaler.java | 37 ++++++++++++++++ .../beam/sdk/io/jms/DefaultAutoscaler.java | 43 +++++++++++++++++++ .../org/apache/beam/sdk/io/jms/JmsIO.java | 30 +++++++++++++ 3 files changed, 110 insertions(+) create mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java create mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java new file mode 100644 index 000000000000..009a2c3b2c5d --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import java.io.Serializable; + +/** + * An interface allowing the implementation of getBacklogBytes and getSplitBacklogBytes. + * + *

The interface also provides the following methods: - start to start the autoscaler - stop to + * stop the autoscaler + */ +public interface AutoScaler extends Serializable { + + void start(); + + long getTotalBacklogBytes(); + + long getSplitBacklogBytes(); + + void stop(); +} diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java new file mode 100644 index 000000000000..d623101819db --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; + +/** + * Default implementation of {@link AutoScaler}. Returns {@link + * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader#BACKLOG_UNKNOWN} as default value for both + * getTotalBacklogBytes and getSplitBacklogBytes. + */ +public class DefaultAutoscaler implements AutoScaler { + @Override + public void start() {} + + @Override + public long getTotalBacklogBytes() { + return BACKLOG_UNKNOWN; + } + + @Override + public long getSplitBacklogBytes() { + return BACKLOG_UNKNOWN; + } + + @Override + public void stop() {} +} diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 4999e10ee9e8..d71959704d1c 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -196,6 +196,8 @@ public abstract static class Read extends PTransform> abstract @Nullable Coder getCoder(); + abstract @Nullable AutoScaler getAutoScaler(); + abstract Builder builder(); @AutoValue.Builder @@ -218,6 +220,8 @@ abstract static class Builder { abstract Builder setCoder(Coder coder); + abstract Builder setAutoScaler(AutoScaler autoScaler); + abstract Read build(); } @@ -344,6 +348,11 @@ public Read withCoder(Coder coder) { return builder().setCoder(coder).build(); } + public Read withAutoScaler(AutoScaler autoScaler) { + checkArgument(autoScaler != null, "autoScaler can not be null"); + return builder().setAutoScaler(autoScaler).build(); + } + @Override public PCollection expand(PBegin input) { checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); @@ -447,6 +456,7 @@ static class UnboundedJmsReader extends UnboundedReader { private Connection connection; private Session session; private MessageConsumer consumer; + private AutoScaler autoScaler; private T currentMessage; private Instant currentTimestamp; @@ -474,6 +484,12 @@ public boolean start() throws IOException { } connection.start(); this.connection = connection; + if (spec.getAutoScaler() == null) { + this.autoScaler = new DefaultAutoscaler(); + } else { + this.autoScaler = spec.getAutoScaler(); + } + this.autoScaler.start(); } catch (Exception e) { throw new IOException("Error connecting to JMS", e); } @@ -544,6 +560,16 @@ public CheckpointMark getCheckpointMark() { return checkpointMark; } + @Override + public long getSplitBacklogBytes() { + return this.autoScaler.getSplitBacklogBytes(); + } + + @Override + public long getTotalBacklogBytes() { + return this.autoScaler.getTotalBacklogBytes(); + } + @Override public UnboundedSource getCurrentSource() { return source; @@ -565,6 +591,10 @@ public void close() throws IOException { connection.close(); connection = null; } + if (autoScaler != null) { + autoScaler.stop(); + autoScaler = null; + } } catch (Exception e) { throw new IOException(e); } From ed03a74c845a149887764c4f51846c64007958dd Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Mon, 6 Sep 2021 16:17:53 +0200 Subject: [PATCH 2/6] [BEAM-12634]: Add Unit Tests for DefaultAutoScaler and mock custom Autoscaler --- sdks/java/io/jms/build.gradle | 1 + .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/sdks/java/io/jms/build.gradle b/sdks/java/io/jms/build.gradle index 69cd88f81d1f..3f9d0c719ec6 100644 --- a/sdks/java/io/jms/build.gradle +++ b/sdks/java/io/jms/build.gradle @@ -36,6 +36,7 @@ dependencies { testCompile library.java.activemq_kahadb_store testCompile library.java.activemq_client testCompile library.java.junit + testCompile library.java.mockito_core testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index c335f8ab11de..9e2c1ce57ef5 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -17,12 +17,17 @@ */ package org.apache.beam.sdk.io.jms; +import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.lang.reflect.Proxy; @@ -421,6 +426,54 @@ public void testCheckpointMarkDefaultCoder() throws Exception { CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark); } + @Test + public void testDefaultAutoscaler() throws IOException { + JmsIO.Read spec = + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withQueue(QUEUE); + JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); + JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + // start the reader and check getSplitBacklogBytes and getTotalBacklogBytes values + reader.start(); + assertEquals(BACKLOG_UNKNOWN, reader.getSplitBacklogBytes()); + assertEquals(BACKLOG_UNKNOWN, reader.getTotalBacklogBytes()); + reader.close(); + } + + @Test + public void testCustomAutoscaler() throws IOException { + long excpectedSplitBacklogBytes = 1000L; + long excpectedTotalBacklogBytes = 1111L; + + AutoScaler autoScaler = mock(DefaultAutoscaler.class); + when(autoScaler.getSplitBacklogBytes()).thenReturn(excpectedSplitBacklogBytes); + when(autoScaler.getTotalBacklogBytes()).thenReturn(excpectedTotalBacklogBytes); + JmsIO.Read spec = + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withQueue(QUEUE) + .withAutoScaler(autoScaler); + + JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); + JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + // start the reader and check getSplitBacklogBytes and getTotalBacklogBytes values + reader.start(); + verify(autoScaler, times(1)).start(); + assertEquals(excpectedSplitBacklogBytes, reader.getSplitBacklogBytes()); + verify(autoScaler, times(1)).getSplitBacklogBytes(); + assertEquals(excpectedTotalBacklogBytes, reader.getTotalBacklogBytes()); + verify(autoScaler, times(1)).getTotalBacklogBytes(); + reader.close(); + verify(autoScaler, times(1)).stop(); + } + private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); From 1ff185507c72fed7e94a2bc63195ce712de13f53 Mon Sep 17 00:00:00 2001 From: rvballada Date: Thu, 9 Sep 2021 10:40:13 +0200 Subject: [PATCH 3/6] [BEAM-12634] => Apply suggestions from code review - Delete getSplitBacklogBytes method - Add new comments Co-authored-by: Lukasz Cwik --- .../java/org/apache/beam/sdk/io/jms/AutoScaler.java | 12 +++++++----- .../apache/beam/sdk/io/jms/DefaultAutoscaler.java | 3 +-- .../main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 3 +++ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java index 009a2c3b2c5d..aad6c6840aa5 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java @@ -20,18 +20,20 @@ import java.io.Serializable; /** - * An interface allowing the implementation of getBacklogBytes and getSplitBacklogBytes. - * - *

The interface also provides the following methods: - start to start the autoscaler - stop to - * stop the autoscaler + * Enables users to specify their own `JMS` backlog reporters enabling {@link JmsIO} to report {@link UnboundedSource#getTotalBacklogBytes}. */ public interface AutoScaler extends Serializable { + /** + * The {@link AutoScaler} is started when the {@link UnboundedJmsReader} is started. + */ void start(); long getTotalBacklogBytes(); - long getSplitBacklogBytes(); + /** + * The {@link AutoScaler} is stopped when the {@link UnboundedJmsReader} is closed. + */ void stop(); } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java index d623101819db..98207a624322 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java @@ -21,8 +21,7 @@ /** * Default implementation of {@link AutoScaler}. Returns {@link - * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader#BACKLOG_UNKNOWN} as default value for both - * getTotalBacklogBytes and getSplitBacklogBytes. + * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader#BACKLOG_UNKNOWN} as the default value. */ public class DefaultAutoscaler implements AutoScaler { @Override diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index d71959704d1c..64d223362686 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -348,6 +348,9 @@ public Read withCoder(Coder coder) { return builder().setCoder(coder).build(); } + /** + * Sets the {@link AutoScaler} to use for reporting backlog during the execution of this source. + */ public Read withAutoScaler(AutoScaler autoScaler) { checkArgument(autoScaler != null, "autoScaler can not be null"); return builder().setAutoScaler(autoScaler).build(); From a0e933957b07d65174987bb1af7e0fc7f19b9425 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Thu, 9 Sep 2021 10:52:58 +0200 Subject: [PATCH 4/6] [BEAM-12634]: Fix tests and some JavaDoc comments consecutively to code review --- .../java/org/apache/beam/sdk/io/jms/AutoScaler.java | 13 +++++-------- .../apache/beam/sdk/io/jms/DefaultAutoscaler.java | 5 ----- .../main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 5 ----- .../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 4 ---- 4 files changed, 5 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java index aad6c6840aa5..29620c863e66 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java @@ -18,22 +18,19 @@ package org.apache.beam.sdk.io.jms; import java.io.Serializable; +import org.apache.beam.sdk.io.UnboundedSource; /** - * Enables users to specify their own `JMS` backlog reporters enabling {@link JmsIO} to report {@link UnboundedSource#getTotalBacklogBytes}. + * Enables users to specify their own `JMS` backlog reporters enabling {@link JmsIO} to report + * {@link UnboundedSource.UnboundedReader#getTotalBacklogBytes()}. */ public interface AutoScaler extends Serializable { - /** - * The {@link AutoScaler} is started when the {@link UnboundedJmsReader} is started. - */ + /** The {@link AutoScaler} is started when the {@link JmsIO.UnboundedJmsReader} is started. */ void start(); long getTotalBacklogBytes(); - - /** - * The {@link AutoScaler} is stopped when the {@link UnboundedJmsReader} is closed. - */ + /** The {@link AutoScaler} is stopped when the {@link JmsIO.UnboundedJmsReader} is closed. */ void stop(); } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java index 98207a624322..2b05cf630bf8 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/DefaultAutoscaler.java @@ -32,11 +32,6 @@ public long getTotalBacklogBytes() { return BACKLOG_UNKNOWN; } - @Override - public long getSplitBacklogBytes() { - return BACKLOG_UNKNOWN; - } - @Override public void stop() {} } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 64d223362686..9fa4492cf235 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -563,11 +563,6 @@ public CheckpointMark getCheckpointMark() { return checkpointMark; } - @Override - public long getSplitBacklogBytes() { - return this.autoScaler.getSplitBacklogBytes(); - } - @Override public long getTotalBacklogBytes() { return this.autoScaler.getTotalBacklogBytes(); diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 9e2c1ce57ef5..a9f3c3f004ef 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -446,11 +446,9 @@ public void testDefaultAutoscaler() throws IOException { @Test public void testCustomAutoscaler() throws IOException { - long excpectedSplitBacklogBytes = 1000L; long excpectedTotalBacklogBytes = 1111L; AutoScaler autoScaler = mock(DefaultAutoscaler.class); - when(autoScaler.getSplitBacklogBytes()).thenReturn(excpectedSplitBacklogBytes); when(autoScaler.getTotalBacklogBytes()).thenReturn(excpectedTotalBacklogBytes); JmsIO.Read spec = JmsIO.read() @@ -466,8 +464,6 @@ public void testCustomAutoscaler() throws IOException { // start the reader and check getSplitBacklogBytes and getTotalBacklogBytes values reader.start(); verify(autoScaler, times(1)).start(); - assertEquals(excpectedSplitBacklogBytes, reader.getSplitBacklogBytes()); - verify(autoScaler, times(1)).getSplitBacklogBytes(); assertEquals(excpectedTotalBacklogBytes, reader.getTotalBacklogBytes()); verify(autoScaler, times(1)).getTotalBacklogBytes(); reader.close(); From 8117b19b08dc1162c591cca19e659c4476063d72 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Thu, 9 Sep 2021 16:39:19 +0200 Subject: [PATCH 5/6] [BEAM-12634]: Add comment on Autoscaler#getTotalBacklogBytes --- .../main/java/org/apache/beam/sdk/io/jms/AutoScaler.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java index 29620c863e66..a4733556a38a 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java @@ -29,6 +29,13 @@ public interface AutoScaler extends Serializable { /** The {@link AutoScaler} is started when the {@link JmsIO.UnboundedJmsReader} is started. */ void start(); + /** + * Returns the size of the backlog of unread data in the underlying data source represented by all + * splits of this source. + * + *

It should be overridden in order to allow the runner to scale the amount of resources + * allocated to the pipeline. + */ long getTotalBacklogBytes(); /** The {@link AutoScaler} is stopped when the {@link JmsIO.UnboundedJmsReader} is closed. */ From 6251c862d9811d68f7057ebb69d837bc3f43b9a4 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Fri, 10 Sep 2021 08:12:48 -0700 Subject: [PATCH 6/6] Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java --- .../src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java index a4733556a38a..0e023d1aae09 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/AutoScaler.java @@ -32,9 +32,6 @@ public interface AutoScaler extends Serializable { /** * Returns the size of the backlog of unread data in the underlying data source represented by all * splits of this source. - * - *

It should be overridden in order to allow the runner to scale the amount of resources - * allocated to the pipeline. */ long getTotalBacklogBytes();