Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
* {@link Listener} is responsible for eventually (after an appropriate backoff period) invoking
* {@link #refresh()}.
*
* <p>Implementations <strong>don't need to be thread-safe</strong>. All methods are guaranteed to
* be called sequentially. Additionally, all methods that have side-effects, i.e., {@link #start},
* {@link #shutdown} and {@link #refresh} are called from the same {@link SynchronizationContext} as
* returned by {@link Helper#getSynchronizationContext}.
*
* @since 1.0.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@ThreadSafe
public abstract class NameResolver {
/**
* Returns the authority used to authenticate connections to servers. It <strong>must</strong> be
Expand Down Expand Up @@ -209,18 +213,34 @@ void onAddresses(

/**
* A utility object passed to {@link Factory#newNameResolver(URI, NameResolver.Helper)}.
*
* @since 1.19.0
*/
public abstract static class Helper {
/**
* The port number used in case the target or the underlying naming system doesn't provide a
* port number.
*
* @since 1.19.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Independently of this PR, you should probably backport this to v1.19.x

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure: #5367

*/
public abstract int getDefaultPort();

/**
* If the NameResolver wants to support proxy, it should inquire this {@link ProxyDetector}.
* See documentation on {@link ProxyDetector} about how proxies work in gRPC.
*
* @since 1.19.0
*/
public abstract ProxyDetector getProxyDetector();

/**
* Returns the {@link SynchronizationContext} where {@link #start}, {@link #shutdown} and {@link
* #refresh} are run from.
*
* @since 1.20.0
*/
public SynchronizationContext getSynchronizationContext() {
throw new UnsupportedOperationException("Not implemented");
}
}
}
131 changes: 65 additions & 66 deletions core/src/main/java/io/grpc/internal/DnsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.ProxiedSocketAddress;
import io.grpc.ProxyDetector;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -52,7 +53,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
* A DNS-based {@link NameResolver}.
Expand Down Expand Up @@ -138,19 +138,23 @@ final class DnsNameResolver extends NameResolver {
private final String host;
private final int port;
private final Resource<Executor> executorResource;
@GuardedBy("this")
private final long cacheTtlNanos;
private final SynchronizationContext syncContext;

// Following fields must be accessed from syncContext
private final Stopwatch stopwatch;
private ResolutionResults cachedResolutionResults;
private boolean shutdown;
@GuardedBy("this")
private Executor executor;
@GuardedBy("this")
private boolean resolving;
@GuardedBy("this")
private Listener listener;

private final Runnable resolveRunnable;
// The field must be accessed from syncContext, although the methods on a Listener can be called
// from any thread.
private Listener listener;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this can be accessed outside of the syncContext; it's only changed in start().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field will only be accessed from syncContext, while it can be called from any thread. I have made it more clear in the comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that listener can actually be accessed from any thread. It doesn't change.


DnsNameResolver(@Nullable String nsAuthority, String name, Helper helper,
Resource<Executor> executorResource, Stopwatch stopwatch, boolean isAndroid) {
Preconditions.checkNotNull(helper, "helper");
// TODO: if a DNS server is provided as nsAuthority, use it.
// https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
this.executorResource = executorResource;
Expand All @@ -167,81 +171,65 @@ final class DnsNameResolver extends NameResolver {
port = nameUri.getPort();
}
this.proxyDetector = Preconditions.checkNotNull(helper.getProxyDetector(), "proxyDetector");
this.resolveRunnable = new Resolve(this, stopwatch, getNetworkAddressCacheTtlNanos(isAndroid));
this.cacheTtlNanos = getNetworkAddressCacheTtlNanos(isAndroid);
this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch");
this.syncContext =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little uncomfortable to me. It would be better if the ctor was dumber, and the DNRP constructed the dependencies for this class, rather than DNR constructing its own. Same with proxyDetector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you are suggesting. Are you suggesting instead of passing Helper to the ctor, passing the defaultPort, proxyDetector and syncContext? I don't see much benefit of it rather than a longer argument list. There will be more stuff on Helper that DNR needs to look at. Flatting them out doesn't seem favorable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new form was much clearer for me, because they are the same for every resolver. Having them in the resolver is harder to reason about because I then have to check if they change. I'd actually prefer if Listener was removed from Resolver as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay then, I think helper needs a checkNotNull too, then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Preconditions.checkNotNull(helper.getSynchronizationContext(), "syncContext");
}

@Override
public final String getServiceAuthority() {
public String getServiceAuthority() {
return authority;
}

@Override
public final synchronized void start(Listener listener) {
public void start(Listener listener) {
Preconditions.checkState(this.listener == null, "already started");
executor = SharedResourceHolder.get(executorResource);
this.listener = Preconditions.checkNotNull(listener, "listener");
resolve();
}

@Override
public final synchronized void refresh() {
public void refresh() {
Preconditions.checkState(listener != null, "not started");
resolve();
}

@VisibleForTesting
static final class Resolve implements Runnable {

private final DnsNameResolver resolver;
private final Stopwatch stopwatch;
private final long cacheTtlNanos;
private ResolutionResults cachedResolutionResults = null;
private final class Resolve implements Runnable {
private final Listener savedListener;

Resolve(DnsNameResolver resolver, Stopwatch stopwatch, long cacheTtlNanos) {
this.resolver = resolver;
this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch");
this.cacheTtlNanos = cacheTtlNanos;
Resolve(Listener savedListener) {
this.savedListener = Preconditions.checkNotNull(savedListener, "savedListener");
}

@Override
public void run() {
if (logger.isLoggable(Level.FINER)) {
logger.finer("Attempting DNS resolution of " + resolver.host);
}
Listener savedListener;
synchronized (resolver) {
if (resolver.shutdown || !cacheRefreshRequired()) {
return;
}
savedListener = resolver.listener;
resolver.resolving = true;
logger.finer("Attempting DNS resolution of " + host);
}
try {
resolveInternal(savedListener);
resolveInternal();
} finally {
synchronized (resolver) {
resolver.resolving = false;
}
syncContext.execute(new Runnable() {
@Override
public void run() {
resolving = false;
}
});
}
}

private boolean cacheRefreshRequired() {
return cachedResolutionResults == null
|| cacheTtlNanos == 0
|| (cacheTtlNanos > 0 && stopwatch.elapsed(TimeUnit.NANOSECONDS) > cacheTtlNanos);
}

@VisibleForTesting
void resolveInternal(Listener savedListener) {
void resolveInternal() {
InetSocketAddress destination =
InetSocketAddress.createUnresolved(resolver.host, resolver.port);
InetSocketAddress.createUnresolved(host, port);
ProxiedSocketAddress proxiedAddr;
try {
proxiedAddr = resolver.proxyDetector.proxyFor(destination);
proxiedAddr = proxyDetector.proxyFor(destination);
} catch (IOException e) {
savedListener.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + resolver.host)
.withCause(e));
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
return;
}
if (proxiedAddr != null) {
Expand All @@ -256,37 +244,42 @@ void resolveInternal(Listener savedListener) {
ResolutionResults resolutionResults;
try {
ResourceResolver resourceResolver = null;
if (shouldUseJndi(enableJndi, enableJndiLocalhost, resolver.host)) {
resourceResolver = resolver.getResourceResolver();
if (shouldUseJndi(enableJndi, enableJndiLocalhost, host)) {
resourceResolver = getResourceResolver();
}
resolutionResults = resolveAll(
resolver.addressResolver,
final ResolutionResults results = resolveAll(
addressResolver,
resourceResolver,
enableSrv,
enableTxt,
resolver.host);
cachedResolutionResults = resolutionResults;
if (cacheTtlNanos > 0) {
stopwatch.reset().start();
}
host);
resolutionResults = results;
syncContext.execute(new Runnable() {
@Override
public void run() {
cachedResolutionResults = results;
if (cacheTtlNanos > 0) {
stopwatch.reset().start();
}
}
});
if (logger.isLoggable(Level.FINER)) {
logger.finer("Found DNS results " + resolutionResults + " for " + resolver.host);
logger.finer("Found DNS results " + resolutionResults + " for " + host);
}
} catch (Exception e) {
savedListener.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + resolver.host)
.withCause(e));
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
return;
}
// Each address forms an EAG
List<EquivalentAddressGroup> servers = new ArrayList<>();
for (InetAddress inetAddr : resolutionResults.addresses) {
servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, resolver.port)));
servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
}
servers.addAll(resolutionResults.balancerAddresses);
if (servers.isEmpty()) {
savedListener.onError(Status.UNAVAILABLE.withDescription(
"No DNS backend or balancer addresses found for " + resolver.host));
"No DNS backend or balancer addresses found for " + host));
return;
}

Expand All @@ -298,7 +291,7 @@ void resolveInternal(Listener savedListener) {
parseTxtResults(resolutionResults.txtRecords)) {
try {
serviceConfig =
maybeChooseServiceConfig(possibleConfig, resolver.random, getLocalHostname());
maybeChooseServiceConfig(possibleConfig, random, getLocalHostname());
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Bad service config choice " + possibleConfig, e);
}
Expand All @@ -313,22 +306,28 @@ void resolveInternal(Listener savedListener) {
attrs.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig);
}
} else {
logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{resolver.host});
logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host});
}
savedListener.onAddresses(servers, attrs.build());
}
}

@GuardedBy("this")
private void resolve() {
if (resolving || shutdown) {
if (resolving || shutdown || !cacheRefreshRequired()) {
return;
}
executor.execute(resolveRunnable);
resolving = true;
executor.execute(new Resolve(listener));
}

private boolean cacheRefreshRequired() {
return cachedResolutionResults == null
|| cacheTtlNanos == 0
|| (cacheTtlNanos > 0 && stopwatch.elapsed(TimeUnit.NANOSECONDS) > cacheTtlNanos);
}

@Override
public final synchronized void shutdown() {
public void shutdown() {
if (shutdown) {
return;
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public void run() {

// Must be called from syncContext
private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
syncContext.throwIfNotInThisSynchronizationContext();
if (channelIsActive) {
checkState(nameResolverStarted, "nameResolver is not started");
checkState(lbHelper != null, "lbHelper is null");
Expand Down Expand Up @@ -338,6 +339,7 @@ private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
*/
@VisibleForTesting
void exitIdleMode() {
syncContext.throwIfNotInThisSynchronizationContext();
if (shutdown.get() || panicMode) {
return;
}
Expand Down Expand Up @@ -557,6 +559,11 @@ public int getDefaultPort() {
public ProxyDetector getProxyDetector() {
return proxyDetector;
}

@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
};
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverHelper);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.grpc.NameResolver;
import io.grpc.ProxyDetector;
import io.grpc.SynchronizationContext;
import java.net.URI;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -30,8 +31,14 @@
/** Unit tests for {@link DnsNameResolverProvider}. */
@RunWith(JUnit4.class)
public class DnsNameResolverProviderTest {

private static final NameResolver.Helper HELPER = new NameResolver.Helper() {
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final NameResolver.Helper helper = new NameResolver.Helper() {
@Override
public int getDefaultPort() {
throw new UnsupportedOperationException("Should not be called");
Expand All @@ -41,6 +48,11 @@ public int getDefaultPort() {
public ProxyDetector getProxyDetector() {
return GrpcUtil.getDefaultProxyDetector();
}

@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
};

private DnsNameResolverProvider provider = new DnsNameResolverProvider();
Expand All @@ -53,8 +65,8 @@ public void isAvailable() {
@Test
public void newNameResolver() {
assertSame(DnsNameResolver.class,
provider.newNameResolver(URI.create("dns:///localhost:443"), HELPER).getClass());
provider.newNameResolver(URI.create("dns:///localhost:443"), helper).getClass());
assertNull(
provider.newNameResolver(URI.create("notdns:///localhost:443"), HELPER));
provider.newNameResolver(URI.create("notdns:///localhost:443"), helper));
}
}
Loading