diff --git a/pom.xml b/pom.xml index 8bc0ff6..b603354 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ dnsjava dnsjava - 3.0.2 + 3.4.2 com.google.guava @@ -57,7 +57,7 @@ org.slf4j slf4j-api - 1.7.30 + 1.7.32 junit @@ -242,9 +242,9 @@ maven-compiler-plugin 3.8.1 - 9 - 9 - 9 + 8 + 8 + 8 diff --git a/src/main/java/com/spotify/dns/CachingLookupFactory.java b/src/main/java/com/spotify/dns/CachingLookupFactory.java index 98656e1..2482186 100644 --- a/src/main/java/com/spotify/dns/CachingLookupFactory.java +++ b/src/main/java/com/spotify/dns/CachingLookupFactory.java @@ -23,11 +23,13 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.concurrent.ExecutionException; import org.xbill.DNS.Lookup; +import org.xbill.DNS.lookup.LookupSession; /** * Caches Lookup instances using a per-thread cache; this is so that different threads will never * get the same instance of Lookup. Lookup instances are not thread-safe. */ +@Deprecated class CachingLookupFactory implements LookupFactory { private final LookupFactory delegate; private final ThreadLocal> cacheHolder; @@ -51,4 +53,9 @@ public Lookup forName(final String fqdn) { throw new DnsException(e); } } + + @Override + public LookupSession sessionForName(String fqdn) { + throw new java.lang.UnsupportedOperationException("Session not supported with caching lookup"); + } } diff --git a/src/main/java/com/spotify/dns/DnsSrvResolver.java b/src/main/java/com/spotify/dns/DnsSrvResolver.java index 849d35d..b20623c 100644 --- a/src/main/java/com/spotify/dns/DnsSrvResolver.java +++ b/src/main/java/com/spotify/dns/DnsSrvResolver.java @@ -17,6 +17,7 @@ package com.spotify.dns; import java.util.List; +import java.util.concurrent.CompletionStage; /** * Contract for doing SRV lookups. @@ -25,10 +26,24 @@ public interface DnsSrvResolver { /** * Does a DNS SRV lookup for the supplied fully qualified domain name, and returns the * matching results. + * @deprecated + * This method is deprecated in favor of the asynchronous version. + * Use {@link DnsSrvResolver#resolveAsync(String)} instead * * @param fqdn a DNS name to query for * @return a possibly empty list of matching records * @throws DnsException if there was an error doing the DNS lookup */ + @Deprecated List resolve(String fqdn); + + /** + * Does a DNS SRV lookup for the supplied fully qualified domain name, and returns the + * matching results. + * + * @param fqdn a DNS name to query for + * @return a possibly empty list of matching records + * @throws DnsException if there was an error doing the DNS lookup + */ + CompletionStage> resolveAsync(String fqdn); } diff --git a/src/main/java/com/spotify/dns/DnsSrvResolvers.java b/src/main/java/com/spotify/dns/DnsSrvResolvers.java index c23f77a..2c20cba 100644 --- a/src/main/java/com/spotify/dns/DnsSrvResolvers.java +++ b/src/main/java/com/spotify/dns/DnsSrvResolvers.java @@ -23,6 +23,9 @@ import java.net.UnknownHostException; import java.time.Duration; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; + import org.xbill.DNS.ExtendedResolver; import org.xbill.DNS.Resolver; @@ -46,6 +49,7 @@ public static final class DnsSrvResolverBuilder { private final long dnsLookupTimeoutMillis; private final long retentionDurationMillis; private final List servers; + private final Executor executor; private DnsSrvResolverBuilder() { this(null, @@ -53,6 +57,7 @@ private DnsSrvResolverBuilder() { false, SECONDS.toMillis(DEFAULT_DNS_TIMEOUT_SECONDS), HOURS.toMillis(DEFAULT_RETENTION_DURATION_HOURS), + null, null); } @@ -62,13 +67,15 @@ private DnsSrvResolverBuilder( boolean cacheLookups, long dnsLookupTimeoutMillis, long retentionDurationMillis, - List servers) { + List servers, + Executor executor) { this.reporter = reporter; this.retainData = retainData; this.cacheLookups = cacheLookups; this.dnsLookupTimeoutMillis = dnsLookupTimeoutMillis; this.retentionDurationMillis = retentionDurationMillis; this.servers = servers; + this.executor = executor; } public DnsSrvResolver build() { @@ -79,7 +86,7 @@ public DnsSrvResolver build() { // or if that's empty, localhost. resolver = servers == null ? new ExtendedResolver() : - new ExtendedResolver(servers.toArray(new String[servers.size()])); + new ExtendedResolver(servers.toArray(new String[0])); } catch (UnknownHostException e) { throw new RuntimeException(e); } @@ -88,7 +95,8 @@ public DnsSrvResolver build() { final Duration timeoutDuration = Duration.ofMillis(dnsLookupTimeoutMillis); resolver.setTimeout(timeoutDuration); - LookupFactory lookupFactory = new SimpleLookupFactory(resolver); + LookupFactory lookupFactory = executor == null ? new SimpleLookupFactory(resolver, ForkJoinPool.commonPool()) : + new SimpleLookupFactory(resolver, executor); if (cacheLookups) { lookupFactory = new CachingLookupFactory(lookupFactory); @@ -109,27 +117,37 @@ public DnsSrvResolver build() { public DnsSrvResolverBuilder metered(DnsReporter reporter) { return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, - retentionDurationMillis, servers); + retentionDurationMillis, servers, executor); } public DnsSrvResolverBuilder retainingDataOnFailures(boolean retainData) { return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, - retentionDurationMillis, servers); + retentionDurationMillis, servers, executor); } + /** + * @deprecated + * CachingLookups will be removed in the future as it doesn't work with `resolveAsync` + */ + @Deprecated public DnsSrvResolverBuilder cachingLookups(boolean cacheLookups) { return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, - retentionDurationMillis, servers); + retentionDurationMillis, servers, executor); } public DnsSrvResolverBuilder dnsLookupTimeoutMillis(long dnsLookupTimeoutMillis) { return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, - retentionDurationMillis, servers); + retentionDurationMillis, servers, executor); } public DnsSrvResolverBuilder retentionDurationMillis(long retentionDurationMillis) { return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, - retentionDurationMillis, servers); + retentionDurationMillis, servers, executor); + } + + public DnsSrvResolverBuilder executor(Executor executor) { + return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, + retentionDurationMillis, servers, executor); } /** @@ -143,7 +161,7 @@ public DnsSrvResolverBuilder retentionDurationMillis(long retentionDurationMilli */ public DnsSrvResolverBuilder servers(List servers) { return new DnsSrvResolverBuilder(reporter, retainData, cacheLookups, dnsLookupTimeoutMillis, - retentionDurationMillis, servers); + retentionDurationMillis, servers, executor); } } diff --git a/src/main/java/com/spotify/dns/LookupFactory.java b/src/main/java/com/spotify/dns/LookupFactory.java index 946b24d..eec69dd 100644 --- a/src/main/java/com/spotify/dns/LookupFactory.java +++ b/src/main/java/com/spotify/dns/LookupFactory.java @@ -17,15 +17,28 @@ package com.spotify.dns; import org.xbill.DNS.Lookup; +import org.xbill.DNS.lookup.LookupSession; /** - * Library-internal interface used for finding or creating {@link Lookup} instances. + * Library-internal interface used for finding or creating {@link LookupSession} or {@link Lookup} instances. */ interface LookupFactory { /** * Returns a {@link Lookup} instance capable of doing SRV lookups for the supplied FQDN. + * @deprecated + * This synchronous method is being deprecated. + * Use {@link LookupFactory#sessionForName(String)} instead + * * @param fqdn the name to do lookups for * @return a Lookup instance */ + @Deprecated Lookup forName(String fqdn); + + /** + * Returns a {@link LookupSession} instance capable of doing SRV lookups for the supplied FQDN. + * @param fqdn the name to do lookups for + * @return a Lookup instance + */ + LookupSession sessionForName(String fqdn); } diff --git a/src/main/java/com/spotify/dns/MeteredDnsSrvResolver.java b/src/main/java/com/spotify/dns/MeteredDnsSrvResolver.java index 6fcf6aa..6ab8335 100644 --- a/src/main/java/com/spotify/dns/MeteredDnsSrvResolver.java +++ b/src/main/java/com/spotify/dns/MeteredDnsSrvResolver.java @@ -16,12 +16,14 @@ package com.spotify.dns; +import static com.google.common.base.Throwables.throwIfUnchecked; import static java.util.Objects.requireNonNull; import com.spotify.dns.statistics.DnsReporter; import com.spotify.dns.statistics.DnsTimingContext; import java.util.List; +import java.util.concurrent.CompletionStage; /** * Tracks metrics for DnsSrvResolver calls. @@ -35,28 +37,54 @@ class MeteredDnsSrvResolver implements DnsSrvResolver { this.reporter = requireNonNull(reporter, "reporter"); } + @Override + public List resolve(String fqdn) { + // Only catch and report RuntimeException to avoid Error's since that would + // most likely only aggravate any condition that causes them to be thrown. + + final DnsTimingContext resolveTimer = reporter.resolveTimer(); + + final List result; + + try { + result = delegate.resolve(fqdn); + } catch (RuntimeException error) { + reporter.reportFailure(error); + throw error; + } finally { + resolveTimer.stop(); + } + + if (result.isEmpty()) { + reporter.reportEmpty(); + } + + return result; + } + @Override - public List resolve(String fqdn) { + public CompletionStage> resolveAsync(String fqdn) { // Only catch and report RuntimeException to avoid Error's since that would // most likely only aggravate any condition that causes them to be thrown. final DnsTimingContext resolveTimer = reporter.resolveTimer(); - final List result; - - try { - result = delegate.resolve(fqdn); - } catch (RuntimeException error) { - reporter.reportFailure(error); - throw error; - } finally { - resolveTimer.stop(); - } - - if (result.isEmpty()) { - reporter.reportEmpty(); - } + return delegate + .resolveAsync(fqdn) + .handle( + (result, error) -> { + resolveTimer.stop(); + if (error == null) { + if (result.isEmpty()) { + reporter.reportEmpty(); + } - return result; + return result; + } else { + reporter.reportFailure(error); + throwIfUnchecked(error); + throw new RuntimeException(error); + } + }); } } diff --git a/src/main/java/com/spotify/dns/RetainingDnsSrvResolver.java b/src/main/java/com/spotify/dns/RetainingDnsSrvResolver.java index 653f3aa..f2a16c9 100644 --- a/src/main/java/com/spotify/dns/RetainingDnsSrvResolver.java +++ b/src/main/java/com/spotify/dns/RetainingDnsSrvResolver.java @@ -23,6 +23,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; /** @@ -70,4 +71,29 @@ public List resolve(final String fqdn) { throw new RuntimeException(e); } } + + @Override + public CompletionStage> resolveAsync(final String fqdn) { + requireNonNull(fqdn, "fqdn"); + return delegate.resolveAsync(fqdn).handle((nodes, e) -> { + if (e == null){ + // No nodes resolved? Return stale data. + if (nodes.isEmpty()) { + List cached = cache.getIfPresent(fqdn); + return (cached != null) ? cached : nodes; + } + + cache.put(fqdn, nodes); + + return nodes; + } else{ + if (cache.getIfPresent(fqdn) != null) { + return cache.getIfPresent(fqdn); + } + + throwIfUnchecked(e); + throw new RuntimeException(e); + } + }); + } } diff --git a/src/main/java/com/spotify/dns/ServiceResolvingChangeNotifier.java b/src/main/java/com/spotify/dns/ServiceResolvingChangeNotifier.java index a91d4a4..82fdfeb 100644 --- a/src/main/java/com/spotify/dns/ServiceResolvingChangeNotifier.java +++ b/src/main/java/com/spotify/dns/ServiceResolvingChangeNotifier.java @@ -19,7 +19,6 @@ import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableSet; -import java.util.List; import java.util.Set; import java.util.function.Function; import org.slf4j.Logger; @@ -53,7 +52,7 @@ class ServiceResolvingChangeNotifier extends AbstractChangeNotifier * and put into a set. The set will then be compared to the previous set and if a * change is detected, the notifier will fire. * - *

An optional {@link ErrorHandler} can be used to reacto on {@link DnsException}s thrown + *

An optional {@link ErrorHandler} can be used to react on {@link DnsException}s thrown * by the {@link DnsSrvResolver}. * * @param resolver The resolver to use. @@ -88,45 +87,42 @@ public void run() { return; } - final List nodes; - try { - nodes = resolver.resolve(fqdn); - } catch (DnsException e) { - if (errorHandler != null) { - errorHandler.handle(fqdn, e); + resolver.resolveAsync(fqdn).whenComplete((nodes, e) -> { + if (e instanceof DnsException) { + if (errorHandler != null) { + errorHandler.handle(fqdn, (DnsException) e); + } + log.error(e.getMessage(), e); + fireIfFirstError(); + } else if (e != null) { + log.error(e.getMessage(), e); + fireIfFirstError(); + } else { + final Set current; + try { + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (LookupResult node : nodes) { + T transformed = resultTransformer.apply(node); + builder.add(requireNonNull(transformed, "transformed")); + } + current = builder.build(); + } catch (Exception transformerException) { + log.error(transformerException.getMessage(), transformerException); + fireIfFirstError(); + return; + } + + if (ChangeNotifiers.isNoLongerInitial(current, records) || !current.equals(records)) { + // This means that any subsequent DNS error will be ignored and the existing result will be kept + waitingForFirstEvent = false; + final ChangeNotification changeNotification = + newChangeNotification(current, records); + records = current; + + fireRecordsUpdated(changeNotification); + } } - log.error(e.getMessage(), e); - fireIfFirstError(); - return; - } catch (Exception e) { - log.error(e.getMessage(), e); - fireIfFirstError(); - return; - } - - final Set current; - try { - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (LookupResult node : nodes) { - T transformed = resultTransformer.apply(node); - builder.add(requireNonNull(transformed, "transformed")); - } - current = builder.build(); - } catch (Exception e) { - log.error(e.getMessage(), e); - fireIfFirstError(); - return; - } - - if (ChangeNotifiers.isNoLongerInitial(current, records) || !current.equals(records)) { - // This means that any subsequent DNS error will be ignored and the existing result will be kept - waitingForFirstEvent = false; - final ChangeNotification changeNotification = - newChangeNotification(current, records); - records = current; - - fireRecordsUpdated(changeNotification); - } + }); } private void fireIfFirstError() { diff --git a/src/main/java/com/spotify/dns/SimpleLookupFactory.java b/src/main/java/com/spotify/dns/SimpleLookupFactory.java index 3954f02..533cc22 100644 --- a/src/main/java/com/spotify/dns/SimpleLookupFactory.java +++ b/src/main/java/com/spotify/dns/SimpleLookupFactory.java @@ -16,25 +16,50 @@ package com.spotify.dns; +import static java.util.Objects.requireNonNull; + import org.xbill.DNS.DClass; import org.xbill.DNS.Lookup; import org.xbill.DNS.Resolver; import org.xbill.DNS.TextParseException; import org.xbill.DNS.Type; +import org.xbill.DNS.lookup.LookupSession; -/** - * A LookupFactory that always returns new instances. - */ -public class SimpleLookupFactory implements LookupFactory { +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +/** A LookupFactory that always returns new instances. */ +public class SimpleLookupFactory implements LookupFactory { + private final LookupSession session; private final Resolver resolver; + /** + * @deprecated + * Deprecated to avoid overloading forkjoin common pool. + * Use {@link SimpleLookupFactory#SimpleLookupFactory(Executor)} instead. + */ + @Deprecated public SimpleLookupFactory() { - this(null); + this(Lookup.getDefaultResolver()); } + /** + * @deprecated + * Deprecated to avoid overloading forkjoin common pool. + * Use {@link SimpleLookupFactory#SimpleLookupFactory(Resolver, Executor)} instead. + */ public SimpleLookupFactory(Resolver resolver) { + this(resolver, ForkJoinPool.commonPool()); + } + + public SimpleLookupFactory(Executor executor) { + this(Lookup.getDefaultResolver(), executor); + } + + public SimpleLookupFactory(Resolver resolver, Executor executor) { + requireNonNull(executor); this.resolver = resolver; + this.session = LookupSession.builder().resolver(resolver).executor(executor).build(); } @Override @@ -49,4 +74,9 @@ public Lookup forName(String fqdn) { throw new DnsException("unable to create lookup for name: " + fqdn, e); } } + + @Override + public LookupSession sessionForName(String fqdn) { + return session; + } } diff --git a/src/main/java/com/spotify/dns/XBillDnsSrvResolver.java b/src/main/java/com/spotify/dns/XBillDnsSrvResolver.java index ccc5f83..b874cc1 100644 --- a/src/main/java/com/spotify/dns/XBillDnsSrvResolver.java +++ b/src/main/java/com/spotify/dns/XBillDnsSrvResolver.java @@ -22,15 +22,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xbill.DNS.DClass; import org.xbill.DNS.Lookup; +import org.xbill.DNS.Name; import org.xbill.DNS.Record; import org.xbill.DNS.SRVRecord; +import org.xbill.DNS.TextParseException; +import org.xbill.DNS.Type; +import org.xbill.DNS.lookup.LookupSession; +import org.xbill.DNS.lookup.NoSuchDomainException; +import org.xbill.DNS.lookup.NoSuchRRSetException; import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; /** - * A DnsSrvResolver implementation that uses the dnsjava implementation from xbill.org: - * http://www.xbill.org/dnsjava/ + * A DnsSrvResolver implementation that uses the dnsjava implementation: + * https://github.com/dnsjava/dnsjava */ class XBillDnsSrvResolver implements DnsSrvResolver { private static final Logger LOG = LoggerFactory.getLogger(XBillDnsSrvResolver.class); @@ -53,27 +62,73 @@ public List resolve(final String fqdn) { // fallthrough case Lookup.TYPE_NOT_FOUND: LOG.warn("No results returned for query '{}'; result from XBill: {} - {}", - fqdn, lookup.getResult(), lookup.getErrorString()); + fqdn, lookup.getResult(), lookup.getErrorString()); return ImmutableList.of(); default: throw new DnsException( - String.format("Lookup of '%s' failed with code: %d - %s ", - fqdn, lookup.getResult(), lookup.getErrorString())); + String.format("Lookup of '%s' failed with code: %d - %s ", + fqdn, lookup.getResult(), lookup.getErrorString())); } } + @Override + public CompletionStage> resolveAsync(final String fqdn) { + LookupSession lookup = lookupFactory.sessionForName(fqdn); + Name name; + try { + name = Name.fromString(fqdn); + } catch (TextParseException e) { + throw new DnsException("unable to create lookup for name: " + fqdn, e); + } + + return lookup.lookupAsync(name, Type.SRV, DClass.IN).handle((result, ex) ->{ + if (ex == null){ + return toLookupResults(result); + } else{ + Throwable cause = ex; + if (ex instanceof CompletionException && ex.getCause() != null) { + cause = ex.getCause(); + } + if (cause instanceof NoSuchRRSetException || cause instanceof NoSuchDomainException) { + LOG.warn("No results returned for query '{}'; result from dnsjava: {}", + fqdn, ex.getMessage()); + return ImmutableList.of(); + } + throw new DnsException( + String.format("Lookup of '%s' failed: %s ", fqdn, ex.getMessage()), ex); + } + }); + } + + private static List toLookupResults(org.xbill.DNS.lookup.LookupResult queryResult) { + ImmutableList.Builder builder = ImmutableList.builder(); + + for (Record record: queryResult.getRecords()) { + if (record instanceof SRVRecord) { + SRVRecord srvRecord = (SRVRecord) record; + builder.add(LookupResult.create(srvRecord.getTarget().toString(), + srvRecord.getPort(), + srvRecord.getPriority(), + srvRecord.getWeight(), + srvRecord.getTTL())); + } + } + + return builder.build(); + } + private static List toLookupResults(Record[] queryResult) { ImmutableList.Builder builder = ImmutableList.builder(); if (queryResult != null) { - for (Record record: queryResult) { + for (Record record : queryResult) { if (record instanceof SRVRecord) { SRVRecord srvRecord = (SRVRecord) record; builder.add(LookupResult.create(srvRecord.getTarget().toString(), - srvRecord.getPort(), - srvRecord.getPriority(), - srvRecord.getWeight(), - srvRecord.getTTL())); + srvRecord.getPort(), + srvRecord.getPriority(), + srvRecord.getWeight(), + srvRecord.getTTL())); } } } diff --git a/src/test/java/com/spotify/dns/DnsLookupPerformanceTest.java b/src/test/java/com/spotify/dns/DnsLookupPerformanceTest.java new file mode 100644 index 0000000..fad3c90 --- /dev/null +++ b/src/test/java/com/spotify/dns/DnsLookupPerformanceTest.java @@ -0,0 +1,100 @@ +package com.spotify.dns; + +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class DnsLookupPerformanceTest { + private static AtomicInteger successCount = new AtomicInteger(0); + + private static DnsSrvResolver resolver = DnsSrvResolvers.newBuilder() + .cachingLookups(false) + .retainingDataOnFailures(false) + .dnsLookupTimeoutMillis(5000) + .executor(Executors.newFixedThreadPool(10)) + .build(); + + @Test + @Ignore("Needs network access and is timing dependent") + public void runTest() throws InterruptedException { + int numThreads = 3; + final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List records = List.of( + "_spotify-noop._http.services.gew1.spotify.net.", + "_spotify-noop._http.services.guc3.spotify.net.", + "_spotify-noop._http.services.gae2.spotify.net.", + "_spotify-palindrome._grpc.services.gae2.spotify.net.", + "_spotify-palindrome._grpc.services.gew1.spotify.net.", + "_spotify-concat._grpc.services.gew1.spotify.net.", + "_spotify-concat._grpc.services.guc3.spotify.net.", + "_spotify-concat._hm.services.gae2.spotify.net.", + "_spotify-concat._hm.services.gew1.spotify.net.", + "_spotify-concat._hm.services.guc3.spotify.net.", + "_spotify-fabric-test._grpc.services.gae2.spotify.net.", + "_spotify-fabric-test._grpc.services.gew1.spotify.net.", + "_spotify-fabric-test._grpc.services.guc3.spotify.net.", + "_spotify-fabric-test._hm.services.gae2.spotify.net.", + "_spotify-fabric-test._hm.services.gew1.spotify.net.", + "_spotify-fabric-test._hm.services.guc3.spotify.net.", + "_spotify-fabric-load-generator._grpc.services.gae2.spotify.net.", + "_spotify-fabric-load-generator._grpc.services.gew1.spotify.net.", + "_spotify-fabric-load-generator._grpc.services.guc3.spotify.net.", + "_spotify-client._tcp.spotify.com"); + + CountDownLatch done = new CountDownLatch(records.size() * 2); + records.stream() + .forEach( + fqdn -> { + executorService.submit(() -> resolve(fqdn, done)); + CompletableFuture.runAsync(DnsLookupPerformanceTest::blockCommonPool) + .whenComplete((v, ex) -> done.countDown()); + }); + done.await(1, TimeUnit.MINUTES); + executorService.shutdown(); + + int failureCount = records.size() - successCount.get(); + + System.out.println("Number of threads: " + numThreads); + System.out.println("Number of records: " + records.size()); + System.out.println("Failed lookups: " + failureCount); + + assertThat(failureCount, equalTo(0)); + } + + private static void blockCommonPool() { + try { + Thread.sleep(10_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private static void resolve(String fqdn, CountDownLatch done) { + try { + System.out.println("Resolving: " + fqdn); + List results = resolver.resolveAsync(fqdn).toCompletableFuture().get(); + + if(!results.isEmpty()) { + successCount.incrementAndGet(); + System.out.println(fqdn + "...ok!"); + } else { + System.err.format("%s ... failed!\n", fqdn); + } + } catch (Exception e) { + System.err.format("%s ... failed!\n", fqdn); + e.printStackTrace(System.err); + } finally { + done.countDown(); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/spotify/dns/DnsSrvResolversIT.java b/src/test/java/com/spotify/dns/DnsSrvResolversIT.java index aa00966..919db52 100644 --- a/src/test/java/com/spotify/dns/DnsSrvResolversIT.java +++ b/src/test/java/com/spotify/dns/DnsSrvResolversIT.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.hamcrest.Matchers; import org.junit.Before; @@ -54,8 +55,9 @@ public void setUp() { } @Test - public void shouldReturnResultsForValidQuery() { + public void shouldReturnResultsForValidQuery() throws ExecutionException, InterruptedException { assertThat(resolver.resolve("_spotify-client._tcp.spotify.com").isEmpty(), is(false)); + assertThat(resolver.resolveAsync("_spotify-client._tcp.spotify.com").toCompletableFuture().get().isEmpty(), is(false)); } @Test @@ -79,7 +81,7 @@ public void testCorrectSequenceOfNotifications() { } @Test - public void shouldTrackMetricsWhenToldTo() { + public void shouldTrackMetricsWhenToldTo() throws ExecutionException, InterruptedException { final DnsReporter reporter = mock(DnsReporter.class); final DnsTimingContext timingReporter = mock(DnsTimingContext.class); @@ -88,12 +90,22 @@ public void shouldTrackMetricsWhenToldTo() { .build(); when(reporter.resolveTimer()).thenReturn(timingReporter); - resolver.resolve("_spotify-client._tcp.sto.spotify.net"); + resolver.resolveAsync("_spotify-client._tcp.sto.spotify.net").toCompletableFuture().get(); verify(timingReporter).stop(); verify(reporter, never()).reportFailure(isA(RuntimeException.class)); verify(reporter, times(1)).reportEmpty(); } + @Test + public void shouldFailForBadHostNamesAsync() throws Exception { + try { + resolver.resolveAsync("nonexistenthost").toCompletableFuture().get(); + } + catch (DnsException e) { + assertThat(e.getMessage(), containsString("host not found")); + } + } + @Test public void shouldFailForBadHostNames() { try { @@ -112,6 +124,7 @@ public void shouldReturnResultsUsingSpecifiedServers() throws Exception { .servers(List.of(server)) .build(); assertThat(resolver.resolve("_spotify-client._tcp.spotify.com").isEmpty(), is(false)); + assertThat(resolver.resolveAsync("_spotify-client._tcp.spotify.com").toCompletableFuture().get().isEmpty(), is(false)); } @Test diff --git a/src/test/java/com/spotify/dns/DnsSrvWatchersTest.java b/src/test/java/com/spotify/dns/DnsSrvWatchersTest.java index d4a440e..5965e60 100644 --- a/src/test/java/com/spotify/dns/DnsSrvWatchersTest.java +++ b/src/test/java/com/spotify/dns/DnsSrvWatchersTest.java @@ -6,10 +6,13 @@ import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; +import org.xbill.DNS.lookup.NoSuchDomainException; public class DnsSrvWatchersTest { @@ -82,5 +85,14 @@ public List resolve(String fqdn) { return null; } } + + @Override + public CompletionStage> resolveAsync(String fqdn) { + if (this.fqdn.equals(fqdn)) { + return CompletableFuture.completedFuture(List.of(result)); + } else { + return CompletableFuture.failedFuture(new DnsException(this.fqdn + " != " + fqdn)); + } + } } } diff --git a/src/test/java/com/spotify/dns/MeteredDnsSrvResolverTest.java b/src/test/java/com/spotify/dns/MeteredDnsSrvResolverTest.java index 4986201..acc582e 100644 --- a/src/test/java/com/spotify/dns/MeteredDnsSrvResolverTest.java +++ b/src/test/java/com/spotify/dns/MeteredDnsSrvResolverTest.java @@ -26,6 +26,9 @@ import com.spotify.dns.statistics.DnsReporter; import com.spotify.dns.statistics.DnsTimingContext; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -79,6 +82,17 @@ public void shouldCountSuccessful() throws Exception { verify(reporter, never()).reportFailure(RUNTIME_EXCEPTION); } + @Test + public void shouldCountSuccessfulAsync() throws Exception { + CompletableFuture> completedNotEmpty = CompletableFuture.completedFuture(NOT_EMPTY); + when(delegate.resolveAsync(FQDN)).thenReturn(completedNotEmpty); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + verify(reporter, never()).reportEmpty(); + verify(reporter, never()).reportFailure(RUNTIME_EXCEPTION); + } + @Test public void shouldReportEmpty() throws Exception { when(delegate.resolve(FQDN)).thenReturn(EMPTY); @@ -89,6 +103,17 @@ public void shouldReportEmpty() throws Exception { verify(reporter, never()).reportFailure(RUNTIME_EXCEPTION); } + @Test + public void shouldReportEmptyAsync() throws Exception { + CompletableFuture> completedEmpty = CompletableFuture.completedFuture(EMPTY); + when(delegate.resolveAsync(FQDN)).thenReturn(completedEmpty); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + verify(reporter).reportEmpty(); + verify(reporter, never()).reportFailure(RUNTIME_EXCEPTION); + } + @Test public void shouldReportRuntimeException() throws Exception { when(delegate.resolve(FQDN)).thenThrow(RUNTIME_EXCEPTION); @@ -104,6 +129,21 @@ public void shouldReportRuntimeException() throws Exception { verify(reporter).reportFailure(RUNTIME_EXCEPTION); } + @Test + public void shouldReportRuntimeExceptionAsync() throws Exception { + when(delegate.resolveAsync(FQDN)).thenReturn(CompletableFuture.failedFuture((RUNTIME_EXCEPTION))); + + try { + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + fail("resolve should have thrown exception"); + } catch(ExecutionException e) { + assertEquals(RUNTIME_EXCEPTION, e.getCause()); + } + + verify(reporter, never()).reportEmpty(); + verify(reporter).reportFailure(RUNTIME_EXCEPTION); + } + @Test public void shouldNotReportError() throws Exception { when(delegate.resolve(FQDN)).thenThrow(ERROR); @@ -118,4 +158,19 @@ public void shouldNotReportError() throws Exception { verify(reporter, never()).reportEmpty(); verify(reporter, never()).reportFailure(RUNTIME_EXCEPTION); } + + @Test + public void shouldNotReportErrorAsync() throws Exception { + when(delegate.resolveAsync(FQDN)).thenReturn(CompletableFuture.failedFuture(ERROR)); + + try { + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + fail("resolve should have thrown exception"); + } catch(ExecutionException e) { + assertEquals(ERROR, e.getCause()); + } + + verify(reporter, never()).reportEmpty(); + verify(reporter, never()).reportFailure(RUNTIME_EXCEPTION); + } } diff --git a/src/test/java/com/spotify/dns/RetainingDnsSrvResolverTest.java b/src/test/java/com/spotify/dns/RetainingDnsSrvResolverTest.java index 0f0ef59..fb245ab 100644 --- a/src/test/java/com/spotify/dns/RetainingDnsSrvResolverTest.java +++ b/src/test/java/com/spotify/dns/RetainingDnsSrvResolverTest.java @@ -24,6 +24,9 @@ import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -60,6 +63,13 @@ public void shouldReturnResultsFromDelegate() { assertThat(resolver.resolve(FQDN), equalTo(nodes1)); } + @Test + public void shouldReturnResultsFromDelegateAsync() throws ExecutionException, InterruptedException { + when(delegate.resolveAsync(FQDN)).thenReturn(CompletableFuture.completedFuture(nodes1)); + + assertThat(resolver.resolveAsync(FQDN).toCompletableFuture().get(), equalTo(nodes1)); + } + @Test public void shouldReturnResultsFromDelegateEachTime() { when(delegate.resolve(FQDN)).thenReturn(nodes1).thenReturn(nodes2); @@ -69,6 +79,17 @@ public void shouldReturnResultsFromDelegateEachTime() { assertThat(resolver.resolve(FQDN), equalTo(nodes2)); } + @Test + public void shouldReturnResultsFromDelegateEachTimeAsync() throws ExecutionException, InterruptedException { + when(delegate.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(nodes1)) + .thenReturn(CompletableFuture.completedFuture(nodes2)); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + assertThat(resolver.resolveAsync(FQDN).toCompletableFuture().get(), equalTo(nodes2)); + } + @Test public void shouldRetainDataIfNewResultEmpty() { when(delegate.resolve(FQDN)).thenReturn(nodes1).thenReturn(nodes()); @@ -78,17 +99,39 @@ public void shouldRetainDataIfNewResultEmpty() { assertThat(resolver.resolve(FQDN), equalTo(nodes1)); } + @Test + public void shouldRetainDataIfNewResultEmptyAsync() throws ExecutionException, InterruptedException { + when(delegate.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(nodes1)) + .thenReturn(CompletableFuture.completedFuture(nodes())); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + assertThat(resolver.resolveAsync(FQDN).toCompletableFuture().get(), equalTo(nodes1)); + } + @Test public void shouldRetainDataOnFailure() { when(delegate.resolve(FQDN)) - .thenReturn(nodes1) - .thenThrow(new DnsException("expected")); + .thenReturn(nodes1) + .thenThrow(new DnsException("expected")); resolver.resolve(FQDN); assertThat(resolver.resolve(FQDN), equalTo(nodes1)); } + @Test + public void shouldRetainDataOnFailureAsync() throws ExecutionException, InterruptedException { + when(delegate.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(nodes1)) + .thenReturn(CompletableFuture.failedFuture(new DnsException("expected"))); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + assertThat(resolver.resolveAsync(FQDN).toCompletableFuture().get(), equalTo(nodes1)); + } + @Test public void shouldThrowOnFailureAndNoDataAvailable() { when(delegate.resolve(FQDN)).thenThrow(new DnsException("expected")); @@ -99,6 +142,17 @@ public void shouldThrowOnFailureAndNoDataAvailable() { resolver.resolve(FQDN); } + @Test + public void shouldThrowOnFailureAndNoDataAvailableAsync() throws ExecutionException, InterruptedException { + DnsException cause = new DnsException("expected"); + when(delegate.resolveAsync(FQDN)).thenReturn(CompletableFuture.failedFuture(cause)); + + thrown.expect(ExecutionException.class); + thrown.expectCause(is(cause)); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + } + @Test public void shouldReturnEmptyOnEmptyAndNoDataAvailable() { when(delegate.resolve(FQDN)).thenReturn(nodes()); @@ -106,11 +160,18 @@ public void shouldReturnEmptyOnEmptyAndNoDataAvailable() { assertThat(resolver.resolve(FQDN).isEmpty(), is(true)); } + @Test + public void shouldReturnEmptyOnEmptyAndNoDataAvailableAsync() throws ExecutionException, InterruptedException { + when(delegate.resolveAsync(FQDN)).thenReturn(CompletableFuture.completedFuture(nodes())); + + assertThat(resolver.resolveAsync(FQDN).toCompletableFuture().get().isEmpty(), is(true)); + } + @Test public void shouldNotStoreEmptyResults() { when(delegate.resolve(FQDN)) - .thenReturn(nodes()) - .thenThrow(new DnsException("expected")); + .thenReturn(nodes()) + .thenThrow(new DnsException("expected")); resolver.resolve(FQDN); @@ -120,11 +181,26 @@ public void shouldNotStoreEmptyResults() { resolver.resolve(FQDN); } + @Test + public void shouldNotStoreEmptyResultsAsync() throws ExecutionException, InterruptedException { + DnsException cause = new DnsException("expected"); + when(delegate.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(nodes())) + .thenReturn(CompletableFuture.failedFuture(cause)); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + thrown.expect(ExecutionException.class); + thrown.expectCause(is(cause)); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + } + @Test public void shouldNotRetainPastEndOfRetentionOnEmptyResults() throws Exception { when(delegate.resolve(FQDN)) - .thenReturn(nodes("aresult")) - .thenReturn(nodes()); + .thenReturn(nodes("aresult")) + .thenReturn(nodes()); resolver.resolve(FQDN); @@ -134,12 +210,26 @@ public void shouldNotRetainPastEndOfRetentionOnEmptyResults() throws Exception { assertThat(resolver.resolve(FQDN).isEmpty(), is(true)); } + @Test + public void shouldNotRetainPastEndOfRetentionOnEmptyResultsAsync() throws Exception { + when(delegate.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(nodes("aresult"))) + .thenReturn(CompletableFuture.completedFuture(nodes())); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + // expire retained entry + Thread.sleep(RETENTION_TIME_MILLIS); + + assertThat(resolver.resolveAsync(FQDN).toCompletableFuture().get().isEmpty(), is(true)); + } + @Test public void shouldNotRetainPastEndOfRetentionOnException() throws Exception { DnsException expected = new DnsException("expected"); when(delegate.resolve(FQDN)) - .thenReturn(nodes("aresult")) - .thenThrow(expected); + .thenReturn(nodes("aresult")) + .thenThrow(expected); resolver.resolve(FQDN); @@ -151,6 +241,23 @@ public void shouldNotRetainPastEndOfRetentionOnException() throws Exception { resolver.resolve(FQDN); } + @Test + public void shouldNotRetainPastEndOfRetentionOnExceptionAsync() throws Exception { + DnsException expected = new DnsException("expected"); + when(delegate.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(nodes("aresult"))) + .thenReturn(CompletableFuture.failedFuture(expected)); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + + // expire retained entry + Thread.sleep(RETENTION_TIME_MILLIS); + + thrown.expectCause(equalTo(expected)); + + resolver.resolveAsync(FQDN).toCompletableFuture().get(); + } + @Test public void shouldThrowIfRetentionNegative() { thrown.expect(IllegalArgumentException.class); diff --git a/src/test/java/com/spotify/dns/ServiceResolvingChangeNotifierTest.java b/src/test/java/com/spotify/dns/ServiceResolvingChangeNotifierTest.java index 2044c45..9e01fc7 100644 --- a/src/test/java/com/spotify/dns/ServiceResolvingChangeNotifierTest.java +++ b/src/test/java/com/spotify/dns/ServiceResolvingChangeNotifierTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.junit.Before; import org.junit.Test; @@ -61,8 +62,9 @@ public void shouldCallListenerOnChange() { LookupResult result1 = result("host", 1234); LookupResult result2 = result("host", 4321); - when(resolver.resolve(FQDN)) - .thenReturn(of(result1), of(result1, result2)); + when(resolver.resolve(FQDN)).thenReturn(of(result1), of(result1, result2)); + when(resolver.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(of(result1)), CompletableFuture.completedFuture(of(result1, result2))); sut.run(); sut.run(); @@ -94,7 +96,9 @@ public void shouldCallListenerOnSet() { LookupResult result = result("host", 1234); when(resolver.resolve(FQDN)) - .thenReturn(of(result)); + .thenReturn(of(result)); + when(resolver.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(of(result))); sut.run(); sut.setListener(listener, true); @@ -118,7 +122,9 @@ public void shouldReturnImmutableSets() { LookupResult result1 = result("host", 1234); LookupResult result2 = result("host", 4321); when(resolver.resolve(FQDN)) - .thenReturn(of(result1), of(result1, result2)); + .thenReturn(of(result1), of(result1, result2)); + when(resolver.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(of(result1)), CompletableFuture.completedFuture(of(result1, result2))); sut.run(); sut.setListener(listener, true); @@ -152,7 +158,9 @@ public void shouldOnlyChangeIfTransformedValuesChange() { LookupResult result1 = result("host", 1234); LookupResult result2 = result("host", 4321); when(resolver.resolve(FQDN)) - .thenReturn(of(result1), of(result1, result2)); + .thenReturn(of(result1), of(result1, result2)); + when(resolver.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(of(result1)), CompletableFuture.completedFuture(of(result1, result2))); sut.run(); sut.run(); @@ -189,10 +197,15 @@ public void shouldDoSomethingWithNulls() { ChangeNotifier.Listener listener = mock(ChangeNotifier.Listener.class); when(resolver.resolve(FQDN)) - .thenReturn(of( + .thenReturn(of( + result("host1", 1234), + result("host2", 1234), + result("host3", 1234))); + when(resolver.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.completedFuture(of( result("host1", 1234), result("host2", 1234), - result("host3", 1234))); + result("host3", 1234)))); when(f.apply(any(LookupResult.class))) .thenReturn("foo", null, "bar"); @@ -213,7 +226,9 @@ public void shouldCallErrorHandlerOnResolveErrors() { DnsException exception = new DnsException("something wrong"); when(resolver.resolve(FQDN)) - .thenThrow(exception); + .thenThrow(exception); + when(resolver.resolveAsync(FQDN)) + .thenReturn(CompletableFuture.failedFuture(exception)); sut.setListener(listener, false); sut.run(); diff --git a/src/test/java/com/spotify/dns/SimpleLookupFactoryTest.java b/src/test/java/com/spotify/dns/SimpleLookupFactoryTest.java index 1630e3a..a1af511 100644 --- a/src/test/java/com/spotify/dns/SimpleLookupFactoryTest.java +++ b/src/test/java/com/spotify/dns/SimpleLookupFactoryTest.java @@ -27,6 +27,9 @@ import org.junit.rules.ExpectedException; import org.xbill.DNS.Lookup; import org.xbill.DNS.TextParseException; +import org.xbill.DNS.lookup.LookupSession; + +import java.util.concurrent.ForkJoinPool; public class SimpleLookupFactoryTest { @@ -37,7 +40,7 @@ public class SimpleLookupFactoryTest { @Before public void setUp() { - factory = new SimpleLookupFactory(); + factory = new SimpleLookupFactory(ForkJoinPool.commonPool()); } @Test @@ -46,7 +49,12 @@ public void shouldCreateLookups() { } @Test - public void shouldCreateNewLookupsEachTime() { + public void shouldCreateLookupSession() { + assertThat(factory.sessionForName("some.domain."), is(notNullValue())); + } + + @Test + public void shouldNotCreateNewLookupsEachTime() { Lookup first = factory.forName("some.other.name."); Lookup second = factory.forName("some.other.name."); @@ -54,10 +62,10 @@ public void shouldCreateNewLookupsEachTime() { } @Test - public void shouldRethrowXBillExceptions() { - thrown.expect(DnsException.class); - thrown.expectCause(isA(TextParseException.class)); + public void shouldCreateNewLookupSessionEachTime() { + LookupSession firstSession = factory.sessionForName("some.other.name."); + LookupSession secondSession = factory.sessionForName("some.other.name."); - factory.forName("bad\\1 name"); + assertThat(firstSession == secondSession, is(true)); } } diff --git a/src/test/java/com/spotify/dns/examples/BasicUsage.java b/src/test/java/com/spotify/dns/examples/BasicUsage.java index 07636cc..b8728f6 100644 --- a/src/test/java/com/spotify/dns/examples/BasicUsage.java +++ b/src/test/java/com/spotify/dns/examples/BasicUsage.java @@ -16,7 +16,6 @@ package com.spotify.dns.examples; -import com.spotify.dns.DnsException; import com.spotify.dns.DnsSrvResolver; import com.spotify.dns.DnsSrvResolvers; import com.spotify.dns.LookupResult; @@ -25,7 +24,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.util.List; public final class BasicUsage { @@ -49,15 +47,15 @@ public static void main(String[] args) throws IOException { if (line == null || line.isEmpty()) { quit = true; } else { - try { - List nodes = resolver.resolve(line); - - for (LookupResult node : nodes) { - System.out.println(node); + resolver.resolveAsync(line).whenComplete((nodes, e) -> { + if (e == null) { + for (LookupResult node : nodes) { + System.out.println(node); + } + } else { + e.printStackTrace(System.out); } - } catch (DnsException e) { - e.printStackTrace(System.out); - } + }); } } }