> registeredTaskManagers;
+ private final ShuffleMaster> shuffleMaster;
+
// -------- Mutable fields ---------
private SchedulerNG schedulerNG;
@@ -207,7 +210,8 @@ public JobMaster(
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader,
- SchedulerNGFactory schedulerNGFactory) throws Exception {
+ SchedulerNGFactory schedulerNGFactory,
+ ShuffleMaster> shuffleMaster) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
@@ -253,6 +257,8 @@ public JobMaster(
this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
+ this.shuffleMaster = checkNotNull(shuffleMaster);
+
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
this.schedulerNG = createScheduler(jobManagerJobMetricGroup);
this.jobStatusListener = null;
@@ -277,7 +283,8 @@ private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJob
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
- jobMasterConfiguration.getSlotRequestTimeout());
+ jobMasterConfiguration.getSlotRequestTimeout(),
+ shuffleMaster);
}
//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index c89ecbdda3dbb..60260b064cf83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -31,6 +31,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
/**
* Default implementation of the {@link JobMasterServiceFactory}.
@@ -57,6 +58,8 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
private final SchedulerNGFactory schedulerNGFactory;
+ private final ShuffleMaster> shuffleMaster;
+
public DefaultJobMasterServiceFactory(
JobMasterConfiguration jobMasterConfiguration,
SlotPoolFactory slotPoolFactory,
@@ -67,7 +70,8 @@ public DefaultJobMasterServiceFactory(
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
- SchedulerNGFactory schedulerNGFactory) {
+ SchedulerNGFactory schedulerNGFactory,
+ ShuffleMaster> shuffleMaster) {
this.jobMasterConfiguration = jobMasterConfiguration;
this.slotPoolFactory = slotPoolFactory;
this.schedulerFactory = schedulerFactory;
@@ -78,6 +82,7 @@ public DefaultJobMasterServiceFactory(
this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory;
this.fatalErrorHandler = fatalErrorHandler;
this.schedulerNGFactory = schedulerNGFactory;
+ this.shuffleMaster = shuffleMaster;
}
@Override
@@ -100,6 +105,7 @@ public JobMaster createJobMasterService(
jobCompletionActions,
fatalErrorHandler,
userCodeClassloader,
- schedulerNGFactory);
+ schedulerNGFactory,
+ shuffleMaster);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 427734a325438..301bd11f2e0a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -28,6 +28,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -52,7 +53,8 @@ public DefaultScheduler(
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final Time slotRequestTimeout) throws Exception {
+ final Time slotRequestTimeout,
+ final ShuffleMaster> shuffleMaster) throws Exception {
super(
log,
@@ -68,7 +70,8 @@ public DefaultScheduler(
new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
blobWriter,
jobManagerJobMetricGroup,
- slotRequestTimeout);
+ slotRequestTimeout,
+ shuffleMaster);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 71f88020b4839..ead7b9f3fe7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -27,6 +27,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -52,7 +53,8 @@ public SchedulerNG createInstance(
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final Time slotRequestTimeout) throws Exception {
+ final Time slotRequestTimeout,
+ final ShuffleMaster> shuffleMaster) throws Exception {
return new DefaultScheduler(
log,
@@ -67,7 +69,8 @@ public SchedulerNG createInstance(
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
- slotRequestTimeout);
+ slotRequestTimeout,
+ shuffleMaster);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index 2cbe7d05a8570..71021ab9accef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -69,6 +69,7 @@
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -142,7 +143,8 @@ public LegacyScheduler(
final RestartStrategyFactory restartStrategyFactory,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final Time slotRequestTimeout) throws Exception {
+ final Time slotRequestTimeout,
+ final ShuffleMaster> shuffleMaster) throws Exception {
this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
@@ -169,12 +171,14 @@ public LegacyScheduler(
this.blobWriter = checkNotNull(blobWriter);
this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
- this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
+ this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster));
}
- private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
+ private ExecutionGraph createAndRestoreExecutionGraph(
+ JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+ ShuffleMaster> shuffleMaster) throws Exception {
- ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);
+ ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
@@ -193,7 +197,9 @@ private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup c
return newExecutionGraph;
}
- private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
+ private ExecutionGraph createExecutionGraph(
+ JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+ ShuffleMaster> shuffleMaster) throws JobExecutionException, JobException {
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
@@ -208,7 +214,8 @@ private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobM
currentJobManagerJobMetricGroup,
blobWriter,
slotRequestTimeout,
- log);
+ log,
+ shuffleMaster);
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
index 12fb5573f70fd..1f22fa9193dbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
@@ -28,6 +28,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -61,7 +62,8 @@ public SchedulerNG createInstance(
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final Time slotRequestTimeout) throws Exception {
+ final Time slotRequestTimeout,
+ final ShuffleMaster> shuffleMaster) throws Exception {
return new LegacyScheduler(
log,
@@ -77,6 +79,7 @@ public SchedulerNG createInstance(
restartStrategyFactory,
blobWriter,
jobManagerJobMetricGroup,
- slotRequestTimeout);
+ slotRequestTimeout,
+ shuffleMaster);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index edd439aca5a6b..56bc044c23990 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -27,6 +27,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -51,6 +52,7 @@ SchedulerNG createInstance(
Time rpcTimeout,
BlobWriter blobWriter,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
- Time slotRequestTimeout) throws Exception;
+ Time slotRequestTimeout,
+ ShuffleMaster> shuffleMaster) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index f7108b6b474d1..bde82eb69e81f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -33,7 +33,7 @@
*/
public class ShuffleEnvironmentContext {
private final Configuration configuration;
- private final ResourceID location;
+ private final ResourceID taskExecutorResourceId;
private final long maxJvmHeapMemory;
private final boolean localCommunicationOnly;
private final InetAddress hostAddress;
@@ -43,7 +43,7 @@ public class ShuffleEnvironmentContext {
public ShuffleEnvironmentContext(
Configuration configuration,
- ResourceID location,
+ ResourceID taskExecutorResourceId,
long maxJvmHeapMemory,
boolean localCommunicationOnly,
InetAddress hostAddress,
@@ -51,7 +51,7 @@ public ShuffleEnvironmentContext(
MetricGroup parentMetricGroup,
IOManager ioManager) {
this.configuration = checkNotNull(configuration);
- this.location = checkNotNull(location);
+ this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
this.maxJvmHeapMemory = maxJvmHeapMemory;
this.localCommunicationOnly = localCommunicationOnly;
this.hostAddress = checkNotNull(hostAddress);
@@ -64,8 +64,8 @@ public Configuration getConfiguration() {
return configuration;
}
- public ResourceID getLocation() {
- return location;
+ public ResourceID getTaskExecutorResourceId() {
+ return taskExecutorResourceId;
}
public long getMaxJvmHeapMemory() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java
new file mode 100644
index 0000000000000..6786e4a1c50e6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+/**
+ * Interface for shuffle service factory implementations.
+ *
+ * This component is a light-weight factory for {@link ShuffleMaster} and {@link ShuffleEnvironment}.
+ *
+ * @param partition shuffle descriptor used for producer/consumer deployment and their data exchange.
+ * @param type of provided result partition writers
+ * @param type of provided input gates
+ */
+public interface ShuffleServiceFactory {
+
+ /**
+ * Factory method to create a specific {@link ShuffleMaster} implementation.
+ *
+ * @param configuration Flink configuration
+ * @return shuffle manager implementation
+ */
+ ShuffleMaster createShuffleMaster(Configuration configuration);
+
+ /**
+ * Factory method to create a specific local {@link ShuffleEnvironment} implementation.
+ *
+ * @param shuffleEnvironmentContext local context
+ * @return local shuffle service environment implementation
+ */
+ ShuffleEnvironment createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java
new file mode 100644
index 0000000000000..5da78fb32edab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import static org.apache.flink.runtime.shuffle.ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS;
+
+/**
+ * Utility to load the pluggable {@link ShuffleServiceFactory} implementations.
+ */
+public enum ShuffleServiceLoader {
+ ;
+
+ public static ShuffleServiceFactory, ?, ?> loadShuffleServiceFactory(Configuration configuration) throws FlinkException {
+ String shuffleServiceClassName = configuration.getString(SHUFFLE_SERVICE_FACTORY_CLASS);
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ return InstantiationUtil.instantiate(
+ shuffleServiceClassName,
+ ShuffleServiceFactory.class,
+ classLoader);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java
new file mode 100644
index 0000000000000..b610def748571
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Options to configure shuffle service.
+ */
+@SuppressWarnings("WeakerAccess")
+public class ShuffleServiceOptions {
+
+ private ShuffleServiceOptions() {
+ }
+
+ /**
+ * The full class name of the shuffle service factory implementation to be used by the cluster.
+ */
+ public static final ConfigOption SHUFFLE_SERVICE_FACTORY_CLASS = ConfigOptions
+ .key("shuffle-service-factory.class")
+ .defaultValue("org.apache.flink.runtime.io.network.NettyShuffleServiceFactory")
+ .withDescription("The full class name of the shuffle service factory implementation to be used by the cluster. " +
+ "The default implementation uses Netty for network communication and local memory as well disk space " +
+ "to store results on a TaskExecutor.");
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 5d6207614ac01..fd9ebfac9257b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -30,11 +30,11 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -241,7 +241,7 @@ public static TaskManagerServices fromConfiguration(
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
- final ShuffleEnvironment shuffleEnvironment = createShuffleEnvironment(
+ final ShuffleEnvironment, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
@@ -304,11 +304,11 @@ public static TaskManagerServices fromConfiguration(
taskEventDispatcher);
}
- private static ShuffleEnvironment createShuffleEnvironment(
+ private static ShuffleEnvironment, ?> createShuffleEnvironment(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
TaskEventDispatcher taskEventDispatcher,
MetricGroup taskManagerMetricGroup,
- IOManager ioManager) {
+ IOManager ioManager) throws FlinkException {
final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
taskManagerServicesConfiguration.getConfiguration(),
@@ -320,7 +320,9 @@ private static ShuffleEnvironment createShuffleEnvironment(
taskManagerMetricGroup,
ioManager);
- return NettyShuffleEnvironment.fromShuffleContext(shuffleEnvironmentContext);
+ return ShuffleServiceLoader
+ .loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
+ .createShuffleEnvironment(shuffleEnvironmentContext);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 24e0b85deea8c..74c72438d036f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -37,6 +37,7 @@
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -118,7 +119,8 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
- log);
+ log,
+ NettyShuffleMaster.INSTANCE);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 85d3258093493..15643a9056678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -60,6 +60,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -822,7 +823,8 @@ private ExecutionGraph createExecutionGraph(Configuration configuration) throws
new UnregisteredMetricsGroup(),
blobWriter,
timeout,
- LoggerFactory.getLogger(getClass()));
+ LoggerFactory.getLogger(getClass()),
+ NettyShuffleMaster.INSTANCE);
}
private static final class ExecutionStageMatcher extends TypeSafeMatcher> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
index 4b22f69c8083c..306a835b3cfff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -30,6 +30,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
@@ -77,7 +78,8 @@ public void testExecutionGraphArbitraryDopConstructionTest() throws Exception {
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout(),
- TEST_LOGGER);
+ TEST_LOGGER,
+ NettyShuffleMaster.INSTANCE);
for (JobVertex jv : jobVertices) {
assertThat(jv.getParallelism(), is(initialParallelism));
@@ -106,7 +108,8 @@ public void testExecutionGraphArbitraryDopConstructionTest() throws Exception {
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout(),
- TEST_LOGGER);
+ TEST_LOGGER,
+ NettyShuffleMaster.INSTANCE);
for (JobVertex jv : jobVertices) {
assertThat(jv.getParallelism(), is(1));
@@ -135,7 +138,8 @@ public void testExecutionGraphArbitraryDopConstructionTest() throws Exception {
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout(),
- TEST_LOGGER);
+ TEST_LOGGER,
+ NettyShuffleMaster.INSTANCE);
for (JobVertex jv : jobVertices) {
assertThat(jv.getParallelism(), is(scaleUpParallelism));
@@ -177,7 +181,8 @@ public void testExecutionGraphConstructionFailsRescaleDopExceedMaxParallelism()
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout(),
- TEST_LOGGER);
+ TEST_LOGGER,
+ NettyShuffleMaster.INSTANCE);
fail("Building the ExecutionGraph with a parallelism higher than the max parallelism should fail.");
} catch (JobException e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index ca3c31bc3484f..4b5c98f7f7a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -48,6 +48,7 @@
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -589,7 +590,8 @@ private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slot
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
- log);
+ log,
+ NettyShuffleMaster.INSTANCE);
}
private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 5ba8263aff53f..f0924fe62afbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -41,6 +41,7 @@
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -409,7 +410,8 @@ public static ExecutionGraph createExecutionGraph(
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
- TEST_LOGGER);
+ TEST_LOGGER,
+ NettyShuffleMaster.INSTANCE);
}
public static JobVertex createNoOpVertex(int parallelism) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index bb593f18824f9..5b8ef04ae7f02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -40,6 +40,7 @@
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -224,7 +225,8 @@ private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
- log);
+ log,
+ NettyShuffleMaster.INSTANCE);
}
private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index 6990858f5f8ab..92e659cb2ce24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -36,6 +36,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -642,6 +643,7 @@ private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobExcepti
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
- log);
+ log,
+ NettyShuffleMaster.INSTANCE);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 2269646a7c3c7..9f0a9ce0289e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -118,7 +118,7 @@ public NettyShuffleEnvironmentBuilder setIOManager(IOManager ioManager) {
}
public NettyShuffleEnvironment build() {
- return NettyShuffleEnvironment.create(
+ return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
new NettyShuffleEnvironmentConfiguration(
numNetworkBuffers,
networkBufferSize,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 58de718813f74..aa1252dc81185 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -98,6 +98,7 @@
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
@@ -292,7 +293,8 @@ public void testDeclineCheckpointInvocationWithUserException() throws Exception
new TestingOnCompletionActions(),
testingFatalErrorHandler,
JobMasterTest.class.getClassLoader(),
- schedulerNGFactory) {
+ schedulerNGFactory,
+ NettyShuffleMaster.INSTANCE) {
@Override
public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
@@ -1402,7 +1404,8 @@ public void testTriggerSavepointTimeout() throws Exception {
new TestingOnCompletionActions(),
testingFatalErrorHandler,
JobMasterTest.class.getClassLoader(),
- schedulerNGFactory) {
+ schedulerNGFactory,
+ NettyShuffleMaster.INSTANCE) {
@Override
public CompletableFuture triggerSavepoint(
@@ -1860,7 +1863,8 @@ private JobMaster createJobMaster(
onCompletionActions,
testingFatalErrorHandler,
JobMasterTest.class.getClassLoader(),
- schedulerNGFactory);
+ schedulerNGFactory,
+ NettyShuffleMaster.INSTANCE);
}
private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java
new file mode 100644
index 0000000000000..c7b99561a713f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.util.FlinkException;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.shuffle.ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite for {@link ShuffleServiceLoader} utility.
+ */
+public class ShuffleServiceLoaderTest {
+
+ @Test
+ public void testLoadDefaultNettyShuffleServiceFactory() throws FlinkException {
+ Configuration configuration = new Configuration();
+ ShuffleServiceFactory, ?, ?> shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
+ assertThat(
+ "Loaded shuffle service factory is not the default netty implementation",
+ shuffleServiceFactory,
+ instanceOf(NettyShuffleServiceFactory.class));
+ }
+
+ @Test
+ public void testLoadCustomShuffleServiceFactory() throws FlinkException {
+ Configuration configuration = new Configuration();
+ configuration.setString(SHUFFLE_SERVICE_FACTORY_CLASS, "org.apache.flink.runtime.shuffle.ShuffleServiceLoaderTest$CustomShuffleServiceFactory");
+ ShuffleServiceFactory, ?, ?> shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
+ assertThat(
+ "Loaded shuffle service factory is not the custom test implementation",
+ shuffleServiceFactory,
+ instanceOf(CustomShuffleServiceFactory.class));
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testLoadShuffleServiceFactoryFailure() throws FlinkException {
+ Configuration configuration = new Configuration();
+ configuration.setString(SHUFFLE_SERVICE_FACTORY_CLASS, "org.apache.flink.runtime.shuffle.UnavailableShuffleServiceFactory");
+ ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
+ }
+
+ /**
+ * Stub implementation of {@link ShuffleServiceFactory} to test {@link ShuffleServiceLoader} utility.
+ */
+ public static class CustomShuffleServiceFactory implements ShuffleServiceFactory {
+ @Override
+ public ShuffleMaster createShuffleMaster(Configuration configuration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ShuffleEnvironment createShuffleEnvironment(
+ ShuffleEnvironmentContext shuffleEnvironmentContext) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}