@@ -406,6 +406,8 @@ def __init__(self):
406406 self ._asyncgens = weakref .WeakSet ()
407407 # Set to True when `loop.shutdown_asyncgens` is called.
408408 self ._asyncgens_shutdown_called = False
409+ # Set to True when `loop.shutdown_default_executor` is called.
410+ self ._executor_shutdown_called = False
409411
410412 def __repr__ (self ):
411413 return (
@@ -503,6 +505,10 @@ def _check_closed(self):
503505 if self ._closed :
504506 raise RuntimeError ('Event loop is closed' )
505507
508+ def _check_default_executor (self ):
509+ if self ._executor_shutdown_called :
510+ raise RuntimeError ('Executor shutdown has been called' )
511+
506512 def _asyncgen_finalizer_hook (self , agen ):
507513 self ._asyncgens .discard (agen )
508514 if not self .is_closed ():
@@ -543,6 +549,26 @@ async def shutdown_asyncgens(self):
543549 'asyncgen' : agen
544550 })
545551
552+ async def shutdown_default_executor (self ):
553+ """Schedule the shutdown of the default executor."""
554+ self ._executor_shutdown_called = True
555+ if self ._default_executor is None :
556+ return
557+ future = self .create_future ()
558+ thread = threading .Thread (target = self ._do_shutdown , args = (future ,))
559+ thread .start ()
560+ try :
561+ await future
562+ finally :
563+ thread .join ()
564+
565+ def _do_shutdown (self , future ):
566+ try :
567+ self ._default_executor .shutdown (wait = True )
568+ self .call_soon_threadsafe (future .set_result , None )
569+ except Exception as ex :
570+ self .call_soon_threadsafe (future .set_exception , ex )
571+
546572 def run_forever (self ):
547573 """Run until stop() is called."""
548574 self ._check_closed ()
@@ -632,6 +658,7 @@ def close(self):
632658 self ._closed = True
633659 self ._ready .clear ()
634660 self ._scheduled .clear ()
661+ self ._executor_shutdown_called = True
635662 executor = self ._default_executor
636663 if executor is not None :
637664 self ._default_executor = None
@@ -768,6 +795,8 @@ def run_in_executor(self, executor, func, *args):
768795 self ._check_callback (func , 'run_in_executor' )
769796 if executor is None :
770797 executor = self ._default_executor
798+ # Only check when the default executor is being used
799+ self ._check_default_executor ()
771800 if executor is None :
772801 executor = concurrent .futures .ThreadPoolExecutor ()
773802 self ._default_executor = executor
0 commit comments