Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions flo_ai/flo_ai/helpers/llm_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class LLMFactory:
'ollama',
'vertexai',
'rootflo',
'openai_vllm',
}

@staticmethod
Expand Down Expand Up @@ -63,6 +64,14 @@ def create_llm(model_config: Dict[str, Any], **kwargs) -> 'BaseLLM':
... {'provider': 'rootflo', 'model_id': 'model-123'},
... app_key='key', app_secret='secret', issuer='iss', audience='aud'
... )

>>> # OpenAI vLLM with base_url
>>> llm = LLMFactory.create_llm({
... 'provider': 'openai_vllm',
... 'name': 'microsoft/phi-4',
... 'base_url': 'http://localhost:8000/v1',
... 'api_key': 'vllm-key'
... })
"""
provider = model_config.get('provider', 'openai').lower()

Expand All @@ -76,6 +85,8 @@ def create_llm(model_config: Dict[str, Any], **kwargs) -> 'BaseLLM':
return LLMFactory._create_rootflo_llm(model_config, **kwargs)
elif provider == 'vertexai':
return LLMFactory._create_vertexai_llm(model_config, **kwargs)
elif provider == 'openai_vllm':
return LLMFactory._create_openai_vllm_llm(model_config, **kwargs)
else:
return LLMFactory._create_standard_llm(provider, model_config, **kwargs)

Expand Down Expand Up @@ -134,6 +145,39 @@ def _create_vertexai_llm(model_config: Dict[str, Any], **kwargs) -> 'BaseLLM':
base_url=base_url,
)

@staticmethod
def _create_openai_vllm_llm(model_config: Dict[str, Any], **kwargs) -> 'BaseLLM':
"""Create OpenAI vLLM instance with base_url handling."""
from flo_ai.llm import OpenAIVLLM

model_name = model_config.get('name')
if not model_name:
raise ValueError(
'openai_vllm provider requires "name" parameter in model configuration'
)

# Priority: kwargs > model_config > None
base_url = kwargs.get('base_url') or model_config.get('base_url')
if not base_url:
raise ValueError(
'openai_vllm provider requires "base_url" parameter. '
'Provide it in model_config or as a kwarg.'
)

# Optional parameters
api_key = kwargs.get('api_key') or model_config.get('api_key')
temperature = kwargs.get(
'temperature',
model_config.get('temperature', 0.7),
)

return OpenAIVLLM(
model=model_name,
base_url=base_url,
api_key=api_key,
temperature=temperature,
)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
@staticmethod
def _create_rootflo_llm(model_config: Dict[str, Any], **kwargs) -> 'BaseLLM':
"""Create RootFlo LLM instance with authentication."""
Expand Down
227 changes: 148 additions & 79 deletions flo_ai/flo_ai/llm/rootflo_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from flo_ai.models.chat_message import ImageMessageContent
import jwt
import httpx
import asyncio
from .base_llm import BaseLLM
from .openai_llm import OpenAI
from .gemini_llm import Gemini
from .anthropic_llm import Anthropic
from .openai_vllm import OpenAIVLLM
from flo_ai.tool.base_tool import Tool


Expand All @@ -17,12 +19,13 @@ class LLMProvider(Enum):
OPENAI = 'openai'
GEMINI = 'gemini'
ANTHROPIC = 'anthropic'
VLLM = 'vllm'


class RootFloLLM(BaseLLM):
"""
Proxy LLM class that routes to different SDK implementations based on type.
Acts as a unified interface to OpenAI, Gemini, and Anthropic SDKs via a proxy URL.
Acts as a unified interface to OpenAI, Gemini, Anthropic SDKs and VLLM via a proxy URL.
"""

def __init__(
Expand Down Expand Up @@ -50,9 +53,12 @@ def __init__(
access_token: Optional pre-generated access token (if provided, skips JWT generation)
temperature: Temperature parameter for generation
**kwargs: Additional parameters to pass to the underlying SDK

Note:
The actual LLM configuration is fetched lazily on first use (generate/stream)
to avoid blocking HTTP calls during initialization.
"""
# Validate required parameters

if not model_id:
raise ValueError('model_id is required')

Expand Down Expand Up @@ -80,88 +86,41 @@ def __init__(
if not app_key:
raise ValueError('app_key is required even when using access_token')

# Use provided access_token or generate JWT token
if access_token:
api_token = access_token
else:
now = datetime.now()
payload = {
'iss': issuer,
'aud': audience,
'iat': int(now.timestamp()),
'exp': int((now + timedelta(seconds=3600)).timestamp()),
'role_id': 'floconsole-service',
'user_id': 'service',
'service_auth': True,
}
service_token = jwt.encode(payload, app_secret, algorithm='HS256')
api_token = f'fc_{service_token}'

# Fetch LLM configuration from API
config = self._fetch_llm_config(base_url, model_id, api_token, app_key)
llm_model = config['llm_model']
llm_type = config['type']

# Map type string to LLMProvider enum
try:
llm_provider = LLMProvider(llm_type.lower())
except ValueError:
raise ValueError(
f'Unsupported LLM provider type from API: {llm_type}. '
f'Supported types: {[p.value for p in LLMProvider]}'
)
# Store initialization parameters for lazy initialization
self._base_url = base_url
self._model_id = model_id
self._app_key = app_key
self._app_secret = app_secret
self._issuer = issuer
self._audience = audience
self._access_token = access_token
self._temperature = temperature
self._kwargs = kwargs

# Lazy initialization state
self._llm = None
self._initialized = False
self._init_lock = asyncio.Lock()

Comment on lines +89 to +104
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Fix race in lazy initialization lock creation

The current double-checked locking pattern can create two different asyncio.Lock instances if two coroutines hit _ensure_initialized concurrently while _init_lock is still None. Each coroutine can then async with a different lock and both proceed to run the one-time initialization concurrently (duplicated config fetch and LLM construction, last writer wins).

Given this lock is lightweight, it’s simpler and safer to create a single instance in __init__ and always use it, instead of lazily creating it inside _ensure_initialized.

Consider a change along these lines:

@@
-        # Lazy initialization state
-        self._llm = None
-        self._initialized = False
-        self._init_lock = None
+        # Lazy initialization state
+        self._llm = None
+        self._initialized = False
+        # Single lock instance to protect one-time async initialization
+        self._init_lock = asyncio.Lock()
@@
-        # Lazily create the lock in an async context
-        if self._init_lock is None:
-            self._init_lock = asyncio.Lock()
-
-        # Acquire lock for initialization
-        async with self._init_lock:
+        # Acquire lock for initialization
+        async with self._init_lock:
             # Double-check: another task might have initialized while we waited
             if self._initialized:
                 return

Please verify this matches your supported Python/asyncio versions’ expectations for creating asyncio.Lock() in __init__.

To double-check the lock semantics for your runtime, you can consult the asyncio docs:

Also applies to: 173-188


🌐 Web query:

Is it safe to construct `asyncio.Lock()` outside a running event loop (e.g., in `__init__`) in Python <your target version>?

💡 Result:

Short answer: No for older Pythons — yes (safe) on modern Python, but prefer lazy creation.

Details:

  • On Python < 3.10 asyncio primitives (Lock, Event, Condition, Future, etc.) were bound to the event loop at construction time; creating asyncio.Lock() outside the intended running loop can attach it to the wrong loop and lead to RuntimeError when used later. [1][2]
  • Starting around Python 3.10 the behaviour was changed/cleaned up (the explicit loop argument was deprecated/removed and the library stopped requiring a loop at construction), so creating primitives before calling asyncio.run() is not the same problem on recent Python versions. [3][4]
  • Recommendation: to be robust across versions, don't construct asyncio primitives at import / in init unless you know which loop they should attach to. Instead initialize them lazily (e.g., self._lock = None and set self._lock = asyncio.Lock() inside an async initializer or on first use), or construct them from inside the running event loop. [2][5]

Sources:

  1. Python docs — asyncio synchronization primitives (shows loop parameter / behaviour). [2]
  2. StackOverflow discussion and examples explaining RuntimeError when Lock created outside running loop. [3]
  3. Python docs / issue notes about deprecating loop parameter and behaviour changes in 3.10+. [6]
  4. Python asyncio policy / event loop docs describing changes in get_event_loop/get_running_loop behaviour. [1]

(References: [1] asyncio policy/event loop docs; [2] asyncio sync docs; [3] StackOverflow; [6] Python issue/changes.)


I need to determine the target Python version for this codebase to verify whether the proposed fix is safe.


🏁 Script executed:

#!/bin/bash
# Search for Python version specifications in common config files
fd -e 'pyproject.toml' -e 'setup.py' -e 'setup.cfg' -e '.python-version' -e 'tox.ini' -e 'README.md' | head -20

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check pyproject.toml if it exists for Python version
if [ -f pyproject.toml ]; then
  echo "=== pyproject.toml ==="; 
  cat pyproject.toml | grep -A5 -B5 -i "python"
fi

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check setup.py if it exists
if [ -f setup.py ]; then
  echo "=== setup.py ===";
  cat setup.py | grep -i "python_requires\|version"
fi

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check for .python-version or similar
if [ -f .python-version ]; then
  echo "=== .python-version ===";
  cat .python-version
fi

Length of output: 0


🏁 Script executed:

#!/bin/bash
# List root directory to see structure
ls -la | head -30

Length of output: 1094


🏁 Script executed:

#!/bin/bash
# Broader search for config files anywhere in repo
fd -i 'setup|pyproject|requirements|tox' --type f | head -20

Length of output: 65


🏁 Script executed:

#!/bin/bash
# Check the actual rootflo_llm.py file for imports and context
cat flo_ai/flo_ai/llm/rootflo_llm.py | head -120

Length of output: 3902


🏁 Script executed:

#!/bin/bash
# Check flo_ai/pyproject.toml for Python version
echo "=== flo_ai/pyproject.toml ===" && cat flo_ai/pyproject.toml

Length of output: 1831


🏁 Script executed:

#!/bin/bash
# Check flo_ai/setup.py for Python version
echo "=== flo_ai/setup.py ===" && cat flo_ai/setup.py

Length of output: 651


🏁 Script executed:

#!/bin/bash
# Search for the helper methods mentioned in the review
rg -n 'format_tool_for_llm|format_tools_for_llm|format_image_in_message' flo_ai/flo_ai/llm/rootflo_llm.py

Length of output: 405


🏁 Script executed:

#!/bin/bash
# Get full view of the _ensure_initialized method and surrounding code
sed -n '170,220p' flo_ai/flo_ai/llm/rootflo_llm.py

Length of output: 1962


Fix race condition in lazy initialization lock and add guards to helper methods

The double-checked locking pattern at lines 182-184 has a race window: two concurrent coroutines can each see _init_lock is None and create two different asyncio.Lock instances, allowing both to proceed past the lock. While Python 3.10+ (your target version per pyproject.toml) permits safe asyncio.Lock() construction in __init__, the official recommendation is still lazy initialization for better practice.

Additionally, format_tool_for_llm, format_tools_for_llm, and format_image_in_message (lines 313–323) call self._llm.* without checking initialization, risking AttributeError if invoked before generate/stream. These methods should guard against uninitialized state.

Suggested approach:

  1. Lock: Create once in __init__:
-        self._init_lock = None
+        # Single lock instance to protect one-time async initialization
+        self._init_lock = asyncio.Lock()

Then remove the lazy lock creation from _ensure_initialized (lines 182–184).

  1. Helper methods: Add an initialization guard:
def format_tool_for_llm(self, tool: 'Tool') -> Dict[str, Any]:
    if not self._initialized:
        raise RuntimeError('LLM not yet initialized; call generate/stream first')
    return self._llm.format_tool_for_llm(tool)

Apply the same guard pattern to format_tools_for_llm and format_image_in_message.

🤖 Prompt for AI Agents
In flo_ai/flo_ai/llm/rootflo_llm.py around lines 89 to 104 and referenced init
code (lines ~182-184 and helper methods ~313-323), avoid creating the init lock
lazily to prevent a race: instantiate self._init_lock = asyncio.Lock() in
__init__ (where other lazy-init fields are set) and remove any on-the-fly lock
creation from _ensure_initialized; additionally, add guards to
format_tool_for_llm, format_tools_for_llm, and format_image_in_message that
raise a clear RuntimeError if self._initialized is False (e.g., "LLM not yet
initialized; call generate/stream first") before delegating to self._llm to
prevent AttributeError when these helpers are called too early.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vizsatiz, I'll provide the fixes for both issues:

Fix 1: Create lock in __init__ to prevent race condition

In the __init__ method, change the lock initialization:

# Lazy initialization state
self._llm = None
self._initialized = False
# Single lock instance to protect one-time async initialization
self._init_lock = asyncio.Lock()

Then in the _ensure_initialized method, remove the lazy lock creation (lines 182-184):

async def _ensure_initialized(self):
    """
    Ensure the LLM is initialized by fetching config on first use.
    Uses double-checked locking to prevent race conditions in concurrent scenarios.
    """
    # Fast path: already initialized
    if self._initialized:
        return

    # Acquire lock for initialization
    async with self._init_lock:
        # Double-check: another task might have initialized while we waited
        if self._initialized:
            return

        # ... rest of the initialization logic

Fix 2: Add initialization guards to helper methods

Update the three helper methods to check initialization state:

def format_tool_for_llm(self, tool: 'Tool') -> Dict[str, Any]:
    """Format a tool for the specific LLM's API"""
    if not self._initialized or self._llm is None:
        raise RuntimeError(
            'RootFloLLM is not initialized yet; call generate() or stream() first.'
        )
    return self._llm.format_tool_for_llm(tool)

def format_tools_for_llm(self, tools: List['Tool']) -> List[Dict[str, Any]]:
    """Format a list of tools for the specific LLM's API"""
    if not self._initialized or self._llm is None:
        raise RuntimeError(
            'RootFloLLM is not initialized yet; call generate() or stream() first.'
        )
    return self._llm.format_tools_for_llm(tools)

def format_image_in_message(self, image: ImageMessageContent) -> str:
    """Format a image in the message"""
    if not self._initialized or self._llm is None:
        raise RuntimeError(
            'RootFloLLM is not initialized yet; call generate() or stream() first.'
        )
    return self._llm.format_image_in_message(image)

These changes ensure thread-safety in the lazy initialization and prevent AttributeError when helper methods are called before the LLM is initialized.

# Will be set during initialization
self.base_url = base_url
self.model_id = model_id
self.llm_provider = None

# Call parent __init__ with minimal parameters
# Actual model will be set during lazy initialization
super().__init__(
model=llm_model, api_key=api_token, temperature=temperature, **kwargs
model='',
api_key='',
temperature=temperature,
**kwargs,
)

self.base_url = base_url
self.model_id = model_id
self.llm_provider = llm_provider

# Construct full URL for LLM inference
full_url = f'{base_url}/v1/llm-inference/{model_id}'

# Prepare custom headers for proxy authentication
custom_headers = {'X-Rootflo-Key': app_key}

# Instantiate appropriate SDK wrapper based on llm_provider
if llm_provider == LLMProvider.OPENAI:
self._llm = OpenAI(
model=llm_model,
base_url=full_url,
api_key=api_token,
temperature=temperature,
custom_headers=custom_headers,
**kwargs,
)
elif llm_provider == LLMProvider.ANTHROPIC:
self._llm = Anthropic(
model=llm_model,
base_url=full_url,
api_key=api_token,
temperature=temperature,
custom_headers=custom_headers,
**kwargs,
)
elif llm_provider == LLMProvider.GEMINI:
# Gemini SDK - pass base_url which will be handled via http_options
self._llm = Gemini(
model=llm_model,
api_key=api_token,
temperature=temperature,
base_url=full_url,
custom_headers=custom_headers,
**kwargs,
)
else:
raise ValueError(f'Unsupported LLM provider: {llm_provider}')

def _fetch_llm_config(
async def _fetch_llm_config_async(
self, base_url: str, model_id: str, api_token: str, app_key: str
) -> Dict[str, Any]:
"""
Fetch LLM configuration from the API endpoint.
Fetch LLM configuration from the API endpoint asynchronously.

Args:
base_url: The base URL of the API server
Expand All @@ -182,8 +141,8 @@ def _fetch_llm_config(
}

try:
with httpx.Client() as client:
response = client.get(config_url, headers=headers, timeout=30.0)
async with httpx.AsyncClient() as client:
response = await client.get(config_url, headers=headers, timeout=30.0)
response.raise_for_status()

data = response.json()
Expand Down Expand Up @@ -211,6 +170,110 @@ def _fetch_llm_config(
except Exception as e:
raise Exception(f'Failed to fetch LLM config: {str(e)}') from e

async def _ensure_initialized(self):
"""
Ensure the LLM is initialized by fetching config on first use.
Uses double-checked locking to prevent race conditions in concurrent scenarios.
"""
# Fast path: already initialized
if self._initialized:
return

# Acquire lock for initialization
async with self._init_lock:
# Double-check: another task might have initialized while we waited
if self._initialized:
return

# Generate or use provided access token
if self._access_token:
api_token = self._access_token
else:
now = datetime.now()
payload = {
'iss': self._issuer,
'aud': self._audience,
'iat': int(now.timestamp()),
'exp': int((now + timedelta(seconds=3600)).timestamp()),
'role_id': 'floconsole-service',
'user_id': 'service',
'service_auth': True,
}
service_token = jwt.encode(payload, self._app_secret, algorithm='HS256')
api_token = f'fc_{service_token}'

# Fetch LLM configuration from API
config = await self._fetch_llm_config_async(
self._base_url, self._model_id, api_token, self._app_key
)
llm_model = config['llm_model']
llm_type = config['type']

# Map type string to LLMProvider enum
try:
llm_provider = LLMProvider(llm_type.lower())
except ValueError:
raise ValueError(
f'Unsupported LLM provider type from API: {llm_type}. '
f'Supported types: {[p.value for p in LLMProvider]}'
)

# Update instance attributes
self.llm_provider = llm_provider
self.model = llm_model
self.api_key = api_token

# Construct full URL for LLM inference
full_url = f'{self._base_url}/v1/llm-inference/{self._model_id}'

# Prepare custom headers for proxy authentication
custom_headers = {'X-Rootflo-Key': self._app_key}

# Instantiate appropriate SDK wrapper based on llm_provider
if llm_provider == LLMProvider.OPENAI:
self._llm = OpenAI(
model=llm_model,
base_url=full_url,
api_key=api_token,
temperature=self._temperature,
custom_headers=custom_headers,
**self._kwargs,
)
elif llm_provider == LLMProvider.ANTHROPIC:
self._llm = Anthropic(
model=llm_model,
base_url=full_url,
api_key=api_token,
temperature=self._temperature,
custom_headers=custom_headers,
**self._kwargs,
)
elif llm_provider == LLMProvider.GEMINI:
# Gemini SDK - pass base_url which will be handled via http_options
self._llm = Gemini(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add support for vLLM and Ollama

model=llm_model,
api_key=api_token,
temperature=self._temperature,
base_url=full_url,
custom_headers=custom_headers,
**self._kwargs,
)
elif llm_provider == LLMProvider.VLLM:
# vLLM via OpenAI-compatible API
self._llm = OpenAIVLLM(
model=llm_model,
base_url=full_url,
api_key=api_token,
temperature=self._temperature,
custom_headers=custom_headers,
**self._kwargs,
)
else:
raise ValueError(f'Unsupported LLM provider: {llm_provider}')

# Mark as initialized
self._initialized = True

async def generate(
self,
messages: List[Dict[str, str]],
Expand All @@ -219,6 +282,7 @@ async def generate(
**kwargs,
) -> Dict[str, Any]:
"""Generate a response from the LLM"""
await self._ensure_initialized()
return await self._llm.generate(
messages, functions=functions, output_schema=output_schema, **kwargs
)
Expand All @@ -230,11 +294,16 @@ async def stream(
**kwargs: Any,
) -> AsyncIterator[Dict[str, Any]]:
"""Generate a streaming response from the LLM"""
await self._ensure_initialized()
async for chunk in self._llm.stream(messages, functions=functions, **kwargs):
yield chunk

def get_message_content(self, response: Any) -> str:
"""Extract message content from response"""
if not getattr(self, '_initialized', False) or self._llm is None:
raise RuntimeError(
'RootFloLLM is not initialized yet; call generate() or stream() first.'
)
return self._llm.get_message_content(response)

def format_tool_for_llm(self, tool: 'Tool') -> Dict[str, Any]:
Expand Down