From 234f6beff316e2e438ff7439f16c0e786cb1c60a Mon Sep 17 00:00:00 2001 From: TranswarpHippo Date: Tue, 31 Oct 2023 11:46:56 +0800 Subject: [PATCH 1/3] Add a new vector hippo for chatgpt-retrieval-plugin --- datastore/factory.py | 5 + datastore/providers/hippo_datastore.py | 252 ++++++++++++++++++ docs/providers/hippo/setup.md | 19 ++ poetry.lock | 25 +- pyproject.toml | 1 + .../providers/hippo/test_hippo_datastore.py | 1 + 6 files changed, 299 insertions(+), 4 deletions(-) create mode 100644 datastore/providers/hippo_datastore.py create mode 100644 docs/providers/hippo/setup.md create mode 100644 tests/datastore/providers/hippo/test_hippo_datastore.py diff --git a/datastore/factory.py b/datastore/factory.py index dd4e9b538..67b9363de 100644 --- a/datastore/factory.py +++ b/datastore/factory.py @@ -11,6 +11,11 @@ async def get_datastore() -> DataStore: from datastore.providers.chroma_datastore import ChromaDataStore return ChromaDataStore() + case "hippo": + from datastore.providers.hippo_datastore import HippoDataStore + + return HippoDataStore() + case "llama": from datastore.providers.llama_datastore import LlamaDataStore diff --git a/datastore/providers/hippo_datastore.py b/datastore/providers/hippo_datastore.py new file mode 100644 index 000000000..253397820 --- /dev/null +++ b/datastore/providers/hippo_datastore.py @@ -0,0 +1,252 @@ +from __future__ import annotations + +import os +from typing import Any, Dict, List, Optional +from tenacity import retry, wait_random_exponential, stop_after_attempt +import asyncio +from loguru import logger +from transwarp_hippo_api.hippo_client import HippoClient, HippoField +from transwarp_hippo_api.hippo_type import HippoType, IndexType, MetricType + +from datastore.datastore import DataStore +from models.models import ( + DocumentChunk, + DocumentChunkMetadata, + DocumentChunkWithScore, + DocumentMetadataFilter, + QueryResult, + QueryWithEmbedding, + Source, +) +from services.date import to_unix_timestamp + +# Read environment variables for hippo configuration + +HIPPO_TABLE = os.environ.get("HIPPO_TABLE") +HIPPO_DATABASE = os.environ.get("HIPPO_DATABASE") +HIPPO_HOST = os.environ.get("HIPPO_HOST") +HIPPO_PORT = os.environ.get("HIPPO_PORT") +HIPPO_USER = os.environ.get("HIPPO_USER") +HIPPO_PASSWORD = os.environ.get("HIPPO_PASSWORD") +EMBEDDING_FIELD = os.environ.get("EMBEDDING_FIELD") + +if HIPPO_DATABASE is None: + HIPPO_DATABASE = "default" + +if HIPPO_USER is None: + HIPPO_USER = "shiva" + +if HIPPO_PASSWORD is None: + HIPPO_PASSWORD = "shiva" + +if EMBEDDING_FIELD is None: + EMBEDDING_FIELD = "vector" + +assert HIPPO_TABLE is not None +assert HIPPO_HOST is not None +assert HIPPO_PORT is not None + +# Set the batch size for upserting vectors to Pinecone +UPSERT_BATCH_SIZE = 100 + + +class HippoDataStore(DataStore): + def __init__(self): + # 创建HippoClient + self.client = HippoClient([HIPPO_HOST + ":" + HIPPO_PORT]) + self.table = None + if self.client.check_table_exists(HIPPO_TABLE, HIPPO_DATABASE): + # Connect to an existing table with the specified name + try: + logger.info(f"Connecting to existing table {HIPPO_TABLE} in {HIPPO_DATABASE}") + self.table = self.client.get_table(HIPPO_TABLE, HIPPO_DATABASE) + logger.info(f"Connected to table {HIPPO_TABLE} in {HIPPO_DATABASE} successfully") + except Exception as e: + logger.error(f"Error connecting to table {HIPPO_TABLE} in {HIPPO_DATABASE}: {e}") + raise e + else: + fields_to_table = list(DocumentChunkMetadata.__fields__.keys()) + # Create a new index with the specified name, dimension, and metadata configuration + try: + logger.info( + f"Creating table {HIPPO_TABLE} in {HIPPO_DATABASE} with metadata config {fields_to_table}" + ) + field = [ + HippoField("pk", True, HippoType.STRING), + # dimensionality of OpenAI ada v2 embeddings + HippoField(EMBEDDING_FIELD, False, HippoType.FLOAT_VECTOR, type_params={"dimension": 1536}), + HippoField("text", False, HippoType.STRING), + HippoField("document_id", False, HippoType.STRING), + HippoField("source_id", False, HippoType.STRING), + HippoField("source", False, HippoType.STRING), + HippoField("url", False, HippoType.STRING), + HippoField("created_at", False, HippoType.STRING), + HippoField("author", False, HippoType.STRING), + ] + self.table = self.client.create_table(name=HIPPO_TABLE, fields=field, auto_id=True, + database_name=HIPPO_DATABASE) + + self.table.create_index(EMBEDDING_FIELD, "vector_index", IndexType.IVF_FLAT, MetricType.L2, + nlist=10) + self.table.activate_index("vector_index") + logger.info(f"Table {HIPPO_TABLE} in {HIPPO_DATABASE} created successfully") + except Exception as e: + logger.error(f"Error creating table {HIPPO_TABLE}: {e}") + raise e + + async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: + """Upsert chunks into the datastore. + + Args: + chunks (Dict[str, List[DocumentChunk]]): A list of DocumentChunks to insert + + Raises: + e: Error in upserting data. + + Returns: + List[str]: The document_id's that were inserted. + """ + # The doc id's to return for the upsert + doc_ids: List[str] = [] + for doc_id, chunk_list in chunks.items(): + doc_ids.append(doc_id) + for chunk in chunk_list: + values = self._get_values(chunk) + + id = values["id"] + text = values["text"] + embedding = values["embedding"] + source = values["source"] + source_id = values["source_id"] + url = values["url"] + create_at = values["created_at"] + author = values["author"] + document_id = values["document_id"] + + print(f"id:{id}") + print(f"text:{text}") + print(f"embedding:{embedding}") + print(f"source:{source}") + print(f"source_id:{source_id}") + print(f"url:{url}") + print(f"create_at:{create_at}") + print(f"author:{author}") + print(f"document_id:{document_id}") + + result = self.table.insert_rows( + [ + [id], + [embedding], + [text], + [source], + [source_id], + [url], + [create_at], + [author], + [document_id], + ] + ) + if not result: + raise Exception("Inserting data failed") + else: + logger.info(f"id: {id}") + logger.info(f"text: {text}") + logger.info(f"embedding: {embedding}") + logger.info(f"source: {source}") + logger.info(f"source_id: {source_id}") + logger.info(f"url: {url}") + logger.info(f"create_at: {create_at}") + logger.info(f"author: {author}") + logger.info(f"document_id: {document_id}") + + return doc_ids + + def _get_values(self, chunk: DocumentChunk) -> List[any] | None: + """Convert the chunk into a list of values to insert whose indexes align with fields. + + Args: + chunk (DocumentChunk): The chunk to convert. + + Returns: + List (any): The values to insert. + """ + + # Convert DocumentChunk and its sub models to dict + values = chunk.dict() + # Unpack the metadata into the same dict + meta = values.pop("metadata") + values.update(meta) + + # Convert date to int timestamp form + if values["created_at"]: + values["created_at"] = to_unix_timestamp(values["created_at"]) + + # If source exists, change from Source object to the string value it holds + if values["source"]: + values["source"] = values["source"].value + + return values + + @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3)) + async def _query(self, queries: List[QueryWithEmbedding]) -> List[QueryResult]: + + # Define a helper coroutine that performs a single query and returns a QueryResult + async def _single_query(query: QueryWithEmbedding) -> QueryResult: + print(query) + logger.debug(f"Query: {query.query}") + fields = [] + for field in self.client.get_table_schema(HIPPO_TABLE, HIPPO_DATABASE).get("fields"): + fields.append(field.get("name")) + try: + query_response = self.table.query( + search_field=EMBEDDING_FIELD, + vectors=[query.embedding], + output_fields=fields, + topk=query.top_k + ) + except Exception as e: + logger.error(f"Error querying table: {e}") + raise e + + try: + query_results: List[DocumentChunkWithScore] = [] + score_col = "text" + "%scores" + count = 0 + + print(fields) + print(EMBEDDING_FIELD) + fields.remove(EMBEDDING_FIELD) + for items in zip(*[query_response[0][field] for field in fields]): + meta = {field: value for field, value in zip(fields, items)} + score = query_response[0][score_col][count] + id = meta.pop("document_id") + text = meta.pop("text") + chunk = DocumentChunkWithScore( + id=id, + score=score, + text=text, + metadata=DocumentChunkMetadata(**meta), + ) + query_results.append(chunk) + return QueryResult(query=query.query, results=query_results) + except Exception as e: + logger.error("Failed to query, error: {}".format(e)) + return QueryResult(query=query.query, results=[]) + + # Use asyncio.gather to run multiple _single_query coroutines concurrently and collect their results + results: List[QueryResult] = await asyncio.gather( + *[_single_query(query) for query in queries] + ) + + return results + + async def delete(self, ids: Optional[List[str]] = None, filter: Optional[DocumentMetadataFilter] = None, + delete_all: Optional[bool] = None) -> bool: + if delete_all: + self.client.delete_table(HIPPO_TABLE, HIPPO_DATABASE) + else: + for documentId in ids: + expr = f"document_id = {documentId} " + result = self.table.delete_rows_by_query(expr=expr) + if not result: + raise Exception("Deleting data failed") diff --git a/docs/providers/hippo/setup.md b/docs/providers/hippo/setup.md new file mode 100644 index 000000000..8d74dd200 --- /dev/null +++ b/docs/providers/hippo/setup.md @@ -0,0 +1,19 @@ +# Hippo + +## Deploying the Database + +You can deploy and manage Hippo using Docker Compose, K8's Operator Follow the instructions [here](https://www.transwarp.cn/starwarp) to get started. + +**Environment Variables:** + +| Name | Required | Description | +|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------| +| `DATASTORE` | Yes | Datastore name, set to `hippo` | +| `BEARER_TOKEN` | Yes | Your bearer token | +| `OPENAI_API_KEY` | Yes | Your OpenAI API key | +| `HIPPO_DATABASE` | Optional | Hippo database name, defaults to default +| `HIPPO_TABLE` | Yes | Hippo table name, defaults to a random UUID | +| `HIPPO_HOST` | Yes | Hippo host IP, | +| `HIPPO_PORT` | Yes | Hippo port, | +| `HIPPO_USER` | Optional | Hippo username if RBAC is enabled, defaults to `shiva` | +| `HIPPO_PASSWORD` | Optional | Hippo password if required, defaults to `shiva` | diff --git a/poetry.lock b/poetry.lock index b60c492e6..0c9aa5e81 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -1177,6 +1177,7 @@ files = [ {file = "greenlet-2.0.2-cp27-cp27m-win32.whl", hash = "sha256:6c3acb79b0bfd4fe733dff8bc62695283b57949ebcca05ae5c129eb606ff2d74"}, {file = "greenlet-2.0.2-cp27-cp27m-win_amd64.whl", hash = "sha256:283737e0da3f08bd637b5ad058507e578dd462db259f7f6e4c5c365ba4ee9343"}, {file = "greenlet-2.0.2-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d27ec7509b9c18b6d73f2f5ede2622441de812e7b1a80bbd446cb0633bd3d5ae"}, + {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d967650d3f56af314b72df7089d96cda1083a7fc2da05b375d2bc48c82ab3f3c"}, {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:30bcf80dda7f15ac77ba5af2b961bdd9dbc77fd4ac6105cee85b0d0a5fcf74df"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26fbfce90728d82bc9e6c38ea4d038cba20b7faf8a0ca53a9c07b67318d46088"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9190f09060ea4debddd24665d6804b995a9c122ef5917ab26e1566dcc712ceeb"}, @@ -1185,6 +1186,7 @@ files = [ {file = "greenlet-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:76ae285c8104046b3a7f06b42f29c7b73f77683df18c49ab5af7983994c2dd91"}, {file = "greenlet-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:2d4686f195e32d36b4d7cf2d166857dbd0ee9f3d20ae349b6bf8afc8485b3645"}, {file = "greenlet-2.0.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c4302695ad8027363e96311df24ee28978162cdcdd2006476c43970b384a244c"}, + {file = "greenlet-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d4606a527e30548153be1a9f155f4e283d109ffba663a15856089fb55f933e47"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c48f54ef8e05f04d6eff74b8233f6063cb1ed960243eacc474ee73a2ea8573ca"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1846f1b999e78e13837c93c778dcfc3365902cfb8d1bdb7dd73ead37059f0d0"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a06ad5312349fec0ab944664b01d26f8d1f05009566339ac6f63f56589bc1a2"}, @@ -1214,6 +1216,7 @@ files = [ {file = "greenlet-2.0.2-cp37-cp37m-win32.whl", hash = "sha256:3f6ea9bd35eb450837a3d80e77b517ea5bc56b4647f5502cd28de13675ee12f7"}, {file = "greenlet-2.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:7492e2b7bd7c9b9916388d9df23fa49d9b88ac0640db0a5b4ecc2b653bf451e3"}, {file = "greenlet-2.0.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:b864ba53912b6c3ab6bcb2beb19f19edd01a6bfcbdfe1f37ddd1778abfe75a30"}, + {file = "greenlet-2.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1087300cf9700bbf455b1b97e24db18f2f77b55302a68272c56209d5587c12d1"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:ba2956617f1c42598a308a84c6cf021a90ff3862eddafd20c3333d50f0edb45b"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3a569657468b6f3fb60587e48356fe512c1754ca05a564f11366ac9e306526"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8eab883b3b2a38cc1e050819ef06a7e6344d4a990d24d45bc6f2cf959045a45b"}, @@ -1222,6 +1225,7 @@ files = [ {file = "greenlet-2.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0ef99cdbe2b682b9ccbb964743a6aca37905fda5e0452e5ee239b1654d37f2a"}, {file = "greenlet-2.0.2-cp38-cp38-win32.whl", hash = "sha256:b80f600eddddce72320dbbc8e3784d16bd3fb7b517e82476d8da921f27d4b249"}, {file = "greenlet-2.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:4d2e11331fc0c02b6e84b0d28ece3a36e0548ee1a1ce9ddde03752d9b79bba40"}, + {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8512a0c38cfd4e66a858ddd1b17705587900dd760c6003998e9472b77b56d417"}, {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:88d9ab96491d38a5ab7c56dd7a3cc37d83336ecc564e4e8816dbed12e5aaefc8"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:561091a7be172ab497a3527602d467e2b3fbe75f9e783d8b8ce403fa414f71a6"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971ce5e14dc5e73715755d0ca2975ac88cfdaefcaab078a284fea6cfabf866df"}, @@ -1380,6 +1384,19 @@ files = [ hpack = ">=4.0,<5" hyperframe = ">=6.0,<7" +[[package]] +name = "hippo-api" +version = "1.1.0rc3" +description = "Transwarp Hippo API" +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "hippo_api-1.1.0rc3-py3-none-any.whl", hash = "sha256:68f8da5e3cbb3e06617b6bc41d321200809a83d6dec1bd96eabcdf5110ff076b"}, +] + +[package.dependencies] +pyyaml = ">=6.0" + [[package]] name = "hnswlib" version = "0.7.0" @@ -2281,8 +2298,8 @@ files = [ [package.dependencies] numpy = [ - {version = ">=1.21.0", markers = "python_version >= \"3.10\""}, {version = ">=1.23.2", markers = "python_version >= \"3.11\""}, + {version = ">=1.21.0", markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -3367,7 +3384,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\""} +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""} typing-extensions = ">=4.2.0" [package.extras] @@ -4218,4 +4235,4 @@ postgresql = ["psycopg2cffi"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "39179f3602509004d328d1fb7a48068c86f8e93ab0660cf18c7d7f85018cacf7" +content-hash = "99bd74c04ec12bf2fd8ae9a89f273b6274b74c54acaefcdc1d463ccb81b9b761" diff --git a/pyproject.toml b/pyproject.toml index 814eeaa65..a94df68c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ pgvector = "^0.1.7" psycopg2cffi = {version = "^2.9.0", optional = true} loguru = "^0.7.0" elasticsearch = "8.8.2" +hippo-api = "1.1.0rc3" [tool.poetry.scripts] start = "server.main:start" diff --git a/tests/datastore/providers/hippo/test_hippo_datastore.py b/tests/datastore/providers/hippo/test_hippo_datastore.py new file mode 100644 index 000000000..99f108930 --- /dev/null +++ b/tests/datastore/providers/hippo/test_hippo_datastore.py @@ -0,0 +1 @@ +# todo \ No newline at end of file From 94527ae44bcd27dd8a5eaad30613f0f5f0899466 Mon Sep 17 00:00:00 2001 From: TranswarpHippo Date: Mon, 13 Nov 2023 16:06:12 +0800 Subject: [PATCH 2/3] Add a new vector hippo for chatgpt-retrieval-plugin --- README.md | 3 +++ docs/providers/hippo/setup.md | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 56653b064..8c1f09780 100644 --- a/README.md +++ b/README.md @@ -325,6 +325,9 @@ For more detailed instructions on setting up and using each vector database prov [Milvus](https://milvus.io/) is an open-source, cloud-native vector database that scales to billions of vectors. It is the open-source version of Zilliz and shares many of its features, such as various indexing algorithms, distance metrics, scalar filtering, time travel searches, rollback with snapshots, multi-language SDKs, storage and compute separation, and cloud scalability. For detailed setup instructions, refer to [`/docs/providers/milvus/setup.md`](/docs/providers/milvus/setup.md). +#### Hippo +[Hippo](http://hippo.transwarp.io/) is a proprietary enterprise cloud native distributed vector database, supporting storing, indexing and managing massive vector data, delivering accelerated solutions for many areas, such as vector similarity search and clustering of dense vectors. Hippo ensures high availability, high performance and easy scale-out/in, supports vector search index, and delivers a set of functionality including data sharding, partitioning, data persistence, incremental data ingestion, vector/scalar filtering in hybrid search, enabling enterprises to perform real-time query, search, and candidate generation against massive vector data. + #### Qdrant [Qdrant](https://qdrant.tech/) is a vector database capable of storing documents and vector embeddings. It offers both self-hosted and managed [Qdrant Cloud](https://cloud.qdrant.io/) deployment options, providing flexibility for users with different requirements. For detailed setup instructions, refer to [`/docs/providers/qdrant/setup.md`](/docs/providers/qdrant/setup.md). diff --git a/docs/providers/hippo/setup.md b/docs/providers/hippo/setup.md index 8d74dd200..2b05a8389 100644 --- a/docs/providers/hippo/setup.md +++ b/docs/providers/hippo/setup.md @@ -2,7 +2,7 @@ ## Deploying the Database -You can deploy and manage Hippo using Docker Compose, K8's Operator Follow the instructions [here](https://www.transwarp.cn/starwarp) to get started. +You can deploy and manage Hippo using hippo cloud Follow the instructions [here](http://hippo.transwarp.io/) to get started. **Environment Variables:** From 06a9c7210c7d47bc079edef6d5db9c438ed96522 Mon Sep 17 00:00:00 2001 From: TranswarpHippo Date: Fri, 24 Nov 2023 11:27:59 +0800 Subject: [PATCH 3/3] Add a new vector hippo for chatgpt-retrieval-plugin --- datastore/providers/hippo_datastore.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datastore/providers/hippo_datastore.py b/datastore/providers/hippo_datastore.py index 253397820..513141ec5 100644 --- a/datastore/providers/hippo_datastore.py +++ b/datastore/providers/hippo_datastore.py @@ -212,7 +212,6 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult: query_results: List[DocumentChunkWithScore] = [] score_col = "text" + "%scores" count = 0 - print(fields) print(EMBEDDING_FIELD) fields.remove(EMBEDDING_FIELD)