Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
ignore = E203, E402, E501, E731, E741, W503, W605, E722
ignore = E203, E402, E501, E731, E741, W503, W605, E722, E231, W604, E702, E226, E221, E713, E271
max-line-length = 119

# E402: module level import not at top of file
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ default_stages:
# - manual # Run in CI
repos:
- repo: https://github.com/psf/black.git
rev: 22.8.0
rev: 25.1.0
hooks:
- id: black
files: \.(py|pyi)$
Expand All @@ -18,7 +18,7 @@ repos:
hooks:
- id: isort
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
rev: 7.0.0
hooks:
- id: flake8
# 代码检查
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
ids_len = seq_lens[i, 0]
input_ids[i, 0:ids_len] = np.random.randint(1, 10, seq_lens[i, 0], "int64")

(x_remove_padding, cum_offsets_out, padding_offset, cu_seqlens_q, cu_seqlens_k,) = get_padding_offset(
(
x_remove_padding,
cum_offsets_out,
padding_offset,
cu_seqlens_q,
cu_seqlens_k,
) = get_padding_offset(
paddle.to_tensor(input_ids),
paddle.to_tensor(cum_offset),
paddle.to_tensor(token_num),
Expand Down
5 changes: 4 additions & 1 deletion fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,10 @@ def request_block_ids(self, task, block_size, dec_token_num, *args):
current_time = time.time()
self._update_matched_node_info(req_id, match_block_node, current_time)
# 2. prepare cache
(gpu_recv_block_ids, gpu_extra_block_ids,) = self._prepare_cache(
(
gpu_recv_block_ids,
gpu_extra_block_ids,
) = self._prepare_cache(
req_id,
input_ids,
block_size,
Expand Down
10 changes: 3 additions & 7 deletions fastdeploy/distributed/custom_all_reduce/cuda_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ class CudaRTLibrary:
Function(
"cudaStreamIsCapturing",
cudaError_t,
[
cudaStream_t,
ctypes.POINTER(cudaStreamCaptureStatus)
]
[cudaStream_t, ctypes.POINTER(cudaStreamCaptureStatus)],
),
]

Expand Down Expand Up @@ -197,9 +194,8 @@ def cudaIpcOpenMemHandle(self, handle: cudaIpcMemHandle_t) -> ctypes.c_void_p:
self.funcs["cudaIpcOpenMemHandle"](ctypes.byref(devPtr), handle, cudaIpcMemLazyEnablePeerAccess)
)
return devPtr

def cudaStreamIsCapturing(self, stream: cudaStream_t) -> ctypes.c_int:
is_capturing = ctypes.c_int()
self.CUDART_CHECK(
self.funcs["cudaStreamIsCapturing"](stream, is_capturing)
)
self.CUDART_CHECK(self.funcs["cudaStreamIsCapturing"](stream, is_capturing))
return is_capturing
4 changes: 2 additions & 2 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
"--ips",
type=lambda s: s.split(",") if s else None,
default=EngineArgs.ips,
help=
"IP addresses of all nodes participating in distributed inference.")
help="IP addresses of all nodes participating in distributed inference.",
)

# Performance tuning parameters group
perf_group = parser.add_argument_group("Performance Tuning")
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
mm_processor_kwargs,
enable_mm=False,
reasoning_parser=None,
data_parallel_size=1
data_parallel_size=1,
):
input_processor = InputPreprocessor(
tokenizer,
Expand All @@ -55,8 +55,7 @@ def __init__(
self.data_processor = input_processor.create_processor()
self.max_model_len = max_model_len
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
array_size = min(
max_chips_per_node, tensor_parallel_size * data_parallel_size)
array_size = min(max_chips_per_node, tensor_parallel_size * data_parallel_size)
self.worker_healthy_live_recorded_time_array = np.zeros(shape=[array_size], dtype=np.int32)
self.worker_healthy_live_signal = IPCSignal(
name="worker_healthy_live_signal",
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def lifespan(app: FastAPI):
args.mm_processor_kwargs,
args.enable_mm,
args.reasoning_parser,
args.data_parallel_size
args.data_parallel_size,
)
app.state.dynamic_load_weight = args.dynamic_load_weight
chat_handler = OpenAIServingChat(engine_client, pid, args.ips)
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ class ChatCompletionRequest(BaseModel):
top_p: Optional[float] = None
top_k: Optional[int] = None
min_p: Optional[float] = None
user: Optional[str] = None
user: Optional[str] = None
metadata: Optional[dict] = None
extra_body: Optional[dict] = None
return_token_ids: Optional[bool] = False
Expand Down
27 changes: 16 additions & 11 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import traceback
import uuid
from typing import List, Optional
import numpy as np

import aiozmq
import msgpack
import numpy as np
from aiozmq import zmq

from fastdeploy.entrypoints.openai.protocol import (
Expand Down Expand Up @@ -151,7 +152,9 @@ async def chat_completion_stream_generator(
if request.metadata is not None:
enable_thinking = request.metadata.get("enable_thinking")
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
enable_return_token_ids = request.return_token_ids or (request.extra_body is not None and request.extra_body.get('return_token_ids', False))
enable_return_token_ids = request.return_token_ids or (
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
)
while num_choices > 0:
try:
raw_data = await asyncio.wait_for(dealer.read(), timeout=10)
Expand Down Expand Up @@ -193,13 +196,13 @@ async def chat_completion_stream_generator(
choice = ChatCompletionResponseStreamChoice(
index=i,
delta=DeltaMessage(
role="assistant",
content="",
reasoning_content="",
role="assistant",
content="",
reasoning_content="",
tool_calls=None,
prompt_token_ids=None,
completion_token_ids=None,
)
),
)
if enable_return_token_ids:
choice.delta.prompt_token_ids = list(prompt_token_ids)
Expand Down Expand Up @@ -238,10 +241,10 @@ async def chat_completion_stream_generator(

previous_num_tokens += len(output["token_ids"])
delta_message = DeltaMessage(
content=delta_text,
reasoning_content=output.get("reasoning_content"), \
content=delta_text,
reasoning_content=output.get("reasoning_content"),
prompt_token_ids=None,
completion_token_ids=None,
completion_token_ids=None,
tool_calls=output.get("tool_call_content", []),
)

Expand Down Expand Up @@ -329,7 +332,9 @@ async def chat_completion_full_generator(
final_res = None
enable_thinking = None
include_stop_str_in_output = False
enable_return_token_ids = request.return_token_ids or (request.extra_body is not None and request.extra_body.get('return_token_ids', False))
enable_return_token_ids = request.return_token_ids or (
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
)
try:
dealer = await aiozmq.create_zmq_stream(zmq.DEALER, connect=f"ipc:///dev/shm/router_{self.pid}.ipc")
dealer.write([b"", request_id.encode("utf-8")])
Expand Down Expand Up @@ -403,7 +408,7 @@ async def chat_completion_full_generator(
reasoning_content=output.get("reasoning_content"),
tool_calls=output.get("tool_call_content"),
prompt_token_ids=prompt_token_ids if enable_return_token_ids else None,
completion_token_ids=completion_token_ids if enable_return_token_ids else None,
completion_token_ids=(completion_token_ids if enable_return_token_ids else None),
)
logprobs_full_res = None
if logprob_contents:
Expand Down
54 changes: 32 additions & 22 deletions fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import time
import uuid
from typing import List
import numpy as np

import aiozmq
import msgpack
import numpy as np
from aiozmq import zmq

from fastdeploy.engine.request import RequestOutput
Expand Down Expand Up @@ -48,7 +49,6 @@ def __init__(self, engine_client, pid, ips):
else:
self.master_ip = self.master_ip.split(",")[0]


def _check_master(self):
if self.master_ip is None:
return True
Expand Down Expand Up @@ -238,7 +238,9 @@ async def completion_stream_generator(
model=model_name,
choices=choices,
)
enable_return_token_ids = request.return_token_ids or (request.extra_body is not None and request.extra_body.get('return_token_ids', False))
enable_return_token_ids = request.return_token_ids or (
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
)
current_waiting_time = 0
while num_choices > 0:
try:
Expand Down Expand Up @@ -267,12 +269,16 @@ async def completion_stream_generator(
id=request_id,
created=created_time,
model=model_name,
choices=[CompletionResponseStreamChoice(
index=idx,
text="",
prompt_token_ids=list(prompt_batched_token_ids[idx]) if enable_return_token_ids else None,
completion_token_ids=None,
)]
choices=[
CompletionResponseStreamChoice(
index=idx,
text="",
prompt_token_ids=(
list(prompt_batched_token_ids[idx]) if enable_return_token_ids else None
),
completion_token_ids=None,
)
],
)
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
first_iteration[idx] = False
Expand All @@ -286,15 +292,17 @@ async def completion_stream_generator(

output = res["outputs"]

choices.append(CompletionResponseStreamChoice(
index=idx,
text=output["text"],
prompt_token_ids=None,
completion_token_ids=output.get("token_ids") if enable_return_token_ids else None,
tool_calls=output.get("tool_call_content"),
reasoning_content=output.get("reasoning_content"),
arrival_time=arrival_time
))
choices.append(
CompletionResponseStreamChoice(
index=idx,
text=output["text"],
prompt_token_ids=None,
completion_token_ids=(output.get("token_ids") if enable_return_token_ids else None),
tool_calls=output.get("tool_call_content"),
reasoning_content=output.get("reasoning_content"),
arrival_time=arrival_time,
)
)
if res["finished"]:
if request.max_tokens is None or output_tokens[idx] + 1 != request.max_tokens:
chunk.choices[0].finish_reason = "stop"
Expand Down Expand Up @@ -353,12 +361,14 @@ def request_output_to_completion_response(
created_time: int,
model_name: str,
prompt_batched_token_ids: list(),
completion_batched_token_ids: list()
completion_batched_token_ids: list(),
) -> CompletionResponse:
choices: List[CompletionResponseChoice] = []
num_prompt_tokens = 0
num_generated_tokens = 0
enable_return_token_ids = request.return_token_ids or (request.extra_body is not None and request.extra_body.get('return_token_ids', False))
enable_return_token_ids = request.return_token_ids or (
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
)

for idx in range(len(final_res_batch)):
final_res = final_res_batch[idx]
Expand All @@ -385,8 +395,8 @@ def request_output_to_completion_response(
index=len(choices),
text=output_text,
prompt_token_ids=prompt_token_ids if enable_return_token_ids else None,
completion_token_ids=completion_token_ids if enable_return_token_ids else None,
reasoning_content=output.get('reasoning_content'),
completion_token_ids=(completion_token_ids if enable_return_token_ids else None),
reasoning_content=output.get("reasoning_content"),
tool_calls=output.get("tool_call_content"),
logprobs=None,
finish_reason=None,
Expand Down
10 changes: 4 additions & 6 deletions fastdeploy/input/ernie_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ def process_request(self, request, max_model_len=None, **kwargs):

if request.prompt_token_ids is None or len(request.prompt_token_ids) == 0:
if request.prompt is None and request.messages is None:
raise ValueError(
f"The request should have `prompt_token_ids`, `prompt` or `messages`: {request}.")
raise ValueError(f"The request should have `prompt_token_ids`, `prompt` or `messages`: {request}.")
if request.prompt is not None:
prompt = request.prompt if request.prompt is not None else request.messages[0]
prompt = prompt[0] if isinstance(prompt, list) else prompt
Expand Down Expand Up @@ -164,8 +163,8 @@ def process_request_dict(self, request, max_model_len=None):
req_id = request.get("request_id", None)
data_processor_logger.info(f"req_id:{req_id}, tokens:{tokens}, token_ids: {token_ids}")
else:
request['prompt_token_ids'] = self.messages2ids(request)
if len(request['prompt_token_ids']) == 0:
request["prompt_token_ids"] = self.messages2ids(request)
if len(request["prompt_token_ids"]) == 0:
raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs")

# truncate prompts that exceed the length limit
Expand Down Expand Up @@ -246,8 +245,7 @@ def process_response_dict_normal(self, response_dict, **kwargs):
if is_end:
full_text = previous_texts + delta_text
if enable_thinking and self.reasoning_parser:
reasoning_content, text = self.reasoning_parser.extract_reasoning_content(
full_text, response_dict)
reasoning_content, text = self.reasoning_parser.extract_reasoning_content(full_text, response_dict)
response_dict["outputs"]["text"] = text
response_dict["outputs"]["reasoning_content"] = reasoning_content
else:
Expand Down
3 changes: 2 additions & 1 deletion fastdeploy/input/mm_processor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,5 +507,6 @@ def apply_chat_template(self, request):
tokens = self.tokenizer.tokenize(prompt_token_str)
token_ids = self.tokenizer.convert_tokens_to_ids(tokens)
data_processor_logger.info(
f"req_id:{request.get('request_id', ''),} tokens: {tokens}, token_ids: {token_ids}")
f"req_id:{request.get('request_id', ''), } tokens: {tokens}, token_ids: {token_ids}"
)
return token_ids
21 changes: 8 additions & 13 deletions fastdeploy/input/text_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ def process_request(self, request, max_model_len=None, **kwargs):
task["enable_thinking"] = kwargs.get("enable_thinking", True)
request.prompt_token_ids = self.messages2ids(task)
else:
raise ValueError(
f"The request should have `input_ids`, `text` or `messages`: {request}."
)
raise ValueError(f"The request should have `input_ids`, `text` or `messages`: {request}.")
if len(request.prompt_token_ids) == 0:
raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs")
if request.get("max_tokens") is None:
Expand Down Expand Up @@ -281,18 +279,16 @@ def process_request_dict(self, request, max_model_len=None, **kwargs):

data_processor_logger.info(f"Processing request {request}")
# processing prompt_token_ids
if not request.get('prompt_token_ids'):
if 'prompt' in request:
request['prompt_token_ids'] = self.text2ids(request['prompt'], max_model_len).tolist()
elif 'messages' in request:
if not request.get("prompt_token_ids"):
if "prompt" in request:
request["prompt_token_ids"] = self.text2ids(request["prompt"], max_model_len).tolist()
elif "messages" in request:
if self.tokenizer.chat_template is None:
raise ValueError("This model does not support chat_template.")
request["prompt_token_ids"] = self.messages2ids(request)
else:
raise ValueError(
f"Request must contain 'prompt_token_ids', 'prompt', or 'messages': {request}"
)
if len(request['prompt_token_ids']) == 0:
raise ValueError(f"Request must contain 'prompt_token_ids', 'prompt', or 'messages': {request}")
if len(request["prompt_token_ids"]) == 0:
raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs")
if request.get("max_tokens") is None:
request["max_tokens"] = max(1, max_model_len - len(request["prompt_token_ids"]))
Expand Down Expand Up @@ -357,8 +353,7 @@ def process_response_dict_normal(self, response_dict, **kwargs):
if is_end:
full_text = previous_texts + delta_text
if enable_thinking and self.reasoning_parser:
reasoning_content, text = self.reasoning_parser.extract_reasoning_content(
full_text, response_dict)
reasoning_content, text = self.reasoning_parser.extract_reasoning_content(full_text, response_dict)
response_dict["outputs"]["text"] = text
response_dict["outputs"]["reasoning_content"] = reasoning_content
else:
Expand Down
Loading
Loading