@@ -727,6 +727,8 @@ class map_async(Stream):
727727 func: async callable
728728 *args :
729729 The arguments to pass to the function.
730+ buffer_size:
731+ The max size of the input buffer, default value is unlimited
730732 **kwargs:
731733 Keyword arguments to pass to func
732734
@@ -747,19 +749,23 @@ class map_async(Stream):
747749 8
748750
749751 """
750- def __init__ (self , upstream , func , * args , ** kwargs ):
752+ def __init__ (self , upstream , func , * args , buffer_size = 0 , ** kwargs ):
751753 self .func = func
752754 stream_name = kwargs .pop ('stream_name' , None )
753755 self .kwargs = kwargs
754756 self .args = args
755- self .input_queue = asyncio .Queue ()
757+ self .input_queue = asyncio .Queue (maxsize = buffer_size )
756758
757759 Stream .__init__ (self , upstream , stream_name = stream_name , ensure_io_loop = True )
758- self .input_task = self .loop . asyncio_loop . create_task (self .input_callback ())
760+ self .input_task = self ._create_task (self .input_callback ())
759761
760762 def update (self , x , who = None , metadata = None ):
761763 coro = self .func (x , * self .args , ** self .kwargs )
762- self .input_queue .put_nowait ((coro , metadata ))
764+ self ._retain_refs (metadata )
765+ return self ._create_task (self .input_queue .put ((coro , metadata )))
766+
767+ def _create_task (self , coro ):
768+ return self .loop .asyncio_loop .create_task (coro )
763769
764770 async def input_callback (self ):
765771 while True :
@@ -771,7 +777,10 @@ async def input_callback(self):
771777 logger .exception (e )
772778 raise
773779 else :
774- self ._emit (result , metadata = metadata )
780+ results = self ._emit (result , metadata = metadata )
781+ if results :
782+ await asyncio .gather (* results )
783+ self ._release_refs (metadata )
775784
776785
777786@Stream .register_api ()
0 commit comments