From fb96453cedf7aaa4cd51991b6037ba4b6f285e7c Mon Sep 17 00:00:00 2001 From: Tamas Cservenak Date: Wed, 2 Nov 2022 15:19:08 +0100 Subject: [PATCH 1/3] [MRESOLVER-283] Shared executor service More and more component in resolver does parallel processing (BF collector, MD resolver, basic connector), and they all create, maintain their own executor instance. Instead of this, create one shared service component and just reuse it accross resolver. https://issues.apache.org/jira/browse/MRESOLVER-283 --- .../basic/BasicRepositoryConnector.java | 84 ++++------- .../BasicRepositoryConnectorFactory.java | 25 +++- .../aether/impl/DefaultServiceLocator.java | 3 + .../aether/impl/guice/AetherModule.java | 5 + .../impl/DefaultMetadataResolver.java | 105 +++++++------- .../collect/bf/BfDependencyCollector.java | 75 ++++++---- .../concurrency/DefaultResolverExecutor.java | 93 ++++++++++++ .../DefaultResolverExecutorService.java | 136 ++++++++++++++++++ .../impl/DefaultMetadataResolverTest.java | 2 + .../collect/bf/BfDependencyCollectorTest.java | 2 + .../spi/concurrency/ResolverExecutor.java | 60 ++++++++ .../concurrency/ResolverExecutorService.java | 61 ++++++++ 12 files changed, 505 insertions(+), 146 deletions(-) create mode 100644 maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java create mode 100644 maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java create mode 100644 maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java create mode 100644 maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java index 4c6ab9f49..27db76e7f 100644 --- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java +++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java @@ -31,17 +31,14 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.aether.ConfigurationProperties; import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.RequestTrace; import org.eclipse.aether.repository.RemoteRepository; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.ArtifactDownload; import org.eclipse.aether.spi.connector.ArtifactUpload; import org.eclipse.aether.spi.connector.MetadataDownload; @@ -70,7 +67,6 @@ import org.eclipse.aether.util.ConfigUtils; import org.eclipse.aether.util.FileUtils; import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; -import org.eclipse.aether.util.concurrency.WorkerThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,14 +77,24 @@ final class BasicRepositoryConnector implements RepositoryConnector { + /** + * The count of threads to be used when downloading artifacts in parallel, default value 5. + */ private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads"; + /** + * The default value for {@link #CONFIG_PROP_THREADS}. + */ + private static final int CONFIG_PROP_THREADS_DEFAULT = 5; + private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums"; private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class ); private final Map providedChecksumsSources; + private final ResolverExecutor resolverExecutor; + private final FileProcessor fileProcessor; private final RemoteRepository repository; @@ -101,23 +107,21 @@ final class BasicRepositoryConnector private final ChecksumPolicyProvider checksumPolicyProvider; - private final int maxThreads; - private final boolean smartChecksums; private final boolean persistedChecksums; - private Executor executor; - private final AtomicBoolean closed; + @SuppressWarnings( "checkstyle:parameternumber" ) BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository, TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider, ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor, - Map providedChecksumsSources ) + Map providedChecksumsSources, + ResolverExecutorService resolverExecutorService ) throws NoRepositoryConnectorException { try @@ -143,36 +147,17 @@ final class BasicRepositoryConnector this.fileProcessor = fileProcessor; this.providedChecksumsSources = providedChecksumsSources; this.closed = new AtomicBoolean( false ); + this.resolverExecutor = resolverExecutorService.getResolverExecutor( session, + resolverExecutorService.getKey( RepositoryConnector.class, repository.getId() ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS, + "maven.artifact.threads" ) ); - maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" ); smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); persistedChecksums = ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS, ConfigurationProperties.PERSISTED_CHECKSUMS ); } - private Executor getExecutor( Collection artifacts, Collection metadatas ) - { - if ( maxThreads <= 1 ) - { - return DirectExecutor.INSTANCE; - } - int tasks = safe( artifacts ).size() + safe( metadatas ).size(); - if ( tasks <= 1 ) - { - return DirectExecutor.INSTANCE; - } - if ( executor == null ) - { - executor = - new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new WorkerThreadFactory( getClass().getSimpleName() + '-' - + repository.getHost() + '-' ) ); - } - return executor; - } - @Override protected void finalize() throws Throwable @@ -192,10 +177,6 @@ public void close() { if ( closed.compareAndSet( false, true ) ) { - if ( executor instanceof ExecutorService ) - { - ( (ExecutorService) executor ).shutdown(); - } transporter.close(); } } @@ -214,11 +195,13 @@ public void get( Collection artifactDownloads, { failIfClosed(); - Executor executor = getExecutor( artifactDownloads, metadataDownloads ); RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); List checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories(); + Collection mds = safe( metadataDownloads ); + Collection ads = safe( artifactDownloads ); + ArrayList runnable = new ArrayList<>( mds.size() + ads.size() ); - for ( MetadataDownload transfer : safe( metadataDownloads ) ) + for ( MetadataDownload transfer : mds ) { URI location = layout.getLocation( transfer.getMetadata(), false ); @@ -235,10 +218,10 @@ public void get( Collection artifactDownloads, Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksumAlgorithmFactories, checksumLocations, null, listener ); - executor.execute( errorForwarder.wrap( task ) ); + runnable.add( errorForwarder.wrap( task ) ); } - for ( ArtifactDownload transfer : safe( artifactDownloads ) ) + for ( ArtifactDownload transfer : ads ) { Map providedChecksums = Collections.emptyMap(); for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() ) @@ -276,9 +259,10 @@ public void get( Collection artifactDownloads, task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksumAlgorithmFactories, checksumLocations, providedChecksums, listener ); } - executor.execute( errorForwarder.wrap( task ) ); + runnable.add( errorForwarder.wrap( task ) ); } + resolverExecutor.submitBatch( runnable ); errorForwarder.await(); } @@ -622,18 +606,4 @@ private void uploadChecksum( URI location, Object checksum ) } } - - private static class DirectExecutor - implements Executor - { - - static final Executor INSTANCE = new DirectExecutor(); - - @Override - public void execute( Runnable command ) - { - command.run(); - } - - } } diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java index 576d12160..08f6d1c2e 100644 --- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java +++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java @@ -29,6 +29,7 @@ import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.repository.RemoteRepository; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.RepositoryConnector; import org.eclipse.aether.spi.connector.RepositoryConnectorFactory; import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; @@ -59,6 +60,8 @@ public final class BasicRepositoryConnectorFactory private Map providedChecksumsSources; + private ResolverExecutorService resolverExecutorService; + private float priority; /** @@ -76,13 +79,15 @@ public BasicRepositoryConnectorFactory() RepositoryLayoutProvider layoutProvider, ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor, - Map providedChecksumsSources ) + Map providedChecksumsSources, + ResolverExecutorService resolverExecutorService ) { setTransporterProvider( transporterProvider ); setRepositoryLayoutProvider( layoutProvider ); setChecksumPolicyProvider( checksumPolicyProvider ); setFileProcessor( fileProcessor ); setProvidedChecksumSources( providedChecksumsSources ); + setResolverExecutorService( resolverExecutorService ); } public void initService( ServiceLocator locator ) @@ -92,6 +97,7 @@ public void initService( ServiceLocator locator ) setChecksumPolicyProvider( locator.getService( ChecksumPolicyProvider.class ) ); setFileProcessor( locator.getService( FileProcessor.class ) ); setProvidedChecksumSources( Collections.emptyMap() ); + setResolverExecutorService( locator.getService( ResolverExecutorService.class ) ); } /** @@ -159,6 +165,21 @@ public BasicRepositoryConnectorFactory setProvidedChecksumSources( return this; } + /** + * Sets the resolver executor to use for this component. + * + * @param resolverExecutorService The resolver executor to use, must not be {@code null}. + * @return This component for chaining, never {@code null}. + * @since 1.9.0 + */ + public BasicRepositoryConnectorFactory setResolverExecutorService( ResolverExecutorService resolverExecutorService ) + { + this.resolverExecutorService = requireNonNull( + resolverExecutorService, "resolver executor service cannot be null" + ); + return this; + } + public float getPriority() { return priority; @@ -183,7 +204,7 @@ public RepositoryConnector newInstance( RepositorySystemSession session, RemoteR requireNonNull( repository, "repository cannot be null" ); return new BasicRepositoryConnector( session, repository, transporterProvider, layoutProvider, - checksumPolicyProvider, fileProcessor, providedChecksumsSources ); + checksumPolicyProvider, fileProcessor, providedChecksumsSources, resolverExecutorService ); } } diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java index 69cfc2c83..0ce6c54cd 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java @@ -57,9 +57,11 @@ import org.eclipse.aether.internal.impl.EnhancedLocalRepositoryManagerFactory; import org.eclipse.aether.internal.impl.Maven2RepositoryLayoutFactory; import org.eclipse.aether.internal.impl.SimpleLocalRepositoryManagerFactory; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.impl.filter.DefaultRemoteRepositoryFilterManager; import org.eclipse.aether.internal.impl.slf4j.Slf4jLoggerFactory; import org.eclipse.aether.internal.impl.synccontext.DefaultSyncContextFactory; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactorySelector; import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; import org.eclipse.aether.spi.connector.layout.RepositoryLayoutFactory; @@ -231,6 +233,7 @@ public DefaultServiceLocator() addService( LocalPathComposer.class, DefaultLocalPathComposer.class ); addService( RemoteRepositoryFilterManager.class, DefaultRemoteRepositoryFilterManager.class ); addService( RepositorySystemLifecycle.class, DefaultRepositorySystemLifecycle.class ); + addService( ResolverExecutorService.class, DefaultResolverExecutorService.class ); } private Entry getEntry( Class type, boolean create ) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java index 54e047889..5ff3b39b9 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java @@ -60,6 +60,7 @@ import org.eclipse.aether.internal.impl.collect.DependencyCollectorDelegate; import org.eclipse.aether.internal.impl.collect.bf.BfDependencyCollector; import org.eclipse.aether.internal.impl.collect.df.DfDependencyCollector; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.impl.filter.DefaultRemoteRepositoryFilterManager; import org.eclipse.aether.internal.impl.filter.GroupIdRemoteRepositoryFilterSource; import org.eclipse.aether.internal.impl.filter.PrefixesRemoteRepositoryFilterSource; @@ -102,6 +103,7 @@ import org.eclipse.aether.internal.impl.slf4j.Slf4jLoggerFactory; import org.eclipse.aether.named.providers.NoopNamedLockFactory; import org.eclipse.aether.spi.checksums.TrustedChecksumsSource; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.checksum.ProvidedChecksumsSource; import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactorySelector; import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; @@ -229,6 +231,9 @@ protected void configure() bind( RepositorySystemLifecycle.class ) .to( DefaultRepositorySystemLifecycle.class ).in( Singleton.class ); + bind( ResolverExecutorService.class ) + .to( DefaultResolverExecutorService.class ).in( Singleton.class ); + bind( NamedLockFactorySelector.class ).toInstance( new ParameterizedNamedLockFactorySelector() ); bind( SyncContextFactory.class ).to( DefaultSyncContextFactory.class ).in( Singleton.class ); bind( org.eclipse.aether.impl.SyncContextFactory.class ) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java index e56048341..390dd925f 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java @@ -27,11 +27,6 @@ import java.util.List; import java.util.Map; import static java.util.Objects.requireNonNull; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.inject.Named; @@ -48,6 +43,7 @@ import org.eclipse.aether.impl.RemoteRepositoryManager; import org.eclipse.aether.impl.RepositoryConnectorProvider; import org.eclipse.aether.impl.RepositoryEventDispatcher; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.filter.RemoteRepositoryFilter; import org.eclipse.aether.spi.synccontext.SyncContextFactory; import org.eclipse.aether.impl.UpdateCheck; @@ -73,7 +69,6 @@ import org.eclipse.aether.transfer.RepositoryOfflineException; import org.eclipse.aether.util.ConfigUtils; import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; -import org.eclipse.aether.util.concurrency.WorkerThreadFactory; /** */ @@ -83,8 +78,16 @@ public class DefaultMetadataResolver implements MetadataResolver, Service { + /** + * The count of threads to be used when resolving metadata in parallel, default value 4. + */ private static final String CONFIG_PROP_THREADS = "aether.metadataResolver.threads"; + /** + * The default value for {@link #CONFIG_PROP_THREADS}. + */ + private static final int CONFIG_PROP_THREADS_DEFAULT = 4; + private RepositoryEventDispatcher repositoryEventDispatcher; private UpdateCheckManager updateCheckManager; @@ -99,18 +102,22 @@ public class DefaultMetadataResolver private RemoteRepositoryFilterManager remoteRepositoryFilterManager; + private ResolverExecutorService resolverExecutorService; + public DefaultMetadataResolver() { // enables default constructor } + @SuppressWarnings( "checkstyle:parameternumber" ) @Inject DefaultMetadataResolver( RepositoryEventDispatcher repositoryEventDispatcher, UpdateCheckManager updateCheckManager, RepositoryConnectorProvider repositoryConnectorProvider, RemoteRepositoryManager remoteRepositoryManager, SyncContextFactory syncContextFactory, OfflineController offlineController, - RemoteRepositoryFilterManager remoteRepositoryFilterManager ) + RemoteRepositoryFilterManager remoteRepositoryFilterManager, + ResolverExecutorService resolverExecutorService ) { setRepositoryEventDispatcher( repositoryEventDispatcher ); setUpdateCheckManager( updateCheckManager ); @@ -119,6 +126,7 @@ public DefaultMetadataResolver() setSyncContextFactory( syncContextFactory ); setOfflineController( offlineController ); setRemoteRepositoryFilterManager( remoteRepositoryFilterManager ); + setResolverExecutorService( resolverExecutorService ); } public void initService( ServiceLocator locator ) @@ -130,6 +138,7 @@ public void initService( ServiceLocator locator ) setSyncContextFactory( locator.getService( SyncContextFactory.class ) ); setOfflineController( locator.getService( OfflineController.class ) ); setRemoteRepositoryFilterManager( locator.getService( RemoteRepositoryFilterManager.class ) ); + setResolverExecutorService( locator.getService( ResolverExecutorService.class ) ); } public DefaultMetadataResolver setRepositoryEventDispatcher( RepositoryEventDispatcher repositoryEventDispatcher ) @@ -180,6 +189,14 @@ public DefaultMetadataResolver setRemoteRepositoryFilterManager( return this; } + public DefaultMetadataResolver setResolverExecutorService( ResolverExecutorService resolverExecutorService ) + { + this.resolverExecutorService = requireNonNull( resolverExecutorService, + "resolver executor service cannot be null" ); + return this; + } + + @Override public List resolveMetadata( RepositorySystemSession session, Collection requests ) { @@ -374,41 +391,37 @@ else if ( exception == null ) if ( !tasks.isEmpty() ) { - int threads = ConfigUtils.getInteger( session, 4, CONFIG_PROP_THREADS ); - Executor executor = getExecutor( Math.min( tasks.size(), threads ) ); - try + RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); + ArrayList runnable = new ArrayList<>( tasks.size() ); + for ( ResolveTask task : tasks ) { - RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); - - for ( ResolveTask task : tasks ) - { - executor.execute( errorForwarder.wrap( task ) ); - } + runnable.add( errorForwarder.wrap( task ) ); + } - errorForwarder.await(); + resolverExecutorService.getResolverExecutor( session, + resolverExecutorService.getKey( MetadataResolver.class ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS ) ) + .submitBatch( runnable ); + errorForwarder.await(); - for ( ResolveTask task : tasks ) + for ( ResolveTask task : tasks ) + { + /* + * NOTE: Touch after registration with local repo to ensure concurrent resolution is not + * rejected with "already updated" via session data when actual update to local repo is + * still pending. + */ + for ( UpdateCheck check : task.checks ) { - /* - * NOTE: Touch after registration with local repo to ensure concurrent resolution is not - * rejected with "already updated" via session data when actual update to local repo is - * still pending. - */ - for ( UpdateCheck check : task.checks ) - { - updateCheckManager.touchMetadata( task.session, check.setException( task.exception ) ); - } + updateCheckManager.touchMetadata( task.session, check.setException( task.exception ) ); + } - metadataDownloaded( session, task.trace, task.request.getMetadata(), task.request.getRepository(), - task.metadataFile, task.exception ); + metadataDownloaded( session, task.trace, task.request.getMetadata(), task.request.getRepository(), + task.metadataFile, task.exception ); - task.result.setException( task.exception ); - } - } - finally - { - shutdown( executor ); + task.result.setException( task.exception ); } + for ( ResolveTask task : tasks ) { Metadata metadata = task.request.getMetadata(); @@ -531,27 +544,6 @@ private void metadataDownloaded( RepositorySystemSession session, RequestTrace t repositoryEventDispatcher.dispatch( event.build() ); } - private Executor getExecutor( int threads ) - { - if ( threads <= 1 ) - { - return command -> command.run(); - } - else - { - return new ThreadPoolExecutor( threads, threads, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(), - new WorkerThreadFactory( null ) ); - } - } - - private void shutdown( Executor executor ) - { - if ( executor instanceof ExecutorService ) - { - ( (ExecutorService) executor ).shutdown(); - } - } - class ResolveTask implements Runnable { @@ -584,6 +576,7 @@ class ResolveTask this.checks = checks; } + @Override public void run() { Metadata metadata = request.getMetadata(); diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java index 7e9967699..e165d99e5 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java @@ -34,11 +34,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,6 +53,7 @@ import org.eclipse.aether.graph.Dependency; import org.eclipse.aether.graph.DependencyNode; import org.eclipse.aether.impl.ArtifactDescriptorReader; +import org.eclipse.aether.impl.DependencyCollector; import org.eclipse.aether.impl.RemoteRepositoryManager; import org.eclipse.aether.impl.VersionRangeResolver; import org.eclipse.aether.internal.impl.collect.DataPool; @@ -70,15 +67,16 @@ import org.eclipse.aether.resolution.ArtifactDescriptorResult; import org.eclipse.aether.resolution.VersionRangeRequest; import org.eclipse.aether.resolution.VersionRangeResult; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.locator.Service; +import org.eclipse.aether.spi.locator.ServiceLocator; import org.eclipse.aether.util.ConfigUtils; import org.eclipse.aether.util.artifact.ArtifactIdUtils; -import org.eclipse.aether.util.concurrency.WorkerThreadFactory; import org.eclipse.aether.util.graph.manager.DependencyManagerUtils; import org.eclipse.aether.version.Version; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; import static org.eclipse.aether.internal.impl.collect.DefaultDependencyCycle.find; /** @@ -96,7 +94,9 @@ public class BfDependencyCollector /** * The key in the repository session's {@link RepositorySystemSession#getConfigProperties() * configuration properties} used to store a {@link Boolean} flag controlling the resolver's skip mode. - * + *

+ * Visible for testing. + * * @since 1.8.0 */ static final String CONFIG_PROP_SKIPPER = "aether.dependencyCollector.bf.skipper"; @@ -106,14 +106,23 @@ public class BfDependencyCollector * * @since 1.8.0 */ - static final boolean CONFIG_PROP_SKIPPER_DEFAULT = true; + private static final boolean CONFIG_PROP_SKIPPER_DEFAULT = true; /** * The count of threads to be used when collecting POMs in parallel, default value 5. * * @since 1.9.0 */ - static final String CONFIG_PROP_THREADS = "aether.dependencyCollector.bf.threads"; + private static final String CONFIG_PROP_THREADS = "aether.dependencyCollector.bf.threads"; + + /** + * The default value for {@link #CONFIG_PROP_THREADS}. + * + * @since 1.9.0 + */ + private static final int CONFIG_PROP_THREADS_DEFAULT = 5; + + private ResolverExecutorService resolverExecutorService; /** * Default ctor for SL. @@ -129,9 +138,25 @@ public BfDependencyCollector() @Inject BfDependencyCollector( RemoteRepositoryManager remoteRepositoryManager, ArtifactDescriptorReader artifactDescriptorReader, - VersionRangeResolver versionRangeResolver ) + VersionRangeResolver versionRangeResolver, + ResolverExecutorService resolverExecutorService ) { super( remoteRepositoryManager, artifactDescriptorReader, versionRangeResolver ); + setResolverExecutorService( resolverExecutorService ); + } + + @Override + public void initService( ServiceLocator locator ) + { + super.initService( locator ); + setResolverExecutorService( locator.getService( ResolverExecutorService.class ) ); + } + + public DependencyCollector setResolverExecutorService( ResolverExecutorService resolverExecutorService ) + { + this.resolverExecutorService = + requireNonNull( resolverExecutorService, "resolver executor service cannot be null" ); + return this; } @SuppressWarnings( "checkstyle:parameternumber" ) @@ -155,7 +180,10 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr new Args( session, pool, context, versionContext, request, useSkip ? DependencyResolutionSkipper.defaultSkipper() : DependencyResolutionSkipper.neverSkipper(), - new ParallelDescriptorResolver( session ) ); + new ParallelDescriptorResolver( resolverExecutorService.getResolverExecutor( session, + resolverExecutorService.getKey( DependencyCollector.class ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, + CONFIG_PROP_THREADS, "maven.artifact.threads" ) ) ) ); DependencySelector rootDepSelector = session.getDependencySelector() != null ? session.getDependencySelector().deriveChildSelector( context ) : null; @@ -190,7 +218,6 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr false ); } - args.resolver.shutdown(); args.skipper.report(); } @@ -475,23 +502,22 @@ else if ( descriptorResult == DataPool.NO_DESCRIPTOR ) static class ParallelDescriptorResolver { - final ExecutorService executorService; + final ResolverExecutor executor; /** * Artifact ID -> Future of DescriptorResolutionResult */ final Map> results = new ConcurrentHashMap<>( 256 ); - final Logger logger = LoggerFactory.getLogger( getClass() ); - ParallelDescriptorResolver( RepositorySystemSession session ) + ParallelDescriptorResolver( ResolverExecutor executor ) { - this.executorService = getExecutorService( session ); + this.executor = executor; } void resolveDescriptors( Artifact artifact, Callable callable ) { results.computeIfAbsent( ArtifactIdUtils.toId( artifact ), - key -> this.executorService.submit( callable ) ); + key -> this.executor.submit( callable ) ); } void cacheVersionRangeDescriptor( Artifact artifact, DescriptorResolutionResult resolutionResult ) @@ -504,19 +530,6 @@ Future find( Artifact artifact ) { return results.get( ArtifactIdUtils.toId( artifact ) ); } - - void shutdown() - { - executorService.shutdown(); - } - - private ExecutorService getExecutorService( RepositorySystemSession session ) - { - int nThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" ); - logger.debug( "Created thread pool with {} threads to resolve descriptors.", nThreads ); - return new ThreadPoolExecutor( nThreads, nThreads, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), - new WorkerThreadFactory( getClass().getSimpleName() ) ); - } } static class DescriptorResolutionResult diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java new file mode 100644 index 000000000..34a54edb9 --- /dev/null +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java @@ -0,0 +1,93 @@ +package org.eclipse.aether.internal.impl.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.eclipse.aether.spi.concurrency.ResolverExecutor; + +import static java.util.Objects.requireNonNull; + +/** + * Default implementation of {@link ResolverExecutor}. + *

+ * It relies on ctor passed {@link ExecutorService} that may be {@code null}, in which case "direct invocation" (on + * caller thread) happens, otherwise the non-null executor service is used. + */ +final class DefaultResolverExecutor implements ResolverExecutor +{ + private final ExecutorService executorService; + + DefaultResolverExecutor( final ExecutorService executorService ) + { + this.executorService = executorService; + } + + @Override + public void submitBatch( Collection tasks ) + { + requireNonNull( tasks ); + if ( tasks.size() == 1 ) + { + directlyExecute( Executors.callable( tasks.iterator().next() ) ); + } + else + { + for ( Runnable task : tasks ) + { + submit( Executors.callable( task ) ); + } + } + } + + @Override + public void submit( Runnable task ) + { + requireNonNull( task ); + submit( Executors.callable( task ) ); + } + + @Override + public Future submit( Callable task ) + { + requireNonNull( task ); + return executorService == null ? directlyExecute( task ) : executorService.submit( task ); + } + + private static Future directlyExecute( Callable task ) + { + CompletableFuture future; + try + { + future = CompletableFuture.completedFuture( task.call() ); + } + catch ( Exception e ) + { + future = new CompletableFuture<>(); + future.completeExceptionally( e ); + } + return future; + } +} \ No newline at end of file diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java new file mode 100644 index 000000000..648b82472 --- /dev/null +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java @@ -0,0 +1,136 @@ +package org.eclipse.aether.internal.impl.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.inject.Named; +import javax.inject.Singleton; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.aether.RepositorySystemSession; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; +import org.eclipse.aether.util.concurrency.WorkerThreadFactory; + +import static java.util.Objects.hash; +import static java.util.Objects.requireNonNull; + +/** + * Default implementation of {@link ResolverExecutor}. + *

+ * This implementation uses {@link RepositorySystemSession#getData()} to store created {@link ExecutorService} + * instances. It creates instances that may be eventually garbage collected, so no explicit shutdown happens on + * them. When {@code maxThreads} parameter is 1 (accepted values are greater than zero), this implementation assumes + * caller wants "direct execution" (on caller thread) and creates {@link ResolverExecutor} instances accordingly. + */ +@Singleton +@Named +public final class DefaultResolverExecutorService implements ResolverExecutorService +{ + @Override + public Key getKey( Class service, String... discriminators ) + { + requireNonNull( service ); + return new KeyImpl( DefaultResolverExecutorService.class.getName() + + "-" + service.getSimpleName() + + String.join( "-", discriminators ) ); + } + + @Override + public ResolverExecutor getResolverExecutor( RepositorySystemSession session, Key key, int maxThreads ) + { + requireNonNull( session ); + requireNonNull( key ); + if ( maxThreads < 1 ) + { + throw new IllegalArgumentException( "maxThreads must be greater than zero" ); + } + + final ExecutorService executorService; + if ( maxThreads == 1 ) // direct + { + executorService = null; + } + else // shared && pooled + { + executorService = (ExecutorService) session.getData() + .computeIfAbsent( key, () -> createExecutorService( key, maxThreads ) ); + } + return new DefaultResolverExecutor( executorService ); + } + + /** + * Creates am {@link ExecutorService} that allows its core threads to die off in case of inactivity, and allows + * for proper garbage collection. This is important detail, as these instances are kept within session data, and + * currently there is no way to shut down them. + */ + private ExecutorService createExecutorService( Key key, int maxThreads ) + { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + maxThreads, + maxThreads, + 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new WorkerThreadFactory( key.asString() + "-" ) + ); + threadPoolExecutor.allowCoreThreadTimeOut( true ); + return threadPoolExecutor; + } + + private static class KeyImpl implements Key + { + private final String keyString; + + private KeyImpl( String keyString ) + { + this.keyString = keyString; + } + + @Override + public String asString() + { + return keyString; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + KeyImpl key = (KeyImpl) o; + return keyString.equals( key.keyString ); + } + + @Override + public int hashCode() + { + return hash( keyString ); + } + } +} \ No newline at end of file diff --git a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java index bb5003300..ca06b1882 100644 --- a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java +++ b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java @@ -31,6 +31,7 @@ import java.util.Set; import org.eclipse.aether.DefaultRepositorySystemSession; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.impl.filter.DefaultRemoteRepositoryFilterManager; import org.eclipse.aether.internal.impl.filter.Filters; import org.eclipse.aether.internal.test.util.TestFileUtils; @@ -91,6 +92,7 @@ public void setup() resolver.setSyncContextFactory( new StubSyncContextFactory() ); resolver.setOfflineController( new DefaultOfflineController() ); resolver.setRemoteRepositoryFilterManager( remoteRepositoryFilterManager ); + resolver.setResolverExecutorService( new DefaultResolverExecutorService() ); repository = new RemoteRepository.Builder( "test-DMRT", "default", TestFileUtils.createTempDir().toURI().toURL().toString() ).build(); diff --git a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java index cd6aa1532..5568a4b8b 100644 --- a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java +++ b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java @@ -33,6 +33,7 @@ import org.eclipse.aether.internal.impl.StubRemoteRepositoryManager; import org.eclipse.aether.internal.impl.StubVersionRangeResolver; import org.eclipse.aether.internal.impl.collect.DependencyCollectorDelegateTestSupport; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.test.util.DependencyGraphParser; import org.eclipse.aether.util.graph.manager.TransitiveDependencyManager; import org.eclipse.aether.util.graph.selector.ExclusionDependencySelector; @@ -67,6 +68,7 @@ protected void setupCollector() collector.setArtifactDescriptorReader( newReader( "" ) ); collector.setVersionRangeResolver( new StubVersionRangeResolver() ); collector.setRemoteRepositoryManager( new StubRemoteRepositoryManager() ); + ((BfDependencyCollector) collector).setResolverExecutorService( new DefaultResolverExecutorService() ); } private Dependency newDep( String coords, String scope, Collection exclusions ) diff --git a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java new file mode 100644 index 000000000..e90f0348d --- /dev/null +++ b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java @@ -0,0 +1,60 @@ +package org.eclipse.aether.spi.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * Component providing shared {@link java.util.concurrent.Executor}-like service across resolver. + * + * @since 1.9.0 + */ +public interface ResolverExecutor +{ + /** + * Submits a batch of {@link Runnable} tasks for execution. If collection has size greater than 1, this method + * will submit all tasks just like {@link #submit(Runnable)} does. Otherwise, "direct", on caller thread execution + * happens. Several resolver components may deal "sequentially" with tasks and rely on this behaviour for + * performance purposes. + *

+ * Error handling: this method will never throw. If you are interested in possible outcome of submitted + * {@link Runnable} use some helper like the {@code RunnableErrorForwarder} in resolver utilities module. + */ + void submitBatch( Collection batch ); + + /** + * Submits a {@link Runnable} task for execution. This call may block if thread pool is full. In certain + * circumstances this method may choose to directly invoke task (on caller thread) instead to submit it. + *

+ * Error handling: this method will never throw. If you are interested in possible outcome of submitted + * {@link Runnable} use some helper like the {@code RunnableErrorForwarder} in resolver utilities module. + */ + void submit( Runnable task ); + + /** + * Submits a {@link Callable} task for execution. This call may block if thread pool is full. In certain + * circumstances this method may choose to directly invoke task (on caller thread) instead to submit it. + *

+ * Error handling: this method will never throw. + */ + Future submit( Callable task ); +} diff --git a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java new file mode 100644 index 000000000..e7d93fef6 --- /dev/null +++ b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java @@ -0,0 +1,61 @@ +package org.eclipse.aether.spi.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.eclipse.aether.RepositorySystemSession; + +/** + * Component providing {@link ResolverExecutor} instances on demand. + * + * @since 1.9.0 + */ +public interface ResolverExecutorService +{ + /** + * A hierarchical key to identify use cases for executors. Same {@link Key} designates same + * {@link ResolverExecutor}. + */ + interface Key + { + String asString(); + } + + /** + * Creates service key with given parameters. + * + * @param service The service that is to use executor, never {@code null}. + * @param discriminators Potential (sub) discriminators, if needed. + */ + Key getKey( Class service, String... discriminators ); + + /** + * Returns a resolver executor for requester service. The {@code service} parameter is used as "key", meaning + * multiple services using same "key" will share the executor as well. The very first invocation of this method + * may create thread pool as well (using {@code maxThreads} parameter), and subsequent calls with different + * parameter will reuse it, hence the parameter may be neglected. + *

+ * None of parameters may be {@code null}. + * + * @param session The session. + * @param key A key for service, multiple components using same key will share same executor. + * @param maxThreads The count of configured threads (must be bigger than zero). + */ + ResolverExecutor getResolverExecutor( RepositorySystemSession session, Key key, int maxThreads ); +} From d5f7403d990956f7a6974389ee1ba342c04e946a Mon Sep 17 00:00:00 2001 From: Tamas Cservenak Date: Tue, 8 Nov 2022 11:14:37 +0100 Subject: [PATCH 2/3] Do not SHARE --- .../basic/BasicRepositoryConnector.java | 9 ++- .../impl/DefaultMetadataResolver.java | 13 +-- .../collect/bf/BfDependencyCollector.java | 79 ++++++++++--------- .../concurrency/DefaultResolverExecutor.java | 9 +-- .../DefaultResolverExecutorService.java | 38 ++++----- .../spi/concurrency/ResolverExecutor.java | 23 +++--- .../concurrency/ResolverExecutorService.java | 23 ++---- 7 files changed, 91 insertions(+), 103 deletions(-) diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java index 27db76e7f..203a5e515 100644 --- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java +++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java @@ -147,13 +147,13 @@ final class BasicRepositoryConnector this.fileProcessor = fileProcessor; this.providedChecksumsSources = providedChecksumsSources; this.closed = new AtomicBoolean( false ); - this.resolverExecutor = resolverExecutorService.getResolverExecutor( session, - resolverExecutorService.getKey( RepositoryConnector.class, repository.getId() ), + this.resolverExecutor = resolverExecutorService.getResolverExecutor( + resolverExecutorService.getName( BasicRepositoryConnector.class, repository.getId() ), ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS, "maven.artifact.threads" ) ); - smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); - persistedChecksums = + this.smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); + this.persistedChecksums = ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS, ConfigurationProperties.PERSISTED_CHECKSUMS ); } @@ -177,6 +177,7 @@ public void close() { if ( closed.compareAndSet( false, true ) ) { + resolverExecutor.close(); transporter.close(); } } diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java index 390dd925f..facd66e9a 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java @@ -43,6 +43,7 @@ import org.eclipse.aether.impl.RemoteRepositoryManager; import org.eclipse.aether.impl.RepositoryConnectorProvider; import org.eclipse.aether.impl.RepositoryEventDispatcher; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.filter.RemoteRepositoryFilter; import org.eclipse.aether.spi.synccontext.SyncContextFactory; @@ -398,11 +399,13 @@ else if ( exception == null ) runnable.add( errorForwarder.wrap( task ) ); } - resolverExecutorService.getResolverExecutor( session, - resolverExecutorService.getKey( MetadataResolver.class ), - ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS ) ) - .submitBatch( runnable ); - errorForwarder.await(); + try ( ResolverExecutor resolverExecutor = resolverExecutorService.getResolverExecutor( + resolverExecutorService.getName( DefaultMetadataResolver.class ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS ) ) ) + { + resolverExecutor.submitBatch( runnable ); + errorForwarder.await(); + } for ( ResolveTask task : tasks ) { diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java index e165d99e5..ca246c77a 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java @@ -176,49 +176,52 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr logger.debug( "Collector skip mode enabled" ); } - Args args = - new Args( session, pool, context, versionContext, request, - useSkip ? DependencyResolutionSkipper.defaultSkipper() - : DependencyResolutionSkipper.neverSkipper(), - new ParallelDescriptorResolver( resolverExecutorService.getResolverExecutor( session, - resolverExecutorService.getKey( DependencyCollector.class ), - ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, - CONFIG_PROP_THREADS, "maven.artifact.threads" ) ) ) ); - - DependencySelector rootDepSelector = session.getDependencySelector() != null - ? session.getDependencySelector().deriveChildSelector( context ) : null; - DependencyManager rootDepManager = session.getDependencyManager() != null - ? session.getDependencyManager().deriveChildManager( context ) : null; - DependencyTraverser rootDepTraverser = session.getDependencyTraverser() != null - ? session.getDependencyTraverser().deriveChildTraverser( context ) : null; - VersionFilter rootVerFilter = session.getVersionFilter() != null - ? session.getVersionFilter().deriveChildFilter( context ) : null; - - List parents = Collections.singletonList( node ); - for ( Dependency dependency : dependencies ) + try ( ResolverExecutor resolverExecutor = resolverExecutorService.getResolverExecutor( + resolverExecutorService.getName( BfDependencyCollector.class ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, + CONFIG_PROP_THREADS, "maven.artifact.threads" ) ) ) { - RequestTrace childTrace = collectStepTrace( trace, args.request.getRequestContext(), parents, - dependency ); - DependencyProcessingContext processingContext = - new DependencyProcessingContext( rootDepSelector, rootDepManager, rootDepTraverser, - rootVerFilter, childTrace, repositories, managedDependencies, parents, dependency, - PremanagedDependency.create( rootDepManager, dependency, - false, args.premanagedState ) ); - if ( !filter( processingContext ) ) + Args args = + new Args( session, pool, context, versionContext, request, + useSkip ? DependencyResolutionSkipper.defaultSkipper() + : DependencyResolutionSkipper.neverSkipper(), + new ParallelDescriptorResolver( resolverExecutor ) ); + + DependencySelector rootDepSelector = session.getDependencySelector() != null + ? session.getDependencySelector().deriveChildSelector( context ) : null; + DependencyManager rootDepManager = session.getDependencyManager() != null + ? session.getDependencyManager().deriveChildManager( context ) : null; + DependencyTraverser rootDepTraverser = session.getDependencyTraverser() != null + ? session.getDependencyTraverser().deriveChildTraverser( context ) : null; + VersionFilter rootVerFilter = session.getVersionFilter() != null + ? session.getVersionFilter().deriveChildFilter( context ) : null; + + List parents = Collections.singletonList( node ); + for ( Dependency dependency : dependencies ) { - processingContext.withDependency( processingContext.premanagedDependency.getManagedDependency() ); - resolveArtifactDescriptorAsync( args, processingContext, results ); - args.dependencyProcessingQueue.add( processingContext ); + RequestTrace childTrace = collectStepTrace( trace, args.request.getRequestContext(), parents, + dependency ); + DependencyProcessingContext processingContext = + new DependencyProcessingContext( rootDepSelector, rootDepManager, rootDepTraverser, + rootVerFilter, childTrace, repositories, managedDependencies, parents, dependency, + PremanagedDependency.create( rootDepManager, dependency, + false, args.premanagedState ) ); + if ( !filter( processingContext ) ) + { + processingContext.withDependency( processingContext.premanagedDependency.getManagedDependency() ); + resolveArtifactDescriptorAsync( args, processingContext, results ); + args.dependencyProcessingQueue.add( processingContext ); + } } - } - while ( !args.dependencyProcessingQueue.isEmpty() ) - { - processDependency( args, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), - false ); - } + while ( !args.dependencyProcessingQueue.isEmpty() ) + { + processDependency( args, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), + false ); + } - args.skipper.report(); + args.skipper.report(); + } } @SuppressWarnings( "checkstyle:parameternumber" ) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java index 34a54edb9..15643bd30 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java @@ -63,17 +63,16 @@ public void submitBatch( Collection tasks ) } @Override - public void submit( Runnable task ) + public Future submit( Callable task ) { requireNonNull( task ); - submit( Executors.callable( task ) ); + return executorService == null ? directlyExecute( task ) : executorService.submit( task ); } @Override - public Future submit( Callable task ) + public void close() { - requireNonNull( task ); - return executorService == null ? directlyExecute( task ) : executorService.submit( task ); + executorService.shutdown(); } private static Future directlyExecute( Callable task ) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java index 648b82472..82e386509 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java @@ -27,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.spi.concurrency.ResolverExecutor; import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.util.concurrency.WorkerThreadFactory; @@ -37,30 +36,24 @@ /** * Default implementation of {@link ResolverExecutor}. - *

- * This implementation uses {@link RepositorySystemSession#getData()} to store created {@link ExecutorService} - * instances. It creates instances that may be eventually garbage collected, so no explicit shutdown happens on - * them. When {@code maxThreads} parameter is 1 (accepted values are greater than zero), this implementation assumes - * caller wants "direct execution" (on caller thread) and creates {@link ResolverExecutor} instances accordingly. */ @Singleton @Named public final class DefaultResolverExecutorService implements ResolverExecutorService { @Override - public Key getKey( Class service, String... discriminators ) + public Name getName( Class service, String... discriminators ) { requireNonNull( service ); - return new KeyImpl( DefaultResolverExecutorService.class.getName() + return new NameImpl( DefaultResolverExecutorService.class.getName() + "-" + service.getSimpleName() + String.join( "-", discriminators ) ); } @Override - public ResolverExecutor getResolverExecutor( RepositorySystemSession session, Key key, int maxThreads ) + public ResolverExecutor getResolverExecutor( Name name, int maxThreads ) { - requireNonNull( session ); - requireNonNull( key ); + requireNonNull( name ); if ( maxThreads < 1 ) { throw new IllegalArgumentException( "maxThreads must be greater than zero" ); @@ -73,8 +66,7 @@ public ResolverExecutor getResolverExecutor( RepositorySystemSession session, Ke } else // shared && pooled { - executorService = (ExecutorService) session.getData() - .computeIfAbsent( key, () -> createExecutorService( key, maxThreads ) ); + executorService = createExecutorService( name, maxThreads ); } return new DefaultResolverExecutor( executorService ); } @@ -84,32 +76,32 @@ public ResolverExecutor getResolverExecutor( RepositorySystemSession session, Ke * for proper garbage collection. This is important detail, as these instances are kept within session data, and * currently there is no way to shut down them. */ - private ExecutorService createExecutorService( Key key, int maxThreads ) + private ExecutorService createExecutorService( Name name, int maxThreads ) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), - new WorkerThreadFactory( key.asString() + "-" ) + new WorkerThreadFactory( name.asString() + "-" ) ); threadPoolExecutor.allowCoreThreadTimeOut( true ); return threadPoolExecutor; } - private static class KeyImpl implements Key + private static class NameImpl implements Name { - private final String keyString; + private final String nameString; - private KeyImpl( String keyString ) + private NameImpl( String nameString ) { - this.keyString = keyString; + this.nameString = nameString; } @Override public String asString() { - return keyString; + return nameString; } @Override @@ -123,14 +115,14 @@ public boolean equals( Object o ) { return false; } - KeyImpl key = (KeyImpl) o; - return keyString.equals( key.keyString ); + NameImpl other = (NameImpl) o; + return nameString.equals( other.nameString ); } @Override public int hashCode() { - return hash( keyString ); + return hash( nameString ); } } } \ No newline at end of file diff --git a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java index e90f0348d..8d8ec855c 100644 --- a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java +++ b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java @@ -19,20 +19,22 @@ * under the License. */ +import java.io.Closeable; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.Future; /** - * Component providing shared {@link java.util.concurrent.Executor}-like service across resolver. + * Component providing {@link java.util.concurrent.Executor}-like service across resolver. Instances are to be treated + * like resources, best in try-with-resource constructs. * * @since 1.9.0 */ -public interface ResolverExecutor +public interface ResolverExecutor extends Closeable { /** * Submits a batch of {@link Runnable} tasks for execution. If collection has size greater than 1, this method - * will submit all tasks just like {@link #submit(Runnable)} does. Otherwise, "direct", on caller thread execution + * will submit all tasks just like {@link #submit(Callable)} does. Otherwise, "direct", on caller thread execution * happens. Several resolver components may deal "sequentially" with tasks and rely on this behaviour for * performance purposes. *

@@ -41,15 +43,6 @@ public interface ResolverExecutor */ void submitBatch( Collection batch ); - /** - * Submits a {@link Runnable} task for execution. This call may block if thread pool is full. In certain - * circumstances this method may choose to directly invoke task (on caller thread) instead to submit it. - *

- * Error handling: this method will never throw. If you are interested in possible outcome of submitted - * {@link Runnable} use some helper like the {@code RunnableErrorForwarder} in resolver utilities module. - */ - void submit( Runnable task ); - /** * Submits a {@link Callable} task for execution. This call may block if thread pool is full. In certain * circumstances this method may choose to directly invoke task (on caller thread) instead to submit it. @@ -57,4 +50,10 @@ public interface ResolverExecutor * Error handling: this method will never throw. */ Future submit( Callable task ); + + /** + * Caller notifies that is not using this instance anymore, it is up to implementation to shut it down, or do + * whatever is needed. + */ + void close(); } diff --git a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java index e7d93fef6..f41f3a311 100644 --- a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java +++ b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java @@ -19,8 +19,6 @@ * under the License. */ -import org.eclipse.aether.RepositorySystemSession; - /** * Component providing {@link ResolverExecutor} instances on demand. * @@ -29,33 +27,26 @@ public interface ResolverExecutorService { /** - * A hierarchical key to identify use cases for executors. Same {@link Key} designates same - * {@link ResolverExecutor}. + * A hierarchical name to name threads for executors. */ - interface Key + interface Name { String asString(); } /** - * Creates service key with given parameters. + * Creates {@link Name} with given parameters. * * @param service The service that is to use executor, never {@code null}. * @param discriminators Potential (sub) discriminators, if needed. */ - Key getKey( Class service, String... discriminators ); + Name getName( Class service, String... discriminators ); /** - * Returns a resolver executor for requester service. The {@code service} parameter is used as "key", meaning - * multiple services using same "key" will share the executor as well. The very first invocation of this method - * may create thread pool as well (using {@code maxThreads} parameter), and subsequent calls with different - * parameter will reuse it, hence the parameter may be neglected. - *

- * None of parameters may be {@code null}. + * Returns a new resolver executor for requester service. * - * @param session The session. - * @param key A key for service, multiple components using same key will share same executor. + * @param name A key for service, multiple components using same key will share same executor. * @param maxThreads The count of configured threads (must be bigger than zero). */ - ResolverExecutor getResolverExecutor( RepositorySystemSession session, Key key, int maxThreads ); + ResolverExecutor getResolverExecutor( Name name, int maxThreads ); } From 39970d0dbe23d959b0a43d0d944edc2bf31b00d3 Mon Sep 17 00:00:00 2001 From: Tamas Cservenak Date: Tue, 8 Nov 2022 11:18:57 +0100 Subject: [PATCH 3/3] runnable -> runnables --- .../aether/connector/basic/BasicRepositoryConnector.java | 8 ++++---- .../aether/internal/impl/DefaultMetadataResolver.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java index 203a5e515..751b9e736 100644 --- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java +++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java @@ -200,7 +200,7 @@ public void get( Collection artifactDownloads, List checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories(); Collection mds = safe( metadataDownloads ); Collection ads = safe( artifactDownloads ); - ArrayList runnable = new ArrayList<>( mds.size() + ads.size() ); + ArrayList runnables = new ArrayList<>( mds.size() + ads.size() ); for ( MetadataDownload transfer : mds ) { @@ -219,7 +219,7 @@ public void get( Collection artifactDownloads, Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksumAlgorithmFactories, checksumLocations, null, listener ); - runnable.add( errorForwarder.wrap( task ) ); + runnables.add( errorForwarder.wrap( task ) ); } for ( ArtifactDownload transfer : ads ) @@ -260,10 +260,10 @@ public void get( Collection artifactDownloads, task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksumAlgorithmFactories, checksumLocations, providedChecksums, listener ); } - runnable.add( errorForwarder.wrap( task ) ); + runnables.add( errorForwarder.wrap( task ) ); } - resolverExecutor.submitBatch( runnable ); + resolverExecutor.submitBatch( runnables ); errorForwarder.await(); } diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java index facd66e9a..300f127d8 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java @@ -393,17 +393,17 @@ else if ( exception == null ) if ( !tasks.isEmpty() ) { RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); - ArrayList runnable = new ArrayList<>( tasks.size() ); + ArrayList runnables = new ArrayList<>( tasks.size() ); for ( ResolveTask task : tasks ) { - runnable.add( errorForwarder.wrap( task ) ); + runnables.add( errorForwarder.wrap( task ) ); } try ( ResolverExecutor resolverExecutor = resolverExecutorService.getResolverExecutor( resolverExecutorService.getName( DefaultMetadataResolver.class ), ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS ) ) ) { - resolverExecutor.submitBatch( runnable ); + resolverExecutor.submitBatch( runnables ); errorForwarder.await(); }