Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/crewai/cli/create_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import click

from crewai.telemetry import Telemetry


def create_flow(name):
"""Create a new flow."""
Expand All @@ -15,6 +17,10 @@ def create_flow(name):
click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
return

# Initialize telemetry
telemetry = Telemetry()
telemetry.flow_creation_span(class_name)

# Create directory structure
(project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
Expand Down
17 changes: 14 additions & 3 deletions src/crewai/flow/flow.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# flow.py

# flow.py

import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union

from pydantic import BaseModel

from crewai.flow.flow_visualizer import plot_flow
from crewai.telemetry import Telemetry

T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])

Expand Down Expand Up @@ -142,6 +141,8 @@ def __new__(mcs, name, bases, dct):


class Flow(Generic[T], metaclass=FlowMeta):
_telemetry = Telemetry()

_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
_routers: Dict[str, str] = {}
Expand All @@ -161,6 +162,8 @@ def __init__(self):
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs

self._telemetry.flow_creation_span(self.__class__.__name__)

for method_name in dir(self):
if callable(getattr(self, method_name)) and not method_name.startswith(
"__"
Expand Down Expand Up @@ -190,6 +193,10 @@ async def kickoff(self) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")

self._telemetry.flow_execution_span(
self.__class__.__name__, list(self._methods.keys())
)

# Create tasks for all start methods
tasks = [
self._execute_start_method(start_method)
Expand Down Expand Up @@ -270,5 +277,9 @@ async def _execute_single_listener(self, listener: str, result: Any):

traceback.print_exc()

def plot(self, filename: str = "crewai_flow_graph"):
def plot(self, filename: str = "crewai_flow"):
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())
)

plot_flow(self, filename)
69 changes: 55 additions & 14 deletions src/crewai/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import os
import platform
import warnings
from typing import TYPE_CHECKING, Any, Optional
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Optional


@contextmanager
Expand All @@ -21,7 +21,9 @@ def suppress_warnings():


from opentelemetry import trace # noqa: E402
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
Expand Down Expand Up @@ -117,9 +119,11 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file,
"function_calling_llm": agent.function_calling_llm.model
if agent.function_calling_llm
else "",
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
Expand All @@ -145,9 +149,9 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role
if task.agent
else "None",
"agent_role": (
task.agent.role if task.agent else "None"
),
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
Expand Down Expand Up @@ -184,9 +188,11 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"function_calling_llm": agent.function_calling_llm.model
if agent.function_calling_llm
else "",
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
Expand All @@ -210,9 +216,9 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
"id": str(task.id),
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role
if task.agent
else "None",
"agent_role": (
task.agent.role if task.agent else "None"
),
"agent_key": task.agent.key if task.agent else None,
"tools_names": [
tool.name.casefold()
Expand Down Expand Up @@ -568,3 +574,38 @@ def _add_attribute(self, span, key, value):
return span.set_attribute(key, value)
except Exception:
pass

def flow_creation_span(self, flow_name: str):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass

def flow_plotting_span(self, flow_name: str, node_names: list[str]):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
self._add_attribute(span, "flow_name", flow_name)
self._add_attribute(span, "node_names", json.dumps(node_names))
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass

def flow_execution_span(self, flow_name: str, node_names: list[str]):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")
self._add_attribute(span, "flow_name", flow_name)
self._add_attribute(span, "node_names", json.dumps(node_names))
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass