From aed8c2457f8abff5b50d579dc3a4938cc36c5be1 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 4 Oct 2018 13:12:25 -0700 Subject: [PATCH 1/3] fix java foreachBatch --- .../sql/streaming/DataStreamWriter.scala | 2 +- .../JavaDataStreamReaderWriterSuite.java | 64 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index e9a15214d952f..b23e86a786459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -380,7 +380,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.4.0 */ @InterfaceStability.Evolving - def foreachBatch(function: VoidFunction2[Dataset[T], Long]): DataStreamWriter[T] = { + def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): DataStreamWriter[T] = { foreachBatch((batchDs: Dataset[T], batchId: Long) => function.call(batchDs, batchId)) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java new file mode 100644 index 0000000000000..9a51e95ce08a7 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java @@ -0,0 +1,64 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { + spark = new TestSparkSession(); + input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { + Utils.deleteRecursively(new File(input)); + spark.stop(); + spark = null; + } + + @Test + public void testForeachBatchAPI() { + StreamingQuery query = spark + .readStream() + .textFile(input) + .writeStream() + .foreachBatch(new VoidFunction2, Long>() { + @Override + public void call(Dataset v1, Long v2) throws Exception { + } + }) + .start(); + query.stop(); + } +} From ce60b8acfd93551a5de1cfac80004a0a7336c332 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 4 Oct 2018 14:40:07 -0700 Subject: [PATCH 2/3] testForeachAPI --- .../JavaDataStreamReaderWriterSuite.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java index 9a51e95ce08a7..7eaa5a09f8ef1 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java @@ -25,6 +25,7 @@ import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.test.TestSparkSession; @@ -61,4 +62,28 @@ public void call(Dataset v1, Long v2) throws Exception { .start(); query.stop(); } + + @Test + public void testForeachAPI() { + StreamingQuery query = spark + .readStream() + .textFile(input) + .writeStream() + .foreach(new ForeachWriter() { + @Override + public boolean open(long partitionId, long epochId) { + return true; + } + + @Override + public void process(String value) { + } + + @Override + public void close(Throwable errorOrNull) { + } + }) + .start(); + query.stop(); + } } From 999b6beee920a8f755028a90e96955527283c205 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 4 Oct 2018 22:21:33 -0700 Subject: [PATCH 3/3] style --- .../JavaDataStreamReaderWriterSuite.java | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java index 7eaa5a09f8ef1..48cdb2642d830 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package test.org.apache.spark.sql.streaming; @@ -43,47 +43,47 @@ public void setUp() { @After public void tearDown() { - Utils.deleteRecursively(new File(input)); - spark.stop(); - spark = null; + try { + Utils.deleteRecursively(new File(input)); + } finally { + spark.stop(); + spark = null; + } } @Test public void testForeachBatchAPI() { StreamingQuery query = spark - .readStream() - .textFile(input) - .writeStream() - .foreachBatch(new VoidFunction2, Long>() { - @Override - public void call(Dataset v1, Long v2) throws Exception { - } - }) - .start(); + .readStream() + .textFile(input) + .writeStream() + .foreachBatch(new VoidFunction2, Long>() { + @Override + public void call(Dataset v1, Long v2) throws Exception {} + }) + .start(); query.stop(); } @Test public void testForeachAPI() { StreamingQuery query = spark - .readStream() - .textFile(input) - .writeStream() - .foreach(new ForeachWriter() { - @Override - public boolean open(long partitionId, long epochId) { - return true; - } + .readStream() + .textFile(input) + .writeStream() + .foreach(new ForeachWriter() { + @Override + public boolean open(long partitionId, long epochId) { + return true; + } - @Override - public void process(String value) { - } + @Override + public void process(String value) {} - @Override - public void close(Throwable errorOrNull) { - } - }) - .start(); + @Override + public void close(Throwable errorOrNull) {} + }) + .start(); query.stop(); } }