diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java new file mode 100644 index 0000000000000..5d4995a05d233 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A marker interface to provide a catalog implementation for Spark. + *

+ * Implementations can provide catalog functions by implementing additional interfaces for tables, + * views, and functions. + *

+ * Catalog implementations must implement this marker interface to be loaded by + * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the + * required public no-arg constructor. After creating an instance, it will be configured by calling + * {@link #initialize(String, CaseInsensitiveStringMap)}. + *

+ * Catalog implementations are registered to a name by adding a configuration option to Spark: + * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties + * in the Spark configuration that share the catalog name prefix, + * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive + * string map of options in initialization with the prefix removed. + * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name". + */ +@Experimental +public interface CatalogPlugin { + /** + * Called to initialize configuration. + *

+ * This method is called once, just after the provider is instantiated. + * + * @param name the name used to identify and load this catalog + * @param options a case-insensitive string map of configuration + */ + void initialize(String name, CaseInsensitiveStringMap options); + + /** + * Called to get this catalog's name. + *

+ * This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is + * called to pass the catalog's name. + */ + String name(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java new file mode 100644 index 0000000000000..efae26636a4bc --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.annotation.Private; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.apache.spark.util.Utils; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static scala.collection.JavaConverters.mapAsJavaMapConverter; + +@Private +public class Catalogs { + private Catalogs() { + } + + /** + * Load and configure a catalog by name. + *

+ * This loads, instantiates, and initializes the catalog plugin for each call; it does not cache + * or reuse instances. + * + * @param name a String catalog name + * @param conf a SQLConf + * @return an initialized CatalogPlugin + * @throws SparkException If the plugin class cannot be found or instantiated + */ + public static CatalogPlugin load(String name, SQLConf conf) throws SparkException { + String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); + if (pluginClassName == null) { + throw new SparkException(String.format( + "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); + } + + ClassLoader loader = Utils.getContextOrSparkClassLoader(); + + try { + Class pluginClass = loader.loadClass(pluginClassName); + + if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) { + throw new SparkException(String.format( + "Plugin class for catalog '%s' does not implement CatalogPlugin: %s", + name, pluginClassName)); + } + + CatalogPlugin plugin = CatalogPlugin.class.cast(pluginClass.newInstance()); + + plugin.initialize(name, catalogOptions(name, conf)); + + return plugin; + + } catch (ClassNotFoundException e) { + throw new SparkException(String.format( + "Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName)); + + } catch (IllegalAccessException e) { + throw new SparkException(String.format( + "Failed to call public no-arg constructor for catalog '%s': %s", name, pluginClassName), + e); + + } catch (InstantiationException e) { + throw new SparkException(String.format( + "Failed while instantiating plugin for catalog '%s': %s", name, pluginClassName), + e.getCause()); + } + } + + /** + * Extracts a named catalog's configuration from a SQLConf. + * + * @param name a catalog name + * @param conf a SQLConf + * @return a case insensitive string map of options starting with spark.sql.catalog.(name). + */ + private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { + Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); + Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); + + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + for (Map.Entry entry : allConfs.entrySet()) { + Matcher matcher = prefix.matcher(entry.getKey()); + if (matcher.matches() && matcher.groupCount() > 0) { + options.put(matcher.group(1), entry.getValue()); + } + } + + return options; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java new file mode 100644 index 0000000000000..8c5a6c61d8658 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util; + +import org.apache.spark.annotation.Experimental; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +/** + * Case-insensitive map of string keys to string values. + *

+ * This is used to pass options to v2 implementations to ensure consistent case insensitivity. + *

+ * Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return + * keys converted to lower case. + */ +@Experimental +public class CaseInsensitiveStringMap implements Map { + + public static CaseInsensitiveStringMap empty() { + return new CaseInsensitiveStringMap(); + } + + private final Map delegate; + + private CaseInsensitiveStringMap() { + this.delegate = new HashMap<>(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public boolean containsValue(Object value) { + return delegate.containsValue(value); + } + + @Override + public String get(Object key) { + return delegate.get(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public String put(String key, String value) { + return delegate.put(key.toLowerCase(Locale.ROOT), value); + } + + @Override + public String remove(Object key) { + return delegate.remove(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public void putAll(Map m) { + for (Map.Entry entry : m.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set keySet() { + return delegate.keySet(); + } + + @Override + public Collection values() { + return delegate.values(); + } + + @Override + public Set> entrySet() { + return delegate.entrySet(); + } +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java new file mode 100644 index 0000000000000..2f55da83e2a49 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; + +public class CatalogLoadingSuite { + @Test + public void testLoad() throws SparkException { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.test-name", TestCatalogPlugin.class.getCanonicalName()); + + CatalogPlugin plugin = Catalogs.load("test-name", conf); + Assert.assertNotNull("Should instantiate a non-null plugin", plugin); + Assert.assertEquals("Plugin should have correct implementation", + TestCatalogPlugin.class, plugin.getClass()); + + TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin; + Assert.assertEquals("Options should contain no keys", 0, testPlugin.options.size()); + Assert.assertEquals("Catalog should have correct name", "test-name", testPlugin.name()); + } + + @Test + public void testInitializationOptions() throws SparkException { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.test-name", TestCatalogPlugin.class.getCanonicalName()); + conf.setConfString("spark.sql.catalog.test-name.name", "not-catalog-name"); + conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE"); + + CatalogPlugin plugin = Catalogs.load("test-name", conf); + Assert.assertNotNull("Should instantiate a non-null plugin", plugin); + Assert.assertEquals("Plugin should have correct implementation", + TestCatalogPlugin.class, plugin.getClass()); + + TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin; + + Assert.assertEquals("Options should contain only two keys", 2, testPlugin.options.size()); + Assert.assertEquals("Options should contain correct value for name (not overwritten)", + "not-catalog-name", testPlugin.options.get("name")); + Assert.assertEquals("Options should contain correct value for key", + "valUE", testPlugin.options.get("key")); + } + + @Test + public void testLoadWithoutConfig() { + SQLConf conf = new SQLConf(); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf)); + + Assert.assertTrue("Should complain that implementation is not configured", + exc.getMessage() + .contains("plugin class not found: spark.sql.catalog.missing is not defined")); + Assert.assertTrue("Should identify the catalog by name", + exc.getMessage().contains("missing")); + } + + @Test + public void testLoadMissingClass() { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.missing", "com.example.NoSuchCatalogPlugin"); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf)); + + Assert.assertTrue("Should complain that the class is not found", + exc.getMessage().contains("Cannot find catalog plugin class")); + Assert.assertTrue("Should identify the catalog by name", + exc.getMessage().contains("missing")); + Assert.assertTrue("Should identify the missing class", + exc.getMessage().contains("com.example.NoSuchCatalogPlugin")); + } + + @Test + public void testLoadNonCatalogPlugin() { + SQLConf conf = new SQLConf(); + String invalidClassName = InvalidCatalogPlugin.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should complain that class does not implement CatalogPlugin", + exc.getMessage().contains("does not implement CatalogPlugin")); + Assert.assertTrue("Should identify the catalog by name", + exc.getMessage().contains("invalid")); + Assert.assertTrue("Should identify the class", + exc.getMessage().contains(invalidClassName)); + } + + @Test + public void testLoadConstructorFailureCatalogPlugin() { + SQLConf conf = new SQLConf(); + String invalidClassName = ConstructorFailureCatalogPlugin.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + RuntimeException exc = intercept(RuntimeException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should have expected error message", + exc.getMessage().contains("Expected failure")); + } + + @Test + public void testLoadAccessErrorCatalogPlugin() { + SQLConf conf = new SQLConf(); + String invalidClassName = AccessErrorCatalogPlugin.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should complain that no public constructor is provided", + exc.getMessage().contains("Failed to call public no-arg constructor for catalog")); + Assert.assertTrue("Should identify the catalog by name", + exc.getMessage().contains("invalid")); + Assert.assertTrue("Should identify the class", + exc.getMessage().contains(invalidClassName)); + } + + @SuppressWarnings("unchecked") + public static E intercept(Class expected, Callable callable) { + try { + callable.call(); + Assert.fail("No exception was thrown, expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(expected, actual.getClass()); + return (E) actual; + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + // Compiler doesn't catch that Assert.fail will always throw an exception. + throw new UnsupportedOperationException("[BUG] Should not reach this statement"); + } +} + +class TestCatalogPlugin implements CatalogPlugin { + String name = null; + CaseInsensitiveStringMap options = null; + + TestCatalogPlugin() { + } + + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + this.name = name; + this.options = options; + } + + @Override + public String name() { + return name; + } +} + +class ConstructorFailureCatalogPlugin implements CatalogPlugin { // fails in its constructor + ConstructorFailureCatalogPlugin() { + throw new RuntimeException("Expected failure."); + } + + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + } + + @Override + public String name() { + return null; + } +} + +class AccessErrorCatalogPlugin implements CatalogPlugin { // no public constructor + private AccessErrorCatalogPlugin() { + } + + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + } + + @Override + public String name() { + return null; + } +} + +class InvalidCatalogPlugin { // doesn't implement CatalogPlugin + public void initialize(CaseInsensitiveStringMap options) { + } +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java new file mode 100644 index 0000000000000..76392777d42a4 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class CaseInsensitiveStringMapSuite { + @Test + public void testPutAndGet() { + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + options.put("kEy", "valUE"); + + Assert.assertEquals("Should return correct value for lower-case key", + "valUE", options.get("key")); + Assert.assertEquals("Should return correct value for upper-case key", + "valUE", options.get("KEY")); + } + + @Test + public void testKeySet() { + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + options.put("kEy", "valUE"); + + Set expectedKeySet = new HashSet<>(); + expectedKeySet.add("key"); + + Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet()); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f6fab76b5301a..5208b9a545cb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -32,6 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders._ @@ -620,6 +622,12 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) + @transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]() + + private[sql] def catalog(name: String): CatalogPlugin = synchronized { + catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) + } + /** * Returns the specified table/view as a `DataFrame`. *