diff --git a/google-cloud-core-grpc/pom.xml b/google-cloud-core-grpc/pom.xml index 77e2cd7abf..b926b0d425 100644 --- a/google-cloud-core-grpc/pom.xml +++ b/google-cloud-core-grpc/pom.xml @@ -47,15 +47,10 @@ io.grpc grpc-api - - io.grpc - grpc-core - com.google.http-client google-http-client - junit junit diff --git a/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/GrpcTransportOptions.java b/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/GrpcTransportOptions.java index 841cc8218b..4db83253a3 100644 --- a/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/GrpcTransportOptions.java +++ b/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/GrpcTransportOptions.java @@ -32,8 +32,6 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.TransportOptions; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.grpc.internal.SharedResourceHolder; -import io.grpc.internal.SharedResourceHolder.Resource; import java.io.IOException; import java.io.ObjectInputStream; import java.util.Objects; @@ -51,8 +49,8 @@ public class GrpcTransportOptions implements TransportOptions { private transient ExecutorFactory executorFactory; /** Shared thread pool executor. */ - private static final Resource EXECUTOR = - new Resource() { + private static final SharedResourceHolder.Resource EXECUTOR = + new SharedResourceHolder.Resource() { @Override public ScheduledExecutorService create() { ScheduledThreadPoolExecutor service = diff --git a/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/LogExceptionRunnable.java b/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/LogExceptionRunnable.java new file mode 100644 index 0000000000..40fd6884c1 --- /dev/null +++ b/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/LogExceptionRunnable.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This class was copied from grpc-core to prevent dependence on an unstable API that may be subject + * to changes + * (https://github.com/grpc/grpc-java/blob/d07ecbe037d2705a1c9f4b6345581f860e505b56/core/src/main/java/io/grpc/internal/LogExceptionRunnable.java) + * + *

A simple wrapper for a {@link Runnable} that logs any exception thrown by it, before + * re-throwing it. + */ +final class LogExceptionRunnable implements Runnable { + + private static final Logger log = Logger.getLogger(LogExceptionRunnable.class.getName()); + + private final Runnable task; + + public LogExceptionRunnable(Runnable task) { + this.task = checkNotNull(task, "task"); + } + + @Override + public void run() { + try { + task.run(); + } catch (Throwable t) { + log.log(Level.SEVERE, "Exception while executing runnable " + task, t); + Throwables.throwIfUnchecked(t); + throw new AssertionError(t); + } + } + + @Override + public String toString() { + return "LogExceptionRunnable(" + task + ")"; + } +} diff --git a/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/SharedResourceHolder.java b/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/SharedResourceHolder.java new file mode 100644 index 0000000000..c8fc6a886b --- /dev/null +++ b/google-cloud-core-grpc/src/main/java/com/google/cloud/grpc/SharedResourceHolder.java @@ -0,0 +1,184 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.grpc; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.IdentityHashMap; +import java.util.concurrent.*; + +/** + * This class was copied from grpc-core to prevent dependence on an unstable API that may be subject + * to changes + * (https://github.com/grpc/grpc-java/blob/d07ecbe037d2705a1c9f4b6345581f860e505b56/core/src/main/java/io/grpc/internal/SharedResourceHolder.java) + * + *

A holder for shared resource singletons. + * + *

Components like client channels and servers need certain resources, e.g. a thread pool, to + * run. If the user has not provided such resources, these components will use a default one, which + * is shared as a static resource. This class holds these default resources and manages their + * life-cycles. + * + *

A resource is identified by the reference of a {@link Resource} object, which is typically a + * singleton, provided to the get() and release() methods. Each Resource object (not its class) maps + * to an object cached in the holder. + * + *

Resources are ref-counted and shut down after a delay when the ref-count reaches zero. + */ +final class SharedResourceHolder { + static final long DESTROY_DELAY_SECONDS = 1; + + // The sole holder instance. + private static final SharedResourceHolder holder = + new SharedResourceHolder( + new ScheduledExecutorFactory() { + @Override + public ScheduledExecutorService createScheduledExecutor() { + return Executors.newSingleThreadScheduledExecutor( + getThreadFactory("grpc-shared-destroyer-%d", true)); + } + }); + + private final IdentityHashMap, Instance> instances = new IdentityHashMap<>(); + + private final ScheduledExecutorFactory destroyerFactory; + + private ScheduledExecutorService destroyer; + + // Visible to tests that would need to create instances of the holder. + SharedResourceHolder(ScheduledExecutorFactory destroyerFactory) { + this.destroyerFactory = destroyerFactory; + } + + private static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) { + return new ThreadFactoryBuilder().setDaemon(daemon).setNameFormat(nameFormat).build(); + } + + /** + * Try to get an existing instance of the given resource. If an instance does not exist, create a + * new one with the given factory. + * + * @param resource the singleton object that identifies the requested static resource + */ + public static T get(Resource resource) { + return holder.getInternal(resource); + } + + /** + * Releases an instance of the given resource. + * + *

The instance must have been obtained from {@link #get(Resource)}. Otherwise will throw + * IllegalArgumentException. + * + *

Caller must not release a reference more than once. It's advisory that you clear the + * reference to the instance with the null returned by this method. + * + * @param resource the singleton Resource object that identifies the released static resource + * @param instance the released static resource + * @return a null which the caller can use to clear the reference to that instance. + */ + public static T release(final Resource resource, final T instance) { + return holder.releaseInternal(resource, instance); + } + + /** + * Visible to unit tests. + * + * @see #get(Resource) + */ + @SuppressWarnings("unchecked") + synchronized T getInternal(Resource resource) { + Instance instance = instances.get(resource); + if (instance == null) { + instance = new Instance(resource.create()); + instances.put(resource, instance); + } + if (instance.destroyTask != null) { + instance.destroyTask.cancel(false); + instance.destroyTask = null; + } + instance.refcount++; + return (T) instance.payload; + } + + /** Visible to unit tests. */ + synchronized T releaseInternal(final Resource resource, final T instance) { + final Instance cached = instances.get(resource); + if (cached == null) { + throw new IllegalArgumentException("No cached instance found for " + resource); + } + Preconditions.checkArgument(instance == cached.payload, "Releasing the wrong instance"); + Preconditions.checkState(cached.refcount > 0, "Refcount has already reached zero"); + cached.refcount--; + if (cached.refcount == 0) { + Preconditions.checkState(cached.destroyTask == null, "Destroy task already scheduled"); + // Schedule a delayed task to destroy the resource. + if (destroyer == null) { + destroyer = destroyerFactory.createScheduledExecutor(); + } + cached.destroyTask = + destroyer.schedule( + new LogExceptionRunnable( + new Runnable() { + @Override + public void run() { + synchronized (SharedResourceHolder.this) { + // Refcount may have gone up since the task was scheduled. Re-check it. + if (cached.refcount == 0) { + try { + resource.close(instance); + } finally { + instances.remove(resource); + if (instances.isEmpty()) { + destroyer.shutdown(); + destroyer = null; + } + } + } + } + } + }), + DESTROY_DELAY_SECONDS, + TimeUnit.SECONDS); + } + // Always returning null + return null; + } + + /** Defines a resource, and the way to create and destroy instances of it. */ + public interface Resource { + /** Create a new instance of the resource. */ + T create(); + + /** Destroy the given instance. */ + void close(T instance); + } + + interface ScheduledExecutorFactory { + ScheduledExecutorService createScheduledExecutor(); + } + + private static class Instance { + final Object payload; + int refcount; + ScheduledFuture destroyTask; + + Instance(Object payload) { + this.payload = payload; + } + } +} diff --git a/google-cloud-core-grpc/src/test/java/com/google/cloud/grpc/SharedResourceHolderTest.java b/google-cloud-core-grpc/src/test/java/com/google/cloud/grpc/SharedResourceHolderTest.java new file mode 100644 index 0000000000..ba70a0de55 --- /dev/null +++ b/google-cloud-core-grpc/src/test/java/com/google/cloud/grpc/SharedResourceHolderTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.grpc; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.*; + +import java.util.LinkedList; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * This class was copied from grpc-core to prevent dependence on an unstable API that may be subject + * to changes + * (https://github.com/grpc/grpc-java/blob/d07ecbe037d2705a1c9f4b6345581f860e505b56/core/src/test/java/io/grpc/internal/SharedResourceHolderTest.java) + * + *

Unit tests for {@link SharedResourceHolder}. + */ +@RunWith(JUnit4.class) +public class SharedResourceHolderTest { + + private final LinkedList> scheduledDestroyTasks = new LinkedList<>(); + + private SharedResourceHolder holder; + + private static class ResourceInstance { + volatile boolean closed; + } + + private static class ResourceFactory implements SharedResourceHolder.Resource { + @Override + public ResourceInstance create() { + return new ResourceInstance(); + } + + @Override + public void close(ResourceInstance instance) { + instance.closed = true; + } + } + + // Defines two kinds of resources + private static final SharedResourceHolder.Resource SHARED_FOO = + new ResourceFactory(); + private static final SharedResourceHolder.Resource SHARED_BAR = + new ResourceFactory(); + + @Before + public void setUp() { + holder = new SharedResourceHolder(new MockExecutorFactory()); + } + + @Test + public void destroyResourceWhenRefCountReachesZero() { + ResourceInstance foo1 = holder.getInternal(SHARED_FOO); + ResourceInstance sharedFoo = foo1; + ResourceInstance foo2 = holder.getInternal(SHARED_FOO); + assertSame(sharedFoo, foo2); + + ResourceInstance bar1 = holder.getInternal(SHARED_BAR); + ResourceInstance sharedBar = bar1; + + foo2 = holder.releaseInternal(SHARED_FOO, foo2); + // foo refcount not reached 0, thus shared foo is not closed + assertTrue(scheduledDestroyTasks.isEmpty()); + assertFalse(sharedFoo.closed); + assertNull(foo2); + + foo1 = holder.releaseInternal(SHARED_FOO, foo1); + assertNull(foo1); + + // foo refcount has reached 0, a destroying task is scheduled + assertEquals(1, scheduledDestroyTasks.size()); + MockScheduledFuture scheduledDestroyTask = scheduledDestroyTasks.poll(); + assertEquals( + SharedResourceHolder.DESTROY_DELAY_SECONDS, + scheduledDestroyTask.getDelay(TimeUnit.SECONDS)); + + // Simluate that the destroyer executes the foo destroying task + scheduledDestroyTask.runTask(); + assertTrue(sharedFoo.closed); + + // After the destroying, obtaining a foo will get a different instance + ResourceInstance foo3 = holder.getInternal(SHARED_FOO); + assertNotSame(sharedFoo, foo3); + + holder.releaseInternal(SHARED_BAR, bar1); + + // bar refcount has reached 0, a destroying task is scheduled + assertEquals(1, scheduledDestroyTasks.size()); + scheduledDestroyTask = scheduledDestroyTasks.poll(); + assertEquals( + SharedResourceHolder.DESTROY_DELAY_SECONDS, + scheduledDestroyTask.getDelay(TimeUnit.SECONDS)); + + // Simulate that the destroyer executes the bar destroying task + scheduledDestroyTask.runTask(); + assertTrue(sharedBar.closed); + } + + @Test + public void cancelDestroyTask() { + ResourceInstance foo1 = holder.getInternal(SHARED_FOO); + ResourceInstance sharedFoo = foo1; + holder.releaseInternal(SHARED_FOO, foo1); + // A destroying task for foo is scheduled + MockScheduledFuture scheduledDestroyTask = scheduledDestroyTasks.poll(); + assertFalse(scheduledDestroyTask.cancelled); + + // obtaining a foo before the destroying task is executed will cancel the destroy + ResourceInstance foo2 = holder.getInternal(SHARED_FOO); + assertTrue(scheduledDestroyTask.cancelled); + assertTrue(scheduledDestroyTasks.isEmpty()); + assertFalse(sharedFoo.closed); + + // And it will be the same foo instance + assertSame(sharedFoo, foo2); + + // Release it and the destroying task is scheduled again + holder.releaseInternal(SHARED_FOO, foo2); + scheduledDestroyTask = scheduledDestroyTasks.poll(); + assertNotNull(scheduledDestroyTask); + assertFalse(scheduledDestroyTask.cancelled); + scheduledDestroyTask.runTask(); + assertTrue(sharedFoo.closed); + } + + @Test + public void releaseWrongInstance() { + ResourceInstance uncached = new ResourceInstance(); + try { + holder.releaseInternal(SHARED_FOO, uncached); + fail("Should throw IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + ResourceInstance cached = holder.getInternal(SHARED_FOO); + try { + holder.releaseInternal(SHARED_FOO, uncached); + fail("Should throw IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + holder.releaseInternal(SHARED_FOO, cached); + } + + @Test + public void overreleaseInstance() { + ResourceInstance foo1 = holder.getInternal(SHARED_FOO); + holder.releaseInternal(SHARED_FOO, foo1); + try { + holder.releaseInternal(SHARED_FOO, foo1); + fail("Should throw IllegalStateException"); + } catch (IllegalStateException e) { + // expected + } + } + + @Test + public void handleInstanceCloseError() { + class ExceptionOnCloseResource implements SharedResourceHolder.Resource { + @Override + public ResourceInstance create() { + return new ResourceInstance(); + } + + @Override + public void close(ResourceInstance instance) { + throw new RuntimeException(); + } + } + + SharedResourceHolder.Resource resource = new ExceptionOnCloseResource(); + ResourceInstance instance = holder.getInternal(resource); + holder.releaseInternal(resource, instance); + MockScheduledFuture scheduledDestroyTask = scheduledDestroyTasks.poll(); + try { + scheduledDestroyTask.runTask(); + fail("Should throw RuntimeException"); + } catch (RuntimeException e) { + // expected + } + + // Future resource fetches should not get the partially-closed one. + assertNotSame(instance, holder.getInternal(resource)); + } + + private class MockExecutorFactory implements SharedResourceHolder.ScheduledExecutorFactory { + @Override + public ScheduledExecutorService createScheduledExecutor() { + ScheduledExecutorService mockExecutor = createNiceMock(ScheduledExecutorService.class); + expect(mockExecutor.schedule(anyObject(Runnable.class), anyLong(), anyObject(TimeUnit.class))) + .andAnswer( + new IAnswer() { + @Override + public ScheduledFuture answer() throws Throwable { + Object[] args = EasyMock.getCurrentArguments(); + Runnable command = (Runnable) args[0]; + long delay = (Long) args[1]; + TimeUnit unit = (TimeUnit) args[2]; + MockScheduledFuture future = + new MockScheduledFuture<>(command, delay, unit); + scheduledDestroyTasks.add(future); + return future; + } + }) + .anyTimes(); + replay(mockExecutor); + return mockExecutor; + } + } + + protected static class MockScheduledFuture implements java.util.concurrent.ScheduledFuture { + private boolean cancelled; + private boolean finished; + final Runnable command; + final long delay; + final TimeUnit unit; + + MockScheduledFuture(Runnable command, long delay, TimeUnit unit) { + this.command = command; + this.delay = delay; + this.unit = unit; + } + + void runTask() { + command.run(); + finished = true; + } + + @Override + public boolean cancel(boolean interrupt) { + if (cancelled || finished) { + return false; + } + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public long getDelay(TimeUnit targetUnit) { + return targetUnit.convert(this.delay, this.unit); + } + + @Override + public int compareTo(Delayed o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + return cancelled || finished; + } + + @Override + public V get() { + throw new UnsupportedOperationException(); + } + + @Override + public V get(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + } +}