From 8600e141c733937518b10933e12a9ae7ea4f3370 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 17:48:45 -0800 Subject: [PATCH 1/9] unit tests for Util; refactored getLocalHost for testing purposes, but should be able to clean up after converting to Java --- .../scala/org/apache/samza/util/Util.scala | 51 ++- .../java/org/apache/samza/util/TestUtil.java | 329 ++++++++++++++++++ .../org/apache/samza/util/TestUtil.scala | 44 --- 3 files changed, 372 insertions(+), 52 deletions(-) create mode 100644 samza-core/src/test/java/org/apache/samza/util/TestUtil.java delete mode 100644 samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 1323cd2ec6..41dda05dc1 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -35,7 +35,7 @@ import scala.collection.JavaConverters._ object Util extends Logging { - private val FALLBACK_VERSION = "0.0.1" + val FALLBACK_VERSION = "0.0.1" val Random = new Random /** @@ -120,27 +120,31 @@ object Util extends Logging { } } + def getLocalHost: InetAddress = { + doGetLocalHost(new NetworkingUtil) + } + /** * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback * * @return the [[java.net.InetAddress]] which represents the localhost */ - def getLocalHost: InetAddress = { - val localHost = InetAddress.getLocalHost + def doGetLocalHost(networkingUtil: NetworkingUtil): InetAddress = { + val localHost = networkingUtil.inetAddressGetLocalHost if (localHost.isLoopbackAddress) { debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName)) val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) { - NetworkInterface.getNetworkInterfaces.asScala.toList + networkingUtil.networkInterfaceGetNetworkInterfaces.asScala.toList } else { - NetworkInterface.getNetworkInterfaces.asScala.toList.reverse + networkingUtil.networkInterfaceGetNetworkInterfaces.asScala.toList.reverse } for (networkInterface <- networkInterfaces) { - val addresses = networkInterface.getInetAddresses.asScala.toList + val addresses = networkingUtil.networkInterfaceGetInetAddresses(networkInterface).asScala.toList .filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress) if (addresses.nonEmpty) { val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) - debug("Found an external IP address %s which represents the localhost.".format(address.getHostAddress)) - return InetAddress.getByAddress(address.getAddress) + debug("Found an external IP address %s which represents the localhost.".format(networkingUtil.inetAddressGetHostAddress(address))) + return networkingUtil.inetAddressGetByAddress(networkingUtil.inetAddressGetAddress(address)) } } } @@ -178,3 +182,34 @@ object Util extends Logging { } } + +/** + * Do this so Powermockito can mock the system classes. + * Powermockito doesn't seem to work as well with Scala singletons. + * In Java, it seems like it will work to use Powermock without this wrapper. + */ +class NetworkingUtil { + def inetAddressGetLocalHost: InetAddress = { + InetAddress.getLocalHost + } + + def inetAddressGetByAddress(address: Array[Byte]): InetAddress = { + InetAddress.getByAddress(address) + } + + def inetAddressGetHostAddress(inetAddress: InetAddress): String = { + inetAddress.getHostAddress + } + + def inetAddressGetAddress(inetAddress: InetAddress): Array[Byte] = { + inetAddress.getAddress + } + + def networkInterfaceGetNetworkInterfaces: java.util.Enumeration[NetworkInterface] = { + NetworkInterface.getNetworkInterfaces + } + + def networkInterfaceGetInetAddresses(networkInterface: NetworkInterface): java.util.Enumeration[InetAddress] = { + networkInterface.getInetAddresses + } +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java new file mode 100644 index 0000000000..b13958ab11 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -0,0 +1,329 @@ +package org.apache.samza.util; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalMatchers.aryEq; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({NetworkingUtil.class}) +public class TestUtil { + private static final String CONFIG_KEY = "config.key"; + private static final String CONFIG_VALUE = "value"; + private static final String NEW_CONFIG_KEY = "new.rewritten.config.key"; + private static final String REWRITER_NAME = "propertyRewriter"; + private static final String OTHER_REWRITER_NAME = "otherPropertyRewriter"; + + @Test + public void testEnvVarEscape() { + // no special characters in original + String noSpecialCharacters = "hello world 123 .?!"; + assertEquals(noSpecialCharacters, Util.envVarEscape(noSpecialCharacters)); + + String withSpecialCharacters = "quotation \" apostrophe '"; + String escaped = "quotation \\\" apostrophe \\'"; + assertEquals(escaped, Util.envVarEscape(withSpecialCharacters)); + } + + /** + * It's difficult to explicitly test having an actual version and using the fallback, due to the usage of methods of + * Class. + */ + @Test + public void testGetSamzaVersion() { + String utilImplementationVersion = Util.class.getPackage().getImplementationVersion(); + String expectedVersion = (utilImplementationVersion != null) ? utilImplementationVersion : Util.FALLBACK_VERSION(); + assertEquals(expectedVersion, Util.getSamzaVersion()); + } + + /** + * It's difficult to explicitly test having an actual version and using the fallback, due to the usage of methods of + * Class. + */ + @Test + public void testGetTaskClassVersion() { + // cannot find app nor task + assertEquals(Util.FALLBACK_VERSION(), Util.getTaskClassVersion(new MapConfig())); + + // only app + String appClassVersion = MyAppClass.class.getPackage().getImplementationVersion(); + String expectedAppClassVersion = (appClassVersion != null) ? appClassVersion : Util.FALLBACK_VERSION(); + Config config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName())); + assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); + + // only task + String taskClassVersion = MyTaskClass.class.getPackage().getImplementationVersion(); + String expectedTaskClassVersion = (taskClassVersion != null) ? taskClassVersion : Util.FALLBACK_VERSION(); + config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS, MyTaskClass.class.getName())); + assertEquals(expectedTaskClassVersion, Util.getTaskClassVersion(config)); + + // both app and task; choose app + config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName(), + // shouldn't even try to load the task class + TaskConfig.TASK_CLASS, "this_is_not_a_class")); + assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); + } + + @Test + public void testGetLocalHostNotLoopbackAddress() throws UnknownHostException { + mockStatic(InetAddress.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(false); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + assertEquals(inetAddressLocalHost, Util.getLocalHost()); + } + + @Test + public void testGetLocalHostLoopbackAddressNoExternalAddressFound() throws Exception { + mockStatic(InetAddress.class, NetworkInterface.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(true); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + + // network interfaces return addresses which are not external + InetAddress linkLocalAddress = mock(InetAddress.class); + when(linkLocalAddress.isLinkLocalAddress()).thenReturn(true); + InetAddress loopbackAddress = mock(InetAddress.class); + when(loopbackAddress.isLinkLocalAddress()).thenReturn(false); + when(loopbackAddress.isLoopbackAddress()).thenReturn(true); + NetworkInterface networkInterface0 = mock(NetworkInterface.class); + when(networkInterface0.getInetAddresses()).thenReturn( + Collections.enumeration(Arrays.asList(linkLocalAddress, loopbackAddress))); + NetworkInterface networkInterface1 = mock(NetworkInterface.class); + when(networkInterface1.getInetAddresses()).thenReturn( + Collections.enumeration(Collections.singletonList(loopbackAddress))); + when(NetworkInterface.getNetworkInterfaces()).thenReturn( + Collections.enumeration(Arrays.asList(networkInterface0, networkInterface1))); + + assertEquals(inetAddressLocalHost, Util.getLocalHost()); + } + + @Test + public void testGetLocalHostExternalInet4Address() throws Exception { + mockStatic(InetAddress.class, NetworkInterface.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(true); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + + InetAddress linkLocalAddress = mock(InetAddress.class); + when(linkLocalAddress.isLinkLocalAddress()).thenReturn(true); + Inet4Address externalInet4Address = mock(Inet4Address.class); + when(externalInet4Address.isLinkLocalAddress()).thenReturn(false); + when(externalInet4Address.isLoopbackAddress()).thenReturn(false); + byte[] externalInet4AddressBytes = new byte[]{0, 1, 2, 3}; + when(externalInet4Address.getAddress()).thenReturn(externalInet4AddressBytes); + InetAddress otherExternalAddress = mock(InetAddress.class); // not Inet4Address + when(otherExternalAddress.isLinkLocalAddress()).thenReturn(false); + when(otherExternalAddress.isLoopbackAddress()).thenReturn(false); + + NetworkInterface networkInterfaceLinkLocal = mock(NetworkInterface.class); + when(networkInterfaceLinkLocal.getInetAddresses()).thenReturn( + Collections.enumeration(Collections.singletonList(linkLocalAddress))); + NetworkInterface networkInterfaceExternal = mock(NetworkInterface.class); + when(networkInterfaceExternal.getInetAddresses()).thenReturn( + Collections.enumeration(Arrays.asList(otherExternalAddress, externalInet4Address))); + when(NetworkInterface.getNetworkInterfaces()).thenReturn( + Collections.enumeration(Arrays.asList(networkInterfaceLinkLocal, networkInterfaceExternal))); + + InetAddress finalInetAddress = mock(InetAddress.class); + when(InetAddress.getByAddress(aryEq(externalInet4AddressBytes))).thenReturn(finalInetAddress); + + assertEquals(finalInetAddress, Util.getLocalHost()); + } + + @Test + public void testGetLocalHostExternalAddressNotInet4Address() throws Exception { + mockStatic(InetAddress.class, NetworkInterface.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(true); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + + byte[] externalAddressBytes = new byte[]{0, 1, 2, 3, 4, 5}; + InetAddress externalAddress = mock(InetAddress.class); + when(externalAddress.isLinkLocalAddress()).thenReturn(false); + when(externalAddress.isLoopbackAddress()).thenReturn(false); + when(externalAddress.getAddress()).thenReturn(externalAddressBytes); + + NetworkInterface networkInterface = mock(NetworkInterface.class); + when(networkInterface.getInetAddresses()).thenReturn( + Collections.enumeration(Collections.singletonList(externalAddress))); + when(NetworkInterface.getNetworkInterfaces()).thenReturn( + Collections.enumeration(Collections.singletonList(networkInterface))); + + InetAddress finalInetAddress = mock(InetAddress.class); + when(InetAddress.getByAddress(aryEq(externalAddressBytes))).thenReturn(finalInetAddress); + + assertEquals(finalInetAddress, Util.getLocalHost()); + } + + @Test + public void testRewriteConfig() { + Map baseConfigMap = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); + + // no rewriters + Map fullConfig = new HashMap<>(baseConfigMap); + assertEquals(fullConfig, Util.rewriteConfig(new MapConfig(fullConfig))); + + // one rewriter + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + Map expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); + + // only apply rewriters from rewriters list + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, OTHER_REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); + + // two rewriters; second rewriter overwrites configs from first + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME + "," + OTHER_REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); + } + + @Test(expected = SamzaException.class) + public void testRewriteConfigNoClassForConfigRewriterName() { + Config config = + new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, "unknownRewriter")); + Util.rewriteConfig(config); + } + + @Test(expected = SamzaException.class) + public void testRewriteConfigRewriterClassDoesNotExist() { + Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, REWRITER_NAME, + String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class")); + Util.rewriteConfig(config); + } + + @Test + public void testApplyRewriter() { + // new property + Map fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + NewPropertyRewriter.class.getName()); + Map expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + + // update property + fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + + // remove property + fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + DeletePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.remove(CONFIG_KEY); + assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + } + + @Test(expected = SamzaException.class) + public void testApplyRewriterNoClassForConfigRewriterName() { + Map fullConfig = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); + Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); + } + + @Test(expected = SamzaException.class) + public void testApplyRewriterClassDoesNotExist() { + Map fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + "not_a_class"); + Config expectedConfig = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, NEW_CONFIG_KEY, CONFIG_VALUE)); + assertEquals(expectedConfig, Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + } + + /** + * No requirement for this test that this extends any other class. Just need some placeholder class. + */ + public static class MyAppClass { + } + + /** + * No requirement for this test that this extends any other class. Just need some placeholder class. + */ + public static class MyTaskClass { + } + + /** + * Adds a new config entry for the key {@link #NEW_CONFIG_KEY} which has the same value as {@link #CONFIG_KEY}. + */ + public static class NewPropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + ImmutableMap.Builder newConfigMapBuilder = new ImmutableMap.Builder<>(); + newConfigMapBuilder.putAll(config); + newConfigMapBuilder.put(NEW_CONFIG_KEY, config.get(CONFIG_KEY)); + return new MapConfig(newConfigMapBuilder.build()); + } + } + + /** + * If an entry at {@link #NEW_CONFIG_KEY} exists, overwrites it to be the value concatenated with itself. Otherwise, + * updates the entry at {@link #CONFIG_KEY} to be the value concatenated to itself. + */ + public static class UpdatePropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + Map newConfigMap = new HashMap<>(config); + if (config.containsKey(NEW_CONFIG_KEY)) { + // for testing overwriting of new configs + newConfigMap.put(NEW_CONFIG_KEY, config.get(NEW_CONFIG_KEY) + config.get(NEW_CONFIG_KEY)); + } else { + newConfigMap.put(CONFIG_KEY, config.get(CONFIG_KEY) + config.get(CONFIG_KEY)); + } + return new MapConfig(newConfigMap); + } + } + + /** + * Removes config entry for the key {@link #CONFIG_KEY} and {@link #NEW_CONFIG_KEY}. + */ + public static class DeletePropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + Map newConfigMap = new HashMap<>(config); + newConfigMap.remove(CONFIG_KEY); + return new MapConfig(newConfigMap); + } + } +} diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala deleted file mode 100644 index ba3b5df483..0000000000 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - -import org.junit.Assert._ -import org.junit.Test -import org.apache.samza.config.MapConfig - -class TestUtil { - @Test - def testGetLocalHost(): Unit = { - assertNotNull(Util.getLocalHost) - } - - @Test - def testGetObjExistingClass() { - val obj = Util.getObj("org.apache.samza.config.MapConfig", classOf[MapConfig]) - assertNotNull(obj) - assertEquals(classOf[MapConfig], obj.getClass()) - } - - @Test(expected = classOf[ClassNotFoundException]) - def testGetObjNonexistentClass() { - Util.getObj("this.class.does.NotExist", classOf[Object]) - assert(false, "This should not get hit.") - } -} From 60ddccaebd4aa9a9b737c0c9c53366441baa761b Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 17:50:35 -0800 Subject: [PATCH 2/9] remove unused methods and methods for reflection --- .../org/apache/samza/config/SystemConfig.java | 4 +- .../samza/execution/LocalJobPlanner.java | 5 +- .../samza/runtime/LocalApplicationRunner.java | 4 +- .../apache/samza/util/DiagnosticsUtil.java | 2 +- .../samza/diagnostics/DiagnosticsManager.java | 29 ++++------- .../MetricsSnapshotReporterFactory.scala | 6 +-- .../org/apache/samza/util/CommandLine.scala | 8 ++-- .../samza/util/CoordinatorStreamUtil.scala | 2 +- .../scala/org/apache/samza/util/Util.scala | 48 ------------------- .../kafka/KafkaCheckpointManagerFactory.scala | 4 +- .../validation/YarnJobValidationTool.java | 10 ++-- 11 files changed, 29 insertions(+), 93 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java index bfbc2979e9..7b44a3a08f 100644 --- a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java @@ -29,7 +29,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; /** * Config helper methods related to systems. @@ -117,7 +117,7 @@ public Map getSystemFactories() { systemName -> { String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException( String.format("A stream uses system %s, which is missing from the configuration.", systemName))); - return Util.getObj(systemFactoryClassName, SystemFactory.class); + return ReflectionUtil.getObj(systemFactoryClassName, SystemFactory.class); })); return systemFactories; diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java index 902ea50fc9..000e55a4e0 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java @@ -36,7 +36,7 @@ import org.apache.samza.metadatastore.MetadataStoreFactory; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.StreamSpec; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.apache.samza.zk.ZkMetadataStoreFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,7 +210,8 @@ private MetadataStore getMetadataStore() { if (metadataStoreFactoryClass == null) { metadataStoreFactoryClass = DEFAULT_METADATA_STORE_FACTORY; } - MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); + MetadataStoreFactory metadataStoreFactory = + ReflectionUtil.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); return metadataStoreFactory.getMetadataStore(STREAM_CREATION_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap()); } } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 44adfde3a9..bf6dfce2d8 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -64,7 +64,6 @@ import org.apache.samza.task.TaskFactoryUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.ReflectionUtil; -import org.apache.samza.util.Util; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkMetadataStoreFactory; import org.slf4j.Logger; @@ -384,7 +383,8 @@ private void cleanup() { */ private MetadataStore getMetadataStoreForRunID() { String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY); - MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); + MetadataStoreFactory metadataStoreFactory = + ReflectionUtil.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap()); } diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java index 4d2a3bcc1e..e3ff250812 100644 --- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java @@ -132,7 +132,7 @@ public static Optional> buildD } // Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager - SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class); + SystemFactory systemFactory = ReflectionUtil.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class); SystemProducer systemProducer = systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap()); DiagnosticsManager diagnosticsManager = diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java index 80ddf0dc5f..0900c138fd 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java @@ -21,11 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; @@ -36,11 +34,9 @@ import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import scala.collection.JavaConverters; /** @@ -146,24 +142,15 @@ public DiagnosticsManager(String jobName, resetTime = Instant.now(); try { - - Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", - JavaConverters.collectionAsScalaIterableConverter( - Collections.singletonList(new Tuple2, Object>(DiagnosticsManager.class, this))) - .asScala() - .toSeq()); - + ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", + Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class)); LOG.info("Attached log4j diagnostics appender."); - } catch (ClassNotFoundException | InstantiationException | InvocationTargetException e) { + } catch (Exception e) { try { - Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", - JavaConverters.collectionAsScalaIterableConverter( - Collections.singletonList(new Tuple2, Object>(DiagnosticsManager.class, this))) - .asScala() - .toSeq()); - LOG.info("Attached log4j diagnostics appender."); - } catch (ClassNotFoundException | InstantiationException | InvocationTargetException ex) { - + ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", + Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class)); + LOG.info("Attached log4j2 diagnostics appender."); + } catch (Exception ex) { LOG.warn( "Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.", ex); diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index ec5f4d971f..441d834f9c 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -19,7 +19,7 @@ package org.apache.samza.metrics.reporter -import org.apache.samza.util.{Logging, StreamUtil, Util} +import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util} import org.apache.samza.SamzaException import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig} import org.apache.samza.metrics.MetricsReporter @@ -53,7 +53,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName)) - val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) + val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory]) info("Got system factory %s." format systemFactory) @@ -71,7 +71,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val serde = if (serdeName != null) { JavaOptionals.toRichOptional(serializerConfig.getSerdeFactoryClass(serdeName)).toOption match { case Some(serdeClassName) => - Util.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config) + ReflectionUtil.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config) case _ => null } } else { diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala index 14dd1c915f..b97afad119 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala @@ -33,19 +33,19 @@ import scala.collection.JavaConverters._ */ class CommandLine { val parser = new OptionParser() - val configFactoryOpt = + val configFactoryOpt = parser.accepts("config-factory", "The config factory to use to read your config file.") .withRequiredArg .ofType(classOf[java.lang.String]) .describedAs("com.foo.bar.ClassName") .defaultsTo(classOf[PropertiesConfigFactory].getName) val configPathOpt = - parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + + parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + "If multiple files are given they are all used with later files overriding any values that appear in earlier files.") .withRequiredArg .ofType(classOf[URI]) .describedAs("path") - val configOverrideOpt = + val configOverrideOpt = parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.") .withRequiredArg .ofType(classOf[KeyValuePair]) @@ -63,7 +63,7 @@ class CommandLine { // Set up the job parameters. val configFactoryClassName = options.valueOf(configFactoryOpt) val configPaths = options.valuesOf(configPathOpt) - configFactory = Util.getObj(configFactoryClassName, classOf[ConfigFactory]) + configFactory = ReflectionUtil.getObj(configFactoryClassName, classOf[ConfigFactory]) val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv => (kv.key, kv.value)).toMap val configs: Buffer[java.util.Map[String, String]] = configPaths.asScala.map(configFactory.getConfig) diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index 810345e005..f108387a2c 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -86,7 +86,7 @@ object CoordinatorStreamUtil extends Logging { val systemConfig = new SystemConfig(config) val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName)) - Util.getObj(systemFactoryClassName, classOf[SystemFactory]) + ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory]) } /** diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 41dda05dc1..798bac99fb 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -36,39 +36,12 @@ import scala.collection.JavaConverters._ object Util extends Logging { val FALLBACK_VERSION = "0.0.1" - val Random = new Random /** * Make an environment variable string safe to pass. */ def envVarEscape(str: String) = str.replace("\"", "\\\"").replace("'", "\\'") - /** - * Get a random number >= startInclusive, and < endExclusive. - */ - def randomBetween(startInclusive: Int, endExclusive: Int) = - startInclusive + Random.nextInt(endExclusive - startInclusive) - - /** - * Instantiate an object of type T from a given className. - * - * Deprecated: Use [[ReflectionUtil.getObj(String, Class)]] instead. - */ - @Deprecated - def getObj[T](className: String, clazz: Class[T]) = { - try { - Class - .forName(className) - .newInstance - .asInstanceOf[T] - } catch { - case e: Throwable => { - error("Unable to create an instance for class %s." format className, e) - throw e - } - } - } - def getSamzaVersion(): String = { Option(this.getClass.getPackage.getImplementationVersion) .getOrElse({ @@ -99,27 +72,6 @@ object Util extends Logging { } } - /** - * Instantiate an object from given className, and given constructor parameters. - * - * Deprecated: Use [[ReflectionUtil.getObjWithArgs(String, Class, ConstructorArgument...)]] instead. - */ - @Deprecated - @throws[ClassNotFoundException] - @throws[InstantiationException] - @throws[InvocationTargetException] - def getObj(className: String, constructorParams: (Class[_], Object)*) = { - try { - Class.forName(className).getDeclaredConstructor(constructorParams.map(x => x._1): _*) - .newInstance(constructorParams.map(x => x._2): _*) - } catch { - case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => { - warn("Could not instantiate an instance for class %s." format className, e) - throw e - } - } - } - def getLocalHost: InetAddress = { doGetLocalHost(new NetworkingUtil) } diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index b20704a61e..706ae65d34 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -26,7 +26,7 @@ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.{StreamSpec, SystemFactory} import org.apache.samza.system.kafka.KafkaStreamSpec import org.apache.samza.util.ScalaJavaUtil.JavaOptionals -import org.apache.samza.util.{KafkaUtil, Logging, Util, _} +import org.apache.samza.util.{KafkaUtil, Logging, _} class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { @@ -45,7 +45,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin .toOption .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format checkpointSystemName)) - val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory]) + val checkpointSystemFactory = ReflectionUtil.getObj(checkpointSystemFactoryName, classOf[SystemFactory]) val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config) info(s"Creating a KafkaCheckpointManager to consume from $checkpointTopic") diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 376e113051..bbcc976386 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -33,25 +33,21 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.samza.SamzaException; -import org.apache.samza.clustermanager.ClusterBasedJobCoordinator; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; -import org.apache.samza.coordinator.stream.CoordinatorStreamManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; -import org.apache.samza.job.model.JobModel; import org.apache.samza.job.yarn.ClientHelper; import org.apache.samza.metrics.JmxMetricsAccessor; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.MetricsValidator; import org.apache.samza.storage.ChangelogStreamManager; +import org.apache.samza.util.CommandLine; import org.apache.samza.util.CoordinatorStreamUtil; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.apache.samza.util.hadoop.HttpFileSystem; -import org.apache.samza.util.CommandLine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,7 +192,7 @@ public static void main(String [] args) throws Exception { MetricsValidator validator = null; if (options.has(validatorOpt)) { String validatorClass = options.valueOf(validatorOpt); - validator = Util.getObj(validatorClass, MetricsValidator.class); + validator = ReflectionUtil.getObj(validatorClass, MetricsValidator.class); } YarnConfiguration hadoopConfig = new YarnConfiguration(); From 716281fc1fe5f183d525cd852f4dc379afbf3b0a Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 19:12:00 -0800 Subject: [PATCH 3/9] one more test for Util --- .../src/test/java/org/apache/samza/util/TestUtil.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java index b13958ab11..1e3691b9db 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -216,6 +216,17 @@ public void testRewriteConfig() { assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); } + /** + * This fails because Util will interpret the empty string value as a single rewriter which has the empty string as a + * name, and there is no rewriter class config for a rewriter name which is the empty string. + * TODO: should this be fixed to interpret the empty string as an empty list? + */ + @Test(expected = SamzaException.class) + public void testRewriteConfigConfigRewritersEmptyString() { + Config config = new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, "")); + Util.rewriteConfig(config); + } + @Test(expected = SamzaException.class) public void testRewriteConfigNoClassForConfigRewriterName() { Config config = From 176b9fb009e5bfa5a5c0b890161df0b9b065ab85 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 19:18:57 -0800 Subject: [PATCH 4/9] add JavaUtil, to be converted into Util in next commit --- .../java/org/apache/samza/util/JavaUtil.java | 178 ++++++++++++++++++ .../java/org/apache/samza/util/TestUtil.java | 57 +++--- 2 files changed, 207 insertions(+), 28 deletions(-) create mode 100644 samza-core/src/main/java/org/apache/samza/util/JavaUtil.java diff --git a/samza-core/src/main/java/org/apache/samza/util/JavaUtil.java b/samza-core/src/main/java/org/apache/samza/util/JavaUtil.java new file mode 100644 index 0000000000..0a541c7846 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/JavaUtil.java @@ -0,0 +1,178 @@ +package org.apache.samza.util; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.TaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JavaUtil { + private static final Logger LOG = LoggerFactory.getLogger(JavaUtil.class); + + static final String FALLBACK_VERSION = "0.0.1"; + + /** + * Make an environment variable string safe to pass. + */ + public static String envVarEscape(String str) { + return str.replace("\"", "\\\"").replace("'", "\\'"); + } + + public static String getSamzaVersion() { + return Optional.ofNullable(JavaUtil.class.getPackage().getImplementationVersion()).orElseGet(() -> { + LOG.warn("Unable to find implementation samza version in jar's meta info. Defaulting to {}", FALLBACK_VERSION); + return FALLBACK_VERSION; + }); + } + + public static String getTaskClassVersion(Config config) { + try { + Optional appClass = Optional.ofNullable(new ApplicationConfig(config).getAppClass()); + if (appClass.isPresent()) { + return Optional.ofNullable(Class.forName(appClass.get()).getPackage().getImplementationVersion()) + .orElse(FALLBACK_VERSION); + } else { + Optional taskClass = new TaskConfig(config).getTaskClass(); + if (taskClass.isPresent()) { + return Optional.ofNullable(Class.forName(taskClass.get()).getPackage().getImplementationVersion()) + .orElse(FALLBACK_VERSION); + } else { + LOG.warn("Unable to find app class or task class. Defaulting to {}", FALLBACK_VERSION); + return FALLBACK_VERSION; + } + } + } catch (Exception e) { + LOG.warn(String.format("Ran into exception while trying to get version of app or task. Defaulting to %s", + FALLBACK_VERSION), e); + return FALLBACK_VERSION; + } + } + + /** + * Returns the the first host address which is not the loopback address, or {@link InetAddress#getLocalHost} as a + * fallback. + * + * @return the {@link InetAddress} which represents the localhost + */ + public static InetAddress getLocalHost() { + try { + return doGetLocalHost(new NetworkingUtil()); + } catch (Exception e) { + throw new SamzaException("Error while getting localhost", e); + } + } + + private static InetAddress doGetLocalHost(NetworkingUtil networkingUtil) + throws UnknownHostException, SocketException { + InetAddress localHost = networkingUtil.inetAddressGetLocalHost(); + if (localHost.isLoopbackAddress()) { + LOG.debug("Hostname {} resolves to a loopback address, trying to resolve an external IP address.", + localHost.getHostName()); + List networkInterfaces; + if (System.getProperty("os.name").startsWith("Windows")) { + networkInterfaces = Collections.list(networkingUtil.networkInterfaceGetNetworkInterfaces()); + } else { + networkInterfaces = Lists.reverse(Collections.list(networkingUtil.networkInterfaceGetNetworkInterfaces())); + } + for (NetworkInterface networkInterface : networkInterfaces) { + List addresses = + Collections.list(networkingUtil.networkInterfaceGetInetAddresses(networkInterface)) + .stream() + .filter(address -> !(address.isLinkLocalAddress() || address.isLoopbackAddress())) + .collect(Collectors.toList()); + if (!addresses.isEmpty()) { + InetAddress address = addresses.stream() + .filter(addr -> addr instanceof Inet4Address) + .findFirst() + .orElseGet(() -> addresses.get(0)); + LOG.debug("Found an external IP address {} which represents the localhost.", + networkingUtil.inetAddressGetHostAddress(address)); + return networkingUtil.inetAddressGetByAddress(networkingUtil.inetAddressGetAddress(address)); + } + } + } + return localHost; + } + + /** + * Re-writes configuration using a ConfigRewriter, if one is defined. If there is no ConfigRewriter defined for the + * job, then this method is a no-op. + * + * @param config The config to re-write + * @return re-written config + */ + public static Config rewriteConfig(Config config) { + Optional configRewriterNamesOptional = new JobConfig(config).getConfigRewriters(); + if (configRewriterNamesOptional.isPresent()) { + String[] configRewriterNames = configRewriterNamesOptional.get().split(","); + Config rewrittenConfig = config; + for (String configRewriterName : configRewriterNames) { + rewrittenConfig = applyRewriter(rewrittenConfig, configRewriterName); + } + return rewrittenConfig; + } else { + return config; + } + } + + /** + * Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config. + * @param config the config to re-write + * @param rewriterName the name of the rewriter to apply + * @return the rewritten config + */ + public static Config applyRewriter(Config config, String rewriterName) { + String rewriterClassName = new JobConfig(config).getConfigRewriterClass(rewriterName) + .orElseThrow(() -> new SamzaException( + String.format("Unable to find class config for config rewriter %s.", rewriterName))); + ConfigRewriter rewriter = ReflectionUtil.getObj(rewriterClassName, ConfigRewriter.class); + LOG.info("Re-writing config with {}", rewriter); + return rewriter.rewrite(rewriterName, config); + } + + /** + * Do this so Powermockito can mock the system classes. + * Powermockito doesn't seem to work as well with Scala singletons. + * In Java, it seems like it will work to use Powermock without this wrapper. + */ + static class NetworkingUtil { + public InetAddress inetAddressGetLocalHost() throws UnknownHostException { + return InetAddress.getLocalHost(); + } + + public InetAddress inetAddressGetByAddress(byte[] address) throws UnknownHostException { + return InetAddress.getByAddress(address); + } + + public String inetAddressGetHostAddress(InetAddress inetAddress) { + return inetAddress.getHostAddress(); + } + + public byte[] inetAddressGetAddress(InetAddress inetAddress) { + return inetAddress.getAddress(); + } + + public Enumeration networkInterfaceGetNetworkInterfaces() throws SocketException { + return NetworkInterface.getNetworkInterfaces(); + } + + public Enumeration networkInterfaceGetInetAddresses(NetworkInterface networkInterface) { + return networkInterface.getInetAddresses(); + } + } +} diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java index 1e3691b9db..b634995e14 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -29,7 +29,7 @@ @RunWith(PowerMockRunner.class) -@PrepareForTest({NetworkingUtil.class}) +@PrepareForTest({JavaUtil.NetworkingUtil.class}) public class TestUtil { private static final String CONFIG_KEY = "config.key"; private static final String CONFIG_VALUE = "value"; @@ -41,11 +41,11 @@ public class TestUtil { public void testEnvVarEscape() { // no special characters in original String noSpecialCharacters = "hello world 123 .?!"; - assertEquals(noSpecialCharacters, Util.envVarEscape(noSpecialCharacters)); + assertEquals(noSpecialCharacters, JavaUtil.envVarEscape(noSpecialCharacters)); String withSpecialCharacters = "quotation \" apostrophe '"; String escaped = "quotation \\\" apostrophe \\'"; - assertEquals(escaped, Util.envVarEscape(withSpecialCharacters)); + assertEquals(escaped, JavaUtil.envVarEscape(withSpecialCharacters)); } /** @@ -54,9 +54,10 @@ public void testEnvVarEscape() { */ @Test public void testGetSamzaVersion() { - String utilImplementationVersion = Util.class.getPackage().getImplementationVersion(); - String expectedVersion = (utilImplementationVersion != null) ? utilImplementationVersion : Util.FALLBACK_VERSION(); - assertEquals(expectedVersion, Util.getSamzaVersion()); + String utilImplementationVersion = JavaUtil.class.getPackage().getImplementationVersion(); + String expectedVersion = + (utilImplementationVersion != null) ? utilImplementationVersion : JavaUtil.FALLBACK_VERSION; + assertEquals(expectedVersion, JavaUtil.getSamzaVersion()); } /** @@ -66,25 +67,25 @@ public void testGetSamzaVersion() { @Test public void testGetTaskClassVersion() { // cannot find app nor task - assertEquals(Util.FALLBACK_VERSION(), Util.getTaskClassVersion(new MapConfig())); + assertEquals(JavaUtil.FALLBACK_VERSION, JavaUtil.getTaskClassVersion(new MapConfig())); // only app String appClassVersion = MyAppClass.class.getPackage().getImplementationVersion(); - String expectedAppClassVersion = (appClassVersion != null) ? appClassVersion : Util.FALLBACK_VERSION(); + String expectedAppClassVersion = (appClassVersion != null) ? appClassVersion : JavaUtil.FALLBACK_VERSION; Config config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName())); - assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); + assertEquals(expectedAppClassVersion, JavaUtil.getTaskClassVersion(config)); // only task String taskClassVersion = MyTaskClass.class.getPackage().getImplementationVersion(); - String expectedTaskClassVersion = (taskClassVersion != null) ? taskClassVersion : Util.FALLBACK_VERSION(); + String expectedTaskClassVersion = (taskClassVersion != null) ? taskClassVersion : JavaUtil.FALLBACK_VERSION; config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS, MyTaskClass.class.getName())); - assertEquals(expectedTaskClassVersion, Util.getTaskClassVersion(config)); + assertEquals(expectedTaskClassVersion, JavaUtil.getTaskClassVersion(config)); // both app and task; choose app config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName(), // shouldn't even try to load the task class TaskConfig.TASK_CLASS, "this_is_not_a_class")); - assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); + assertEquals(expectedAppClassVersion, JavaUtil.getTaskClassVersion(config)); } @Test @@ -93,7 +94,7 @@ public void testGetLocalHostNotLoopbackAddress() throws UnknownHostException { InetAddress inetAddressLocalHost = mock(InetAddress.class); when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(false); when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); - assertEquals(inetAddressLocalHost, Util.getLocalHost()); + assertEquals(inetAddressLocalHost, JavaUtil.getLocalHost()); } @Test @@ -118,7 +119,7 @@ public void testGetLocalHostLoopbackAddressNoExternalAddressFound() throws Excep when(NetworkInterface.getNetworkInterfaces()).thenReturn( Collections.enumeration(Arrays.asList(networkInterface0, networkInterface1))); - assertEquals(inetAddressLocalHost, Util.getLocalHost()); + assertEquals(inetAddressLocalHost, JavaUtil.getLocalHost()); } @Test @@ -151,7 +152,7 @@ public void testGetLocalHostExternalInet4Address() throws Exception { InetAddress finalInetAddress = mock(InetAddress.class); when(InetAddress.getByAddress(aryEq(externalInet4AddressBytes))).thenReturn(finalInetAddress); - assertEquals(finalInetAddress, Util.getLocalHost()); + assertEquals(finalInetAddress, JavaUtil.getLocalHost()); } @Test @@ -176,7 +177,7 @@ public void testGetLocalHostExternalAddressNotInet4Address() throws Exception { InetAddress finalInetAddress = mock(InetAddress.class); when(InetAddress.getByAddress(aryEq(externalAddressBytes))).thenReturn(finalInetAddress); - assertEquals(finalInetAddress, Util.getLocalHost()); + assertEquals(finalInetAddress, JavaUtil.getLocalHost()); } @Test @@ -185,7 +186,7 @@ public void testRewriteConfig() { // no rewriters Map fullConfig = new HashMap<>(baseConfigMap); - assertEquals(fullConfig, Util.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(fullConfig, JavaUtil.rewriteConfig(new MapConfig(fullConfig))); // one rewriter fullConfig = new HashMap<>(baseConfigMap); @@ -193,7 +194,7 @@ public void testRewriteConfig() { fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); Map expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(new MapConfig(expectedConfigMap), JavaUtil.rewriteConfig(new MapConfig(fullConfig))); // only apply rewriters from rewriters list fullConfig = new HashMap<>(baseConfigMap); @@ -203,7 +204,7 @@ public void testRewriteConfig() { UpdatePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(new MapConfig(expectedConfigMap), JavaUtil.rewriteConfig(new MapConfig(fullConfig))); // two rewriters; second rewriter overwrites configs from first fullConfig = new HashMap<>(baseConfigMap); @@ -213,7 +214,7 @@ public void testRewriteConfig() { UpdatePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(new MapConfig(expectedConfigMap), JavaUtil.rewriteConfig(new MapConfig(fullConfig))); } /** @@ -224,21 +225,21 @@ public void testRewriteConfig() { @Test(expected = SamzaException.class) public void testRewriteConfigConfigRewritersEmptyString() { Config config = new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, "")); - Util.rewriteConfig(config); + JavaUtil.rewriteConfig(config); } @Test(expected = SamzaException.class) public void testRewriteConfigNoClassForConfigRewriterName() { Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, "unknownRewriter")); - Util.rewriteConfig(config); + JavaUtil.rewriteConfig(config); } @Test(expected = SamzaException.class) public void testRewriteConfigRewriterClassDoesNotExist() { Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, REWRITER_NAME, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class")); - Util.rewriteConfig(config); + JavaUtil.rewriteConfig(config); } @Test @@ -249,7 +250,7 @@ public void testApplyRewriter() { NewPropertyRewriter.class.getName()); Map expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(new MapConfig(expectedConfigMap), JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); // update property fullConfig = @@ -257,7 +258,7 @@ public void testApplyRewriter() { UpdatePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(new MapConfig(expectedConfigMap), JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); // remove property fullConfig = @@ -265,13 +266,13 @@ public void testApplyRewriter() { DeletePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.remove(CONFIG_KEY); - assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(new MapConfig(expectedConfigMap), JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } @Test(expected = SamzaException.class) public void testApplyRewriterNoClassForConfigRewriterName() { Map fullConfig = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); - Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); + JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); } @Test(expected = SamzaException.class) @@ -280,7 +281,7 @@ public void testApplyRewriterClassDoesNotExist() { ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class"); Config expectedConfig = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, NEW_CONFIG_KEY, CONFIG_VALUE)); - assertEquals(expectedConfig, Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(expectedConfig, JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } /** From 248fff758a4a401e67ace408809e1aa79e9113be Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 19:22:23 -0800 Subject: [PATCH 5/9] rename JavaUtil to Util --- .../samza/util/{JavaUtil.java => Util.java} | 6 +- .../org/apache/samza/util/HttpUtil.scala | 3 +- .../scala/org/apache/samza/util/Util.scala | 167 ------------------ .../java/org/apache/samza/util/TestUtil.java | 56 +++--- 4 files changed, 32 insertions(+), 200 deletions(-) rename samza-core/src/main/java/org/apache/samza/util/{JavaUtil.java => Util.java} (97%) delete mode 100644 samza-core/src/main/scala/org/apache/samza/util/Util.scala diff --git a/samza-core/src/main/java/org/apache/samza/util/JavaUtil.java b/samza-core/src/main/java/org/apache/samza/util/Util.java similarity index 97% rename from samza-core/src/main/java/org/apache/samza/util/JavaUtil.java rename to samza-core/src/main/java/org/apache/samza/util/Util.java index 0a541c7846..1223d26cb8 100644 --- a/samza-core/src/main/java/org/apache/samza/util/JavaUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/Util.java @@ -21,8 +21,8 @@ import org.slf4j.LoggerFactory; -public class JavaUtil { - private static final Logger LOG = LoggerFactory.getLogger(JavaUtil.class); +public class Util { + private static final Logger LOG = LoggerFactory.getLogger(Util.class); static final String FALLBACK_VERSION = "0.0.1"; @@ -34,7 +34,7 @@ public static String envVarEscape(String str) { } public static String getSamzaVersion() { - return Optional.ofNullable(JavaUtil.class.getPackage().getImplementationVersion()).orElseGet(() -> { + return Optional.ofNullable(Util.class.getPackage().getImplementationVersion()).orElseGet(() -> { LOG.warn("Unable to find implementation samza version in jar's meta info. Defaulting to {}", FALLBACK_VERSION); return FALLBACK_VERSION; }); diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala index ea5eb5a8cc..577bba63da 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala @@ -25,9 +25,8 @@ import java.io.{BufferedReader, IOException, InputStream, InputStreamReader} import java.net.{HttpURLConnection, URL} import org.apache.samza.SamzaException -import org.apache.samza.util.Util.{error, warn} -object HttpUtil { +object HttpUtil extends Logging { /** * Reads a URL and returns the response body as a string. Retries in an exponential backoff, but does no other error handling. diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala deleted file mode 100644 index 798bac99fb..0000000000 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - - -import java.lang.reflect.InvocationTargetException - -import org.apache.samza.config._ -import org.apache.samza.SamzaException -import java.net.Inet4Address -import java.net.InetAddress -import java.net.NetworkInterface -import java.util.Random - -import org.apache.samza.util.ScalaJavaUtil.JavaOptionals - -import scala.collection.JavaConverters._ - - -object Util extends Logging { - val FALLBACK_VERSION = "0.0.1" - - /** - * Make an environment variable string safe to pass. - */ - def envVarEscape(str: String) = str.replace("\"", "\\\"").replace("'", "\\'") - - def getSamzaVersion(): String = { - Option(this.getClass.getPackage.getImplementationVersion) - .getOrElse({ - warn("Unable to find implementation samza version in jar's meta info. Defaulting to %s" format FALLBACK_VERSION) - FALLBACK_VERSION - }) - } - - def getTaskClassVersion(config: Config): String = { - try { - val appClass = Option(new ApplicationConfig(config).getAppClass) - if (appClass.isDefined) { - Option.apply(Class.forName(appClass.get).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION) - } else { - val taskClass = new TaskConfig(config).getTaskClass - if (taskClass.isPresent) { - Option.apply(Class.forName(taskClass.get()).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION) - } else { - warn("Unable to find app class or task class. Defaulting to %s" format FALLBACK_VERSION) - FALLBACK_VERSION - } - } - } catch { - case e: Exception => { - warn("Unable to find implementation version in jar's meta info. Defaulting to %s" format FALLBACK_VERSION) - FALLBACK_VERSION - } - } - } - - def getLocalHost: InetAddress = { - doGetLocalHost(new NetworkingUtil) - } - - /** - * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback - * - * @return the [[java.net.InetAddress]] which represents the localhost - */ - def doGetLocalHost(networkingUtil: NetworkingUtil): InetAddress = { - val localHost = networkingUtil.inetAddressGetLocalHost - if (localHost.isLoopbackAddress) { - debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName)) - val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) { - networkingUtil.networkInterfaceGetNetworkInterfaces.asScala.toList - } else { - networkingUtil.networkInterfaceGetNetworkInterfaces.asScala.toList.reverse - } - for (networkInterface <- networkInterfaces) { - val addresses = networkingUtil.networkInterfaceGetInetAddresses(networkInterface).asScala.toList - .filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress) - if (addresses.nonEmpty) { - val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) - debug("Found an external IP address %s which represents the localhost.".format(networkingUtil.inetAddressGetHostAddress(address))) - return networkingUtil.inetAddressGetByAddress(networkingUtil.inetAddressGetAddress(address)) - } - } - } - localHost - } - - /** - * Re-writes configuration using a ConfigRewriter, if one is defined. If - * there is no ConfigRewriter defined for the job, then this method is a - * no-op. - * - * @param config The config to re-write - * @return re-written config - */ - def rewriteConfig(config: Config): Config = { - JavaOptionals.toRichOptional(new JobConfig(config).getConfigRewriters).toOption match { - case Some(rewriters) => rewriters.split(",").foldLeft(config)(applyRewriter(_, _)) - case _ => config - } - } - - /** - * Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config. - * @param config the config to re-write - * @param rewriterName the name of the rewriter to apply - * @return the rewritten config - */ - def applyRewriter(config: Config, rewriterName: String): Config = { - val rewriterClassName = JavaOptionals.toRichOptional(new JobConfig(config).getConfigRewriterClass(rewriterName)) - .toOption - .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) - val rewriter = ReflectionUtil.getObj(rewriterClassName, classOf[ConfigRewriter]) - info("Re-writing config with " + rewriter) - rewriter.rewrite(rewriterName, config) - } - -} - -/** - * Do this so Powermockito can mock the system classes. - * Powermockito doesn't seem to work as well with Scala singletons. - * In Java, it seems like it will work to use Powermock without this wrapper. - */ -class NetworkingUtil { - def inetAddressGetLocalHost: InetAddress = { - InetAddress.getLocalHost - } - - def inetAddressGetByAddress(address: Array[Byte]): InetAddress = { - InetAddress.getByAddress(address) - } - - def inetAddressGetHostAddress(inetAddress: InetAddress): String = { - inetAddress.getHostAddress - } - - def inetAddressGetAddress(inetAddress: InetAddress): Array[Byte] = { - inetAddress.getAddress - } - - def networkInterfaceGetNetworkInterfaces: java.util.Enumeration[NetworkInterface] = { - NetworkInterface.getNetworkInterfaces - } - - def networkInterfaceGetInetAddresses(networkInterface: NetworkInterface): java.util.Enumeration[InetAddress] = { - networkInterface.getInetAddresses - } -} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java index b634995e14..4cb418b32c 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -29,7 +29,7 @@ @RunWith(PowerMockRunner.class) -@PrepareForTest({JavaUtil.NetworkingUtil.class}) +@PrepareForTest({Util.NetworkingUtil.class}) public class TestUtil { private static final String CONFIG_KEY = "config.key"; private static final String CONFIG_VALUE = "value"; @@ -41,11 +41,11 @@ public class TestUtil { public void testEnvVarEscape() { // no special characters in original String noSpecialCharacters = "hello world 123 .?!"; - assertEquals(noSpecialCharacters, JavaUtil.envVarEscape(noSpecialCharacters)); + assertEquals(noSpecialCharacters, Util.envVarEscape(noSpecialCharacters)); String withSpecialCharacters = "quotation \" apostrophe '"; String escaped = "quotation \\\" apostrophe \\'"; - assertEquals(escaped, JavaUtil.envVarEscape(withSpecialCharacters)); + assertEquals(escaped, Util.envVarEscape(withSpecialCharacters)); } /** @@ -54,10 +54,10 @@ public void testEnvVarEscape() { */ @Test public void testGetSamzaVersion() { - String utilImplementationVersion = JavaUtil.class.getPackage().getImplementationVersion(); + String utilImplementationVersion = Util.class.getPackage().getImplementationVersion(); String expectedVersion = - (utilImplementationVersion != null) ? utilImplementationVersion : JavaUtil.FALLBACK_VERSION; - assertEquals(expectedVersion, JavaUtil.getSamzaVersion()); + (utilImplementationVersion != null) ? utilImplementationVersion : Util.FALLBACK_VERSION; + assertEquals(expectedVersion, Util.getSamzaVersion()); } /** @@ -67,25 +67,25 @@ public void testGetSamzaVersion() { @Test public void testGetTaskClassVersion() { // cannot find app nor task - assertEquals(JavaUtil.FALLBACK_VERSION, JavaUtil.getTaskClassVersion(new MapConfig())); + assertEquals(Util.FALLBACK_VERSION, Util.getTaskClassVersion(new MapConfig())); // only app String appClassVersion = MyAppClass.class.getPackage().getImplementationVersion(); - String expectedAppClassVersion = (appClassVersion != null) ? appClassVersion : JavaUtil.FALLBACK_VERSION; + String expectedAppClassVersion = (appClassVersion != null) ? appClassVersion : Util.FALLBACK_VERSION; Config config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName())); - assertEquals(expectedAppClassVersion, JavaUtil.getTaskClassVersion(config)); + assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); // only task String taskClassVersion = MyTaskClass.class.getPackage().getImplementationVersion(); - String expectedTaskClassVersion = (taskClassVersion != null) ? taskClassVersion : JavaUtil.FALLBACK_VERSION; + String expectedTaskClassVersion = (taskClassVersion != null) ? taskClassVersion : Util.FALLBACK_VERSION; config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS, MyTaskClass.class.getName())); - assertEquals(expectedTaskClassVersion, JavaUtil.getTaskClassVersion(config)); + assertEquals(expectedTaskClassVersion, Util.getTaskClassVersion(config)); // both app and task; choose app config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName(), // shouldn't even try to load the task class TaskConfig.TASK_CLASS, "this_is_not_a_class")); - assertEquals(expectedAppClassVersion, JavaUtil.getTaskClassVersion(config)); + assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); } @Test @@ -94,7 +94,7 @@ public void testGetLocalHostNotLoopbackAddress() throws UnknownHostException { InetAddress inetAddressLocalHost = mock(InetAddress.class); when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(false); when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); - assertEquals(inetAddressLocalHost, JavaUtil.getLocalHost()); + assertEquals(inetAddressLocalHost, Util.getLocalHost()); } @Test @@ -119,7 +119,7 @@ public void testGetLocalHostLoopbackAddressNoExternalAddressFound() throws Excep when(NetworkInterface.getNetworkInterfaces()).thenReturn( Collections.enumeration(Arrays.asList(networkInterface0, networkInterface1))); - assertEquals(inetAddressLocalHost, JavaUtil.getLocalHost()); + assertEquals(inetAddressLocalHost, Util.getLocalHost()); } @Test @@ -152,7 +152,7 @@ public void testGetLocalHostExternalInet4Address() throws Exception { InetAddress finalInetAddress = mock(InetAddress.class); when(InetAddress.getByAddress(aryEq(externalInet4AddressBytes))).thenReturn(finalInetAddress); - assertEquals(finalInetAddress, JavaUtil.getLocalHost()); + assertEquals(finalInetAddress, Util.getLocalHost()); } @Test @@ -177,7 +177,7 @@ public void testGetLocalHostExternalAddressNotInet4Address() throws Exception { InetAddress finalInetAddress = mock(InetAddress.class); when(InetAddress.getByAddress(aryEq(externalAddressBytes))).thenReturn(finalInetAddress); - assertEquals(finalInetAddress, JavaUtil.getLocalHost()); + assertEquals(finalInetAddress, Util.getLocalHost()); } @Test @@ -186,7 +186,7 @@ public void testRewriteConfig() { // no rewriters Map fullConfig = new HashMap<>(baseConfigMap); - assertEquals(fullConfig, JavaUtil.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(fullConfig, Util.rewriteConfig(new MapConfig(fullConfig))); // one rewriter fullConfig = new HashMap<>(baseConfigMap); @@ -194,7 +194,7 @@ public void testRewriteConfig() { fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); Map expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), JavaUtil.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); // only apply rewriters from rewriters list fullConfig = new HashMap<>(baseConfigMap); @@ -204,7 +204,7 @@ public void testRewriteConfig() { UpdatePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), JavaUtil.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); // two rewriters; second rewriter overwrites configs from first fullConfig = new HashMap<>(baseConfigMap); @@ -214,7 +214,7 @@ public void testRewriteConfig() { UpdatePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), JavaUtil.rewriteConfig(new MapConfig(fullConfig))); + assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); } /** @@ -225,21 +225,21 @@ public void testRewriteConfig() { @Test(expected = SamzaException.class) public void testRewriteConfigConfigRewritersEmptyString() { Config config = new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, "")); - JavaUtil.rewriteConfig(config); + Util.rewriteConfig(config); } @Test(expected = SamzaException.class) public void testRewriteConfigNoClassForConfigRewriterName() { Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, "unknownRewriter")); - JavaUtil.rewriteConfig(config); + Util.rewriteConfig(config); } @Test(expected = SamzaException.class) public void testRewriteConfigRewriterClassDoesNotExist() { Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, REWRITER_NAME, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class")); - JavaUtil.rewriteConfig(config); + Util.rewriteConfig(config); } @Test @@ -250,7 +250,7 @@ public void testApplyRewriter() { NewPropertyRewriter.class.getName()); Map expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); // update property fullConfig = @@ -258,7 +258,7 @@ public void testApplyRewriter() { UpdatePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); // remove property fullConfig = @@ -266,13 +266,13 @@ public void testApplyRewriter() { DeletePropertyRewriter.class.getName()); expectedConfigMap = new HashMap<>(fullConfig); expectedConfigMap.remove(CONFIG_KEY); - assertEquals(new MapConfig(expectedConfigMap), JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } @Test(expected = SamzaException.class) public void testApplyRewriterNoClassForConfigRewriterName() { Map fullConfig = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); - JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); + Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); } @Test(expected = SamzaException.class) @@ -281,7 +281,7 @@ public void testApplyRewriterClassDoesNotExist() { ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class"); Config expectedConfig = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, NEW_CONFIG_KEY, CONFIG_VALUE)); - assertEquals(expectedConfig, JavaUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + assertEquals(expectedConfig, Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } /** From e43c7ec751c12cb7923269951d969f61f329bb09 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 19:46:08 -0800 Subject: [PATCH 6/9] move rewriteConfig and applyRewriter to ConfigUtil --- .../org/apache/samza/util/ConfigUtil.java | 54 +++--- .../main/java/org/apache/samza/util/Util.java | 36 ---- .../samza/checkpoint/CheckpointTool.scala | 4 +- .../samza/coordinator/JobModelManager.scala | 4 +- .../org/apache/samza/util/TestConfigUtil.java | 182 +++++++++++++++--- .../java/org/apache/samza/util/TestUtil.java | 158 --------------- 6 files changed, 192 insertions(+), 246 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index 8567ecdb7e..7d86bf52a0 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -19,16 +19,18 @@ package org.apache.samza.util; -import java.util.HashMap; -import java.util.Map; +import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConfigUtil { + private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class); + /** * Re-writes configuration using a ConfigRewriter, if one is defined. If * there is no ConfigRewriter defined for the job, then this method is a @@ -37,28 +39,32 @@ public class ConfigUtil { * @param config The config to re-write * @return rewrited configs */ - static public Config rewriteConfig(Config config) { - try { - final String rewriters = config.get(JobConfig.CONFIG_REWRITERS, ""); - if (!rewriters.isEmpty()) { - Map resultConfig = new HashMap<>(config); - for (String rewriter : rewriters.split(",")) { - String rewriterClassCfg = String.format(JobConfig.CONFIG_REWRITER_CLASS, rewriter); - String rewriterClass = config.get(rewriterClassCfg, ""); - if (rewriterClass.isEmpty()) { - throw new SamzaException( - "Unable to find class config for config rewriter: " + rewriterClassCfg); - } - ConfigRewriter configRewriter = (ConfigRewriter) Class.forName(rewriterClass).newInstance(); - Config rewritedConfig = configRewriter.rewrite(rewriter, config); - resultConfig.putAll(rewritedConfig); - } - return new MapConfig(resultConfig); - } else { - return config; + public static Config rewriteConfig(Config config) { + Optional configRewriterNamesOptional = new JobConfig(config).getConfigRewriters(); + if (configRewriterNamesOptional.isPresent()) { + String[] configRewriterNames = configRewriterNamesOptional.get().split(","); + Config rewrittenConfig = config; + for (String configRewriterName : configRewriterNames) { + rewrittenConfig = applyRewriter(rewrittenConfig, configRewriterName); } - } catch (Exception e) { - throw new RuntimeException(e); + return rewrittenConfig; + } else { + return config; } } + + /** + * Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config. + * @param config the config to re-write + * @param rewriterName the name of the rewriter to apply + * @return the rewritten config + */ + public static Config applyRewriter(Config config, String rewriterName) { + String rewriterClassName = new JobConfig(config).getConfigRewriterClass(rewriterName) + .orElseThrow(() -> new SamzaException( + String.format("Unable to find class config for config rewriter %s.", rewriterName))); + ConfigRewriter rewriter = ReflectionUtil.getObj(rewriterClassName, ConfigRewriter.class); + LOG.info("Re-writing config with {}", rewriter); + return rewriter.rewrite(rewriterName, config); + } } diff --git a/samza-core/src/main/java/org/apache/samza/util/Util.java b/samza-core/src/main/java/org/apache/samza/util/Util.java index 1223d26cb8..7af1259d25 100644 --- a/samza-core/src/main/java/org/apache/samza/util/Util.java +++ b/samza-core/src/main/java/org/apache/samza/util/Util.java @@ -109,42 +109,6 @@ private static InetAddress doGetLocalHost(NetworkingUtil networkingUtil) return localHost; } - /** - * Re-writes configuration using a ConfigRewriter, if one is defined. If there is no ConfigRewriter defined for the - * job, then this method is a no-op. - * - * @param config The config to re-write - * @return re-written config - */ - public static Config rewriteConfig(Config config) { - Optional configRewriterNamesOptional = new JobConfig(config).getConfigRewriters(); - if (configRewriterNamesOptional.isPresent()) { - String[] configRewriterNames = configRewriterNamesOptional.get().split(","); - Config rewrittenConfig = config; - for (String configRewriterName : configRewriterNames) { - rewrittenConfig = applyRewriter(rewrittenConfig, configRewriterName); - } - return rewrittenConfig; - } else { - return config; - } - } - - /** - * Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config. - * @param config the config to re-write - * @param rewriterName the name of the rewriter to apply - * @return the rewritten config - */ - public static Config applyRewriter(Config config, String rewriterName) { - String rewriterClassName = new JobConfig(config).getConfigRewriterClass(rewriterName) - .orElseThrow(() -> new SamzaException( - String.format("Unable to find class config for config rewriter %s.", rewriterName))); - ConfigRewriter rewriter = ReflectionUtil.getObj(rewriterClassName, ConfigRewriter.class); - LOG.info("Re-writing config with {}", rewriter); - return rewriter.rewrite(rewriterName, config); - } - /** * Do this so Powermockito can mock the system classes. * Powermockito doesn't seem to work as well with Scala singletons. diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 74ab562b07..eff1e73955 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -32,7 +32,7 @@ import org.apache.samza.container.TaskName import org.apache.samza.job.JobRunner.info import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.{CommandLine, CoordinatorStreamUtil, Logging, ReflectionUtil, Util} +import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, Logging, ReflectionUtil, Util} import org.apache.samza.Partition import org.apache.samza.SamzaException @@ -138,7 +138,7 @@ object CheckpointTool { val options = cmdline.parser.parse(args: _*) val userConfig = cmdline.loadConfig(options) val jobConfig = JobPlanner.generateSingleJobConfig(userConfig) - val rewrittenConfig = Util.rewriteConfig(jobConfig) + val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig) info(s"Using the rewritten config: $rewrittenConfig") val tool = CheckpointTool(rewrittenConfig, cmdline.newOffsets) tool.run() diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 76245b84ec..07055de1fc 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -47,7 +47,7 @@ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.LocationId import org.apache.samza.system._ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals -import org.apache.samza.util.{Logging, ReflectionUtil, Util} +import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil, Util} import scala.collection.JavaConverters import scala.collection.JavaConversions._ @@ -274,7 +274,7 @@ object JobModelManager extends Logging { filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)). - foldLeft(config)(Util.applyRewriter(_, _)) + foldLeft(config)(ConfigUtil.applyRewriter(_, _)) case _ => config } } diff --git a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java index f771a99010..218eb179fd 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java @@ -21,52 +21,186 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.collect.ImmutableMap; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class TestConfigUtil { - Map configMap = new HashMap<>(); + private static final String CONFIG_KEY = "config.key"; + private static final String CONFIG_VALUE = "value"; + private static final String NEW_CONFIG_KEY = "new.rewritten.config.key"; + private static final String REWRITER_NAME = "propertyRewriter"; + private static final String OTHER_REWRITER_NAME = "otherPropertyRewriter"; - @Before - public void setup() { - configMap.put("job.config.rewriter.testRewriter.class", TestConfigRewriter.class.getName()); - configMap.put("job.config.rewriter.testNoneRewriter.class", ""); + @Test + public void testRewriteConfig() { + Map baseConfigMap = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); + + // no rewriters + Map fullConfig = new HashMap<>(baseConfigMap); + assertEquals(fullConfig, ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // rewriter that adds property + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + Map expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // rewriter that updates property + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // rewriter that removes property + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + DeletePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.remove(CONFIG_KEY); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // only apply rewriters from rewriters list + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, OTHER_REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + // two rewriters; second rewriter overwrites configs from first + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME + "," + OTHER_REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); } - @Test - public void testRewriterWithConfigRewriter() { - configMap.put("job.config.rewriters", "testRewriter"); - configMap.put("job.config.rewriter.testRewriter.value", "rewrittenTest"); + /** + * This fails because Util will interpret the empty string value as a single rewriter which has the empty string as a + * name, and there is no rewriter class config for a rewriter name which is the empty string. + * TODO: should this be fixed to interpret the empty string as an empty list? + */ + @Test(expected = SamzaException.class) + public void testRewriteConfigConfigRewritersEmptyString() { + Config config = new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, "")); + ConfigUtil.rewriteConfig(config); + } + + @Test(expected = SamzaException.class) + public void testRewriteConfigNoClassForConfigRewriterName() { + Config config = + new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, "unknownRewriter")); + ConfigUtil.rewriteConfig(config); + } - Config config = ConfigUtil.rewriteConfig(new MapConfig(configMap)); - assertEquals("rewrittenTest", config.get("value")); + @Test(expected = SamzaException.class) + public void testRewriteConfigRewriterClassDoesNotExist() { + Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, REWRITER_NAME, + String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class")); + ConfigUtil.rewriteConfig(config); } @Test - public void testGetRewriterWithoutConfigRewriter() { - Config config = ConfigUtil.rewriteConfig(new MapConfig(configMap)); - assertEquals(config, new MapConfig(configMap)); + public void testApplyRewriter() { + // new property + Map fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + NewPropertyRewriter.class.getName()); + Map expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + + // update property + fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + + // remove property + fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + DeletePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.remove(CONFIG_KEY); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } - @Test (expected = RuntimeException.class) - public void testGetRewriterWithExceptoion() { - configMap.put("job.config.rewriters", "testNoneRewriter"); - ConfigUtil.rewriteConfig(new MapConfig(configMap)); + @Test(expected = SamzaException.class) + public void testApplyRewriterNoClassForConfigRewriterName() { + Map fullConfig = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); + ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); + } + + @Test(expected = SamzaException.class) + public void testApplyRewriterClassDoesNotExist() { + Map fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + "not_a_class"); + Config expectedConfig = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, NEW_CONFIG_KEY, CONFIG_VALUE)); + assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + } + + /** + * Adds a new config entry for the key {@link #NEW_CONFIG_KEY} which has the same value as {@link #CONFIG_KEY}. + */ + public static class NewPropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + ImmutableMap.Builder newConfigMapBuilder = new ImmutableMap.Builder<>(); + newConfigMapBuilder.putAll(config); + newConfigMapBuilder.put(NEW_CONFIG_KEY, config.get(CONFIG_KEY)); + return new MapConfig(newConfigMapBuilder.build()); + } + } + + /** + * If an entry at {@link #NEW_CONFIG_KEY} exists, overwrites it to be the value concatenated with itself. Otherwise, + * updates the entry at {@link #CONFIG_KEY} to be the value concatenated to itself. + */ + public static class UpdatePropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + Map newConfigMap = new HashMap<>(config); + if (config.containsKey(NEW_CONFIG_KEY)) { + // for testing overwriting of new configs + newConfigMap.put(NEW_CONFIG_KEY, config.get(NEW_CONFIG_KEY) + config.get(NEW_CONFIG_KEY)); + } else { + newConfigMap.put(CONFIG_KEY, config.get(CONFIG_KEY) + config.get(CONFIG_KEY)); + } + return new MapConfig(newConfigMap); + } } - public static class TestConfigRewriter implements ConfigRewriter { + /** + * Removes config entry for the key {@link #CONFIG_KEY} and {@link #NEW_CONFIG_KEY}. + */ + public static class DeletePropertyRewriter implements ConfigRewriter { @Override public Config rewrite(String name, Config config) { - Map configMap = new HashMap<>(config); - configMap.putAll(config.subset(String.format("job.config.rewriter.%s.", name))); - return new MapConfig(configMap); + Map newConfigMap = new HashMap<>(config); + newConfigMap.remove(CONFIG_KEY); + return new MapConfig(newConfigMap); } } } diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java index 4cb418b32c..4a331480c6 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -6,14 +6,9 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import com.google.common.collect.ImmutableMap; -import org.apache.samza.SamzaException; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigRewriter; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.junit.Test; @@ -31,12 +26,6 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({Util.NetworkingUtil.class}) public class TestUtil { - private static final String CONFIG_KEY = "config.key"; - private static final String CONFIG_VALUE = "value"; - private static final String NEW_CONFIG_KEY = "new.rewritten.config.key"; - private static final String REWRITER_NAME = "propertyRewriter"; - private static final String OTHER_REWRITER_NAME = "otherPropertyRewriter"; - @Test public void testEnvVarEscape() { // no special characters in original @@ -180,110 +169,6 @@ public void testGetLocalHostExternalAddressNotInet4Address() throws Exception { assertEquals(finalInetAddress, Util.getLocalHost()); } - @Test - public void testRewriteConfig() { - Map baseConfigMap = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); - - // no rewriters - Map fullConfig = new HashMap<>(baseConfigMap); - assertEquals(fullConfig, Util.rewriteConfig(new MapConfig(fullConfig))); - - // one rewriter - fullConfig = new HashMap<>(baseConfigMap); - fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); - fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); - Map expectedConfigMap = new HashMap<>(fullConfig); - expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); - - // only apply rewriters from rewriters list - fullConfig = new HashMap<>(baseConfigMap); - fullConfig.put(JobConfig.CONFIG_REWRITERS, OTHER_REWRITER_NAME); - fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); - fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), - UpdatePropertyRewriter.class.getName()); - expectedConfigMap = new HashMap<>(fullConfig); - expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); - - // two rewriters; second rewriter overwrites configs from first - fullConfig = new HashMap<>(baseConfigMap); - fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME + "," + OTHER_REWRITER_NAME); - fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); - fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), - UpdatePropertyRewriter.class.getName()); - expectedConfigMap = new HashMap<>(fullConfig); - expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.rewriteConfig(new MapConfig(fullConfig))); - } - - /** - * This fails because Util will interpret the empty string value as a single rewriter which has the empty string as a - * name, and there is no rewriter class config for a rewriter name which is the empty string. - * TODO: should this be fixed to interpret the empty string as an empty list? - */ - @Test(expected = SamzaException.class) - public void testRewriteConfigConfigRewritersEmptyString() { - Config config = new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, "")); - Util.rewriteConfig(config); - } - - @Test(expected = SamzaException.class) - public void testRewriteConfigNoClassForConfigRewriterName() { - Config config = - new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, "unknownRewriter")); - Util.rewriteConfig(config); - } - - @Test(expected = SamzaException.class) - public void testRewriteConfigRewriterClassDoesNotExist() { - Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, REWRITER_NAME, - String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class")); - Util.rewriteConfig(config); - } - - @Test - public void testApplyRewriter() { - // new property - Map fullConfig = - ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), - NewPropertyRewriter.class.getName()); - Map expectedConfigMap = new HashMap<>(fullConfig); - expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); - - // update property - fullConfig = - ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), - UpdatePropertyRewriter.class.getName()); - expectedConfigMap = new HashMap<>(fullConfig); - expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); - assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); - - // remove property - fullConfig = - ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), - DeletePropertyRewriter.class.getName()); - expectedConfigMap = new HashMap<>(fullConfig); - expectedConfigMap.remove(CONFIG_KEY); - assertEquals(new MapConfig(expectedConfigMap), Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); - } - - @Test(expected = SamzaException.class) - public void testApplyRewriterNoClassForConfigRewriterName() { - Map fullConfig = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); - Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); - } - - @Test(expected = SamzaException.class) - public void testApplyRewriterClassDoesNotExist() { - Map fullConfig = - ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), - "not_a_class"); - Config expectedConfig = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, NEW_CONFIG_KEY, CONFIG_VALUE)); - assertEquals(expectedConfig, Util.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); - } - /** * No requirement for this test that this extends any other class. Just need some placeholder class. */ @@ -295,47 +180,4 @@ public static class MyAppClass { */ public static class MyTaskClass { } - - /** - * Adds a new config entry for the key {@link #NEW_CONFIG_KEY} which has the same value as {@link #CONFIG_KEY}. - */ - public static class NewPropertyRewriter implements ConfigRewriter { - @Override - public Config rewrite(String name, Config config) { - ImmutableMap.Builder newConfigMapBuilder = new ImmutableMap.Builder<>(); - newConfigMapBuilder.putAll(config); - newConfigMapBuilder.put(NEW_CONFIG_KEY, config.get(CONFIG_KEY)); - return new MapConfig(newConfigMapBuilder.build()); - } - } - - /** - * If an entry at {@link #NEW_CONFIG_KEY} exists, overwrites it to be the value concatenated with itself. Otherwise, - * updates the entry at {@link #CONFIG_KEY} to be the value concatenated to itself. - */ - public static class UpdatePropertyRewriter implements ConfigRewriter { - @Override - public Config rewrite(String name, Config config) { - Map newConfigMap = new HashMap<>(config); - if (config.containsKey(NEW_CONFIG_KEY)) { - // for testing overwriting of new configs - newConfigMap.put(NEW_CONFIG_KEY, config.get(NEW_CONFIG_KEY) + config.get(NEW_CONFIG_KEY)); - } else { - newConfigMap.put(CONFIG_KEY, config.get(CONFIG_KEY) + config.get(CONFIG_KEY)); - } - return new MapConfig(newConfigMap); - } - } - - /** - * Removes config entry for the key {@link #CONFIG_KEY} and {@link #NEW_CONFIG_KEY}. - */ - public static class DeletePropertyRewriter implements ConfigRewriter { - @Override - public Config rewrite(String name, Config config) { - Map newConfigMap = new HashMap<>(config); - newConfigMap.remove(CONFIG_KEY); - return new MapConfig(newConfigMap); - } - } } From 9ade79e2263d1a44bfed7acc37aaf4cf580671df Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 19:53:54 -0800 Subject: [PATCH 7/9] fix license and build issues --- .../main/java/org/apache/samza/util/Util.java | 26 +++++++++++++++---- .../org/apache/samza/util/FileUtil.scala | 2 -- .../java/org/apache/samza/util/TestUtil.java | 18 +++++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/util/Util.java b/samza-core/src/main/java/org/apache/samza/util/Util.java index 7af1259d25..ac85b15007 100644 --- a/samza-core/src/main/java/org/apache/samza/util/Util.java +++ b/samza-core/src/main/java/org/apache/samza/util/Util.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.util; import java.net.Inet4Address; @@ -14,8 +32,6 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigRewriter; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +51,9 @@ public static String envVarEscape(String str) { public static String getSamzaVersion() { return Optional.ofNullable(Util.class.getPackage().getImplementationVersion()).orElseGet(() -> { - LOG.warn("Unable to find implementation samza version in jar's meta info. Defaulting to {}", FALLBACK_VERSION); - return FALLBACK_VERSION; - }); + LOG.warn("Unable to find implementation samza version in jar's meta info. Defaulting to {}", FALLBACK_VERSION); + return FALLBACK_VERSION; + }); } public static String getTaskClassVersion(Config config) { diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index f45f28e901..d416340686 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -25,8 +25,6 @@ import java.io._ import java.nio.file._ import java.util.zip.CRC32 -import org.apache.samza.util.Util.info - class FileUtil extends Logging { /** * Writes checksum & data to a file diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java index 4a331480c6..b96fb73e81 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.util; import java.net.Inet4Address; From 9f000618f5124a5c3e39e5f532df9298bb063a75 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Fri, 15 Nov 2019 20:07:51 -0800 Subject: [PATCH 8/9] remove NetworkingUtil wrapper since powermock works more cleanly with Java --- .../main/java/org/apache/samza/util/Util.java | 59 ++++--------------- .../java/org/apache/samza/util/TestUtil.java | 2 +- 2 files changed, 13 insertions(+), 48 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/util/Util.java b/samza-core/src/main/java/org/apache/samza/util/Util.java index ac85b15007..875b6da6d9 100644 --- a/samza-core/src/main/java/org/apache/samza/util/Util.java +++ b/samza-core/src/main/java/org/apache/samza/util/Util.java @@ -24,7 +24,6 @@ import java.net.SocketException; import java.net.UnknownHostException; import java.util.Collections; -import java.util.Enumeration; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -87,72 +86,38 @@ public static String getTaskClassVersion(Config config) { */ public static InetAddress getLocalHost() { try { - return doGetLocalHost(new NetworkingUtil()); + return doGetLocalHost(); } catch (Exception e) { throw new SamzaException("Error while getting localhost", e); } } - private static InetAddress doGetLocalHost(NetworkingUtil networkingUtil) - throws UnknownHostException, SocketException { - InetAddress localHost = networkingUtil.inetAddressGetLocalHost(); + private static InetAddress doGetLocalHost() throws UnknownHostException, SocketException { + InetAddress localHost = InetAddress.getLocalHost(); if (localHost.isLoopbackAddress()) { LOG.debug("Hostname {} resolves to a loopback address, trying to resolve an external IP address.", localHost.getHostName()); List networkInterfaces; if (System.getProperty("os.name").startsWith("Windows")) { - networkInterfaces = Collections.list(networkingUtil.networkInterfaceGetNetworkInterfaces()); + networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); } else { - networkInterfaces = Lists.reverse(Collections.list(networkingUtil.networkInterfaceGetNetworkInterfaces())); + networkInterfaces = Lists.reverse(Collections.list(NetworkInterface.getNetworkInterfaces())); } for (NetworkInterface networkInterface : networkInterfaces) { - List addresses = - Collections.list(networkingUtil.networkInterfaceGetInetAddresses(networkInterface)) - .stream() - .filter(address -> !(address.isLinkLocalAddress() || address.isLoopbackAddress())) - .collect(Collectors.toList()); + List addresses = Collections.list(networkInterface.getInetAddresses()) + .stream() + .filter(address -> !(address.isLinkLocalAddress() || address.isLoopbackAddress())) + .collect(Collectors.toList()); if (!addresses.isEmpty()) { InetAddress address = addresses.stream() .filter(addr -> addr instanceof Inet4Address) .findFirst() .orElseGet(() -> addresses.get(0)); - LOG.debug("Found an external IP address {} which represents the localhost.", - networkingUtil.inetAddressGetHostAddress(address)); - return networkingUtil.inetAddressGetByAddress(networkingUtil.inetAddressGetAddress(address)); + LOG.debug("Found an external IP address {} which represents the localhost.", address.getHostAddress()); + return InetAddress.getByAddress(address.getAddress()); } } } return localHost; } - - /** - * Do this so Powermockito can mock the system classes. - * Powermockito doesn't seem to work as well with Scala singletons. - * In Java, it seems like it will work to use Powermock without this wrapper. - */ - static class NetworkingUtil { - public InetAddress inetAddressGetLocalHost() throws UnknownHostException { - return InetAddress.getLocalHost(); - } - - public InetAddress inetAddressGetByAddress(byte[] address) throws UnknownHostException { - return InetAddress.getByAddress(address); - } - - public String inetAddressGetHostAddress(InetAddress inetAddress) { - return inetAddress.getHostAddress(); - } - - public byte[] inetAddressGetAddress(InetAddress inetAddress) { - return inetAddress.getAddress(); - } - - public Enumeration networkInterfaceGetNetworkInterfaces() throws SocketException { - return NetworkInterface.getNetworkInterfaces(); - } - - public Enumeration networkInterfaceGetInetAddresses(NetworkInterface networkInterface) { - return networkInterface.getInetAddresses(); - } - } -} +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java index b96fb73e81..2eb2e89833 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -42,7 +42,7 @@ @RunWith(PowerMockRunner.class) -@PrepareForTest({Util.NetworkingUtil.class}) +@PrepareForTest(Util.class) // need this to be able to use powermock with system classes like InetAddress public class TestUtil { @Test public void testEnvVarEscape() { From f7656f701c7fc2e861eda604e75ec5c83de338f9 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Mon, 18 Nov 2019 13:22:08 -0800 Subject: [PATCH 9/9] change Util.class access in index.scaml --- samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index 5c31ccacb6..7fe6305138 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -25,7 +25,7 @@ -@ val appMasterClasspath: String = scala.util.Properties.javaClassPath -@ val javaVmVersion: String = scala.util.Properties.javaVmVersion -@ val javaVmName: String = scala.util.Properties.javaVmName --@ val samzaVersion: String = org.apache.samza.util.Util.getClass.getPackage.getImplementationVersion +-@ val samzaVersion: String = classOf[org.apache.samza.util.Util].getPackage.getImplementationVersion - attributes("title") = jobName %div.col-xs-2.menu