Conversation
| # here we attempt to figure out if we are running on a dask worker | ||
| # if so we use the dask worker _loop as ours, | ||
| # and we register our RPC server | ||
| worker = get_worker() | ||
| self._loop = worker.loop if worker else None | ||
| self.rpc = self.config.rpc_transport() | ||
| self.rpc.serve_module_rpc(self) | ||
| self.rpc.start() | ||
| except ValueError: | ||
| return | ||
| ... | ||
|
|
||
| # assuming we are not running on a dask worker, | ||
| # it's our job to determine or create the event loop | ||
| if not self._loop: | ||
| try: | ||
| self._loop = asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| self._loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(self._loop) |
There was a problem hiding this comment.
This whole thing is done in self._loop = get_loop(). You can delete it all except the three self.rpc lines.
| try: | ||
| logger.info("Sending query through module system...") | ||
| future = agent.query_async("Hello! What skills do you have?") | ||
|
|
||
| # In the module system, the loop should be running | ||
| time.sleep(5) # Wait for processing | ||
|
|
||
| if hasattr(future, "result"): | ||
| result = future.result(timeout=10) | ||
| logger.info(f"Got result: {result}") | ||
| except Exception as e: | ||
| logger.error(f"Query failed: {e}") |
There was a problem hiding this comment.
Since it uses query_async it should probably be an async test, but otherwise you don't need the sleep since result(timeout=10) sleeps for you.
from concurrent.futures import TimeoutError
try:
result = future.result(timeout=10)
except TimeoutError:
pytest.fail("getting result timed out")| {"args": [cnt, 2]}, | ||
| ) | ||
|
|
||
| time.sleep(0.1 * cnt) |
There was a problem hiding this comment.
This should be asyncio.sleep. Using time.sleep just blocks the entire GIL.
| skill_transport: type[SkillCommsSpec] = LCMSkillComms | ||
|
|
||
|
|
||
| _skill_thread_pool = ThreadPoolExecutor(max_workers=50, thread_name_prefix="skill_worker") |
There was a problem hiding this comment.
It would be nicer if this was on the SkillContainer so you could shut it down cleanly with it. But otherwise you can use:
@atexit.register
def _shutdown_skill_pool():
_skill_thread_pool.shutdown(wait=True)There was a problem hiding this comment.
true, fixed, skillcontainers host their pools now
| agent.register_skills(testcontainer) | ||
| agent.start() | ||
| agent.run_implicit_skill("uptime_seconds") | ||
| agent.query_async( |
There was a problem hiding this comment.
This is missing await.
| # In query_async() and query() | ||
| if loop_type == "AsyncIOMainLoop": | ||
| actual_loop = self._loop.asyncio_loop # Get the wrapped asyncio loop | ||
| return asyncio.ensure_future(self.agent_loop(query), loop=actual_loop) |
There was a problem hiding this comment.
Is this the old version?
Now it's:
def query_async(self, query: str):
return asyncio.ensure_future(self.agent_loop(query), loop=self._loop)| future = agent.query_async(query_text) | ||
|
|
||
| # Wait for the result (with timeout) | ||
| await asyncio.wait_for(asyncio.wrap_future(future), timeout=30.0) |
There was a problem hiding this comment.
I don't think you need wrap_future. It's already an asyncio.Future.
| # Use query_async which returns a Future | ||
| future = agent.query_async(query_text) | ||
|
|
||
| # Wait for the result (with timeout) | ||
| await asyncio.wait_for(asyncio.wrap_future(future), timeout=30.0) | ||
|
|
||
| # Get the result | ||
| if future.done(): | ||
| result = future.result() | ||
| logger.info(f"Agent response: {result}") | ||
| return result | ||
| else: | ||
| logger.warning("Query did not complete") | ||
| return "Query timeout" | ||
|
|
||
| except asyncio.TimeoutError: | ||
| logger.error("Query timed out after 30 seconds") | ||
| return "Query timeout" |
There was a problem hiding this comment.
| # Use query_async which returns a Future | |
| future = agent.query_async(query_text) | |
| # Wait for the result (with timeout) | |
| await asyncio.wait_for(asyncio.wrap_future(future), timeout=30.0) | |
| # Get the result | |
| if future.done(): | |
| result = future.result() | |
| logger.info(f"Agent response: {result}") | |
| return result | |
| else: | |
| logger.warning("Query did not complete") | |
| return "Query timeout" | |
| except asyncio.TimeoutError: | |
| logger.error("Query timed out after 30 seconds") | |
| return "Query timeout" | |
| query = agent.query_async(query_text) | |
| result = await asyncio.wait_for(query, timeout=30.0) | |
| logger.info(f"Agent response: {result}") | |
| return result | |
| except asyncio.TimeoutError: | |
| logger.error("Query timed out after 30 seconds") | |
| return "Query timeout" |
wait_for throws TimeoutError or returns the result
|
superseded by #582 |
Can merge and start experimenting, but I'm sure we'll iterate a lot
screencast demo @
https://discord.com/channels/1341146487186391173/1357833666751107174/1407023805523562537