Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for cosmos db cosmosdb #258

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from telegram.constants import ParseMode, ChatAction

import config
import database
import cosmosdb as database
import openai_utils


Expand Down
2 changes: 2 additions & 0 deletions bot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# config parameters
telegram_token = config_yaml["telegram_token"]
openai_api_key = config_yaml["openai_api_key"]
cosmosdb_connection_string = config_yaml["cosmosdb_connection_string"]
cosmosdb_database_id = config_yaml["cosmosdb_database_id"]
use_chatgpt_api = config_yaml.get("use_chatgpt_api", True)
allowed_telegram_usernames = config_yaml["allowed_telegram_usernames"]
new_dialog_timeout = config_yaml["new_dialog_timeout"]
Expand Down
165 changes: 165 additions & 0 deletions bot/cosmosdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from typing import Optional, Any, List, Dict
from datetime import datetime
import uuid
from azure.cosmos import CosmosClient, PartitionKey
import time
import config

class Database:
def __init__(self):
self.client = CosmosClient.from_connection_string(config.cosmosdb_connection_string)
self.database = self.client.get_database_client(config.cosmosdb_database_id)
self.user_container = self.database.get_container_client("user")
self.dialog_container = self.database.get_container_client("dialog")


def generate_sequential_uuid(self):
timestamp = int(time.time() * 10000000)
namespace = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
sequential_uuid = uuid.uuid5(namespace, str(timestamp))
return sequential_uuid

def check_if_user_exists(self, user_id: int, raise_exception: bool = False):
query = f"SELECT VALUE COUNT(1) FROM c WHERE c._id = {user_id}"

try:
result = list(self.user_container.query_items(query, enable_cross_partition_query=True))
if result[0] > 0:
return True

except Exception:
if raise_exception:
raise ValueError(f"User {user_id} does not exist")
else:
return False

def add_new_user(
self,
user_id: int,
chat_id: int,
username: str = "",
first_name: str = "",
last_name: str = "",
):
user_dict = {
"id": str(user_id),
"_id": user_id,
"chat_id": chat_id,
"username": username,
"first_name": first_name,
"last_name": last_name,
"last_interaction": datetime.now().isoformat(),
"first_seen": datetime.now().isoformat(),
"current_dialog_id": None,
"current_chat_mode": "assistant",
"current_model": config.models["available_text_models"][0],
"n_used_tokens": {},
"n_generated_images": 0,
"n_transcribed_seconds": 0.0 # voice message transcription
}

if not self.check_if_user_exists(user_id):
self.user_container.create_item(body=user_dict)

def start_new_dialog(self, user_id: int):
self.check_if_user_exists(user_id, raise_exception=True)

dialog_id = str(self.generate_sequential_uuid())
dialog_dict = {
"id": dialog_id,
"_id": dialog_id,
"user_id": user_id,
"chat_mode": self.get_user_attribute(user_id, "current_chat_mode"),
"start_time": datetime.now().isoformat(),
"model": self.get_user_attribute(user_id, "current_model"),
"messages": []
}

# add new dialog
self.dialog_container.create_item(body=dialog_dict)

# update user's current dialog
user = self.get_user_by_id(user_id)
user["current_dialog_id"] = dialog_id
self.user_container.upsert_item(body=user)

return dialog_id

def get_user_attribute(self, user_id: int, key: str):
query = f"SELECT u.{key} FROM user u WHERE u.id = @user_id"
params = [{"name": "@user_id", "value": str(user_id)}]
result = list(self.user_container.query_items(query=query, parameters=params, enable_cross_partition_query=True))

if len(result) > 0:
if key == 'last_interaction':
return datetime.strptime(result[0][key],"%Y-%m-%dT%H:%M:%S.%f")
else:
return result[0][key]
else:
return None

def set_user_attribute(self, user_id: int, key: str, value: Any):
user = self.get_user_by_id(user_id)
if isinstance(value, datetime):
value = value.isoformat()
user[key] = value
self.user_container.upsert_item(body=user)

def update_n_used_tokens(self, user_id: int, model: str, n_input_tokens: int, n_output_tokens: int):
self.check_if_user_exists(user_id, raise_exception=True)

n_used_tokens_dict = self.get_user_attribute(user_id, "n_used_tokens")

if model in n_used_tokens_dict:
n_used_tokens_dict[model]["n_input_tokens"] += n_input_tokens
n_used_tokens_dict[model]["n_output_tokens"] += n_output_tokens
else:
n_used_tokens_dict[model] = {
"n_input_tokens": n_input_tokens,
"n_output_tokens": n_output_tokens
}

self.set_user_attribute(user_id, "n_used_tokens", n_used_tokens_dict)

def get_dialog_messages(self, user_id: int, dialog_id: int = None) -> List[Dict[str, any]]:
if not dialog_id:
dialog_id = self.get_user_attribute(user_id, 'current_dialog_id')
dialog = self.get_dialog_by_id(dialog_id)
return dialog['messages'] if dialog else []
else:
dialog = self.get_dialog_by_id(dialog_id)
return dialog['messages'] if dialog else []

def set_dialog_messages(self, user_id: str, messages: List[Dict[str, any]], dialog_id: int = None):
dialog_id = self.get_user_attribute(user_id, 'current_dialog_id')
dialog = self.get_dialog_by_id(dialog_id)
for message in messages:
message['date'] = message['date'].isoformat()
dialog['messages'] = messages
self.dialog_container.upsert_item(body=dialog)


def get_user_by_id(self, user_id: int):
query = "SELECT * FROM c WHERE c.id = @user_id"
params = [{"name": "@user_id", "value": str(user_id)}]
options = {"enable_cross_partition_query": True}
result = list(self.user_container.query_items(
query=query, parameters=params, enable_cross_partition_query=True
))
user = result[0] if result else []
return user

def get_dialog_by_id(self, dialog_id: int):
query = "SELECT * FROM c WHERE c.id = @dialog_id"
params = [{"name": "@dialog_id", "value": str(dialog_id)}]
options = {"enable_cross_partition_query": True}
result = list(self.dialog_container.query_items(
query=query, parameters=params, enable_cross_partition_query=True
))
dialog = result[0] if result else []
return dialog

def set_last_interaction_now(self, user_id:int):
self.set_user_attribute(user_id, "last_interaction", datetime.now().isoformat())
def get_last_interaction(self, user_id:int):
return datetime.strptime(self.get_user_attribute(user_id, "last_interaction"),"%Y-%m-%dT%H:%M:%S.%f")
2 changes: 2 additions & 0 deletions config/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ openai_api_key: ""
use_chatgpt_api: true
allowed_telegram_usernames: [] # if empty, the bot is available to anyone. pass a username string to allow it and/or user ids as integers
new_dialog_timeout: 600 # new dialog starts after timeout (in seconds)
cosmosdb_connection_string: ""
cosmosdb_database_id: ""
return_n_generated_images: 1
n_chat_modes_per_page: 5
enable_message_streaming: true # if set, messages will be shown to user word-by-word
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ tiktoken>=0.3.0
PyYAML==6.0
pymongo==4.3.3
python-dotenv==0.21.0
pydub==0.25.1
pydub==0.25.1
azure-cosmos==4.3.1