diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java index 4c6ab9f49..751b9e736 100644 --- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java +++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java @@ -31,17 +31,14 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.aether.ConfigurationProperties; import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.RequestTrace; import org.eclipse.aether.repository.RemoteRepository; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.ArtifactDownload; import org.eclipse.aether.spi.connector.ArtifactUpload; import org.eclipse.aether.spi.connector.MetadataDownload; @@ -70,7 +67,6 @@ import org.eclipse.aether.util.ConfigUtils; import org.eclipse.aether.util.FileUtils; import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; -import org.eclipse.aether.util.concurrency.WorkerThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,14 +77,24 @@ final class BasicRepositoryConnector implements RepositoryConnector { + /** + * The count of threads to be used when downloading artifacts in parallel, default value 5. + */ private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads"; + /** + * The default value for {@link #CONFIG_PROP_THREADS}. + */ + private static final int CONFIG_PROP_THREADS_DEFAULT = 5; + private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums"; private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class ); private final Map providedChecksumsSources; + private final ResolverExecutor resolverExecutor; + private final FileProcessor fileProcessor; private final RemoteRepository repository; @@ -101,23 +107,21 @@ final class BasicRepositoryConnector private final ChecksumPolicyProvider checksumPolicyProvider; - private final int maxThreads; - private final boolean smartChecksums; private final boolean persistedChecksums; - private Executor executor; - private final AtomicBoolean closed; + @SuppressWarnings( "checkstyle:parameternumber" ) BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository, TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider, ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor, - Map providedChecksumsSources ) + Map providedChecksumsSources, + ResolverExecutorService resolverExecutorService ) throws NoRepositoryConnectorException { try @@ -143,36 +147,17 @@ final class BasicRepositoryConnector this.fileProcessor = fileProcessor; this.providedChecksumsSources = providedChecksumsSources; this.closed = new AtomicBoolean( false ); + this.resolverExecutor = resolverExecutorService.getResolverExecutor( + resolverExecutorService.getName( BasicRepositoryConnector.class, repository.getId() ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS, + "maven.artifact.threads" ) ); - maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" ); - smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); - persistedChecksums = + this.smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); + this.persistedChecksums = ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS, ConfigurationProperties.PERSISTED_CHECKSUMS ); } - private Executor getExecutor( Collection artifacts, Collection metadatas ) - { - if ( maxThreads <= 1 ) - { - return DirectExecutor.INSTANCE; - } - int tasks = safe( artifacts ).size() + safe( metadatas ).size(); - if ( tasks <= 1 ) - { - return DirectExecutor.INSTANCE; - } - if ( executor == null ) - { - executor = - new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new WorkerThreadFactory( getClass().getSimpleName() + '-' - + repository.getHost() + '-' ) ); - } - return executor; - } - @Override protected void finalize() throws Throwable @@ -192,10 +177,7 @@ public void close() { if ( closed.compareAndSet( false, true ) ) { - if ( executor instanceof ExecutorService ) - { - ( (ExecutorService) executor ).shutdown(); - } + resolverExecutor.close(); transporter.close(); } } @@ -214,11 +196,13 @@ public void get( Collection artifactDownloads, { failIfClosed(); - Executor executor = getExecutor( artifactDownloads, metadataDownloads ); RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); List checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories(); + Collection mds = safe( metadataDownloads ); + Collection ads = safe( artifactDownloads ); + ArrayList runnables = new ArrayList<>( mds.size() + ads.size() ); - for ( MetadataDownload transfer : safe( metadataDownloads ) ) + for ( MetadataDownload transfer : mds ) { URI location = layout.getLocation( transfer.getMetadata(), false ); @@ -235,10 +219,10 @@ public void get( Collection artifactDownloads, Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksumAlgorithmFactories, checksumLocations, null, listener ); - executor.execute( errorForwarder.wrap( task ) ); + runnables.add( errorForwarder.wrap( task ) ); } - for ( ArtifactDownload transfer : safe( artifactDownloads ) ) + for ( ArtifactDownload transfer : ads ) { Map providedChecksums = Collections.emptyMap(); for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() ) @@ -276,9 +260,10 @@ public void get( Collection artifactDownloads, task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksumAlgorithmFactories, checksumLocations, providedChecksums, listener ); } - executor.execute( errorForwarder.wrap( task ) ); + runnables.add( errorForwarder.wrap( task ) ); } + resolverExecutor.submitBatch( runnables ); errorForwarder.await(); } @@ -622,18 +607,4 @@ private void uploadChecksum( URI location, Object checksum ) } } - - private static class DirectExecutor - implements Executor - { - - static final Executor INSTANCE = new DirectExecutor(); - - @Override - public void execute( Runnable command ) - { - command.run(); - } - - } } diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java index 576d12160..08f6d1c2e 100644 --- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java +++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnectorFactory.java @@ -29,6 +29,7 @@ import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.repository.RemoteRepository; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.RepositoryConnector; import org.eclipse.aether.spi.connector.RepositoryConnectorFactory; import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; @@ -59,6 +60,8 @@ public final class BasicRepositoryConnectorFactory private Map providedChecksumsSources; + private ResolverExecutorService resolverExecutorService; + private float priority; /** @@ -76,13 +79,15 @@ public BasicRepositoryConnectorFactory() RepositoryLayoutProvider layoutProvider, ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor, - Map providedChecksumsSources ) + Map providedChecksumsSources, + ResolverExecutorService resolverExecutorService ) { setTransporterProvider( transporterProvider ); setRepositoryLayoutProvider( layoutProvider ); setChecksumPolicyProvider( checksumPolicyProvider ); setFileProcessor( fileProcessor ); setProvidedChecksumSources( providedChecksumsSources ); + setResolverExecutorService( resolverExecutorService ); } public void initService( ServiceLocator locator ) @@ -92,6 +97,7 @@ public void initService( ServiceLocator locator ) setChecksumPolicyProvider( locator.getService( ChecksumPolicyProvider.class ) ); setFileProcessor( locator.getService( FileProcessor.class ) ); setProvidedChecksumSources( Collections.emptyMap() ); + setResolverExecutorService( locator.getService( ResolverExecutorService.class ) ); } /** @@ -159,6 +165,21 @@ public BasicRepositoryConnectorFactory setProvidedChecksumSources( return this; } + /** + * Sets the resolver executor to use for this component. + * + * @param resolverExecutorService The resolver executor to use, must not be {@code null}. + * @return This component for chaining, never {@code null}. + * @since 1.9.0 + */ + public BasicRepositoryConnectorFactory setResolverExecutorService( ResolverExecutorService resolverExecutorService ) + { + this.resolverExecutorService = requireNonNull( + resolverExecutorService, "resolver executor service cannot be null" + ); + return this; + } + public float getPriority() { return priority; @@ -183,7 +204,7 @@ public RepositoryConnector newInstance( RepositorySystemSession session, RemoteR requireNonNull( repository, "repository cannot be null" ); return new BasicRepositoryConnector( session, repository, transporterProvider, layoutProvider, - checksumPolicyProvider, fileProcessor, providedChecksumsSources ); + checksumPolicyProvider, fileProcessor, providedChecksumsSources, resolverExecutorService ); } } diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java index 69cfc2c83..0ce6c54cd 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/DefaultServiceLocator.java @@ -57,9 +57,11 @@ import org.eclipse.aether.internal.impl.EnhancedLocalRepositoryManagerFactory; import org.eclipse.aether.internal.impl.Maven2RepositoryLayoutFactory; import org.eclipse.aether.internal.impl.SimpleLocalRepositoryManagerFactory; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.impl.filter.DefaultRemoteRepositoryFilterManager; import org.eclipse.aether.internal.impl.slf4j.Slf4jLoggerFactory; import org.eclipse.aether.internal.impl.synccontext.DefaultSyncContextFactory; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactorySelector; import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; import org.eclipse.aether.spi.connector.layout.RepositoryLayoutFactory; @@ -231,6 +233,7 @@ public DefaultServiceLocator() addService( LocalPathComposer.class, DefaultLocalPathComposer.class ); addService( RemoteRepositoryFilterManager.class, DefaultRemoteRepositoryFilterManager.class ); addService( RepositorySystemLifecycle.class, DefaultRepositorySystemLifecycle.class ); + addService( ResolverExecutorService.class, DefaultResolverExecutorService.class ); } private Entry getEntry( Class type, boolean create ) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java index 54e047889..5ff3b39b9 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/impl/guice/AetherModule.java @@ -60,6 +60,7 @@ import org.eclipse.aether.internal.impl.collect.DependencyCollectorDelegate; import org.eclipse.aether.internal.impl.collect.bf.BfDependencyCollector; import org.eclipse.aether.internal.impl.collect.df.DfDependencyCollector; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.impl.filter.DefaultRemoteRepositoryFilterManager; import org.eclipse.aether.internal.impl.filter.GroupIdRemoteRepositoryFilterSource; import org.eclipse.aether.internal.impl.filter.PrefixesRemoteRepositoryFilterSource; @@ -102,6 +103,7 @@ import org.eclipse.aether.internal.impl.slf4j.Slf4jLoggerFactory; import org.eclipse.aether.named.providers.NoopNamedLockFactory; import org.eclipse.aether.spi.checksums.TrustedChecksumsSource; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.checksum.ProvidedChecksumsSource; import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactorySelector; import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider; @@ -229,6 +231,9 @@ protected void configure() bind( RepositorySystemLifecycle.class ) .to( DefaultRepositorySystemLifecycle.class ).in( Singleton.class ); + bind( ResolverExecutorService.class ) + .to( DefaultResolverExecutorService.class ).in( Singleton.class ); + bind( NamedLockFactorySelector.class ).toInstance( new ParameterizedNamedLockFactorySelector() ); bind( SyncContextFactory.class ).to( DefaultSyncContextFactory.class ).in( Singleton.class ); bind( org.eclipse.aether.impl.SyncContextFactory.class ) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java index e56048341..300f127d8 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java @@ -27,11 +27,6 @@ import java.util.List; import java.util.Map; import static java.util.Objects.requireNonNull; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.inject.Named; @@ -48,6 +43,8 @@ import org.eclipse.aether.impl.RemoteRepositoryManager; import org.eclipse.aether.impl.RepositoryConnectorProvider; import org.eclipse.aether.impl.RepositoryEventDispatcher; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.connector.filter.RemoteRepositoryFilter; import org.eclipse.aether.spi.synccontext.SyncContextFactory; import org.eclipse.aether.impl.UpdateCheck; @@ -73,7 +70,6 @@ import org.eclipse.aether.transfer.RepositoryOfflineException; import org.eclipse.aether.util.ConfigUtils; import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; -import org.eclipse.aether.util.concurrency.WorkerThreadFactory; /** */ @@ -83,8 +79,16 @@ public class DefaultMetadataResolver implements MetadataResolver, Service { + /** + * The count of threads to be used when resolving metadata in parallel, default value 4. + */ private static final String CONFIG_PROP_THREADS = "aether.metadataResolver.threads"; + /** + * The default value for {@link #CONFIG_PROP_THREADS}. + */ + private static final int CONFIG_PROP_THREADS_DEFAULT = 4; + private RepositoryEventDispatcher repositoryEventDispatcher; private UpdateCheckManager updateCheckManager; @@ -99,18 +103,22 @@ public class DefaultMetadataResolver private RemoteRepositoryFilterManager remoteRepositoryFilterManager; + private ResolverExecutorService resolverExecutorService; + public DefaultMetadataResolver() { // enables default constructor } + @SuppressWarnings( "checkstyle:parameternumber" ) @Inject DefaultMetadataResolver( RepositoryEventDispatcher repositoryEventDispatcher, UpdateCheckManager updateCheckManager, RepositoryConnectorProvider repositoryConnectorProvider, RemoteRepositoryManager remoteRepositoryManager, SyncContextFactory syncContextFactory, OfflineController offlineController, - RemoteRepositoryFilterManager remoteRepositoryFilterManager ) + RemoteRepositoryFilterManager remoteRepositoryFilterManager, + ResolverExecutorService resolverExecutorService ) { setRepositoryEventDispatcher( repositoryEventDispatcher ); setUpdateCheckManager( updateCheckManager ); @@ -119,6 +127,7 @@ public DefaultMetadataResolver() setSyncContextFactory( syncContextFactory ); setOfflineController( offlineController ); setRemoteRepositoryFilterManager( remoteRepositoryFilterManager ); + setResolverExecutorService( resolverExecutorService ); } public void initService( ServiceLocator locator ) @@ -130,6 +139,7 @@ public void initService( ServiceLocator locator ) setSyncContextFactory( locator.getService( SyncContextFactory.class ) ); setOfflineController( locator.getService( OfflineController.class ) ); setRemoteRepositoryFilterManager( locator.getService( RemoteRepositoryFilterManager.class ) ); + setResolverExecutorService( locator.getService( ResolverExecutorService.class ) ); } public DefaultMetadataResolver setRepositoryEventDispatcher( RepositoryEventDispatcher repositoryEventDispatcher ) @@ -180,6 +190,14 @@ public DefaultMetadataResolver setRemoteRepositoryFilterManager( return this; } + public DefaultMetadataResolver setResolverExecutorService( ResolverExecutorService resolverExecutorService ) + { + this.resolverExecutorService = requireNonNull( resolverExecutorService, + "resolver executor service cannot be null" ); + return this; + } + + @Override public List resolveMetadata( RepositorySystemSession session, Collection requests ) { @@ -374,41 +392,39 @@ else if ( exception == null ) if ( !tasks.isEmpty() ) { - int threads = ConfigUtils.getInteger( session, 4, CONFIG_PROP_THREADS ); - Executor executor = getExecutor( Math.min( tasks.size(), threads ) ); - try + RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); + ArrayList runnables = new ArrayList<>( tasks.size() ); + for ( ResolveTask task : tasks ) { - RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); - - for ( ResolveTask task : tasks ) - { - executor.execute( errorForwarder.wrap( task ) ); - } + runnables.add( errorForwarder.wrap( task ) ); + } + try ( ResolverExecutor resolverExecutor = resolverExecutorService.getResolverExecutor( + resolverExecutorService.getName( DefaultMetadataResolver.class ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS ) ) ) + { + resolverExecutor.submitBatch( runnables ); errorForwarder.await(); + } - for ( ResolveTask task : tasks ) + for ( ResolveTask task : tasks ) + { + /* + * NOTE: Touch after registration with local repo to ensure concurrent resolution is not + * rejected with "already updated" via session data when actual update to local repo is + * still pending. + */ + for ( UpdateCheck check : task.checks ) { - /* - * NOTE: Touch after registration with local repo to ensure concurrent resolution is not - * rejected with "already updated" via session data when actual update to local repo is - * still pending. - */ - for ( UpdateCheck check : task.checks ) - { - updateCheckManager.touchMetadata( task.session, check.setException( task.exception ) ); - } + updateCheckManager.touchMetadata( task.session, check.setException( task.exception ) ); + } - metadataDownloaded( session, task.trace, task.request.getMetadata(), task.request.getRepository(), - task.metadataFile, task.exception ); + metadataDownloaded( session, task.trace, task.request.getMetadata(), task.request.getRepository(), + task.metadataFile, task.exception ); - task.result.setException( task.exception ); - } - } - finally - { - shutdown( executor ); + task.result.setException( task.exception ); } + for ( ResolveTask task : tasks ) { Metadata metadata = task.request.getMetadata(); @@ -531,27 +547,6 @@ private void metadataDownloaded( RepositorySystemSession session, RequestTrace t repositoryEventDispatcher.dispatch( event.build() ); } - private Executor getExecutor( int threads ) - { - if ( threads <= 1 ) - { - return command -> command.run(); - } - else - { - return new ThreadPoolExecutor( threads, threads, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(), - new WorkerThreadFactory( null ) ); - } - } - - private void shutdown( Executor executor ) - { - if ( executor instanceof ExecutorService ) - { - ( (ExecutorService) executor ).shutdown(); - } - } - class ResolveTask implements Runnable { @@ -584,6 +579,7 @@ class ResolveTask this.checks = checks; } + @Override public void run() { Metadata metadata = request.getMetadata(); diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java index 7e9967699..ca246c77a 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java @@ -34,11 +34,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,6 +53,7 @@ import org.eclipse.aether.graph.Dependency; import org.eclipse.aether.graph.DependencyNode; import org.eclipse.aether.impl.ArtifactDescriptorReader; +import org.eclipse.aether.impl.DependencyCollector; import org.eclipse.aether.impl.RemoteRepositoryManager; import org.eclipse.aether.impl.VersionRangeResolver; import org.eclipse.aether.internal.impl.collect.DataPool; @@ -70,15 +67,16 @@ import org.eclipse.aether.resolution.ArtifactDescriptorResult; import org.eclipse.aether.resolution.VersionRangeRequest; import org.eclipse.aether.resolution.VersionRangeResult; +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; import org.eclipse.aether.spi.locator.Service; +import org.eclipse.aether.spi.locator.ServiceLocator; import org.eclipse.aether.util.ConfigUtils; import org.eclipse.aether.util.artifact.ArtifactIdUtils; -import org.eclipse.aether.util.concurrency.WorkerThreadFactory; import org.eclipse.aether.util.graph.manager.DependencyManagerUtils; import org.eclipse.aether.version.Version; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; import static org.eclipse.aether.internal.impl.collect.DefaultDependencyCycle.find; /** @@ -96,7 +94,9 @@ public class BfDependencyCollector /** * The key in the repository session's {@link RepositorySystemSession#getConfigProperties() * configuration properties} used to store a {@link Boolean} flag controlling the resolver's skip mode. - * + *

+ * Visible for testing. + * * @since 1.8.0 */ static final String CONFIG_PROP_SKIPPER = "aether.dependencyCollector.bf.skipper"; @@ -106,14 +106,23 @@ public class BfDependencyCollector * * @since 1.8.0 */ - static final boolean CONFIG_PROP_SKIPPER_DEFAULT = true; + private static final boolean CONFIG_PROP_SKIPPER_DEFAULT = true; /** * The count of threads to be used when collecting POMs in parallel, default value 5. * * @since 1.9.0 */ - static final String CONFIG_PROP_THREADS = "aether.dependencyCollector.bf.threads"; + private static final String CONFIG_PROP_THREADS = "aether.dependencyCollector.bf.threads"; + + /** + * The default value for {@link #CONFIG_PROP_THREADS}. + * + * @since 1.9.0 + */ + private static final int CONFIG_PROP_THREADS_DEFAULT = 5; + + private ResolverExecutorService resolverExecutorService; /** * Default ctor for SL. @@ -129,9 +138,25 @@ public BfDependencyCollector() @Inject BfDependencyCollector( RemoteRepositoryManager remoteRepositoryManager, ArtifactDescriptorReader artifactDescriptorReader, - VersionRangeResolver versionRangeResolver ) + VersionRangeResolver versionRangeResolver, + ResolverExecutorService resolverExecutorService ) { super( remoteRepositoryManager, artifactDescriptorReader, versionRangeResolver ); + setResolverExecutorService( resolverExecutorService ); + } + + @Override + public void initService( ServiceLocator locator ) + { + super.initService( locator ); + setResolverExecutorService( locator.getService( ResolverExecutorService.class ) ); + } + + public DependencyCollector setResolverExecutorService( ResolverExecutorService resolverExecutorService ) + { + this.resolverExecutorService = + requireNonNull( resolverExecutorService, "resolver executor service cannot be null" ); + return this; } @SuppressWarnings( "checkstyle:parameternumber" ) @@ -151,47 +176,52 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr logger.debug( "Collector skip mode enabled" ); } - Args args = - new Args( session, pool, context, versionContext, request, - useSkip ? DependencyResolutionSkipper.defaultSkipper() - : DependencyResolutionSkipper.neverSkipper(), - new ParallelDescriptorResolver( session ) ); - - DependencySelector rootDepSelector = session.getDependencySelector() != null - ? session.getDependencySelector().deriveChildSelector( context ) : null; - DependencyManager rootDepManager = session.getDependencyManager() != null - ? session.getDependencyManager().deriveChildManager( context ) : null; - DependencyTraverser rootDepTraverser = session.getDependencyTraverser() != null - ? session.getDependencyTraverser().deriveChildTraverser( context ) : null; - VersionFilter rootVerFilter = session.getVersionFilter() != null - ? session.getVersionFilter().deriveChildFilter( context ) : null; - - List parents = Collections.singletonList( node ); - for ( Dependency dependency : dependencies ) + try ( ResolverExecutor resolverExecutor = resolverExecutorService.getResolverExecutor( + resolverExecutorService.getName( BfDependencyCollector.class ), + ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, + CONFIG_PROP_THREADS, "maven.artifact.threads" ) ) ) { - RequestTrace childTrace = collectStepTrace( trace, args.request.getRequestContext(), parents, - dependency ); - DependencyProcessingContext processingContext = - new DependencyProcessingContext( rootDepSelector, rootDepManager, rootDepTraverser, - rootVerFilter, childTrace, repositories, managedDependencies, parents, dependency, - PremanagedDependency.create( rootDepManager, dependency, - false, args.premanagedState ) ); - if ( !filter( processingContext ) ) + Args args = + new Args( session, pool, context, versionContext, request, + useSkip ? DependencyResolutionSkipper.defaultSkipper() + : DependencyResolutionSkipper.neverSkipper(), + new ParallelDescriptorResolver( resolverExecutor ) ); + + DependencySelector rootDepSelector = session.getDependencySelector() != null + ? session.getDependencySelector().deriveChildSelector( context ) : null; + DependencyManager rootDepManager = session.getDependencyManager() != null + ? session.getDependencyManager().deriveChildManager( context ) : null; + DependencyTraverser rootDepTraverser = session.getDependencyTraverser() != null + ? session.getDependencyTraverser().deriveChildTraverser( context ) : null; + VersionFilter rootVerFilter = session.getVersionFilter() != null + ? session.getVersionFilter().deriveChildFilter( context ) : null; + + List parents = Collections.singletonList( node ); + for ( Dependency dependency : dependencies ) { - processingContext.withDependency( processingContext.premanagedDependency.getManagedDependency() ); - resolveArtifactDescriptorAsync( args, processingContext, results ); - args.dependencyProcessingQueue.add( processingContext ); + RequestTrace childTrace = collectStepTrace( trace, args.request.getRequestContext(), parents, + dependency ); + DependencyProcessingContext processingContext = + new DependencyProcessingContext( rootDepSelector, rootDepManager, rootDepTraverser, + rootVerFilter, childTrace, repositories, managedDependencies, parents, dependency, + PremanagedDependency.create( rootDepManager, dependency, + false, args.premanagedState ) ); + if ( !filter( processingContext ) ) + { + processingContext.withDependency( processingContext.premanagedDependency.getManagedDependency() ); + resolveArtifactDescriptorAsync( args, processingContext, results ); + args.dependencyProcessingQueue.add( processingContext ); + } } - } - while ( !args.dependencyProcessingQueue.isEmpty() ) - { - processDependency( args, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), - false ); - } + while ( !args.dependencyProcessingQueue.isEmpty() ) + { + processDependency( args, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), + false ); + } - args.resolver.shutdown(); - args.skipper.report(); + args.skipper.report(); + } } @SuppressWarnings( "checkstyle:parameternumber" ) @@ -475,23 +505,22 @@ else if ( descriptorResult == DataPool.NO_DESCRIPTOR ) static class ParallelDescriptorResolver { - final ExecutorService executorService; + final ResolverExecutor executor; /** * Artifact ID -> Future of DescriptorResolutionResult */ final Map> results = new ConcurrentHashMap<>( 256 ); - final Logger logger = LoggerFactory.getLogger( getClass() ); - ParallelDescriptorResolver( RepositorySystemSession session ) + ParallelDescriptorResolver( ResolverExecutor executor ) { - this.executorService = getExecutorService( session ); + this.executor = executor; } void resolveDescriptors( Artifact artifact, Callable callable ) { results.computeIfAbsent( ArtifactIdUtils.toId( artifact ), - key -> this.executorService.submit( callable ) ); + key -> this.executor.submit( callable ) ); } void cacheVersionRangeDescriptor( Artifact artifact, DescriptorResolutionResult resolutionResult ) @@ -504,19 +533,6 @@ Future find( Artifact artifact ) { return results.get( ArtifactIdUtils.toId( artifact ) ); } - - void shutdown() - { - executorService.shutdown(); - } - - private ExecutorService getExecutorService( RepositorySystemSession session ) - { - int nThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" ); - logger.debug( "Created thread pool with {} threads to resolve descriptors.", nThreads ); - return new ThreadPoolExecutor( nThreads, nThreads, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), - new WorkerThreadFactory( getClass().getSimpleName() ) ); - } } static class DescriptorResolutionResult diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java new file mode 100644 index 000000000..15643bd30 --- /dev/null +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutor.java @@ -0,0 +1,92 @@ +package org.eclipse.aether.internal.impl.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.eclipse.aether.spi.concurrency.ResolverExecutor; + +import static java.util.Objects.requireNonNull; + +/** + * Default implementation of {@link ResolverExecutor}. + *

+ * It relies on ctor passed {@link ExecutorService} that may be {@code null}, in which case "direct invocation" (on + * caller thread) happens, otherwise the non-null executor service is used. + */ +final class DefaultResolverExecutor implements ResolverExecutor +{ + private final ExecutorService executorService; + + DefaultResolverExecutor( final ExecutorService executorService ) + { + this.executorService = executorService; + } + + @Override + public void submitBatch( Collection tasks ) + { + requireNonNull( tasks ); + if ( tasks.size() == 1 ) + { + directlyExecute( Executors.callable( tasks.iterator().next() ) ); + } + else + { + for ( Runnable task : tasks ) + { + submit( Executors.callable( task ) ); + } + } + } + + @Override + public Future submit( Callable task ) + { + requireNonNull( task ); + return executorService == null ? directlyExecute( task ) : executorService.submit( task ); + } + + @Override + public void close() + { + executorService.shutdown(); + } + + private static Future directlyExecute( Callable task ) + { + CompletableFuture future; + try + { + future = CompletableFuture.completedFuture( task.call() ); + } + catch ( Exception e ) + { + future = new CompletableFuture<>(); + future.completeExceptionally( e ); + } + return future; + } +} \ No newline at end of file diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java new file mode 100644 index 000000000..82e386509 --- /dev/null +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/concurrency/DefaultResolverExecutorService.java @@ -0,0 +1,128 @@ +package org.eclipse.aether.internal.impl.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.inject.Named; +import javax.inject.Singleton; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.aether.spi.concurrency.ResolverExecutor; +import org.eclipse.aether.spi.concurrency.ResolverExecutorService; +import org.eclipse.aether.util.concurrency.WorkerThreadFactory; + +import static java.util.Objects.hash; +import static java.util.Objects.requireNonNull; + +/** + * Default implementation of {@link ResolverExecutor}. + */ +@Singleton +@Named +public final class DefaultResolverExecutorService implements ResolverExecutorService +{ + @Override + public Name getName( Class service, String... discriminators ) + { + requireNonNull( service ); + return new NameImpl( DefaultResolverExecutorService.class.getName() + + "-" + service.getSimpleName() + + String.join( "-", discriminators ) ); + } + + @Override + public ResolverExecutor getResolverExecutor( Name name, int maxThreads ) + { + requireNonNull( name ); + if ( maxThreads < 1 ) + { + throw new IllegalArgumentException( "maxThreads must be greater than zero" ); + } + + final ExecutorService executorService; + if ( maxThreads == 1 ) // direct + { + executorService = null; + } + else // shared && pooled + { + executorService = createExecutorService( name, maxThreads ); + } + return new DefaultResolverExecutor( executorService ); + } + + /** + * Creates am {@link ExecutorService} that allows its core threads to die off in case of inactivity, and allows + * for proper garbage collection. This is important detail, as these instances are kept within session data, and + * currently there is no way to shut down them. + */ + private ExecutorService createExecutorService( Name name, int maxThreads ) + { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + maxThreads, + maxThreads, + 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new WorkerThreadFactory( name.asString() + "-" ) + ); + threadPoolExecutor.allowCoreThreadTimeOut( true ); + return threadPoolExecutor; + } + + private static class NameImpl implements Name + { + private final String nameString; + + private NameImpl( String nameString ) + { + this.nameString = nameString; + } + + @Override + public String asString() + { + return nameString; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + NameImpl other = (NameImpl) o; + return nameString.equals( other.nameString ); + } + + @Override + public int hashCode() + { + return hash( nameString ); + } + } +} \ No newline at end of file diff --git a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java index bb5003300..ca06b1882 100644 --- a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java +++ b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/DefaultMetadataResolverTest.java @@ -31,6 +31,7 @@ import java.util.Set; import org.eclipse.aether.DefaultRepositorySystemSession; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.impl.filter.DefaultRemoteRepositoryFilterManager; import org.eclipse.aether.internal.impl.filter.Filters; import org.eclipse.aether.internal.test.util.TestFileUtils; @@ -91,6 +92,7 @@ public void setup() resolver.setSyncContextFactory( new StubSyncContextFactory() ); resolver.setOfflineController( new DefaultOfflineController() ); resolver.setRemoteRepositoryFilterManager( remoteRepositoryFilterManager ); + resolver.setResolverExecutorService( new DefaultResolverExecutorService() ); repository = new RemoteRepository.Builder( "test-DMRT", "default", TestFileUtils.createTempDir().toURI().toURL().toString() ).build(); diff --git a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java index cd6aa1532..5568a4b8b 100644 --- a/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java +++ b/maven-resolver-impl/src/test/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollectorTest.java @@ -33,6 +33,7 @@ import org.eclipse.aether.internal.impl.StubRemoteRepositoryManager; import org.eclipse.aether.internal.impl.StubVersionRangeResolver; import org.eclipse.aether.internal.impl.collect.DependencyCollectorDelegateTestSupport; +import org.eclipse.aether.internal.impl.concurrency.DefaultResolverExecutorService; import org.eclipse.aether.internal.test.util.DependencyGraphParser; import org.eclipse.aether.util.graph.manager.TransitiveDependencyManager; import org.eclipse.aether.util.graph.selector.ExclusionDependencySelector; @@ -67,6 +68,7 @@ protected void setupCollector() collector.setArtifactDescriptorReader( newReader( "" ) ); collector.setVersionRangeResolver( new StubVersionRangeResolver() ); collector.setRemoteRepositoryManager( new StubRemoteRepositoryManager() ); + ((BfDependencyCollector) collector).setResolverExecutorService( new DefaultResolverExecutorService() ); } private Dependency newDep( String coords, String scope, Collection exclusions ) diff --git a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java new file mode 100644 index 000000000..8d8ec855c --- /dev/null +++ b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutor.java @@ -0,0 +1,59 @@ +package org.eclipse.aether.spi.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.Closeable; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * Component providing {@link java.util.concurrent.Executor}-like service across resolver. Instances are to be treated + * like resources, best in try-with-resource constructs. + * + * @since 1.9.0 + */ +public interface ResolverExecutor extends Closeable +{ + /** + * Submits a batch of {@link Runnable} tasks for execution. If collection has size greater than 1, this method + * will submit all tasks just like {@link #submit(Callable)} does. Otherwise, "direct", on caller thread execution + * happens. Several resolver components may deal "sequentially" with tasks and rely on this behaviour for + * performance purposes. + *

+ * Error handling: this method will never throw. If you are interested in possible outcome of submitted + * {@link Runnable} use some helper like the {@code RunnableErrorForwarder} in resolver utilities module. + */ + void submitBatch( Collection batch ); + + /** + * Submits a {@link Callable} task for execution. This call may block if thread pool is full. In certain + * circumstances this method may choose to directly invoke task (on caller thread) instead to submit it. + *

+ * Error handling: this method will never throw. + */ + Future submit( Callable task ); + + /** + * Caller notifies that is not using this instance anymore, it is up to implementation to shut it down, or do + * whatever is needed. + */ + void close(); +} diff --git a/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java new file mode 100644 index 000000000..f41f3a311 --- /dev/null +++ b/maven-resolver-spi/src/main/java/org/eclipse/aether/spi/concurrency/ResolverExecutorService.java @@ -0,0 +1,52 @@ +package org.eclipse.aether.spi.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Component providing {@link ResolverExecutor} instances on demand. + * + * @since 1.9.0 + */ +public interface ResolverExecutorService +{ + /** + * A hierarchical name to name threads for executors. + */ + interface Name + { + String asString(); + } + + /** + * Creates {@link Name} with given parameters. + * + * @param service The service that is to use executor, never {@code null}. + * @param discriminators Potential (sub) discriminators, if needed. + */ + Name getName( Class service, String... discriminators ); + + /** + * Returns a new resolver executor for requester service. + * + * @param name A key for service, multiple components using same key will share same executor. + * @param maxThreads The count of configured threads (must be bigger than zero). + */ + ResolverExecutor getResolverExecutor( Name name, int maxThreads ); +}