Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Dec 10, 2023

What changes were proposed in this pull request?

This PR aims to fix Spark Master's recovery process to update a worker status from UNKNOWN to ALIVE when it receives a RegisterWroker message from that worker.

Why are the changes needed?

This only happens during the recovery.

  • Master already has the recovered worker information in memory with UNKNOWN status.
  • Worker sends RegisterWorker message correctly.
  • Master keeps its worker status in UNKNOWN and informs the worker with RegisteredWorker message with duplicated flag.
  • Since Worker received like the following and will not try to reconnect.
23/12/09 23:49:57 INFO Worker: Retrying connection to master (attempt # 3)
23/12/09 23:49:57 INFO Worker: Connecting to master ...:7077...
23/12/09 23:50:04 INFO TransportClientFactory: Successfully created connection to master...:7077 after 7089 ms (0 ms spent in bootstraps)
23/12/09 23:50:04 WARN Worker: Duplicate registration at master spark://...
23/12/09 23:50:04 INFO Worker: Successfully registered with master spark://...

The UNKNOWN-status workers blocks the recovery process and causes a long delay.

private def canCompleteRecovery =
workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
apps.count(_.state == ApplicationState.UNKNOWN) == 0

After the delay, master simply kills them all.

// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(
removeWorker(_, "Not responding for recovery"))

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This case is a little hard to make a unit test.
Manually test.

  • Master
23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC.
java.io.IOException: Connecting to /***:1024 timed out (10000 ms)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
23/12/10 04:58:54 INFO Master: Registering worker ***:1024 with 2 cores, 23.0 GiB RAM
23/12/10 04:58:54 INFO Master: Worker has been re-registered: worker-20231210045613-***-1024
  • Worker
23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5)
23/12/10 04:58:45 INFO Worker: Connecting to master master:7077...
23/12/10 04:58:54 INFO TransportClientFactory: Successfully created connection to master/...:7077 after 63957 ms (0 ms spent in bootstraps)
23/12/10 04:58:54 WARN Worker: Duplicate registration at master spark://master:7077
23/12/10 04:58:54 INFO Worker: Successfully registered with master spark://master:7077
23/12/10 04:58:54 INFO Worker: WorkerWebUI is available at https://...-1***-1024
23/12/10 04:58:54 INFO Worker: Worker cleanup enabled; old application directories will be deleted in: /data/spark

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Dec 10, 2023
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-46346][CORE] Fix Master to update worker from UNKNOWN to ALIVE on RegisterWorker msg [SPARK-46346][CORE] Fix Master to update a worker from UNKNOWN to ALIVE on RegisterWorker msg Dec 10, 2023
@dongjoon-hyun
Copy link
Member Author

Could you review this PR, @viirya ?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the worker's state is set to UNKNOWN in beginRecovery, right?

After that, the master will send MasterChanged to the worker. Then the worker will send WorkerSchedulerStateResponse back to the master. As the mater receives WorkerSchedulerStateResponse, it will set the worker's state to ALIVE.

I think this is the whole process during recovery on worker.

I am not sure when does the worker send RegisterWorker to the recovering master? Before the mater send MasterChanged to the worker, isn't the worker still using old master address? Why it sends RegisterWorker to the new master before changing master?

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Dec 10, 2023

Yes, that's the correct normal case which we expect. The problem is that master only tries once in that process.

In EKS environment, the Master pod's network setting takes some time when there are thousands Worker pods. The Master's error message in the PR description is the evidence what I mentioned. For example, 1k worker case, 40~50 workers remains UNKNOWN status frequently, @viirya /

23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC.
java.io.IOException: Connecting to /***:1024 timed out (10000 ms)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

For this question, Worker has a retry logic with spark.worker.(initial|max)RegistrationRetries configurations. It happens when Worker fails to send heartbeat message.

I am not sure when does the worker send RegisterWorker to the recovering master?

For the following question, we use K8s service master which provides a mapping to the specific pod. And, spark.worker.preferConfiguredMasterAddress=true informs Worker to use the service name always.

Before the mater send MasterChanged to the worker, isn't the worker still using old master address? Why it sends RegisterWorker to the new master before changing master?

For example, in the PR description, the following shows that Worker is trying to connect master:7077 where master is K8s svc name. And, it's tried 5 times already.

23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5)
23/12/10 04:58:45 INFO Worker: Connecting to master master:7077...

@dongjoon-hyun
Copy link
Member Author

@viirya
Copy link
Member

viirya commented Dec 10, 2023

In EKS environment, the Master pod's network setting takes some time when there are thousands Worker pods. The Master's error message in the PR description is the evidence what I mentioned. For example, 1k worker case, 40~50 workers remains UNKNOWN status frequently

Hmm, so in network failure case so the master cannot send MasterChanged correctly to the worker, the worker can remain UNKNOWN statue in the master. Okay, it makes sense.

For the following question, we use K8s service master which provides a mapping to the specific pod. And, spark.worker.preferConfiguredMasterAddress=true informs Worker to use the service name always.

As the service mapping works, the worker will still send RegisterWorker to the recovering master. For the worker in UNKNOWN status, changing its status from UNKNOWN to ALIVE can bring it registered with the recovering master.

But the worker doesn't send WorkerSchedulerStateResponse as expected to the master (because it doesn't receive MasterChanged correctly). When the recovering master receives WorkerSchedulerStateResponse, looks like it has some important steps to do like adding executor to application, etc.

I'm wondering, is it okay to skip WorkerSchedulerStateResponse and all these steps?

@dongjoon-hyun
Copy link
Member Author

For this part, you are right. The driver recovery and app recovery are two additional separate issues which we need to address later.

But the worker doesn't send WorkerSchedulerStateResponse as expected to the master (because it doesn't receive MasterChanged correctly). When the recovering master receives WorkerSchedulerStateResponse, looks like it has some important steps to do like adding executor to application, etc.
I'm wondering, is it okay to skip WorkerSchedulerStateResponse and all these steps?

This PR focuses only on Worker recovery and there is no regression from the Driver or App perspective because they will be deleted like the existing behavior, @viirya .

@dongjoon-hyun
Copy link
Member Author

For the record, I'm also working on that part internally.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Dec 10, 2023

BTW, masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) already reported when the Worker received this Duplicate registration message here @viirya .

if (duplicate) {
logWarning(s"Duplicate registration at master $preferredMasterAddress")
}
logInfo(s"Successfully registered with master $preferredMasterAddress")
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.rpId, e.cores, e.memory, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))

/**
* A worker will send this message to the master when it registers with the master. Then the
* master will compare them with the executors and drivers in the master and tell the worker to
* kill the unknown executors and drivers.
*/
case class WorkerLatestState(
id: String,
executors: Seq[ExecutorDescription],
driverIds: Seq[String]) extends DeployMessage

@viirya
Copy link
Member

viirya commented Dec 10, 2023

BTW, masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) already reported when the Worker received this Duplicate registration message here.

WorkerLatestState looks still different to WorkerSchedulerStateResponse in their processing logic inside Master. I think they are different and WorkerLatestState cannot replace WorkerSchedulerStateResponse's role during the recovery.

This PR focuses only on Worker recovery and there is no regression from the Driver or App perspective because they will be deleted like the existing behavior.

Okay. This sounds correct. As this PR proposes to address the case MasterChanged is not sent/received correctly, WorkerSchedulerStateResponse is not processed too as the consequence for sure. It is just setting from UNKNOWN to ALIVE to bring the worker back.

WorkerSchedulerStateResponse adds valid executors and drivers to the worker. And you are right that the drivers without worker will be deleted so it is not a regression. For executor, I don't see completeRecovery explicitly delete executors which are not of any worker. Although the workers in UNKNOWN status will be deleted, but their executors are not updated (because WorkerSchedulerStateResponse is not sent). I don't know how Spark will handle such case, but it seems not a regression too.

@viirya
Copy link
Member

viirya commented Dec 10, 2023

The driver recovery and app recovery are two additional separate issues which we need to address later.

Yea, ideally they should be addressed too although currently looks like they are not regression.

@viirya
Copy link
Member

viirya commented Dec 10, 2023

Btw, I found a test "master correctly recover the application in MasterSuite which does some tests on master recovery. For example, it tests master behavior by sending WorkerSchedulerStateResponse for a fake worker. Maybe we can do test like that.

I.e., instead of WorkerSchedulerStateResponse, we can send a RegisterWorker message, and check the fake worker's status.

@dongjoon-hyun
Copy link
Member Author

Btw, I found a test "master correctly recover the application in MasterSuite which does some tests on master recovery. For example, it tests master behavior by sending WorkerSchedulerStateResponse for a fake worker. Maybe we can do test like that.

I.e., instead of WorkerSchedulerStateResponse, we can send a RegisterWorker message, and check the fake worker's status.

Thank you, @viirya . I will start the test coverage improvement for this part.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-46346 branch December 10, 2023 19:20
@dongjoon-hyun
Copy link
Member Author

Hi, @viirya . I made a PR to refactor for unit testing on this area.

@viirya
Copy link
Member

viirya commented Dec 10, 2023

Got it. Thank you @dongjoon-hyun for improving test coverage.

dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
…ALIVE` on `RegisterWorker` msg

### What changes were proposed in this pull request?

This PR aims to fix `Spark Master`'s recovery process to update a worker status from `UNKNOWN` to `ALIVE` when it receives a `RegisterWroker` message from that worker.

### Why are the changes needed?

This only happens during the recovery.
- `Master` already has the recovered worker information in memory with `UNKNOWN` status.
- `Worker` sends `RegisterWorker` message correctly.
- `Master` keeps its worker status in `UNKNOWN` and informs the worker with `RegisteredWorker` message with `duplicated` flag.
- Since `Worker` received like the following and will not try to reconnect.
```
23/12/09 23:49:57 INFO Worker: Retrying connection to master (attempt # 3)
23/12/09 23:49:57 INFO Worker: Connecting to master ...:7077...
23/12/09 23:50:04 INFO TransportClientFactory: Successfully created connection to master...:7077 after 7089 ms (0 ms spent in bootstraps)
23/12/09 23:50:04 WARN Worker: Duplicate registration at master spark://...
23/12/09 23:50:04 INFO Worker: Successfully registered with master spark://...
```

The `UNKNOWN`-status workers blocks the recovery process and causes a long delay.

https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L604-L606

After the delay, master simply kills them all.

https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L647-L649

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This case is a little hard to make a unit test.
Manually test.

- Master
```
23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC.
java.io.IOException: Connecting to /***:1024 timed out (10000 ms)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
23/12/10 04:58:54 INFO Master: Registering worker ***:1024 with 2 cores, 23.0 GiB RAM
23/12/10 04:58:54 INFO Master: Worker has been re-registered: worker-20231210045613-***-1024
```

- Worker
```
23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5)
23/12/10 04:58:45 INFO Worker: Connecting to master master:7077...
23/12/10 04:58:54 INFO TransportClientFactory: Successfully created connection to master/...:7077 after 63957 ms (0 ms spent in bootstraps)
23/12/10 04:58:54 WARN Worker: Duplicate registration at master spark://master:7077
23/12/10 04:58:54 INFO Worker: Successfully registered with master spark://master:7077
23/12/10 04:58:54 INFO Worker: WorkerWebUI is available at https://...-1***-1024
23/12/10 04:58:54 INFO Worker: Worker cleanup enabled; old application directories will be deleted in: /data/spark
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44280 from dongjoon-hyun/SPARK-46346.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…ALIVE` on `RegisterWorker` msg

### What changes were proposed in this pull request?

This PR aims to fix `Spark Master`'s recovery process to update a worker status from `UNKNOWN` to `ALIVE` when it receives a `RegisterWroker` message from that worker.

### Why are the changes needed?

This only happens during the recovery.
- `Master` already has the recovered worker information in memory with `UNKNOWN` status.
- `Worker` sends `RegisterWorker` message correctly.
- `Master` keeps its worker status in `UNKNOWN` and informs the worker with `RegisteredWorker` message with `duplicated` flag.
- Since `Worker` received like the following and will not try to reconnect.
```
23/12/09 23:49:57 INFO Worker: Retrying connection to master (attempt # 3)
23/12/09 23:49:57 INFO Worker: Connecting to master ...:7077...
23/12/09 23:50:04 INFO TransportClientFactory: Successfully created connection to master...:7077 after 7089 ms (0 ms spent in bootstraps)
23/12/09 23:50:04 WARN Worker: Duplicate registration at master spark://...
23/12/09 23:50:04 INFO Worker: Successfully registered with master spark://...
```

The `UNKNOWN`-status workers blocks the recovery process and causes a long delay.

https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L604-L606

After the delay, master simply kills them all.

https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L647-L649

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This case is a little hard to make a unit test.
Manually test.

- Master
```
23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC.
java.io.IOException: Connecting to /***:1024 timed out (10000 ms)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
23/12/10 04:58:54 INFO Master: Registering worker ***:1024 with 2 cores, 23.0 GiB RAM
23/12/10 04:58:54 INFO Master: Worker has been re-registered: worker-20231210045613-***-1024
```

- Worker
```
23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5)
23/12/10 04:58:45 INFO Worker: Connecting to master master:7077...
23/12/10 04:58:54 INFO TransportClientFactory: Successfully created connection to master/...:7077 after 63957 ms (0 ms spent in bootstraps)
23/12/10 04:58:54 WARN Worker: Duplicate registration at master spark://master:7077
23/12/10 04:58:54 INFO Worker: Successfully registered with master spark://master:7077
23/12/10 04:58:54 INFO Worker: WorkerWebUI is available at https://...-1***-1024
23/12/10 04:58:54 INFO Worker: Worker cleanup enabled; old application directories will be deleted in: /data/spark
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44280 from dongjoon-hyun/SPARK-46346.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants