Skip to content
This repository was archived by the owner on Dec 3, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions google-cloud-core-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;
Expand All @@ -51,8 +49,8 @@ public class GrpcTransportOptions implements TransportOptions {
private transient ExecutorFactory<ScheduledExecutorService> executorFactory;

/** Shared thread pool executor. */
private static final Resource<ScheduledExecutorService> EXECUTOR =
new Resource<ScheduledExecutorService>() {
private static final SharedResourceHolder.Resource<ScheduledExecutorService> EXECUTOR =
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
@Override
public ScheduledExecutorService create() {
ScheduledThreadPoolExecutor service =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.grpc;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Throwables;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class was copied from grpc-core to prevent dependence on an unstable API that may be subject
* to changes
* (https://github.com/grpc/grpc-java/blob/d07ecbe037d2705a1c9f4b6345581f860e505b56/core/src/main/java/io/grpc/internal/LogExceptionRunnable.java)
*
* <p>A simple wrapper for a {@link Runnable} that logs any exception thrown by it, before
* re-throwing it.
*/
final class LogExceptionRunnable implements Runnable {

private static final Logger log = Logger.getLogger(LogExceptionRunnable.class.getName());

private final Runnable task;

public LogExceptionRunnable(Runnable task) {
this.task = checkNotNull(task, "task");
}

@Override
public void run() {
try {
task.run();
} catch (Throwable t) {
log.log(Level.SEVERE, "Exception while executing runnable " + task, t);
Throwables.throwIfUnchecked(t);
throw new AssertionError(t);
}
}

@Override
public String toString() {
return "LogExceptionRunnable(" + task + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.grpc;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.IdentityHashMap;
import java.util.concurrent.*;

/**
* This class was copied from grpc-core to prevent dependence on an unstable API that may be subject
* to changes
* (https://github.com/grpc/grpc-java/blob/d07ecbe037d2705a1c9f4b6345581f860e505b56/core/src/main/java/io/grpc/internal/SharedResourceHolder.java)
*
* <p>A holder for shared resource singletons.
*
* <p>Components like client channels and servers need certain resources, e.g. a thread pool, to
* run. If the user has not provided such resources, these components will use a default one, which
* is shared as a static resource. This class holds these default resources and manages their
* life-cycles.
*
* <p>A resource is identified by the reference of a {@link Resource} object, which is typically a
* singleton, provided to the get() and release() methods. Each Resource object (not its class) maps
* to an object cached in the holder.
*
* <p>Resources are ref-counted and shut down after a delay when the ref-count reaches zero.
*/
final class SharedResourceHolder {
static final long DESTROY_DELAY_SECONDS = 1;

// The sole holder instance.
private static final SharedResourceHolder holder =
new SharedResourceHolder(
new ScheduledExecutorFactory() {
@Override
public ScheduledExecutorService createScheduledExecutor() {
return Executors.newSingleThreadScheduledExecutor(
getThreadFactory("grpc-shared-destroyer-%d", true));
}
});

private final IdentityHashMap<Resource<?>, Instance> instances = new IdentityHashMap<>();

private final ScheduledExecutorFactory destroyerFactory;

private ScheduledExecutorService destroyer;

// Visible to tests that would need to create instances of the holder.
SharedResourceHolder(ScheduledExecutorFactory destroyerFactory) {
this.destroyerFactory = destroyerFactory;
}

private static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
return new ThreadFactoryBuilder().setDaemon(daemon).setNameFormat(nameFormat).build();
}

/**
* Try to get an existing instance of the given resource. If an instance does not exist, create a
* new one with the given factory.
*
* @param resource the singleton object that identifies the requested static resource
*/
public static <T> T get(Resource<T> resource) {
return holder.getInternal(resource);
}

/**
* Releases an instance of the given resource.
*
* <p>The instance must have been obtained from {@link #get(Resource)}. Otherwise will throw
* IllegalArgumentException.
*
* <p>Caller must not release a reference more than once. It's advisory that you clear the
* reference to the instance with the null returned by this method.
*
* @param resource the singleton Resource object that identifies the released static resource
* @param instance the released static resource
* @return a null which the caller can use to clear the reference to that instance.
*/
public static <T> T release(final Resource<T> resource, final T instance) {
return holder.releaseInternal(resource, instance);
}

/**
* Visible to unit tests.
*
* @see #get(Resource)
*/
@SuppressWarnings("unchecked")
synchronized <T> T getInternal(Resource<T> resource) {
Instance instance = instances.get(resource);
if (instance == null) {
instance = new Instance(resource.create());
instances.put(resource, instance);
}
if (instance.destroyTask != null) {
instance.destroyTask.cancel(false);
instance.destroyTask = null;
}
instance.refcount++;
return (T) instance.payload;
}

/** Visible to unit tests. */
synchronized <T> T releaseInternal(final Resource<T> resource, final T instance) {
final Instance cached = instances.get(resource);
if (cached == null) {
throw new IllegalArgumentException("No cached instance found for " + resource);
}
Preconditions.checkArgument(instance == cached.payload, "Releasing the wrong instance");
Preconditions.checkState(cached.refcount > 0, "Refcount has already reached zero");
cached.refcount--;
if (cached.refcount == 0) {
Preconditions.checkState(cached.destroyTask == null, "Destroy task already scheduled");
// Schedule a delayed task to destroy the resource.
if (destroyer == null) {
destroyer = destroyerFactory.createScheduledExecutor();
}
cached.destroyTask =
destroyer.schedule(
new LogExceptionRunnable(
new Runnable() {
@Override
public void run() {
synchronized (SharedResourceHolder.this) {
// Refcount may have gone up since the task was scheduled. Re-check it.
if (cached.refcount == 0) {
try {
resource.close(instance);
} finally {
instances.remove(resource);
if (instances.isEmpty()) {
destroyer.shutdown();
destroyer = null;
}
}
}
}
}
}),
DESTROY_DELAY_SECONDS,
TimeUnit.SECONDS);
}
// Always returning null
return null;
}

/** Defines a resource, and the way to create and destroy instances of it. */
public interface Resource<T> {
/** Create a new instance of the resource. */
T create();

/** Destroy the given instance. */
void close(T instance);
}

interface ScheduledExecutorFactory {
ScheduledExecutorService createScheduledExecutor();
}

private static class Instance {
final Object payload;
int refcount;
ScheduledFuture<?> destroyTask;

Instance(Object payload) {
this.payload = payload;
}
}
}
Loading