diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
deleted file mode 100644
index fd0f495ca29da..0000000000000
--- a/core/src/main/java/org/apache/spark/api/java/Optional.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-
-/**
- *
Like {@code java.util.Optional} in Java 8, {@code scala.Option} in Scala, and
- * {@code com.google.common.base.Optional} in Google Guava, this class represents a
- * value of a given type that may or may not exist. It is used in methods that wish
- * to optionally return a value, in preference to returning {@code null}.
- *
- * In fact, the class here is a reimplementation of the essential API of both
- * {@code java.util.Optional} and {@code com.google.common.base.Optional}. From
- * {@code java.util.Optional}, it implements:
- *
- *
- * - {@link #empty()}
- * - {@link #of(Object)}
- * - {@link #ofNullable(Object)}
- * - {@link #get()}
- * - {@link #orElse(Object)}
- * - {@link #isPresent()}
- *
- *
- * From {@code com.google.common.base.Optional} it implements:
- *
- *
- * - {@link #absent()}
- * - {@link #of(Object)}
- * - {@link #fromNullable(Object)}
- * - {@link #get()}
- * - {@link #or(Object)}
- * - {@link #orNull()}
- * - {@link #isPresent()}
- *
- *
- * {@code java.util.Optional} itself was not used because at the time, the
- * project did not require Java 8. Using {@code com.google.common.base.Optional}
- * has in the past caused serious library version conflicts with Guava that can't
- * be resolved by shading. Hence this work-alike clone.
- *
- * @param type of value held inside
- */
-public final class Optional implements Serializable {
-
- private static final Optional> EMPTY = new Optional<>();
-
- private final T value;
-
- private Optional() {
- this.value = null;
- }
-
- private Optional(T value) {
- Preconditions.checkNotNull(value);
- this.value = value;
- }
-
- // java.util.Optional API (subset)
-
- /**
- * @return an empty {@code Optional}
- */
- public static Optional empty() {
- @SuppressWarnings("unchecked")
- Optional t = (Optional) EMPTY;
- return t;
- }
-
- /**
- * @param value non-null value to wrap
- * @return {@code Optional} wrapping this value
- * @throws NullPointerException if value is null
- */
- public static Optional of(T value) {
- return new Optional<>(value);
- }
-
- /**
- * @param value value to wrap, which may be null
- * @return {@code Optional} wrapping this value, which may be empty
- */
- public static Optional ofNullable(T value) {
- if (value == null) {
- return empty();
- } else {
- return of(value);
- }
- }
-
- /**
- * @return the value wrapped by this {@code Optional}
- * @throws NullPointerException if this is empty (contains no value)
- */
- public T get() {
- Preconditions.checkNotNull(value);
- return value;
- }
-
- /**
- * @param other value to return if this is empty
- * @return this {@code Optional}'s value if present, or else the given value
- */
- public T orElse(T other) {
- return value != null ? value : other;
- }
-
- /**
- * @return true iff this {@code Optional} contains a value (non-empty)
- */
- public boolean isPresent() {
- return value != null;
- }
-
- // Guava API (subset)
- // of(), get() and isPresent() are identically present in the Guava API
-
- /**
- * @return an empty {@code Optional}
- */
- public static Optional absent() {
- return empty();
- }
-
- /**
- * @param value value to wrap, which may be null
- * @return {@code Optional} wrapping this value, which may be empty
- */
- public static Optional fromNullable(T value) {
- return ofNullable(value);
- }
-
- /**
- * @param other value to return if this is empty
- * @return this {@code Optional}'s value if present, or else the given value
- */
- public T or(T other) {
- return value != null ? value : other;
- }
-
- /**
- * @return this {@code Optional}'s value if present, or else null
- */
- public T orNull() {
- return value;
- }
-
- // Common methods
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof Optional)) {
- return false;
- }
- Optional> other = (Optional>) obj;
- return Objects.equals(value, other.value);
- }
-
- @Override
- public int hashCode() {
- return value == null ? 0 : value.hashCode();
- }
-
- @Override
- public String toString() {
- return value == null ? "Optional.empty" : String.format("Optional[%s]", value);
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 80a4f84087466..5d7f2d2f6b8ef 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, List => JList, Optional}
import scala.collection.JavaConverters._
import scala.language.implicitConversions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 91ae1002abd21..de55c47bbc73b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}
+import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap, Optional}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 09c83849e26b2..b6ce950acc8f3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
import java.io.Closeable
import java.util
-import java.util.{Map => JMap}
+import java.util.{Map => JMap, Optional}
import scala.collection.JavaConverters._
import scala.language.implicitConversions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index fd96052f95d3f..16730e89df696 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.java
import java.{util => ju}
import java.util.Map.Entry
+import java.util.Optional
import scala.collection.mutable
diff --git a/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
deleted file mode 100644
index 4b97c18198c1a..0000000000000
--- a/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests {@link Optional}.
- */
-public class OptionalSuite {
-
- @Test
- public void testEmpty() {
- Assert.assertFalse(Optional.empty().isPresent());
- Assert.assertNull(Optional.empty().orNull());
- Assert.assertEquals("foo", Optional.empty().or("foo"));
- Assert.assertEquals("foo", Optional.empty().orElse("foo"));
- }
-
- @Test(expected = NullPointerException.class)
- public void testEmptyGet() {
- Optional.empty().get();
- }
-
- @Test
- public void testAbsent() {
- Assert.assertFalse(Optional.absent().isPresent());
- Assert.assertNull(Optional.absent().orNull());
- Assert.assertEquals("foo", Optional.absent().or("foo"));
- Assert.assertEquals("foo", Optional.absent().orElse("foo"));
- }
-
- @Test(expected = NullPointerException.class)
- public void testAbsentGet() {
- Optional.absent().get();
- }
-
- @Test
- public void testOf() {
- Assert.assertTrue(Optional.of(1).isPresent());
- Assert.assertNotNull(Optional.of(1).orNull());
- Assert.assertEquals(Integer.valueOf(1), Optional.of(1).get());
- Assert.assertEquals(Integer.valueOf(1), Optional.of(1).or(2));
- Assert.assertEquals(Integer.valueOf(1), Optional.of(1).orElse(2));
- }
-
- @Test(expected = NullPointerException.class)
- public void testOfWithNull() {
- Optional.of(null);
- }
-
- @Test
- public void testOfNullable() {
- Assert.assertTrue(Optional.ofNullable(1).isPresent());
- Assert.assertNotNull(Optional.ofNullable(1).orNull());
- Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).get());
- Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).or(2));
- Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).orElse(2));
- Assert.assertFalse(Optional.ofNullable(null).isPresent());
- Assert.assertNull(Optional.ofNullable(null).orNull());
- Assert.assertEquals(Integer.valueOf(2), Optional.ofNullable(null).or(2));
- Assert.assertEquals(Integer.valueOf(2), Optional.ofNullable(null).orElse(2));
- }
-
- @Test
- public void testFromNullable() {
- Assert.assertTrue(Optional.fromNullable(1).isPresent());
- Assert.assertNotNull(Optional.fromNullable(1).orNull());
- Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).get());
- Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).or(2));
- Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).orElse(2));
- Assert.assertFalse(Optional.fromNullable(null).isPresent());
- Assert.assertNull(Optional.fromNullable(null).orNull());
- Assert.assertEquals(Integer.valueOf(2), Optional.fromNullable(null).or(2));
- Assert.assertEquals(Integer.valueOf(2), Optional.fromNullable(null).orElse(2));
- }
-
-}
diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
index 1d2b05ebc2503..737e8a30eb4d8 100644
--- a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
@@ -37,7 +37,6 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.util.Utils;
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 3992ab7049bdd..f20ea34e70d2e 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.*;
import org.apache.spark.Accumulator;
@@ -67,7 +68,6 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.input.PortableDataStream;
import org.apache.spark.partial.BoundedDouble;
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index 9d8bd7fd11ebd..16f77b38eaa84 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.regex.Pattern;
import scala.Tuple2;
@@ -26,7 +27,6 @@
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 851fa2334501d..2f3b421629bab 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,9 +36,15 @@ object MimaExcludes {
// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
- ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"),
+ // [SPARK-19287] JavaPairRDD flatMapValues requires function returning Iterable, not Iterator
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.java.JavaPairRDD.flatMapValues"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues")
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues"),
+
+ // [SPARK-24109] Remove class SnappyOutputStreamWrapper
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"),
+
+ // [SPARK-25362][JavaAPI] Replace Spark Optional class with Java Optional
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.Optional")
)
// Exclude rules for 2.4.x
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index dcd698c860d8b..b245cb0127941 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -17,9 +17,11 @@
package org.apache.spark.streaming
+import java.util.Optional
+
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaUtils, Optional}
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
import org.apache.spark.api.java.function.{Function3 => JFunction3, Function4 => JFunction4}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index c3c13df651ccd..cfafb42faa9c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
-import java.util.{List => JList}
+import java.util.{List => JList, Optional}
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.Partitioner
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, Optional}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction,
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index b1367b8f2aed2..738d9324def80 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import scala.Tuple2;
@@ -34,7 +35,6 @@
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.api.java.JavaPairDStream;
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index b154f0e3ac455..924ef2d0b73d3 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -36,7 +36,6 @@
import org.junit.Test;
import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index c7cde5674f547..a715d1c2cfe24 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -47,7 +47,6 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;