-
Notifications
You must be signed in to change notification settings - Fork 146
[MRESOLVER-283] Shared executor service #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,17 +31,14 @@ | |
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Executor; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| import org.eclipse.aether.ConfigurationProperties; | ||
| import org.eclipse.aether.RepositorySystemSession; | ||
| import org.eclipse.aether.RequestTrace; | ||
| import org.eclipse.aether.repository.RemoteRepository; | ||
| import org.eclipse.aether.spi.concurrency.ResolverExecutor; | ||
| import org.eclipse.aether.spi.concurrency.ResolverExecutorService; | ||
| import org.eclipse.aether.spi.connector.ArtifactDownload; | ||
| import org.eclipse.aether.spi.connector.ArtifactUpload; | ||
| import org.eclipse.aether.spi.connector.MetadataDownload; | ||
|
|
@@ -70,7 +67,6 @@ | |
| import org.eclipse.aether.util.ConfigUtils; | ||
| import org.eclipse.aether.util.FileUtils; | ||
| import org.eclipse.aether.util.concurrency.RunnableErrorForwarder; | ||
| import org.eclipse.aether.util.concurrency.WorkerThreadFactory; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -81,14 +77,24 @@ final class BasicRepositoryConnector | |
| implements RepositoryConnector | ||
| { | ||
|
|
||
| /** | ||
| * The count of threads to be used when downloading artifacts in parallel, default value 5. | ||
| */ | ||
| private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads"; | ||
|
|
||
| /** | ||
| * The default value for {@link #CONFIG_PROP_THREADS}. | ||
| */ | ||
| private static final int CONFIG_PROP_THREADS_DEFAULT = 5; | ||
|
|
||
| private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums"; | ||
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class ); | ||
|
|
||
| private final Map<String, ProvidedChecksumsSource> providedChecksumsSources; | ||
|
|
||
| private final ResolverExecutor resolverExecutor; | ||
|
|
||
| private final FileProcessor fileProcessor; | ||
|
|
||
| private final RemoteRepository repository; | ||
|
|
@@ -101,23 +107,21 @@ final class BasicRepositoryConnector | |
|
|
||
| private final ChecksumPolicyProvider checksumPolicyProvider; | ||
|
|
||
| private final int maxThreads; | ||
|
|
||
| private final boolean smartChecksums; | ||
|
|
||
| private final boolean persistedChecksums; | ||
|
|
||
| private Executor executor; | ||
|
|
||
| private final AtomicBoolean closed; | ||
|
|
||
| @SuppressWarnings( "checkstyle:parameternumber" ) | ||
| BasicRepositoryConnector( RepositorySystemSession session, | ||
| RemoteRepository repository, | ||
| TransporterProvider transporterProvider, | ||
| RepositoryLayoutProvider layoutProvider, | ||
| ChecksumPolicyProvider checksumPolicyProvider, | ||
| FileProcessor fileProcessor, | ||
| Map<String, ProvidedChecksumsSource> providedChecksumsSources ) | ||
| Map<String, ProvidedChecksumsSource> providedChecksumsSources, | ||
| ResolverExecutorService resolverExecutorService ) | ||
| throws NoRepositoryConnectorException | ||
| { | ||
| try | ||
|
|
@@ -143,36 +147,17 @@ final class BasicRepositoryConnector | |
| this.fileProcessor = fileProcessor; | ||
| this.providedChecksumsSources = providedChecksumsSources; | ||
| this.closed = new AtomicBoolean( false ); | ||
| this.resolverExecutor = resolverExecutorService.getResolverExecutor( | ||
| resolverExecutorService.getName( BasicRepositoryConnector.class, repository.getId() ), | ||
| ConfigUtils.getInteger( session, CONFIG_PROP_THREADS_DEFAULT, CONFIG_PROP_THREADS, | ||
| "maven.artifact.threads" ) ); | ||
|
|
||
| maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" ); | ||
| smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); | ||
| persistedChecksums = | ||
| this.smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS ); | ||
| this.persistedChecksums = | ||
| ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS, | ||
| ConfigurationProperties.PERSISTED_CHECKSUMS ); | ||
| } | ||
|
|
||
| private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas ) | ||
| { | ||
| if ( maxThreads <= 1 ) | ||
| { | ||
| return DirectExecutor.INSTANCE; | ||
| } | ||
| int tasks = safe( artifacts ).size() + safe( metadatas ).size(); | ||
| if ( tasks <= 1 ) | ||
| { | ||
| return DirectExecutor.INSTANCE; | ||
| } | ||
| if ( executor == null ) | ||
| { | ||
| executor = | ||
| new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS, | ||
| new LinkedBlockingQueue<>(), | ||
| new WorkerThreadFactory( getClass().getSimpleName() + '-' | ||
| + repository.getHost() + '-' ) ); | ||
| } | ||
| return executor; | ||
| } | ||
|
|
||
| @Override | ||
| protected void finalize() | ||
| throws Throwable | ||
|
|
@@ -192,10 +177,7 @@ public void close() | |
| { | ||
| if ( closed.compareAndSet( false, true ) ) | ||
| { | ||
| if ( executor instanceof ExecutorService ) | ||
| { | ||
| ( (ExecutorService) executor ).shutdown(); | ||
| } | ||
| resolverExecutor.close(); | ||
| transporter.close(); | ||
| } | ||
| } | ||
|
|
@@ -214,11 +196,13 @@ public void get( Collection<? extends ArtifactDownload> artifactDownloads, | |
| { | ||
| failIfClosed(); | ||
|
|
||
| Executor executor = getExecutor( artifactDownloads, metadataDownloads ); | ||
| RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); | ||
| List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories(); | ||
| Collection<? extends MetadataDownload> mds = safe( metadataDownloads ); | ||
| Collection<? extends ArtifactDownload> ads = safe( artifactDownloads ); | ||
| ArrayList<Runnable> runnables = new ArrayList<>( mds.size() + ads.size() ); | ||
|
|
||
| for ( MetadataDownload transfer : safe( metadataDownloads ) ) | ||
| for ( MetadataDownload transfer : mds ) | ||
| { | ||
| URI location = layout.getLocation( transfer.getMetadata(), false ); | ||
|
|
||
|
|
@@ -235,10 +219,10 @@ public void get( Collection<? extends ArtifactDownload> artifactDownloads, | |
|
|
||
| Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, | ||
| checksumAlgorithmFactories, checksumLocations, null, listener ); | ||
| executor.execute( errorForwarder.wrap( task ) ); | ||
| runnables.add( errorForwarder.wrap( task ) ); | ||
| } | ||
|
|
||
| for ( ArtifactDownload transfer : safe( artifactDownloads ) ) | ||
| for ( ArtifactDownload transfer : ads ) | ||
| { | ||
| Map<String, String> providedChecksums = Collections.emptyMap(); | ||
| for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() ) | ||
|
|
@@ -276,9 +260,10 @@ public void get( Collection<? extends ArtifactDownload> artifactDownloads, | |
| task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, | ||
| checksumAlgorithmFactories, checksumLocations, providedChecksums, listener ); | ||
| } | ||
| executor.execute( errorForwarder.wrap( task ) ); | ||
| runnables.add( errorForwarder.wrap( task ) ); | ||
| } | ||
|
|
||
| resolverExecutor.submitBatch( runnables ); | ||
| errorForwarder.await(); | ||
| } | ||
|
|
||
|
|
@@ -622,18 +607,4 @@ private void uploadChecksum( URI location, Object checksum ) | |
| } | ||
|
|
||
| } | ||
|
|
||
| private static class DirectExecutor | ||
| implements Executor | ||
| { | ||
|
|
||
| static final Executor INSTANCE = new DirectExecutor(); | ||
|
|
||
| @Override | ||
| public void execute( Runnable command ) | ||
| { | ||
| command.run(); | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given this being deleted, I think it may introduce additional cost for DF.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reintroduced direct strategy as well, two out of 3 users of resolver executor now relies on it. |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also deprecate this property since it does not reflect reality. MD is not artifacts, but this connector does both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should just remove it? We are going for 1.9 after all....