diff --git a/CHANGELOG.md b/CHANGELOG.md index d9fc76a..a9a776a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## 2025-01-09 + +### New Features +- Add IntentKitSkill abstract class, for now, it has a skill store interface out of the box +- Use skill store in Twitter skills, fetch skills will store the last processed tweet ID, prevent duplicate processing +- CDP Skills Filter in Agent, choose the skills you want only, the less skills, the better performance + +### Improvements +- Add a document for skill contributors: [How to add a new skill](docs/contributing/skills.md) + +## 2025-01-08 + +### New Features +- Add `prompt_append` to Agent, it will be appended to the entire prompt as system role, it has stronger priority +- When you use web debug mode, you can see the entire prompt sent to the AI model +- You can use new query param `thread` to debug any conversation thread + ## 2025-01-07 ### New Features diff --git a/README.md b/README.md index bfafe59..56a6e5c 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ This project is currently in alpha stage and is not recommended for production u - 🤖 Multiple Agent Support - 🔄 Autonomous Agent Management -- 🔗 Blockchain Integration (CDP for now, will add more) +- 🔗 Blockchain Integration (EVM for now, will add more) - 🐦 Social Media Integration (Twitter,Telegram for now, will add more) - 🛠️ Extensible Skill System -- 🔌 Extensible Plugin System +- 🔌 Extensible Plugin System (WIP) ## Quick Start @@ -142,6 +142,21 @@ See `example.env` for all available options. Contributions are welcome! Please read our [Contributing Guidelines](CONTRIBUTING.md) before submitting a pull request. +### Contribute Skills + +If you want to add a skill collection, follow these steps: + +1. Create a new skill collection in the `skills/` directory +2. Implement the skill interface +3. Register the skill in `skill/YOUR_SKILL_COLLECTION/__init__.py` + +If you want to add a new skill, follow these steps: + +1. Create a new skill in the `skills/common/` directory +2. Register the skill in `skills/common/__init__.py` + +See the [Skill Development Guide](docs/contributing/skills.md) for more information. + ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. \ No newline at end of file diff --git a/abstracts/skill.py b/abstracts/skill.py new file mode 100644 index 0000000..0c17baa --- /dev/null +++ b/abstracts/skill.py @@ -0,0 +1,87 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from langchain_core.tools import BaseTool + + +class IntentKitSkill(BaseTool): + """Abstract base class for IntentKit skills. + Will have predefined abilities. + """ + + agent_id: str + store: "SkillStoreABC" + + +class SkillStoreABC(ABC): + """Abstract base class for skill data storage operations. + + This class defines the interface for interacting with skill-related data + for both agents and threads. + """ + + @abstractmethod + def get_agent_skill_data( + self, agent_id: str, skill: str, key: str + ) -> Optional[Dict[str, Any]]: + """Get skill data for an agent. + + Args: + agent_id: ID of the agent + skill: Name of the skill + key: Data key + + Returns: + Dictionary containing the skill data if found, None otherwise + """ + pass + + @abstractmethod + def save_agent_skill_data( + self, agent_id: str, skill: str, key: str, data: Dict[str, Any] + ) -> None: + """Save or update skill data for an agent. + + Args: + agent_id: ID of the agent + skill: Name of the skill + key: Data key + data: JSON data to store + """ + pass + + @abstractmethod + def get_thread_skill_data( + self, thread_id: str, skill: str, key: str + ) -> Optional[Dict[str, Any]]: + """Get skill data for a thread. + + Args: + thread_id: ID of the thread + skill: Name of the skill + key: Data key + + Returns: + Dictionary containing the skill data if found, None otherwise + """ + pass + + @abstractmethod + def save_thread_skill_data( + self, + thread_id: str, + agent_id: str, + skill: str, + key: str, + data: Dict[str, Any], + ) -> None: + """Save or update skill data for a thread. + + Args: + thread_id: ID of the thread + agent_id: ID of the agent that owns this thread + skill: Name of the skill + key: Data key + data: JSON data to store + """ + pass diff --git a/app/config/config.py b/app/config/config.py index 869bce6..48ab8bf 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -63,6 +63,9 @@ def __init__(self): self.debug_resp = ( self.load("DEBUG_RESP", "false") == "true" ) # Agent response with thought steps and time cost + self.debug_checkpoint = ( + self.load("DEBUG_CHECKPOINT", "false") == "true" + ) # log with checkpoint # Internal self.internal_base_url = self.load("INTERNAL_BASE_URL", "http://intent-api") # Admin diff --git a/app/core/engine.py b/app/core/engine.py index e384cf9..d9f842d 100644 --- a/app/core/engine.py +++ b/app/core/engine.py @@ -33,8 +33,9 @@ from abstracts.graph import AgentState from app.config.config import config from app.core.graph import create_agent +from app.core.skill import SkillStore from app.models.agent import Agent -from app.models.db import get_coon, get_engine +from app.models.db import get_coon, get_engine, get_session from skill_sets import get_skill_set from skills.common import get_common_skill from skills.crestal import get_crestal_skill @@ -115,6 +116,9 @@ def initialize_agent(aid): # ==== Load skills tools: list[BaseTool] = [] + # init skill store first + skill_store = SkillStore(get_session) + # Configure CDP Agentkit Langchain Extension. if agent.cdp_enabled: values = { @@ -133,7 +137,13 @@ def initialize_agent(aid): db.commit() # Initialize CDP Agentkit Toolkit and get tools. cdp_toolkit = CdpToolkit.from_cdp_agentkit_wrapper(agentkit) - tools.extend(cdp_toolkit.get_tools()) + cdp_tools = cdp_toolkit.get_tools() + # Filter the tools to only include the ones that in agent.cdp_skills. + if agent.cdp_skills and len(agent.cdp_skills) > 0: + cdp_tools = [ + tool for tool in cdp_tools if tool.name in agent.cdp_skills + ] + tools.extend(cdp_tools) # Twitter skills if ( @@ -143,7 +153,11 @@ def initialize_agent(aid): ): twitter_client = tweepy.Client(**agent.twitter_config) for skill in agent.twitter_skills: - tools.append(get_twitter_skill(skill, twitter_client)) + try: + s = get_twitter_skill(skill, twitter_client, skill_store, aid) + tools.append(s) + except Exception as e: + logger.warning(e) # Crestal skills if agent.crestal_skills: @@ -186,7 +200,7 @@ def formatted_prompt(state: AgentState): tools=tools, checkpointer=memory, state_modifier=formatted_prompt, - debug=config.debug, + debug=config.debug_checkpoint, ) diff --git a/app/core/skill.py b/app/core/skill.py new file mode 100644 index 0000000..c3f3150 --- /dev/null +++ b/app/core/skill.py @@ -0,0 +1,99 @@ +from typing import Any, Callable, Dict, Optional + +from abstracts.skill import SkillStoreABC +from app.models.skill import AgentSkillData, ThreadSkillData + + +class SkillStore(SkillStoreABC): + """Implementation of skill data storage operations. + + This class provides concrete implementations for storing and retrieving + skill-related data for both agents and threads using SQLModel-based storage. + + Args: + get_session: A callable that returns a database session + """ + + def __init__(self, get_session: Callable[[], Any]) -> None: + self._get_session = get_session + + def get_agent_skill_data( + self, agent_id: str, skill: str, key: str + ) -> Optional[Dict[str, Any]]: + """Get skill data for an agent. + + Args: + agent_id: ID of the agent + skill: Name of the skill + key: Data key + + Returns: + Dictionary containing the skill data if found, None otherwise + """ + with self._get_session() as session: + return AgentSkillData.get(agent_id, skill, key, session) + + def save_agent_skill_data( + self, agent_id: str, skill: str, key: str, data: Dict[str, Any] + ) -> None: + """Save or update skill data for an agent. + + Args: + agent_id: ID of the agent + skill: Name of the skill + key: Data key + data: JSON data to store + """ + with self._get_session() as session: + skill_data = AgentSkillData( + agent_id=agent_id, + skill=skill, + key=key, + data=data, + ) + skill_data.save(session) + session.commit() + + def get_thread_skill_data( + self, thread_id: str, skill: str, key: str + ) -> Optional[Dict[str, Any]]: + """Get skill data for a thread. + + Args: + thread_id: ID of the thread + skill: Name of the skill + key: Data key + + Returns: + Dictionary containing the skill data if found, None otherwise + """ + with self._get_session() as session: + return ThreadSkillData.get(thread_id, skill, key, session) + + def save_thread_skill_data( + self, + thread_id: str, + agent_id: str, + skill: str, + key: str, + data: Dict[str, Any], + ) -> None: + """Save or update skill data for a thread. + + Args: + thread_id: ID of the thread + agent_id: ID of the agent that owns this thread + skill: Name of the skill + key: Data key + data: JSON data to store + """ + with self._get_session() as session: + skill_data = ThreadSkillData( + thread_id=thread_id, + agent_id=agent_id, + skill=skill, + key=key, + data=data, + ) + skill_data.save(session) + session.commit() diff --git a/app/models/agent.py b/app/models/agent.py index d73656b..fb19690 100644 --- a/app/models/agent.py +++ b/app/models/agent.py @@ -30,19 +30,21 @@ class Agent(SQLModel, table=True): # if cdp_enabled, will load cdp skills # if the cdp_skills is empty, will load all cdp_enabled: bool = Field(default=False) - cdp_skills: Optional[List[str]] = Field(sa_column=Column(JSONB, nullable=True)) + cdp_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) cdp_wallet_data: Optional[str] cdp_network_id: Optional[str] # if twitter_enabled, the twitter_entrypoint will be enabled, twitter_config will be checked - twitter_enabled: bool = Field(default=False) + twitter_enabled: bool = Field(default=False) # TODO: to be deprecated + twitter_entrypoint_enabled: bool = Field(default=False) # TODO: add for future use twitter_config: Optional[dict] = Field(sa_column=Column(JSONB, nullable=True)) # twitter skills require config, but not require twitter_enabled flag. # As long as twitter_skills is not empty, the corresponding skills will be loaded. twitter_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) - telegram_enabled: bool = Field(default=False) + # if telegram_enabled, the telegram_entrypoint will be enabled, telegram_config will be checked + telegram_enabled: bool = Field(default=False) # TODO: to be deprecated + telegram_entrypoint_enabled: bool = Field(default=False) # TODO: add for future use telegram_config: Optional[dict] = Field(sa_column=Column(JSONB, nullable=True)) - # twitter skills require config, but not require twitter_enabled flag. - # As long as twitter_skills is not empty, the corresponding skills will be loaded. + # telegram skills not used for now telegram_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) # crestal skills crestal_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) diff --git a/app/models/db.py b/app/models/db.py index 43fdb64..3908a0e 100644 --- a/app/models/db.py +++ b/app/models/db.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager from typing import Generator from urllib.parse import quote_plus @@ -69,6 +70,31 @@ def get_db() -> Generator[Session, None, None]: yield session +@contextmanager +def get_session() -> Session: + """Get a database session using a context manager. + + This function is designed to be used with the 'with' statement, + ensuring proper session cleanup. + + Returns: + Session: A SQLModel session that will be automatically closed + + Example: + ```python + with get_session() as session: + # use session here + session.query(...) + # session is automatically closed + ``` + """ + session = Session(engine) + try: + yield session + finally: + session.close() + + def get_coon_str() -> str: return conn_str diff --git a/app/models/skill.py b/app/models/skill.py new file mode 100644 index 0000000..597dd28 --- /dev/null +++ b/app/models/skill.py @@ -0,0 +1,166 @@ +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +from sqlalchemy import Column, DateTime, func +from sqlalchemy.dialects.postgresql import JSONB +from sqlmodel import Field, Session, SQLModel, select + + +class AgentSkillData(SQLModel, table=True): + """Model for storing skill-specific data for agents. + + This model uses a composite primary key of (agent_id, skill, key) to store + skill-specific data for agents in a flexible way. + + Attributes: + agent_id: ID of the agent this data belongs to + skill: Name of the skill this data is for + key: Key for this specific piece of data + data: JSON data stored for this key + """ + + __tablename__ = "agent_skill_data" + + agent_id: str = Field(primary_key=True) + skill: str = Field(primary_key=True) + key: str = Field(primary_key=True) + data: Dict[str, Any] = Field(sa_column=Column(JSONB, nullable=True)) + created_at: datetime | None = Field( + default_factory=lambda: datetime.now(timezone.utc), + sa_type=DateTime(timezone=True), + sa_column_kwargs={"server_default": func.now()}, + nullable=False, + ) + updated_at: datetime | None = Field( + default_factory=lambda: datetime.now(timezone.utc), + sa_type=DateTime(timezone=True), + sa_column_kwargs={ + "onupdate": lambda: datetime.now(timezone.utc), + }, + nullable=False, + ) + + @classmethod + def get( + cls, agent_id: str, skill: str, key: str, db: Session + ) -> Optional[Dict[str, Any]]: + """Get skill data for an agent. + + Args: + agent_id: ID of the agent + skill: Name of the skill + key: Data key + db: Database session + + Returns: + Dictionary containing the skill data if found, None otherwise + """ + result = db.exec( + select(cls).where( + cls.agent_id == agent_id, + cls.skill == skill, + cls.key == key, + ) + ).first() + return result.data if result else None + + def save(self, db: Session) -> None: + """Save or update skill data. + + Args: + db: Database session + """ + existing = db.exec( + select(self.__class__).where( + self.__class__.agent_id == self.agent_id, + self.__class__.skill == self.skill, + self.__class__.key == self.key, + ) + ).first() + if existing: + existing.data = self.data + db.add(existing) + else: + db.add(self) + + +class ThreadSkillData(SQLModel, table=True): + """Model for storing skill-specific data for threads. + + This model uses a composite primary key of (thread_id, skill, key) to store + skill-specific data for threads in a flexible way. It also includes agent_id + as a required field for tracking ownership. + + Attributes: + thread_id: ID of the thread this data belongs to + agent_id: ID of the agent that owns this thread + skill: Name of the skill this data is for + key: Key for this specific piece of data + data: JSON data stored for this key + """ + + __tablename__ = "thread_skill_data" + + thread_id: str = Field(primary_key=True) + skill: str = Field(primary_key=True) + key: str = Field(primary_key=True) + agent_id: str = Field(nullable=False) + data: Dict[str, Any] = Field(sa_column=Column(JSONB, nullable=True)) + created_at: datetime | None = Field( + default_factory=lambda: datetime.now(timezone.utc), + sa_type=DateTime(timezone=True), + sa_column_kwargs={"server_default": func.now()}, + nullable=False, + ) + updated_at: datetime | None = Field( + default_factory=lambda: datetime.now(timezone.utc), + sa_type=DateTime(timezone=True), + sa_column_kwargs={ + "onupdate": lambda: datetime.now(timezone.utc), + }, + nullable=False, + ) + + @classmethod + def get( + cls, thread_id: str, skill: str, key: str, db: Session + ) -> Optional[Dict[str, Any]]: + """Get skill data for a thread. + + Args: + thread_id: ID of the thread + skill: Name of the skill + key: Data key + db: Database session + + Returns: + Dictionary containing the skill data if found, None otherwise + """ + result = db.exec( + select(cls).where( + cls.thread_id == thread_id, + cls.skill == skill, + cls.key == key, + ) + ).first() + return result.data if result else None + + def save(self, db: Session) -> None: + """Save or update skill data. + + Args: + db: Database session + """ + existing = db.exec( + select(self.__class__).where( + self.__class__.thread_id == self.thread_id, + self.__class__.skill == self.skill, + self.__class__.key == self.key, + ) + ).first() + if existing: + existing.data = self.data + existing.agent_id = self.agent_id + db.add(existing) + else: + db.add(self) diff --git a/docs/contributing/skills.md b/docs/contributing/skills.md new file mode 100644 index 0000000..1746e6b --- /dev/null +++ b/docs/contributing/skills.md @@ -0,0 +1,201 @@ +# Building Skills for IntentKit + +This guide will help you create new skills for IntentKit. Skills are the building blocks that give agents their capabilities. + +## Overview + +A skill in IntentKit is a specialized tool that inherits from `IntentKitSkill` (which extends LangChain's `BaseTool`). Each skill provides specific functionality that agents can use to interact with external services or perform specific tasks. + +## Basic Structure + +Every skill consists of at least two components: + +1. A skill class that inherits from `IntentKitSkill` +2. Input/Output models using Pydantic + +### Example Structure +```python +from pydantic import BaseModel +from abstracts.skill import IntentKitSkill + +class MySkillInput(BaseModel): + """Input parameters for your skill""" + param1: str + param2: int = 10 # with default value + +class MySkillOutput(BaseModel): + """Output format for your skill""" + result: str + error: str | None = None + +class MySkill(IntentKitSkill): + name: str = "my_skill_name" + description: str = "Description of what your skill does" + args_schema: Type[BaseModel] = MySkillInput + + def _run(self, param1: str, param2: int = 10) -> MySkillOutput: + try: + # Your skill implementation here + result = f"Processed {param1} {param2} times" + return MySkillOutput(result=result) + except Exception as e: + return MySkillOutput(result="", error=str(e)) + + async def _arun(self, param1: str, param2: int = 10) -> MySkillOutput: + """Async implementation if needed""" + return await self._run(param1, param2) +``` + +## Key Components + +### 1. Input/Output Models + +- Use Pydantic models to define input parameters and output structure +- Input model will be used as `args_schema` in your skill +- Output model ensures consistent return format + +### 2. Skill Class + +Required attributes: +- `name`: Unique identifier for your skill +- `description`: Clear description of what the skill does +- `args_schema`: Pydantic model for input validation + +Required methods: +- `_run`: Synchronous implementation of your skill +- `_arun`: Asynchronous implementation (if needed) + +### 3. State Management + +Skills have access to persistent storage through `self.store`, which implements `SkillStoreABC`: + +```python +# Store agent-specific data +self.store.save_agent_skill_data( + self.agent_id, # automatically provided + self.name, # your skill name + "key_name", # custom key for your data + {"data": "value"} # any JSON-serializable data +) + +# Retrieve agent-specific data +data = self.store.get_agent_skill_data( + self.agent_id, + self.name, + "key_name" +) + +# Store thread-specific data +self.store.save_thread_skill_data( + thread_id, # provided in context + self.agent_id, + self.name, + "key_name", + {"data": "value"} +) + +# Retrieve thread-specific data +data = self.store.get_thread_skill_data( + thread_id, + self.name, + "key_name" +) +``` + +## Best Practices + +1. **Error Handling** + - Always wrap your main logic in try-except blocks + - Return errors through your output model rather than raising exceptions + - Provide meaningful error messages + +2. **Documentation** + - Add detailed docstrings to your skill class and methods + - Document input parameters and return values + - Include usage examples in docstrings + +3. **Input Validation** + - Use Pydantic models to validate inputs + - Set appropriate field types and constraints + - Provide default values when sensible + +4. **State Management** + - Use `self.store` for persistent data storage + - Keep agent-specific data separate from thread-specific data + - Use meaningful keys for stored data + +5. **Async Support** + - Implement `_arun` for skills that perform I/O operations + - Use async libraries when available + - Maintain consistent behavior between sync and async implementations + +## Example: Twitter Timeline Skill + +Here's a real-world example of a skill that fetches tweets from a user's timeline: + +```python +class TwitterGetTimelineInput(BaseModel): + """Empty input model as no parameters needed""" + pass + +class Tweet(BaseModel): + """Model representing a Twitter tweet""" + id: str + text: str + author_id: str + created_at: datetime + referenced_tweets: list[dict] | None = None + attachments: dict | None = None + +class TwitterGetTimelineOutput(BaseModel): + tweets: list[Tweet] + error: str | None = None + +class TwitterGetTimeline(TwitterBaseTool): + name: str = "twitter_get_timeline" + description: str = "Get tweets from the authenticated user's timeline" + args_schema: Type[BaseModel] = TwitterGetTimelineInput + + def _run(self) -> TwitterGetTimelineOutput: + try: + # Get last processed tweet ID from storage + last = self.store.get_agent_skill_data(self.agent_id, self.name, "last") + since_id = last.get("since_id") if last else None + + # Fetch timeline + timeline = self.client.get_home_timeline(...) + + # Process and return results + result = [Tweet(...) for tweet in timeline.data] + + # Update storage with newest tweet ID + if timeline.meta: + self.store.save_agent_skill_data( + self.agent_id, + self.name, + "last", + {"since_id": timeline.meta["newest_id"]} + ) + + return TwitterGetTimelineOutput(tweets=result) + except Exception as e: + return TwitterGetTimelineOutput(tweets=[], error=str(e)) +``` + +## Testing Your Skill + +1. Create unit tests in the `tests/skills` directory +2. Test both success and error cases +3. Mock external services and dependencies +4. Verify state management behavior +5. Test both sync and async implementations + +## Contributing + +1. Create your skill in the `skills/` directory +2. Follow the structure of existing skills +3. Add comprehensive tests +4. Update documentation +5. Submit a pull request + +For more examples, check the existing skills in the `skills/` directory. diff --git a/skills/twitter/__init__.py b/skills/twitter/__init__.py index fcae006..cf5bc66 100644 --- a/skills/twitter/__init__.py +++ b/skills/twitter/__init__.py @@ -2,6 +2,7 @@ from tweepy import Client +from abstracts.skill import SkillStoreABC from skills.twitter.base import TwitterBaseTool from skills.twitter.get_mentions import TwitterGetMentions from skills.twitter.get_timeline import TwitterGetTimeline @@ -9,14 +10,16 @@ from skills.twitter.reply_tweet import TwitterReplyTweet -def get_twitter_skill(name: str, client: Client) -> TwitterBaseTool: +def get_twitter_skill( + name: str, client: Client, store: SkillStoreABC, agent_id: str +) -> TwitterBaseTool: if name == "get_mentions": - return TwitterGetMentions(client=client) + return TwitterGetMentions(client=client, store=store, agent_id=agent_id) elif name == "post_tweet": - return TwitterPostTweet(client=client) + return TwitterPostTweet(client=client, store=store, agent_id=agent_id) elif name == "reply_tweet": - return TwitterReplyTweet(client=client) + return TwitterReplyTweet(client=client, store=store, agent_id=agent_id) elif name == "get_timeline": - return TwitterGetTimeline(client=client) + return TwitterGetTimeline(client=client, store=store, agent_id=agent_id) else: raise ValueError(f"Unknown Twitter skill: {name}") diff --git a/skills/twitter/base.py b/skills/twitter/base.py index bb52cff..e53fe9e 100644 --- a/skills/twitter/base.py +++ b/skills/twitter/base.py @@ -1,14 +1,29 @@ +from datetime import datetime from typing import Type -from langchain_core.tools import BaseTool -from pydantic import BaseModel +from pydantic import BaseModel, Field from tweepy import Client +from abstracts.skill import IntentKitSkill, SkillStoreABC -class TwitterBaseTool(BaseTool): + +class Tweet(BaseModel): + """Model representing a Twitter tweet.""" + + id: str + text: str + author_id: str + created_at: datetime + referenced_tweets: list[dict] | None = None + attachments: dict | None = None + + +class TwitterBaseTool(IntentKitSkill): """Base class for Twitter tools.""" client: Client - name: str - description: str + name: str = Field(description="The name of the tool") + description: str = Field(description="A description of what the tool does") args_schema: Type[BaseModel] + agent_id: str = Field(description="The ID of the agent") + store: SkillStoreABC = Field(description="The skill store for persisting data") diff --git a/skills/twitter/get_mentions.py b/skills/twitter/get_mentions.py index b567c7b..49735a6 100644 --- a/skills/twitter/get_mentions.py +++ b/skills/twitter/get_mentions.py @@ -3,13 +3,18 @@ from pydantic import BaseModel -from skills.twitter.base import TwitterBaseTool +from .base import Tweet, TwitterBaseTool class TwitterGetMentionsInput(BaseModel): """Input for TwitterGetMentions tool.""" +class TwitterGetMentionsOutput(BaseModel): + mentions: list[Tweet] + error: str | None = None + + class TwitterGetMentions(TwitterBaseTool): """Tool for getting mentions from Twitter. @@ -22,25 +27,26 @@ class TwitterGetMentions(TwitterBaseTool): args_schema: The schema for the tool's input arguments. """ - prev_since_id: str | None = None name: str = "twitter_get_mentions" description: str = "Get tweets that mention the authenticated user" args_schema: Type[BaseModel] = TwitterGetMentionsInput - def _run(self) -> str: + def _run(self) -> TwitterGetMentionsOutput: """Run the tool to get mentions. Returns: - str: A formatted string containing the mentions data. + TwitterGetMentionsOutput: A structured output containing the mentions data. Raises: Exception: If there's an error accessing the Twitter API. """ try: - # Get mentions using tweepy client + # get since id from store + last = self.store.get_agent_skill_data(self.agent_id, self.name, "last") + last = last or {} max_results = 10 - since_id = self.prev_since_id - if self.prev_since_id: + since_id = last.get("since_id") + if since_id: max_results = 100 # Always get mentions for the last day @@ -53,32 +59,47 @@ def _run(self) -> str: max_results=max_results, since_id=since_id, start_time=start_time, - tweet_fields=["created_at", "author_id", "text"], + expansions=[ + "referenced_tweets.id", + "attachments.media_keys", + ], + tweet_fields=[ + "created_at", + "author_id", + "text", + "referenced_tweets", + "attachments", + ], ) - if not mentions.data: - return "No mentions found." + result = [] + if mentions.data: + for tweet in mentions.data: + mention = Tweet( + id=str(tweet.id), + text=tweet.text, + author_id=str(tweet.author_id), + created_at=tweet.created_at, + referenced_tweets=tweet.referenced_tweets + if hasattr(tweet, "referenced_tweets") + else None, + attachments=tweet.attachments + if hasattr(tweet, "attachments") + else None, + ) + result.append(mention) # Update the previous since_id for the next request if mentions.meta: - self.prev_since_id = mentions.meta.get("newest_id") - - # Format the mentions into a readable string - result = [] - for tweet in mentions.data: - result.append( - f"Tweet ID: {tweet.id}\n" - f"Created at: {tweet.created_at}\n" - f"Author ID: {tweet.author_id}\n" - f"Text: {tweet.text}\n" - ) + last["since_id"] = mentions.meta.get("newest_id") + self.store.save_agent_skill_data(self.agent_id, self.name, "last", last) - return "\n".join(result) + return TwitterGetMentionsOutput(mentions=result) except Exception as e: - return f"Error retrieving mentions: {str(e)}" + return TwitterGetMentionsOutput(mentions=[], error=str(e)) - async def _arun(self) -> str: + async def _arun(self) -> TwitterGetMentionsOutput: """Async implementation of the tool. This tool doesn't have a native async implementation, so we call the sync version. diff --git a/skills/twitter/get_timeline.py b/skills/twitter/get_timeline.py index 3499426..db73624 100644 --- a/skills/twitter/get_timeline.py +++ b/skills/twitter/get_timeline.py @@ -1,14 +1,20 @@ +from datetime import datetime, timedelta, timezone from typing import Type from pydantic import BaseModel -from skills.twitter.base import TwitterBaseTool +from .base import Tweet, TwitterBaseTool class TwitterGetTimelineInput(BaseModel): """Input for TwitterGetTimeline tool.""" +class TwitterGetTimelineOutput(BaseModel): + tweets: list[Tweet] + error: str | None = None + + class TwitterGetTimeline(TwitterBaseTool): """Tool for getting the user's timeline from Twitter. @@ -21,54 +27,76 @@ class TwitterGetTimeline(TwitterBaseTool): args_schema: The schema for the tool's input arguments. """ - prev_timestamp: str | None = None name: str = "twitter_get_timeline" description: str = "Get tweets from the authenticated user's timeline" args_schema: Type[BaseModel] = TwitterGetTimelineInput - def _run(self) -> str: + def _run(self) -> TwitterGetTimelineOutput: """Run the tool to get timeline tweets. Returns: - str: A formatted string containing the timeline tweets data. + TwitterGetTimelineOutput: A structured output containing the timeline tweets data. Raises: Exception: If there's an error accessing the Twitter API. """ try: - # Get timeline tweets using tweepy client - max_results = 100 - timeline = self.client.get_home_timeline( - max_results=max_results, - start_time=self.prev_timestamp, - tweet_fields=["created_at", "author_id", "text"], - ) + # get since id from store + last = self.store.get_agent_skill_data(self.agent_id, self.name, "last") + last = last or {} + max_results = 10 + since_id = last.get("since_id") + if since_id: + max_results = 100 - if not timeline.data: - return "No tweets found." + # Always get timeline for the last day + start_time = datetime.now(timezone.utc) - timedelta(days=1) - # Update the previous timestamp for the next request - self.prev_timestamp = ( - max(tweet.created_at for tweet in timeline.data) - if timeline.data - else None + timeline = self.client.get_home_timeline( + max_results=max_results, + since_id=since_id, + start_time=start_time, + expansions=[ + "referenced_tweets.id", + "attachments.media_keys", + ], + tweet_fields=[ + "created_at", + "author_id", + "text", + "referenced_tweets", + "attachments", + ], ) - # Format the tweets into a readable string result = [] - for tweet in timeline.data: - result.append( - f"Tweet ID: {tweet.id}\n" - f"Created at: {tweet.created_at}\n" - f"Text: {tweet.text}\n" - ) - - return "\n".join(result) + if timeline.data: + for tweet in timeline.data: + tweet_obj = Tweet( + id=str(tweet.id), + text=tweet.text, + author_id=str(tweet.author_id), + created_at=tweet.created_at, + referenced_tweets=tweet.referenced_tweets + if hasattr(tweet, "referenced_tweets") + else None, + attachments=tweet.attachments + if hasattr(tweet, "attachments") + else None, + ) + result.append(tweet_obj) + + # Update the since_id in store for the next request + if timeline.meta: + last["since_id"] = timeline.meta.get("newest_id") + self.store.save_agent_skill_data(self.agent_id, self.name, "last", last) + + return TwitterGetTimelineOutput(tweets=result) except Exception as e: - return f"Error retrieving timeline: {str(e)}" + return TwitterGetTimelineOutput(tweets=[], error=str(e)) - async def _arun(self) -> str: + async def _arun(self) -> TwitterGetTimelineOutput: """Async implementation of the tool. This tool doesn't have a native async implementation, so we call the sync version.