Skip to content

Commit

Permalink
improve: api doc and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
hyacinthus committed Feb 14, 2025
1 parent 68a26cc commit eff80c2
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 179 deletions.
3 changes: 3 additions & 0 deletions app/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(self):
# Admin
self.admin_auth_enabled = self.load("ADMIN_AUTH_ENABLED", "false") == "true"
self.admin_jwt_secret = self.load("ADMIN_JWT_SECRET")
self.debug_auth_enabled = self.load("DEBUG_AUTH_ENABLED", "false") == "true"
self.debug_username = self.load("DEBUG_USERNAME")
self.debug_password = self.load("DEBUG_PASSWORD")
# API
self.api_auth_enabled = self.load("API_AUTH_ENABLED", "false") == "true"
self.api_jwt_secret = self.load("API_JWT_SECRET")
Expand Down
202 changes: 112 additions & 90 deletions app/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ async def agent_executor(agent_id: str) -> (CompiledGraph, float):


async def execute_agent(message: ChatMessage, debug: bool = False) -> list[ChatMessage]:
"""Execute an agent with the given prompt and return response lines.
"""
Execute an agent with the given prompt and return response lines.
This function:
1. Configures execution context with thread ID
Expand Down Expand Up @@ -379,106 +380,124 @@ async def execute_agent(message: ChatMessage, debug: bool = False) -> list[ChatM
)

# run
cached_tool_step = None
async for chunk in executor.astream(
{"messages": [HumanMessage(content=content)]}, stream_config
):
this_time = time.perf_counter()
logger.debug(f"stream chunk: {chunk}", extra={"thread_id": thread_id})
if "agent" in chunk and "messages" in chunk["agent"]:
if len(chunk["agent"]["messages"]) != 1:
logger.error(
"unexpected agent message: " + str(chunk["agent"]["messages"]),
extra={"thread_id": thread_id},
)
msg = chunk["agent"]["messages"][0]
if msg.tool_calls:
# tool calls, save for later use
cached_tool_step = msg
elif msg.content:
# agent message
chat_message = ChatMessage(
try:
cached_tool_step = None
async for chunk in executor.astream(
{"messages": [HumanMessage(content=content)]}, stream_config
):
this_time = time.perf_counter()
logger.debug(f"stream chunk: {chunk}", extra={"thread_id": thread_id})
if "agent" in chunk and "messages" in chunk["agent"]:
if len(chunk["agent"]["messages"]) != 1:
logger.error(
"unexpected agent message: " + str(chunk["agent"]["messages"]),
extra={"thread_id": thread_id},
)
msg = chunk["agent"]["messages"][0]
if msg.tool_calls:
# tool calls, save for later use
cached_tool_step = msg
elif msg.content:
# agent message
chat_message = ChatMessage(
id=str(XID()),
agent_id=message.agent_id,
chat_id=message.chat_id,
author_id=message.agent_id,
author_type=AuthorType.AGENT,
message=msg.content,
input_tokens=msg.usage_metadata.get("input_tokens", 0),
output_tokens=msg.usage_metadata.get("output_tokens", 0),
time_cost=this_time - last,
)
last = this_time
if cold_start_cost > 0:
chat_message.cold_start_cost = cold_start_cost
cold_start_cost = 0
resp.append(chat_message)
await chat_message.save()
else:
logger.error(
"unexpected agent message: " + str(msg),
extra={"thread_id": thread_id},
)
elif "tools" in chunk and "messages" in chunk["tools"]:
if not cached_tool_step:
logger.error(
"unexpected tools message: " + str(chunk["tools"]),
extra={"thread_id": thread_id},
)
continue
skill_calls = []
for msg in chunk["tools"]["messages"]:
if not hasattr(msg, "tool_call_id"):
logger.error(
"unexpected tools message: " + str(chunk["tools"]),
extra={"thread_id": thread_id},
)
continue
for call in cached_tool_step.tool_calls:
if call["id"] == msg.tool_call_id:
skill_call: ChatMessageSkillCall = {
"name": call["name"],
"parameters": call["args"],
"success": True,
}
if msg.status == "error":
skill_call["success"] = False
skill_call["error_message"] = msg.error
else:
if debug:
skill_call["response"] = msg.content
else:
skill_call["response"] = textwrap.shorten(
msg.content, width=100, placeholder="..."
)
skill_calls.append(skill_call)
break
skill_message = ChatMessage(
id=str(XID()),
agent_id=message.agent_id,
chat_id=message.chat_id,
author_id=message.agent_id,
author_type=AuthorType.AGENT,
message=msg.content,
input_tokens=msg.usage_metadata.get("input_tokens", 0),
output_tokens=msg.usage_metadata.get("output_tokens", 0),
author_type=AuthorType.SKILL,
message="",
skill_calls=skill_calls,
input_tokens=cached_tool_step.usage_metadata.get("input_tokens", 0),
output_tokens=cached_tool_step.usage_metadata.get(
"output_tokens", 0
),
time_cost=this_time - last,
)
last = this_time
if cold_start_cost > 0:
chat_message.cold_start_cost = cold_start_cost
skill_message.cold_start_cost = cold_start_cost
cold_start_cost = 0
resp.append(chat_message)
await chat_message.save()
cached_tool_step = None
resp.append(skill_message)
await skill_message.save()
elif "memory_manager" in chunk:
pass
else:
logger.error(
"unexpected agent message: " + str(msg),
"unexpected message type: " + str(chunk),
extra={"thread_id": thread_id},
)
elif "tools" in chunk and "messages" in chunk["tools"]:
if not cached_tool_step:
logger.error(
"unexpected tools message: " + str(chunk["tools"]),
extra={"thread_id": thread_id},
)
continue
skill_calls = []
for msg in chunk["tools"]["messages"]:
if not hasattr(msg, "tool_call_id"):
logger.error(
"unexpected tools message: " + str(chunk["tools"]),
extra={"thread_id": thread_id},
)
continue
for call in cached_tool_step.tool_calls:
if call["id"] == msg.tool_call_id:
skill_call: ChatMessageSkillCall = {
"name": call["name"],
"parameters": call["args"],
"success": True,
}
if msg.status == "error":
skill_call["success"] = False
skill_call["error_message"] = msg.error
else:
if debug:
skill_call["response"] = msg.content
else:
skill_call["response"] = textwrap.shorten(
msg.content, width=100, placeholder="..."
)
skill_calls.append(skill_call)
break
skill_message = ChatMessage(
id=str(XID()),
agent_id=message.agent_id,
chat_id=message.chat_id,
author_id=message.agent_id,
author_type=AuthorType.SKILL,
message="",
skill_calls=skill_calls,
input_tokens=cached_tool_step.usage_metadata.get("input_tokens", 0),
output_tokens=cached_tool_step.usage_metadata.get("output_tokens", 0),
time_cost=this_time - last,
)
last = this_time
if cold_start_cost > 0:
skill_message.cold_start_cost = cold_start_cost
cold_start_cost = 0
cached_tool_step = None
resp.append(skill_message)
await skill_message.save()
elif "memory_manager" in chunk:
pass
else:
logger.error(
"unexpected message type: " + str(chunk),
extra={"thread_id": thread_id},
)
except Exception as e:
logger.error(
f"failed to execute agent: {str(e)}", extra={"thread_id": thread_id}
)
error_message = ChatMessage(
id=str(XID()),
agent_id=message.agent_id,
chat_id=message.chat_id,
author_id=message.agent_id,
author_type=AuthorType.SYSTEM,
message=f"Error in agent:\n {str(e)}",
time_cost=time.perf_counter() - start,
)
await error_message.save()
resp.append(error_message)

return resp

Expand All @@ -489,8 +508,9 @@ async def clean_agent_memory(
clean_agent_memory: bool = False,
clean_skills_memory: bool = False,
debug: bool = False,
):
"""Clean an agent's memory with the given prompt and return response.
) -> str:
"""
Clean an agent's memory with the given prompt and return response.
This function:
1. Cleans the agents skills data.
Expand All @@ -502,6 +522,8 @@ async def clean_agent_memory(
Args:
agent_id (str): Agent ID
thread_id (str): Thread ID for the agent memory cleanup
clean_agent_memory (bool): Whether to clean agent's memory data
clean_skills_memory (bool): Whether to clean skills memory data
debug (bool): Enable debug mode
Returns:
Expand Down
Loading

0 comments on commit eff80c2

Please sign in to comment.