From ef44324230dd9c2605680b183aec17293ab98fa4 Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Mon, 17 Jun 2019 10:48:55 +0200 Subject: [PATCH 1/4] [hotfix][network] Rename taskExecutorLocation to taskExecutorResourceId and small fixes --- .../io/network/NettyShuffleEnvironment.java | 19 ++++++++++--------- .../partition/ResultPartitionFactory.java | 2 +- .../consumer/SingleInputGateFactory.java | 16 ++++++++-------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index a1ee77489d8e6..4b548d343c1d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -87,7 +87,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment(); + this.inputGatesById = new ConcurrentHashMap<>(10); this.resultPartitionFactory = resultPartitionFactory; this.singleInputGateFactory = singleInputGateFactory; this.isClosed = false; @@ -126,11 +126,11 @@ private NettyShuffleEnvironment( public static NettyShuffleEnvironment create( NettyShuffleEnvironmentConfiguration config, - ResourceID taskExecutorLocation, + ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, MetricGroup metricGroup, IOManager ioManager) { - checkNotNull(taskExecutorLocation); + checkNotNull(taskExecutorResourceId); checkNotNull(ioManager); checkNotNull(taskEventPublisher); checkNotNull(config); @@ -159,7 +159,7 @@ public static NettyShuffleEnvironment create( config.isForcePartitionReleaseOnConsumption()); SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( - taskExecutorLocation, + taskExecutorResourceId, config, connectionManager, resultPartitionManager, @@ -167,7 +167,7 @@ public static NettyShuffleEnvironment create( networkBufferPool); return new NettyShuffleEnvironment( - taskExecutorLocation, + taskExecutorResourceId, config, networkBufferPool, connectionManager, @@ -320,7 +320,7 @@ public boolean updatePartitionInfo( checkArgument(shuffleDescriptor instanceof NettyShuffleDescriptor, "Tried to update unknown channel with unknown ShuffleDescriptor %s.", shuffleDescriptor.getClass().getName()); - inputGate.updateInputChannel(taskExecutorLocation, (NettyShuffleDescriptor) shuffleDescriptor); + inputGate.updateInputChannel(taskExecutorResourceId, (NettyShuffleDescriptor) shuffleDescriptor); return true; } @@ -358,6 +358,7 @@ public void close() { LOG.info("Shutting down the network environment and its components."); // terminate all network connections + //noinspection OverlyBroadCatchBlock try { LOG.debug("Shutting down network connection manager"); connectionManager.shutdown(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 2eb620eae1b20..d15edaab8b002 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -42,7 +42,7 @@ * Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}. */ public class ResultPartitionFactory { - private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); + private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class); @Nonnull private final ResultPartitionManager partitionManager; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index 60bc70d3e0d40..1d749b0a6d2d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -50,10 +50,10 @@ * Factory for {@link SingleInputGate} to use in {@link NettyShuffleEnvironment}. */ public class SingleInputGateFactory { - private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class); + private static final Logger LOG = LoggerFactory.getLogger(SingleInputGateFactory.class); @Nonnull - private final ResourceID taskExecutorLocation; + private final ResourceID taskExecutorResourceId; private final boolean isCreditBased; @@ -78,13 +78,13 @@ public class SingleInputGateFactory { private final int floatingNetworkBuffersPerGate; public SingleInputGateFactory( - @Nonnull ResourceID taskExecutorLocation, + @Nonnull ResourceID taskExecutorResourceId, @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, @Nonnull ConnectionManager connectionManager, @Nonnull ResultPartitionManager partitionManager, @Nonnull TaskEventPublisher taskEventPublisher, @Nonnull NetworkBufferPool networkBufferPool) { - this.taskExecutorLocation = taskExecutorLocation; + this.taskExecutorResourceId = taskExecutorResourceId; this.isCreditBased = networkConfig.isCreditBased(); this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); @@ -194,7 +194,7 @@ private InputChannel createKnownInputChannel( ChannelStatistics channelStatistics, InputChannelMetrics metrics) { ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); - if (inputChannelDescriptor.isLocalTo(taskExecutorLocation)) { + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { // Consuming task is deployed to the same TaskManager as the partition => local channelStatistics.numLocalChannels++; return new LocalInputChannel( @@ -241,9 +241,9 @@ static SupplierWithException createBufferPoolFactory( } private static class ChannelStatistics { - int numLocalChannels = 0; - int numRemoteChannels = 0; - int numUnknownChannels = 0; + int numLocalChannels; + int numRemoteChannels; + int numUnknownChannels; @Override public String toString() { From a9c98a11316e59176de93c9d6def305d0a2c0cf3 Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Mon, 17 Jun 2019 10:12:20 +0200 Subject: [PATCH 2/4] [hotfix] Convert ClassNotFoundException to FlinkException if the type implementing class is not found in InstantiationUtil.instantiate --- .../apache/flink/util/InstantiationUtil.java | 22 ++++++++++++++----- .../HighAvailabilityServicesUtils.java | 18 ++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 644289133b283..d98ba9351a6c2 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -338,13 +338,23 @@ private static ObjectStreamClass getEquivalentSerializer(String classDescriptorN * @param classLoader to use for loading the class * @param type of the instantiated class * @return Instance of the given class name - * @throws ClassNotFoundException if the class could not be found + * @throws FlinkException if the class could not be found */ - public static T instantiate(final String className, final Class targetType, final ClassLoader classLoader) throws ClassNotFoundException { - final Class clazz = Class.forName( - className, - false, - classLoader).asSubclass(targetType); + public static T instantiate(final String className, final Class targetType, final ClassLoader classLoader) throws FlinkException { + final Class clazz; + try { + clazz = Class.forName( + className, + false, + classLoader).asSubclass(targetType); + } catch (ClassNotFoundException e) { + throw new FlinkException( + String.format( + "Could not instantiate class '%s' of type '%s'. Please make sure that this class is on your class path.", + className, + targetType.getName()), + e); + } return instantiate(clazz); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 1263dc1fbf943..2cdd5014b78c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -164,20 +164,10 @@ private static HighAvailabilityServices createCustomHAServices(Configuration con final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE); - final HighAvailabilityServicesFactory highAvailabilityServicesFactory; - - try { - highAvailabilityServicesFactory = InstantiationUtil.instantiate( - haServicesClassName, - HighAvailabilityServicesFactory.class, - classLoader); - } catch (Exception e) { - throw new FlinkException( - String.format( - "Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.", - haServicesClassName), - e); - } + final HighAvailabilityServicesFactory highAvailabilityServicesFactory = InstantiationUtil.instantiate( + haServicesClassName, + HighAvailabilityServicesFactory.class, + classLoader); try { return highAvailabilityServicesFactory.createHAServices(config, executor); From 642f66840b75a1782628d74ec7ec412f6c8c408d Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Thu, 30 May 2019 21:15:26 +0200 Subject: [PATCH 3/4] [FLINK-12706] Introduce ShuffleService with the ShuffleEnvironment factory method and its default Netty implementation --- .../shuffle_service_configuration.html | 16 +++ docs/ops/config.md | 4 + docs/ops/config.zh.md | 4 + .../ConfigOptionsDocGenerator.java | 1 + .../io/network/NettyShuffleEnvironment.java | 88 +----------- .../network/NettyShuffleServiceFactory.java | 126 ++++++++++++++++++ .../shuffle/ShuffleEnvironmentContext.java | 10 +- .../shuffle/ShuffleServiceFactory.java | 41 ++++++ .../runtime/shuffle/ShuffleServiceLoader.java | 41 ++++++ .../shuffle/ShuffleServiceOptions.java | 42 ++++++ .../taskexecutor/TaskManagerServices.java | 12 +- .../NettyShuffleEnvironmentBuilder.java | 2 +- .../shuffle/ShuffleServiceLoaderTest.java | 76 +++++++++++ 13 files changed, 365 insertions(+), 98 deletions(-) create mode 100644 docs/_includes/generated/shuffle_service_configuration.html create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java diff --git a/docs/_includes/generated/shuffle_service_configuration.html b/docs/_includes/generated/shuffle_service_configuration.html new file mode 100644 index 0000000000000..4cfffe71c543a --- /dev/null +++ b/docs/_includes/generated/shuffle_service_configuration.html @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + +
KeyDefaultDescription
shuffle-service-factory.class
"org.apache.flink.runtime.io.network.NettyShuffleServiceFactory"The full class name of the shuffle service factory implementation to be used by the cluster. The default implementation uses Netty for network communication and local memory as well disk space to store results on a TaskExecutor.
diff --git a/docs/ops/config.md b/docs/ops/config.md index 9fb660bdea3ce..4cffd9da2db8d 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -132,6 +132,10 @@ The configuration keys in this section are independent of the used resource mana {% include generated/resource_manager_configuration.html %} +### Shuffle Service + +{% include generated/shuffle_service_configuration.html %} + ### YARN {% include generated/yarn_config_configuration.html %} diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md index f79c1a249970a..a127d5c49c410 100644 --- a/docs/ops/config.zh.md +++ b/docs/ops/config.zh.md @@ -132,6 +132,10 @@ The configuration keys in this section are independent of the used resource mana {% include generated/resource_manager_configuration.html %} +### Shuffle Service + +{% include generated/shuffle_service_configuration.html %} + ### YARN {% include generated/yarn_config_configuration.html %} diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java index 0cd9022c3c8bc..0dffb60a3a826 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java @@ -57,6 +57,7 @@ public class ConfigOptionsDocGenerator { static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{ new OptionsClassLocation("flink-core", "org.apache.flink.configuration"), + new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.shuffle"), new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"), new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"), new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 4b548d343c1d5..593ca93501c63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -19,14 +19,12 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge; import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge; @@ -35,8 +33,6 @@ import org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge; import org.apache.flink.runtime.io.network.metrics.OutputBuffersGauge; import org.apache.flink.runtime.io.network.metrics.ResultPartitionMetrics; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory; @@ -50,7 +46,6 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; -import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; import org.apache.flink.util.Preconditions; @@ -65,7 +60,6 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * The implementation of {@link ShuffleEnvironment} based on netty network communication, local memory and disk files. @@ -76,10 +70,6 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment>gauge(METRIC_TOTAL_MEMORY_SEGMENT, - networkBufferPool::getTotalNumberOfMemorySegments); - networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, - networkBufferPool::getNumberOfAvailableMemorySegments); - } - // -------------------------------------------------------------------------------------------- // Properties // -------------------------------------------------------------------------------------------- @@ -396,18 +324,4 @@ public boolean isClosed() { return isClosed; } } - - public static NettyShuffleEnvironment fromShuffleContext(ShuffleEnvironmentContext shuffleEnvironmentContext) { - NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration( - shuffleEnvironmentContext.getConfiguration(), - shuffleEnvironmentContext.getMaxJvmHeapMemory(), - shuffleEnvironmentContext.isLocalCommunicationOnly(), - shuffleEnvironmentContext.getHostAddress()); - return create( - networkConfig, - shuffleEnvironmentContext.getLocation(), - shuffleEnvironmentContext.getEventPublisher(), - shuffleEnvironmentContext.getParentMetricGroup(), - shuffleEnvironmentContext.getIOManager()); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java new file mode 100644 index 0000000000000..42284452e171c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; +import org.apache.flink.runtime.shuffle.ShuffleServiceFactory; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Netty based shuffle service implementation. + */ +public class NettyShuffleServiceFactory implements ShuffleServiceFactory { + + private static final String METRIC_GROUP_NETWORK = "Network"; + private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments"; + private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments"; + + @Override + public NettyShuffleEnvironment createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext) { + checkNotNull(shuffleEnvironmentContext); + NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration( + shuffleEnvironmentContext.getConfiguration(), + shuffleEnvironmentContext.getMaxJvmHeapMemory(), + shuffleEnvironmentContext.isLocalCommunicationOnly(), + shuffleEnvironmentContext.getHostAddress()); + return createNettyShuffleEnvironment( + networkConfig, + shuffleEnvironmentContext.getTaskExecutorResourceId(), + shuffleEnvironmentContext.getEventPublisher(), + shuffleEnvironmentContext.getParentMetricGroup(), + shuffleEnvironmentContext.getIOManager()); + } + + @VisibleForTesting + static NettyShuffleEnvironment createNettyShuffleEnvironment( + NettyShuffleEnvironmentConfiguration config, + ResourceID taskExecutorResourceId, + TaskEventPublisher taskEventPublisher, + MetricGroup metricGroup, + IOManager ioManager) { + checkNotNull(config); + checkNotNull(taskExecutorResourceId); + checkNotNull(taskEventPublisher); + checkNotNull(metricGroup); + checkNotNull(ioManager); + + NettyConfig nettyConfig = config.nettyConfig(); + + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + + ConnectionManager connectionManager = nettyConfig != null ? + new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) : + new LocalConnectionManager(); + + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + config.numNetworkBuffers(), + config.networkBufferSize(), + config.networkBuffersPerChannel()); + + registerNetworkMetrics(metricGroup, networkBufferPool); + + ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( + resultPartitionManager, + ioManager, + networkBufferPool, + config.networkBuffersPerChannel(), + config.floatingNetworkBuffersPerGate(), + config.isForcePartitionReleaseOnConsumption()); + + SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( + taskExecutorResourceId, + config, + connectionManager, + resultPartitionManager, + taskEventPublisher, + networkBufferPool); + + return new NettyShuffleEnvironment( + taskExecutorResourceId, + config, + networkBufferPool, + connectionManager, + resultPartitionManager, + resultPartitionFactory, + singleInputGateFactory); + } + + private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { + MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK); + networkGroup.>gauge(METRIC_TOTAL_MEMORY_SEGMENT, + networkBufferPool::getTotalNumberOfMemorySegments); + networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, + networkBufferPool::getNumberOfAvailableMemorySegments); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java index f7108b6b474d1..bde82eb69e81f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java @@ -33,7 +33,7 @@ */ public class ShuffleEnvironmentContext { private final Configuration configuration; - private final ResourceID location; + private final ResourceID taskExecutorResourceId; private final long maxJvmHeapMemory; private final boolean localCommunicationOnly; private final InetAddress hostAddress; @@ -43,7 +43,7 @@ public class ShuffleEnvironmentContext { public ShuffleEnvironmentContext( Configuration configuration, - ResourceID location, + ResourceID taskExecutorResourceId, long maxJvmHeapMemory, boolean localCommunicationOnly, InetAddress hostAddress, @@ -51,7 +51,7 @@ public ShuffleEnvironmentContext( MetricGroup parentMetricGroup, IOManager ioManager) { this.configuration = checkNotNull(configuration); - this.location = checkNotNull(location); + this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId); this.maxJvmHeapMemory = maxJvmHeapMemory; this.localCommunicationOnly = localCommunicationOnly; this.hostAddress = checkNotNull(hostAddress); @@ -64,8 +64,8 @@ public Configuration getConfiguration() { return configuration; } - public ResourceID getLocation() { - return location; + public ResourceID getTaskExecutorResourceId() { + return taskExecutorResourceId; } public long getMaxJvmHeapMemory() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java new file mode 100644 index 0000000000000..f63ef1bcd6bcc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; + +/** + * Interface for shuffle service factory implementations. + * + *

This component is a light-weight factory for {@link ShuffleEnvironment}. + * + * @param

type of provided result partition writers + * @param type of provided input gates + */ +public interface ShuffleServiceFactory

{ + + /** + * Factory method to create a specific local {@link ShuffleEnvironment} implementation. + * + * @param shuffleEnvironmentContext local context + * @return local shuffle service environment implementation + */ + ShuffleEnvironment createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java new file mode 100644 index 0000000000000..57c21029f5a03 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; + +import static org.apache.flink.runtime.shuffle.ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS; + +/** + * Utility to load the pluggable {@link ShuffleServiceFactory} implementations. + */ +public enum ShuffleServiceLoader { + ; + + public static ShuffleServiceFactory loadShuffleServiceFactory(Configuration configuration) throws FlinkException { + String shuffleServiceClassName = configuration.getString(SHUFFLE_SERVICE_FACTORY_CLASS); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + return InstantiationUtil.instantiate( + shuffleServiceClassName, + ShuffleServiceFactory.class, + classLoader); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java new file mode 100644 index 0000000000000..b610def748571 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Options to configure shuffle service. + */ +@SuppressWarnings("WeakerAccess") +public class ShuffleServiceOptions { + + private ShuffleServiceOptions() { + } + + /** + * The full class name of the shuffle service factory implementation to be used by the cluster. + */ + public static final ConfigOption SHUFFLE_SERVICE_FACTORY_CLASS = ConfigOptions + .key("shuffle-service-factory.class") + .defaultValue("org.apache.flink.runtime.io.network.NettyShuffleServiceFactory") + .withDescription("The full class name of the shuffle service factory implementation to be used by the cluster. " + + "The default implementation uses Netty for network communication and local memory as well disk space " + + "to store results on a TaskExecutor."); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 5d6207614ac01..fd9ebfac9257b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -30,11 +30,11 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; +import org.apache.flink.runtime.shuffle.ShuffleServiceLoader; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -241,7 +241,7 @@ public static TaskManagerServices fromConfiguration( // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); - final ShuffleEnvironment shuffleEnvironment = createShuffleEnvironment( + final ShuffleEnvironment shuffleEnvironment = createShuffleEnvironment( taskManagerServicesConfiguration, taskEventDispatcher, taskManagerMetricGroup, @@ -304,11 +304,11 @@ public static TaskManagerServices fromConfiguration( taskEventDispatcher); } - private static ShuffleEnvironment createShuffleEnvironment( + private static ShuffleEnvironment createShuffleEnvironment( TaskManagerServicesConfiguration taskManagerServicesConfiguration, TaskEventDispatcher taskEventDispatcher, MetricGroup taskManagerMetricGroup, - IOManager ioManager) { + IOManager ioManager) throws FlinkException { final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext( taskManagerServicesConfiguration.getConfiguration(), @@ -320,7 +320,9 @@ private static ShuffleEnvironment createShuffleEnvironment( taskManagerMetricGroup, ioManager); - return NettyShuffleEnvironment.fromShuffleContext(shuffleEnvironmentContext); + return ShuffleServiceLoader + .loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration()) + .createShuffleEnvironment(shuffleEnvironmentContext); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java index 2269646a7c3c7..9f0a9ce0289e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java @@ -118,7 +118,7 @@ public NettyShuffleEnvironmentBuilder setIOManager(IOManager ioManager) { } public NettyShuffleEnvironment build() { - return NettyShuffleEnvironment.create( + return NettyShuffleServiceFactory.createNettyShuffleEnvironment( new NettyShuffleEnvironmentConfiguration( numNetworkBuffers, networkBufferSize, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java new file mode 100644 index 0000000000000..9f4fa620153ee --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.util.FlinkException; + +import org.junit.Test; + +import static org.apache.flink.runtime.shuffle.ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; + +/** + * Test suite for {@link ShuffleServiceLoader} utility. + */ +public class ShuffleServiceLoaderTest { + + @Test + public void testLoadDefaultNettyShuffleServiceFactory() throws FlinkException { + Configuration configuration = new Configuration(); + ShuffleServiceFactory shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration); + assertThat( + "Loaded shuffle service factory is not the default netty implementation", + shuffleServiceFactory, + instanceOf(NettyShuffleServiceFactory.class)); + } + + @Test + public void testLoadCustomShuffleServiceFactory() throws FlinkException { + Configuration configuration = new Configuration(); + configuration.setString(SHUFFLE_SERVICE_FACTORY_CLASS, "org.apache.flink.runtime.shuffle.ShuffleServiceLoaderTest$CustomShuffleServiceFactory"); + ShuffleServiceFactory shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration); + assertThat( + "Loaded shuffle service factory is not the custom test implementation", + shuffleServiceFactory, + instanceOf(CustomShuffleServiceFactory.class)); + } + + @Test(expected = FlinkException.class) + public void testLoadShuffleServiceFactoryFailure() throws FlinkException { + Configuration configuration = new Configuration(); + configuration.setString(SHUFFLE_SERVICE_FACTORY_CLASS, "org.apache.flink.runtime.shuffle.UnavailableShuffleServiceFactory"); + ShuffleServiceLoader.loadShuffleServiceFactory(configuration); + } + + /** + * Stub implementation of {@link ShuffleServiceFactory} to test {@link ShuffleServiceLoader} utility. + */ + public static class CustomShuffleServiceFactory implements ShuffleServiceFactory { + @Override + public ShuffleEnvironment createShuffleEnvironment( + ShuffleEnvironmentContext shuffleEnvironmentContext) { + throw new UnsupportedOperationException(); + } + } +} From 7ef50b6df8a0810c151f02aa7634b24eee46216e Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Thu, 6 Jun 2019 16:15:41 +0200 Subject: [PATCH 4/4] [FLINK-12706] Add the ShuffleMaster factory method to ShuffleService and its default Netty implementation --- .../DefaultJobManagerRunnerFactory.java | 6 +++++- .../executiongraph/ExecutionGraph.java | 10 +++++++--- .../executiongraph/ExecutionGraphBuilder.java | 8 +++++--- .../network/NettyShuffleServiceFactory.java | 10 +++++++++- .../flink/runtime/jobmaster/JobMaster.java | 11 +++++++++-- .../DefaultJobMasterServiceFactory.java | 10 ++++++++-- .../runtime/scheduler/DefaultScheduler.java | 7 +++++-- .../scheduler/DefaultSchedulerFactory.java | 7 +++++-- .../runtime/scheduler/LegacyScheduler.java | 19 +++++++++++++------ .../scheduler/LegacySchedulerFactory.java | 7 +++++-- .../runtime/scheduler/SchedulerNGFactory.java | 4 +++- .../shuffle/ShuffleServiceFactory.java | 14 ++++++++++++-- .../runtime/shuffle/ShuffleServiceLoader.java | 2 +- .../CheckpointSettingsSerializableTest.java | 4 +++- .../ExecutionGraphDeploymentTest.java | 4 +++- .../ExecutionGraphRescalingTest.java | 13 +++++++++---- .../ExecutionGraphSchedulingTest.java | 4 +++- .../ExecutionGraphTestUtils.java | 4 +++- .../ExecutionVertexLocalityTest.java | 4 +++- .../PipelinedFailoverRegionBuildingTest.java | 4 +++- .../runtime/jobmaster/JobMasterTest.java | 10 +++++++--- .../shuffle/ShuffleServiceLoaderTest.java | 11 ++++++++--- 22 files changed, 129 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java index c0707c0ea2324..cab264d671258 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleServiceLoader; /** * Singleton default factory for {@link JobManagerRunner}. @@ -58,6 +60,7 @@ public JobManagerRunner createJobManagerRunner( final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration); final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration); final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, jobManagerServices.getRestartStrategyFactory()); + final ShuffleMaster shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration); final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory( jobMasterConfiguration, @@ -69,7 +72,8 @@ public JobManagerRunner createJobManagerRunner( heartbeatServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, - schedulerNGFactory); + schedulerNGFactory, + shuffleMaster); return new JobManagerRunner( jobGraph, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 0c91276803aa1..cdb780ccc3f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -303,7 +303,7 @@ public class ExecutionGraph implements AccessExecutionGraph { private String jsonPlan; /** Shuffle master to register partitions for task deployment. */ - private final ShuffleMaster shuffleMaster = NettyShuffleMaster.INSTANCE; + private final ShuffleMaster shuffleMaster; // -------------------------------------------------------------------------------------------- // Constructors @@ -405,7 +405,8 @@ public ExecutionGraph( slotProvider, userClassLoader, blobWriter, - allocationTimeout); + allocationTimeout, + NettyShuffleMaster.INSTANCE); } public ExecutionGraph( @@ -419,7 +420,8 @@ public ExecutionGraph( SlotProvider slotProvider, ClassLoader userClassLoader, BlobWriter blobWriter, - Time allocationTimeout) throws IOException { + Time allocationTimeout, + ShuffleMaster shuffleMaster) throws IOException { checkNotNull(futureExecutor); @@ -467,6 +469,8 @@ public ExecutionGraph( "ExecutionGraph is not initialized with proper main thread executor. " + "Call to ExecutionGraph.start(...) required."); + this.shuffleMaster = checkNotNull(shuffleMaster); + LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index fa194e7ed255e..ea7e124099e38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; import org.apache.flink.util.DynamicCodeLoadingException; @@ -92,8 +93,8 @@ public static ExecutionGraph buildGraph( MetricGroup metrics, BlobWriter blobWriter, Time allocationTimeout, - Logger log) - throws JobExecutionException, JobException { + Logger log, + ShuffleMaster shuffleMaster) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -129,7 +130,8 @@ public static ExecutionGraph buildGraph( slotProvider, classLoader, blobWriter, - allocationTimeout); + allocationTimeout, + shuffleMaster); } catch (IOException e) { throw new JobException("Could not create the ExecutionGraph.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index 42284452e171c..8a4e04701bffa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -31,6 +32,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; import org.apache.flink.runtime.shuffle.ShuffleServiceFactory; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; @@ -40,12 +43,17 @@ /** * Netty based shuffle service implementation. */ -public class NettyShuffleServiceFactory implements ShuffleServiceFactory { +public class NettyShuffleServiceFactory implements ShuffleServiceFactory { private static final String METRIC_GROUP_NETWORK = "Network"; private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments"; private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments"; + @Override + public NettyShuffleMaster createShuffleMaster(Configuration configuration) { + return NettyShuffleMaster.INSTANCE; + } + @Override public NettyShuffleEnvironment createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext) { checkNotNull(shuffleEnvironmentContext); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1b1b991032004..aeab67f46b153 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -75,6 +75,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -170,6 +171,8 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final Map> registeredTaskManagers; + private final ShuffleMaster shuffleMaster; + // -------- Mutable fields --------- private SchedulerNG schedulerNG; @@ -207,7 +210,8 @@ public JobMaster( OnCompletionActions jobCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader, - SchedulerNGFactory schedulerNGFactory) throws Exception { + SchedulerNGFactory schedulerNGFactory, + ShuffleMaster shuffleMaster) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME)); @@ -253,6 +257,8 @@ public JobMaster( this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); + this.shuffleMaster = checkNotNull(shuffleMaster); + this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); this.schedulerNG = createScheduler(jobManagerJobMetricGroup); this.jobStatusListener = null; @@ -277,7 +283,8 @@ private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJob rpcTimeout, blobWriter, jobManagerJobMetricGroup, - jobMasterConfiguration.getSlotRequestTimeout()); + jobMasterConfiguration.getSlotRequestTimeout(), + shuffleMaster); } //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java index c89ecbdda3dbb..60260b064cf83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.shuffle.ShuffleMaster; /** * Default implementation of the {@link JobMasterServiceFactory}. @@ -57,6 +58,8 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory { private final SchedulerNGFactory schedulerNGFactory; + private final ShuffleMaster shuffleMaster; + public DefaultJobMasterServiceFactory( JobMasterConfiguration jobMasterConfiguration, SlotPoolFactory slotPoolFactory, @@ -67,7 +70,8 @@ public DefaultJobMasterServiceFactory( HeartbeatServices heartbeatServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, - SchedulerNGFactory schedulerNGFactory) { + SchedulerNGFactory schedulerNGFactory, + ShuffleMaster shuffleMaster) { this.jobMasterConfiguration = jobMasterConfiguration; this.slotPoolFactory = slotPoolFactory; this.schedulerFactory = schedulerFactory; @@ -78,6 +82,7 @@ public DefaultJobMasterServiceFactory( this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory; this.fatalErrorHandler = fatalErrorHandler; this.schedulerNGFactory = schedulerNGFactory; + this.shuffleMaster = shuffleMaster; } @Override @@ -100,6 +105,7 @@ public JobMaster createJobMasterService( jobCompletionActions, fatalErrorHandler, userCodeClassloader, - schedulerNGFactory); + schedulerNGFactory, + shuffleMaster); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 427734a325438..301bd11f2e0a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -52,7 +53,8 @@ public DefaultScheduler( final Time rpcTimeout, final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final Time slotRequestTimeout) throws Exception { + final Time slotRequestTimeout, + final ShuffleMaster shuffleMaster) throws Exception { super( log, @@ -68,7 +70,8 @@ public DefaultScheduler( new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(), blobWriter, jobManagerJobMetricGroup, - slotRequestTimeout); + slotRequestTimeout, + shuffleMaster); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index 71f88020b4839..ead7b9f3fe7a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -52,7 +53,8 @@ public SchedulerNG createInstance( final Time rpcTimeout, final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final Time slotRequestTimeout) throws Exception { + final Time slotRequestTimeout, + final ShuffleMaster shuffleMaster) throws Exception { return new DefaultScheduler( log, @@ -67,7 +69,8 @@ public SchedulerNG createInstance( rpcTimeout, blobWriter, jobManagerJobMetricGroup, - slotRequestTimeout); + slotRequestTimeout, + shuffleMaster); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java index 2cbe7d05a8570..71021ab9accef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java @@ -69,6 +69,7 @@ import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -142,7 +143,8 @@ public LegacyScheduler( final RestartStrategyFactory restartStrategyFactory, final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final Time slotRequestTimeout) throws Exception { + final Time slotRequestTimeout, + final ShuffleMaster shuffleMaster) throws Exception { this.log = checkNotNull(log); this.jobGraph = checkNotNull(jobGraph); @@ -169,12 +171,14 @@ public LegacyScheduler( this.blobWriter = checkNotNull(blobWriter); this.slotRequestTimeout = checkNotNull(slotRequestTimeout); - this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup); + this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster)); } - private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception { + private ExecutionGraph createAndRestoreExecutionGraph( + JobManagerJobMetricGroup currentJobManagerJobMetricGroup, + ShuffleMaster shuffleMaster) throws Exception { - ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup); + ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); @@ -193,7 +197,9 @@ private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup c return newExecutionGraph; } - private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException { + private ExecutionGraph createExecutionGraph( + JobManagerJobMetricGroup currentJobManagerJobMetricGroup, + ShuffleMaster shuffleMaster) throws JobExecutionException, JobException { return ExecutionGraphBuilder.buildGraph( null, jobGraph, @@ -208,7 +214,8 @@ private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobM currentJobManagerJobMetricGroup, blobWriter, slotRequestTimeout, - log); + log, + shuffleMaster); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java index 12fb5573f70fd..1f22fa9193dbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -61,7 +62,8 @@ public SchedulerNG createInstance( final Time rpcTimeout, final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final Time slotRequestTimeout) throws Exception { + final Time slotRequestTimeout, + final ShuffleMaster shuffleMaster) throws Exception { return new LegacyScheduler( log, @@ -77,6 +79,7 @@ public SchedulerNG createInstance( restartStrategyFactory, blobWriter, jobManagerJobMetricGroup, - slotRequestTimeout); + slotRequestTimeout, + shuffleMaster); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java index edd439aca5a6b..56bc044c23990 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -51,6 +52,7 @@ SchedulerNG createInstance( Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, - Time slotRequestTimeout) throws Exception; + Time slotRequestTimeout, + ShuffleMaster shuffleMaster) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java index f63ef1bcd6bcc..6786e4a1c50e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java @@ -18,18 +18,28 @@ package org.apache.flink.runtime.shuffle; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; /** * Interface for shuffle service factory implementations. * - *

This component is a light-weight factory for {@link ShuffleEnvironment}. + *

This component is a light-weight factory for {@link ShuffleMaster} and {@link ShuffleEnvironment}. * + * @param partition shuffle descriptor used for producer/consumer deployment and their data exchange. * @param

type of provided result partition writers * @param type of provided input gates */ -public interface ShuffleServiceFactory

{ +public interface ShuffleServiceFactory { + + /** + * Factory method to create a specific {@link ShuffleMaster} implementation. + * + * @param configuration Flink configuration + * @return shuffle manager implementation + */ + ShuffleMaster createShuffleMaster(Configuration configuration); /** * Factory method to create a specific local {@link ShuffleEnvironment} implementation. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java index 57c21029f5a03..5da78fb32edab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java @@ -30,7 +30,7 @@ public enum ShuffleServiceLoader { ; - public static ShuffleServiceFactory loadShuffleServiceFactory(Configuration configuration) throws FlinkException { + public static ShuffleServiceFactory loadShuffleServiceFactory(Configuration configuration) throws FlinkException { String shuffleServiceClassName = configuration.getString(SHUFFLE_SERVICE_FACTORY_CLASS); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); return InstantiationUtil.instantiate( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index 24e0b85deea8c..74c72438d036f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; @@ -118,7 +119,8 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), timeout, - log); + log, + NettyShuffleMaster.INSTANCE); assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks()); assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 85d3258093493..15643a9056678 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -60,6 +60,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -822,7 +823,8 @@ private ExecutionGraph createExecutionGraph(Configuration configuration) throws new UnregisteredMetricsGroup(), blobWriter, timeout, - LoggerFactory.getLogger(getClass())); + LoggerFactory.getLogger(getClass()), + NettyShuffleMaster.INSTANCE); } private static final class ExecutionStageMatcher extends TypeSafeMatcher> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java index 4b22f69c8083c..306a835b3cfff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; @@ -77,7 +78,8 @@ public void testExecutionGraphArbitraryDopConstructionTest() throws Exception { new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), - TEST_LOGGER); + TEST_LOGGER, + NettyShuffleMaster.INSTANCE); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(initialParallelism)); @@ -106,7 +108,8 @@ public void testExecutionGraphArbitraryDopConstructionTest() throws Exception { new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), - TEST_LOGGER); + TEST_LOGGER, + NettyShuffleMaster.INSTANCE); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(1)); @@ -135,7 +138,8 @@ public void testExecutionGraphArbitraryDopConstructionTest() throws Exception { new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), - TEST_LOGGER); + TEST_LOGGER, + NettyShuffleMaster.INSTANCE); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(scaleUpParallelism)); @@ -177,7 +181,8 @@ public void testExecutionGraphConstructionFailsRescaleDopExceedMaxParallelism() new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), - TEST_LOGGER); + TEST_LOGGER, + NettyShuffleMaster.INSTANCE); fail("Building the ExecutionGraph with a parallelism higher than the max parallelism should fail."); } catch (JobException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index ca3c31bc3484f..4b5c98f7f7a4f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -589,7 +590,8 @@ private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slot new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), timeout, - log); + log, + NettyShuffleMaster.INSTANCE); } private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 5ba8263aff53f..f0924fe62afbf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; @@ -409,7 +410,8 @@ public static ExecutionGraph createExecutionGraph( new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), timeout, - TEST_LOGGER); + TEST_LOGGER, + NettyShuffleMaster.INSTANCE); } public static JobVertex createNoOpVertex(int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index bb593f18824f9..5b8ef04ae7f02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -224,7 +225,8 @@ private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), timeout, - log); + log, + NettyShuffleMaster.INSTANCE); } private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java index 6990858f5f8ab..92e659cb2ce24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.TestLogger; @@ -642,6 +643,7 @@ private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobExcepti new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), timeout, - log); + log, + NettyShuffleMaster.INSTANCE); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 58de718813f74..aa1252dc81185 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -98,6 +98,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; import org.apache.flink.runtime.scheduler.LegacySchedulerFactory; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStreamStateHandle; @@ -292,7 +293,8 @@ public void testDeclineCheckpointInvocationWithUserException() throws Exception new TestingOnCompletionActions(), testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), - schedulerNGFactory) { + schedulerNGFactory, + NettyShuffleMaster.INSTANCE) { @Override public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { declineCheckpointMessageFuture.complete(declineCheckpoint.getReason()); @@ -1402,7 +1404,8 @@ public void testTriggerSavepointTimeout() throws Exception { new TestingOnCompletionActions(), testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), - schedulerNGFactory) { + schedulerNGFactory, + NettyShuffleMaster.INSTANCE) { @Override public CompletableFuture triggerSavepoint( @@ -1860,7 +1863,8 @@ private JobMaster createJobMaster( onCompletionActions, testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), - schedulerNGFactory); + schedulerNGFactory, + NettyShuffleMaster.INSTANCE); } private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java index 9f4fa620153ee..c7b99561a713f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java @@ -38,7 +38,7 @@ public class ShuffleServiceLoaderTest { @Test public void testLoadDefaultNettyShuffleServiceFactory() throws FlinkException { Configuration configuration = new Configuration(); - ShuffleServiceFactory shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration); + ShuffleServiceFactory shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration); assertThat( "Loaded shuffle service factory is not the default netty implementation", shuffleServiceFactory, @@ -49,7 +49,7 @@ public void testLoadDefaultNettyShuffleServiceFactory() throws FlinkException { public void testLoadCustomShuffleServiceFactory() throws FlinkException { Configuration configuration = new Configuration(); configuration.setString(SHUFFLE_SERVICE_FACTORY_CLASS, "org.apache.flink.runtime.shuffle.ShuffleServiceLoaderTest$CustomShuffleServiceFactory"); - ShuffleServiceFactory shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration); + ShuffleServiceFactory shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration); assertThat( "Loaded shuffle service factory is not the custom test implementation", shuffleServiceFactory, @@ -66,7 +66,12 @@ public void testLoadShuffleServiceFactoryFailure() throws FlinkException { /** * Stub implementation of {@link ShuffleServiceFactory} to test {@link ShuffleServiceLoader} utility. */ - public static class CustomShuffleServiceFactory implements ShuffleServiceFactory { + public static class CustomShuffleServiceFactory implements ShuffleServiceFactory { + @Override + public ShuffleMaster createShuffleMaster(Configuration configuration) { + throw new UnsupportedOperationException(); + } + @Override public ShuffleEnvironment createShuffleEnvironment( ShuffleEnvironmentContext shuffleEnvironmentContext) {