From eff80c204983bde954ff64517764113cce8ed0ca Mon Sep 17 00:00:00 2001 From: Muninn Date: Fri, 14 Feb 2025 12:48:13 +0800 Subject: [PATCH] improve: api doc and error handling --- app/config/config.py | 3 + app/core/engine.py | 202 +++++++++++++++++--------------- app/entrypoints/web.py | 253 ++++++++++++++++++++++++++--------------- 3 files changed, 279 insertions(+), 179 deletions(-) diff --git a/app/config/config.py b/app/config/config.py index 121602d..bf70c56 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -68,6 +68,9 @@ def __init__(self): # Admin self.admin_auth_enabled = self.load("ADMIN_AUTH_ENABLED", "false") == "true" self.admin_jwt_secret = self.load("ADMIN_JWT_SECRET") + self.debug_auth_enabled = self.load("DEBUG_AUTH_ENABLED", "false") == "true" + self.debug_username = self.load("DEBUG_USERNAME") + self.debug_password = self.load("DEBUG_PASSWORD") # API self.api_auth_enabled = self.load("API_AUTH_ENABLED", "false") == "true" self.api_jwt_secret = self.load("API_JWT_SECRET") diff --git a/app/core/engine.py b/app/core/engine.py index ca3af86..f9731c8 100644 --- a/app/core/engine.py +++ b/app/core/engine.py @@ -332,7 +332,8 @@ async def agent_executor(agent_id: str) -> (CompiledGraph, float): async def execute_agent(message: ChatMessage, debug: bool = False) -> list[ChatMessage]: - """Execute an agent with the given prompt and return response lines. + """ + Execute an agent with the given prompt and return response lines. This function: 1. Configures execution context with thread ID @@ -379,106 +380,124 @@ async def execute_agent(message: ChatMessage, debug: bool = False) -> list[ChatM ) # run - cached_tool_step = None - async for chunk in executor.astream( - {"messages": [HumanMessage(content=content)]}, stream_config - ): - this_time = time.perf_counter() - logger.debug(f"stream chunk: {chunk}", extra={"thread_id": thread_id}) - if "agent" in chunk and "messages" in chunk["agent"]: - if len(chunk["agent"]["messages"]) != 1: - logger.error( - "unexpected agent message: " + str(chunk["agent"]["messages"]), - extra={"thread_id": thread_id}, - ) - msg = chunk["agent"]["messages"][0] - if msg.tool_calls: - # tool calls, save for later use - cached_tool_step = msg - elif msg.content: - # agent message - chat_message = ChatMessage( + try: + cached_tool_step = None + async for chunk in executor.astream( + {"messages": [HumanMessage(content=content)]}, stream_config + ): + this_time = time.perf_counter() + logger.debug(f"stream chunk: {chunk}", extra={"thread_id": thread_id}) + if "agent" in chunk and "messages" in chunk["agent"]: + if len(chunk["agent"]["messages"]) != 1: + logger.error( + "unexpected agent message: " + str(chunk["agent"]["messages"]), + extra={"thread_id": thread_id}, + ) + msg = chunk["agent"]["messages"][0] + if msg.tool_calls: + # tool calls, save for later use + cached_tool_step = msg + elif msg.content: + # agent message + chat_message = ChatMessage( + id=str(XID()), + agent_id=message.agent_id, + chat_id=message.chat_id, + author_id=message.agent_id, + author_type=AuthorType.AGENT, + message=msg.content, + input_tokens=msg.usage_metadata.get("input_tokens", 0), + output_tokens=msg.usage_metadata.get("output_tokens", 0), + time_cost=this_time - last, + ) + last = this_time + if cold_start_cost > 0: + chat_message.cold_start_cost = cold_start_cost + cold_start_cost = 0 + resp.append(chat_message) + await chat_message.save() + else: + logger.error( + "unexpected agent message: " + str(msg), + extra={"thread_id": thread_id}, + ) + elif "tools" in chunk and "messages" in chunk["tools"]: + if not cached_tool_step: + logger.error( + "unexpected tools message: " + str(chunk["tools"]), + extra={"thread_id": thread_id}, + ) + continue + skill_calls = [] + for msg in chunk["tools"]["messages"]: + if not hasattr(msg, "tool_call_id"): + logger.error( + "unexpected tools message: " + str(chunk["tools"]), + extra={"thread_id": thread_id}, + ) + continue + for call in cached_tool_step.tool_calls: + if call["id"] == msg.tool_call_id: + skill_call: ChatMessageSkillCall = { + "name": call["name"], + "parameters": call["args"], + "success": True, + } + if msg.status == "error": + skill_call["success"] = False + skill_call["error_message"] = msg.error + else: + if debug: + skill_call["response"] = msg.content + else: + skill_call["response"] = textwrap.shorten( + msg.content, width=100, placeholder="..." + ) + skill_calls.append(skill_call) + break + skill_message = ChatMessage( id=str(XID()), agent_id=message.agent_id, chat_id=message.chat_id, author_id=message.agent_id, - author_type=AuthorType.AGENT, - message=msg.content, - input_tokens=msg.usage_metadata.get("input_tokens", 0), - output_tokens=msg.usage_metadata.get("output_tokens", 0), + author_type=AuthorType.SKILL, + message="", + skill_calls=skill_calls, + input_tokens=cached_tool_step.usage_metadata.get("input_tokens", 0), + output_tokens=cached_tool_step.usage_metadata.get( + "output_tokens", 0 + ), time_cost=this_time - last, ) last = this_time if cold_start_cost > 0: - chat_message.cold_start_cost = cold_start_cost + skill_message.cold_start_cost = cold_start_cost cold_start_cost = 0 - resp.append(chat_message) - await chat_message.save() + cached_tool_step = None + resp.append(skill_message) + await skill_message.save() + elif "memory_manager" in chunk: + pass else: logger.error( - "unexpected agent message: " + str(msg), + "unexpected message type: " + str(chunk), extra={"thread_id": thread_id}, ) - elif "tools" in chunk and "messages" in chunk["tools"]: - if not cached_tool_step: - logger.error( - "unexpected tools message: " + str(chunk["tools"]), - extra={"thread_id": thread_id}, - ) - continue - skill_calls = [] - for msg in chunk["tools"]["messages"]: - if not hasattr(msg, "tool_call_id"): - logger.error( - "unexpected tools message: " + str(chunk["tools"]), - extra={"thread_id": thread_id}, - ) - continue - for call in cached_tool_step.tool_calls: - if call["id"] == msg.tool_call_id: - skill_call: ChatMessageSkillCall = { - "name": call["name"], - "parameters": call["args"], - "success": True, - } - if msg.status == "error": - skill_call["success"] = False - skill_call["error_message"] = msg.error - else: - if debug: - skill_call["response"] = msg.content - else: - skill_call["response"] = textwrap.shorten( - msg.content, width=100, placeholder="..." - ) - skill_calls.append(skill_call) - break - skill_message = ChatMessage( - id=str(XID()), - agent_id=message.agent_id, - chat_id=message.chat_id, - author_id=message.agent_id, - author_type=AuthorType.SKILL, - message="", - skill_calls=skill_calls, - input_tokens=cached_tool_step.usage_metadata.get("input_tokens", 0), - output_tokens=cached_tool_step.usage_metadata.get("output_tokens", 0), - time_cost=this_time - last, - ) - last = this_time - if cold_start_cost > 0: - skill_message.cold_start_cost = cold_start_cost - cold_start_cost = 0 - cached_tool_step = None - resp.append(skill_message) - await skill_message.save() - elif "memory_manager" in chunk: - pass - else: - logger.error( - "unexpected message type: " + str(chunk), - extra={"thread_id": thread_id}, - ) + except Exception as e: + logger.error( + f"failed to execute agent: {str(e)}", extra={"thread_id": thread_id} + ) + error_message = ChatMessage( + id=str(XID()), + agent_id=message.agent_id, + chat_id=message.chat_id, + author_id=message.agent_id, + author_type=AuthorType.SYSTEM, + message=f"Error in agent:\n {str(e)}", + time_cost=time.perf_counter() - start, + ) + await error_message.save() + resp.append(error_message) return resp @@ -489,8 +508,9 @@ async def clean_agent_memory( clean_agent_memory: bool = False, clean_skills_memory: bool = False, debug: bool = False, -): - """Clean an agent's memory with the given prompt and return response. +) -> str: + """ + Clean an agent's memory with the given prompt and return response. This function: 1. Cleans the agents skills data. @@ -502,6 +522,8 @@ async def clean_agent_memory( Args: agent_id (str): Agent ID thread_id (str): Thread ID for the agent memory cleanup + clean_agent_memory (bool): Whether to clean agent's memory data + clean_skills_memory (bool): Whether to clean skills memory data debug (bool): Enable debug mode Returns: diff --git a/app/entrypoints/web.py b/app/entrypoints/web.py index 361faad..bb7af0a 100644 --- a/app/entrypoints/web.py +++ b/app/entrypoints/web.py @@ -1,11 +1,21 @@ """IntentKit Web API Router.""" import logging +import secrets from typing import List from epyxid import XID -from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request +from fastapi import ( + APIRouter, + Depends, + HTTPException, + Path, + Query, + Request, + status, +) from fastapi.responses import PlainTextResponse +from fastapi.security import HTTPBasic, HTTPBasicCredentials from langchain_core.messages import BaseMessage from sqlmodel import desc, select from sqlmodel.ext.asyncio.session import AsyncSession @@ -25,48 +35,93 @@ chat_router = APIRouter() chat_router_readonly = APIRouter() +# Add security scheme +security = HTTPBasic() + + +# Add credentials checker +def verify_debug_credentials(credentials: HTTPBasicCredentials = Depends(security)): + from app.config import config + + if not config.debug_auth_enabled: + return None + + if not config.debug_username or not config.debug_password: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Debug credentials not configured", + ) + + is_username_correct = secrets.compare_digest( + credentials.username.encode("utf8"), config.debug_username.encode("utf8") + ) + is_password_correct = secrets.compare_digest( + credentials.password.encode("utf8"), config.debug_password.encode("utf8") + ) + + if not (is_username_correct and is_password_correct): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Basic"}, + ) + return credentials.username + @chat_router.get( - "/{agent_id}/chats/{chat_id}", tags=["Debug"], response_model=list[BaseMessage] + "/debug/{agent_id}/chats/{chat_id}", + tags=["Debug"], + response_model=List[BaseMessage], + dependencies=[Depends(verify_debug_credentials)], + operation_id="debug_chat_history", + summary="Chat History", ) async def chat_history( agent_id: str = Path(..., description="Agent id"), chat_id: str = Path(..., description="Chat id"), -): +) -> List[BaseMessage]: return await thread_stats(agent_id, chat_id) -@chat_router.get("/{aid}/chat", tags=["Debug"], response_class=PlainTextResponse) +@chat_router.get( + "/{aid}/chat", tags=["Debug"], response_class=PlainTextResponse, deprecated=True +) +@chat_router.get( + "/debug/{aid}/chat", + tags=["Debug"], + response_class=PlainTextResponse, + dependencies=[Depends(verify_debug_credentials)], + operation_id="debug_chat", + summary="Chat", +) async def debug_chat( request: Request, - aid: str = Path(..., description="Agent id"), + aid: str = Path(..., description="Agent ID"), q: str = Query(None, description="Query string"), debug: bool = Query(None, description="Enable debug mode"), thread: str = Query(None, description="Thread ID for conversation tracking"), -): +) -> str: """Debug mode: Chat with an AI agent. - This endpoint: + **Process Flow:** 1. Validates agent quota 2. Creates a thread-specific context 3. Executes the agent with the query 4. Updates quota usage - Args: - - request: FastAPI request object - - aid: Agent ID - - q: User's input query - - debug: Enable debug mode - - thread: Thread ID for conversation tracking - - Returns: - - str: Formatted chat response - - Raises: - - HTTPException: - - 404: Agent not found - - 429: Quota exceeded - - 500: Internal server error + **Parameters:** + * `aid` - Agent ID + * `q` - User's input query + * `debug` - Enable debug mode (show whole skill response) + * `thread` - Thread ID for conversation tracking + + **Returns:** + * `str` - Formatted chat response + + **Raises:** + * `404` - Agent not found + * `429` - Quota exceeded + * `500` - Internal server error """ if not q: raise HTTPException(status_code=400, detail="Query string cannot be empty") @@ -131,7 +186,11 @@ async def debug_chat( @chat_router_readonly.get( - "/agents/{aid}/chat/history", tags=["Chat"], response_model=List[ChatMessage] + "/agents/{aid}/chat/history", + tags=["Chat"], + response_model=List[ChatMessage], + operation_id="get_chat_history", + summary="Chat History", ) async def get_chat_history( aid: str = Path(..., description="Agent ID"), @@ -140,17 +199,15 @@ async def get_chat_history( ) -> List[ChatMessage]: """Get chat history for a specific chat. - Args: - - aid: Agent ID - - chat_id: Chat ID to get history for - - db: Database session + **Parameters:** + * `aid` - Agent ID + * `chat_id` - Chat ID to get history for - Returns: - - List[ChatMessage]: List of chat messages, ordered by creation time ascending + **Returns:** + * `List[ChatMessage]` - List of chat messages, ordered by creation time ascending - Raises: - - HTTPException: - - 404: Agent not found + **Raises:** + * `404` - Agent not found """ # Get agent and check if exists result = await db.exec(select(Agent).where(Agent.id == aid)) @@ -173,8 +230,14 @@ async def get_chat_history( return messages -@chat_router.get("/agents/{aid}/chat/retry", tags=["Chat"], response_model=ChatMessage) -async def retry_chat( +@chat_router.get( + "/agents/{aid}/chat/retry", + tags=["Chat"], + response_model=ChatMessage, + deprecated=True, + summary="Retry Chat", +) +async def retry_chat_deprecated( aid: str = Path(..., description="Agent ID"), chat_id: str = Query(..., description="Chat ID to retry last message"), db: AsyncSession = Depends(get_db), @@ -184,19 +247,17 @@ async def retry_chat( If the last message is from the agent, return it directly. If the last message is from a user, generate a new agent response. - Args: - - aid: Agent ID - - chat_id: Chat ID to retry - - db: Database session + **Parameters:** + * `aid` - Agent ID + * `chat_id` - Chat ID to retry - Returns: - - ChatMessage: Agent's response message + **Returns:** + * `ChatMessage` - Agent's response message - Raises: - - HTTPException: - - 404: Agent not found or no messages found - - 429: Quota exceeded - - 500: Internal server error + **Raises:** + * `404` - Agent not found or no messages found + * `429` - Quota exceeded + * `500` - Internal server error """ # Get agent and check if exists agent = await Agent.get(aid) @@ -236,13 +297,17 @@ async def retry_chat( return error_message # If last message is from user, generate a new agent response - return await create_chat() + return await create_chat_deprecated() @chat_router.put( - "/agents/{aid}/public/chat/retry", tags=["Chat"], response_model=list[ChatMessage] + "/agents/{aid}/chat/retry/v2", + tags=["Chat"], + response_model=list[ChatMessage], + operation_id="retry_chat", + summary="Retry Chat", ) -async def retry_public_chat( +async def retry_chat( aid: str = Path(..., description="Agent ID"), chat_id: str = Query(..., description="Chat ID to retry last message"), db: AsyncSession = Depends(get_db), @@ -252,19 +317,17 @@ async def retry_public_chat( If the last message is from the agent, return it directly. If the last message is from a user, generate a new agent response. - Args: - - aid: Agent ID - - chat_id: Chat ID to retry - - db: Database session + **Parameters:** + * `aid` - Agent ID + * `chat_id` - Chat ID to retry - Returns: - - ChatMessage: Agent's response message + **Returns:** + * `List[ChatMessage]` - List of chat messages including the retried response - Raises: - - HTTPException: - - 404: Agent not found or no messages found - - 429: Quota exceeded - - 500: Internal server error + **Raises:** + * `404` - Agent not found or no messages found + * `429` - Quota exceeded + * `500` - Internal server error """ # Get agent and check if exists agent = await Agent.get(aid) @@ -304,37 +367,43 @@ async def retry_public_chat( return [last_message, error_message] # If last message is from user, generate a new agent response - return await create_public_chat() + return await create_chat() @chat_router.post( - "/agents/{aid}/chat", tags=["Chat"], response_model=ChatMessage, deprecated=True + "/agents/{aid}/chat", + tags=["Chat"], + response_model=list[ChatMessage], + deprecated=True, + summary="Chat", ) -async def create_chat( +async def create_chat_deprecated( request: ChatMessageRequest, aid: str = Path(..., description="Agent ID"), -) -> ChatMessage: - """Create a chat message and get agent's response. +) -> list[ChatMessage]: + """Create a private chat message and get agent's response. - This endpoint: + **Process Flow:** 1. Validates agent quota 2. Creates a thread-specific context 3. Executes the agent with the query 4. Updates quota usage 5. Saves both input and output messages - Args: - - request: Chat message request - - aid: Agent ID + > **Note:** This is for internal/private use and may have additional features or fewer + > restrictions compared to the public endpoint. - Returns: - - ChatMessage: Agent's response message + **Parameters:** + * `aid` - Agent ID + * `request` - Chat message request object - Raises: - - HTTPException: - - 404: Agent not found - - 429: Quota exceeded - - 500: Internal server error + **Returns:** + * `List[ChatMessage]` - List of chat messages including both user input and agent response + + **Raises:** + * `404` - Agent not found + * `429` - Quota exceeded + * `500` - Internal server error """ # Get agent and validate quota agent = await Agent.get(aid) @@ -364,7 +433,7 @@ async def create_chat( # Update quota await quota.add_message() - return response_messages[-1] + return response_messages except Exception as e: if isinstance(e, HTTPException): @@ -373,33 +442,39 @@ async def create_chat( @chat_router.post( - "/agents/{aid}/public/chat", tags=["Chat"], response_model=list[ChatMessage] + "/agents/{aid}/chat/v2", + tags=["Chat"], + response_model=list[ChatMessage], + operation_id="chat", + summary="Chat", ) -async def create_public_chat( +async def create_chat( request: ChatMessageRequest, aid: str = Path(..., description="Agent ID"), ) -> list[ChatMessage]: """Create a chat message and get agent's response. - This endpoint: + **Process Flow:** 1. Validates agent quota 2. Creates a thread-specific context 3. Executes the agent with the query 4. Updates quota usage 5. Saves both input and output messages - Args: - - request: Chat message request - - aid: Agent ID + > **Note:** This is the public-facing endpoint with appropriate rate limiting + > and security measures. + + **Parameters:** + * `aid` - Agent ID + * `request` - Chat message request object - Returns: - - ChatMessage: Agent's response message + **Returns:** + * `List[ChatMessage]` - List of chat messages including both user input and agent response - Raises: - - HTTPException: - - 404: Agent not found - - 429: Quota exceeded - - 500: Internal server error + **Raises:** + * `404` - Agent not found + * `429` - Quota exceeded + * `500` - Internal server error """ # Get agent and validate quota agent = await Agent.get(aid)