-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
| Library | httpx 0.28.1 (current latest, all recent versions) |
| Python | Any version |
| Triggered by | Airflow LocalExecutor / CeleryExecutor multiprocessing queue |
Full traceback
When Airflow's LocalExecutor tries to send task results through its multiprocessing
queue, the scheduler crashes trying to pickle the httpx.HTTPStatusError exception.
The scheduler log shows:
scheduler | File ".../airflow/sdk/api/client.py", line 152, in start
scheduler | File ".../httpx/_client.py", line 1218, in patch
scheduler | File ".../tenacity/__init__.py", line 331, in wrapped_f
scheduler | File ".../tenacity/__init__.py", line 470, in __call__
scheduler | File ".../tenacity/__init__.py", line 371, in iter
scheduler | File ".../tenacity/__init__.py", line 413, in exc_check
scheduler | File ".../tenacity/__init__.py", line 184, in reraise
scheduler | raise self._exception
scheduler | File ".../tenacity/__init__.py", line 473, in __call__
scheduler | File ".../airflow/sdk/api/client.py", line 735, in request
scheduler | File ".../httpx/_client.py", line 825, in request
scheduler | File ".../httpx/_client.py", line 914, in send
scheduler | File ".../httpx/_client.py", line 942, in _send_handling_auth
scheduler | File ".../httpx/_client.py", line 999, in _send_handling_redirects
scheduler | File ".../httpx/_client.py", line 982, in _send_handling_redirects
scheduler | File ".../airflow/sdk/api/client.py", line 123, in raise_on_4xx_5xx_with_note
scheduler | return get_json_error(response) or response.raise_for_status()
scheduler | File ".../httpx/_models.py", line 829, in raise_for_status
scheduler | raise HTTPStatusError(message, request=request, response=self)
scheduler | httpx.HTTPStatusError: Server error '500 Internal Server Error' for url
'http://localhost:8080/execution/task-instances/<uuid>/run'
The LocalExecutor then reports the mismatch:
scheduler | [local_executor.py:96] ERROR - uhoh
scheduler | ERROR - Executor LocalExecutor(parallelism=32) reported that the task instance
<TaskInstance: secator_task_httpx.httpx manual__... [queued]> finished with
state failed, but the task instance's state attribute is queued. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html
#task-state-changed-externally
The task log files are empty (0 bytes) because the worker crashes before writing
any output. The task stays in "queued" state from the task instance's perspective,
even though the executor reported it as "failed".
Reproduction
import httpx, pickle
exc = httpx.HTTPStatusError(
'test',
request=httpx.Request('GET', 'http://x'),
response=httpx.Response(500)
)
pickle.dumps(exc) # TypeError: HTTPStatusError.__init__() missing required keyword-only argumentsRoot cause
HTTPStatusError.__init__ has keyword-only arguments (request, response) that
are not captured in Exception.args. When pickle reconstructs the exception, it
only has the message string from args, and calling HTTPStatusError(message) fails
because the required request and response kwargs are missing.
In Airflow, this causes the scheduler to crash silently when sending task results
through the LocalExecutor multiprocessing queue. The observable symptom is:
- Task log files are 0 bytes (empty)
- Task stays in "queued" state indefinitely
- Scheduler reports the state mismatch error shown above
Patch
--- a/httpx/_exceptions.py
+++ b/httpx/_exceptions.py
@@ -241,6 +241,17 @@
self.request = request
self.response = response
+ def __reduce__(self) -> tuple:
+ return (
+ self.__class__,
+ (str(self),),
+ {"request": self.request, "response": self.response},
+ )
+
+ def __setstate__(self, state: dict) -> None:
+ self.request = state["request"]
+ self.response = state["response"]
+
class InvalidURL(Exception):