Skip to content

feat: confluence updater #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7738475
feat: add endpoints for updating and switching collections in the vec…
a-klos Mar 18, 2025
3df45cd
fix: change type hint for identification parameter in delete_document…
a-klos Mar 18, 2025
f3d0b0a
feat: add update endpoint for confluence content and adjust identific…
a-klos Mar 18, 2025
dcb9487
refactor: update documentation and comments to reflect 'Stackit RAG' …
a-klos Mar 18, 2025
40aa34b
chore: Add duplication endpoint to openapi spec
a-klos Mar 18, 2025
f2f8991
chore: add endpoint for collection duplication
a-klos Mar 18, 2025
9f1bd9e
feat: implement abstract base classes for collection switcher and dup…
a-klos Mar 18, 2025
76f7175
feat: Add upload request model and update related API endpoints
a-klos Apr 25, 2025
c58486e
feat: Update upload API calls to use UploadRequest model in Confluenc…
a-klos Apr 25, 2025
357b758
feat: Refactor information piece upload methods to use UploadRequest …
a-klos Apr 25, 2025
4fa6fbf
feat: Update delete method in VectorDatabase to accept collection nam…
a-klos Apr 25, 2025
482ea43
feat: Enhance collection duplicator and switcher with detailed docstr…
a-klos Apr 25, 2025
e5b694e
feat: Introduce ConfluenceHandler and refactor DefaultConfluenceLoade…
a-klos Apr 25, 2025
57b31e5
fix: imports
a-klos Apr 28, 2025
8cfa550
feat: Update ConfluenceHandler and DefaultConfluenceUpdater to suppor…
a-klos Apr 28, 2025
9f3a670
Update dependencies and add comprehensive tests for collection manage…
a-klos Apr 29, 2025
ae43766
feat: Add SparseEmbedderSettings and integrate sparse embedder into D…
a-klos Apr 29, 2025
f6eb00b
feat: Update QdrantDatabase instantiation to include sparse embedder
a-klos Apr 29, 2025
4007d22
feat: Update ConfluenceUpdater usage in AdminApi and adjust collectio…
a-klos Apr 30, 2025
decb7af
Refactor code for improved readability and consistency
a-klos Apr 30, 2025
be4d207
feat: Update JSON serialization in models to use model_dump_json for …
a-klos Apr 30, 2025
a5d45b9
fix: Correct typo in parameter name for upload_information_piece meth…
a-klos Apr 30, 2025
f3cc2a1
feat: Refactor collection name handling in QdrantDatabase; add _get_a…
a-klos May 5, 2025
0e1f872
feat: Add max_pages parameter to ConfluenceParameters and related set…
a-klos May 5, 2025
74f45e7
feat: Add max_pages parameter to ConfluenceParameters; update related…
a-klos May 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
.vscode
*notes.md
node_modules/
# macOS Finder metadata
.DS_Store
**/.DS_Store
15 changes: 15 additions & 0 deletions admin-api-lib/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ paths:
tags:
- admin
summary: Loading confluence to the vector db
/confluence/update:
post:
description: Extracts the confluence space and trigger collection switch in
the vector database. The key value store will not be updated during the operations.
operationId: update_confluence
responses:
"200":
description: Confluence content updated
"423":
description: Update is already in process.
"500":
description: Internal server error.
summary: Update the confluence content in the vectordatabase
tags:
- admin
components:
schemas:
status:
Expand Down
337 changes: 280 additions & 57 deletions admin-api-lib/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""confluence_updater module

Defines an abstract base class for asynchronously updating content from Confluence.
"""

from abc import ABC, abstractmethod


class ConfluenceUpdater(ABC):
"""Interface for async Confluence content updates."""

@abstractmethod
async def aupdate_from_confluence(self) -> None:
"""Asynchronously fetch and update data from Confluence.

Returns
--------
None
"""
16 changes: 16 additions & 0 deletions admin-api-lib/src/admin_api_lib/apis/admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ async def load_confluence_post() -> None:
return await BaseAdminApi.subclasses[0]().load_confluence_post()


@router.post(
"/confluence/update",
responses={
200: {"description": "Confluence content updated"},
423: {"description": "Update is already in process."},
500: {"description": "Internal server error."},
},
tags=["admin"],
summary="Update the confluence content in the vectordatabase",
response_model_by_alias=True,
)
async def update_confluence() -> None:
"""Extracts the confluence space and trigger collection switch in the vector database."""
return await BaseAdminApi.subclasses[0]().update_confluence()


@router.post(
"/upload_documents",
responses={
Expand Down
5 changes: 5 additions & 0 deletions admin-api-lib/src/admin_api_lib/apis/admin_api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ async def load_confluence_post(
None
"""

async def update_confluence(
self,
) -> None:
"""Extracts the confluence space and trigger collection switch in the vector database. The key value store will not be updated during the operations."""

async def upload_documents_post(
self,
body: UploadFile,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Module for the DefaultConfluenceLoader class."""

import logging
from asyncio import run
import threading

from fastapi import HTTPException, status
from langchain_core.documents import Document

from admin_api_lib.api_endpoints.document_deleter import DocumentDeleter
from admin_api_lib.chunker.chunker import Chunker
from admin_api_lib.extractor_api_client.openapi_client.api.extractor_api import (
ExtractorApi,
)
from admin_api_lib.impl.key_db.file_status_key_value_store import (
FileStatusKeyValueStore,
)
from admin_api_lib.impl.mapper.confluence_settings_mapper import (
ConfluenceSettingsMapper,
)
from admin_api_lib.impl.mapper.informationpiece2document import (
InformationPiece2Document,
)
from admin_api_lib.impl.settings.confluence_settings import ConfluenceSettings
from admin_api_lib.information_enhancer.information_enhancer import InformationEnhancer
from admin_api_lib.models.status import Status
from admin_api_lib.rag_backend_client.openapi_client.api.rag_api import RagApi
from admin_api_lib.utils.utils import sanitize_document_name
from admin_api_lib.rag_backend_client.openapi_client.models.upload_request import (
UploadRequest,
)

logger = logging.getLogger(__name__)


class ConfluenceHandler:
def __init__(
self,
extractor_api: ExtractorApi,
settings: ConfluenceSettings,
information_mapper: InformationPiece2Document,
rag_api: RagApi,
key_value_store: FileStatusKeyValueStore,
information_enhancer: InformationEnhancer,
chunker: Chunker,
document_deleter: DocumentDeleter,
settings_mapper: ConfluenceSettingsMapper,
):
"""
Initialize the ConfluenceHandler with the provided dependencies.

Parameters
----------
extractor_api : ExtractorApi
The API for extracting information.
settings : ConfluenceSettings
The settings for Confluence.
information_mapper : InformationPiece2Document
The mapper for information pieces to langchain documents.
rag_api : RagApi
The API client for interacting with the RAG backend system.
key_value_store : FileStatusKeyValueStore
The key-value store to store file names and the corresponding file statuses.
information_enhancer : InformationEnhancer
The enhancer for information pieces.
chunker : Chunker
The chunker for breaking down documents into chunks.
document_deleter : DocumentDeleter
The deleter for documents from S3 Storage and Vector Database.
settings_mapper : ConfluenceSettingsMapper
The mapper to map the Confluence settings to confluence parameters.
"""
self._extractor_api = extractor_api
self._rag_api = rag_api
self._settings = settings
self._key_value_store = key_value_store
self._information_mapper = information_mapper
self._information_enhancer = information_enhancer
self._chunker = chunker
self._document_deleter = document_deleter
self._settings_mapper = settings_mapper

async def _process_confluence(self, index):
logger.info("Loading from Confluence %s", self._settings.url[index])
self._sanitize_document_name(index=index)

params = self._settings_mapper.map_settings_to_params(self._settings, index)
try:
self._key_value_store.upsert(self._settings.document_name[index], Status.PROCESSING)
information_pieces = self._extractor_api.extract_from_confluence_post(params)
documents = [self._information_mapper.extractor_information_piece2document(x) for x in information_pieces]
documents = await self._aenhance_langchain_documents(documents)
chunked_documents = self._chunker.chunk(documents)
return [self._information_mapper.document2rag_information_piece(doc) for doc in chunked_documents]
except Exception as e:
self._key_value_store.upsert(self._settings.document_name[index], Status.ERROR)
logger.error("Error while loading from Confluence: %s", str(e))
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR, f"Error loading from Confluence: {str(e)}"
) from e

async def _update_vector_db(self, results: dict, use_latest_collection: bool | None = None) -> None:
for idx in sorted(results.keys()):
rag_information_pieces = results[idx]
await self._delete_previous_information_pieces(index=idx, use_latest_collection=use_latest_collection)
self._key_value_store.upsert(self._settings.document_name[idx], Status.UPLOADING)
self._upload_information_pieces(
rag_information_pieces, index=idx, use_latest_collection=use_latest_collection
)

async def _aload_from_confluence(self, use_latest_collection: bool | None = None) -> None:
threads = []
results: dict[int, list[Document]] = {}

def worker(idx: int):
pieces = run(self._process_confluence(idx))
results[idx] = pieces

for idx in range(len(self._settings.url)):
t = threading.Thread(target=worker, args=(idx,))
threads.append(t)
t.start()

for t in threads:
t.join()

await self._update_vector_db(results, use_latest_collection=use_latest_collection)

async def _aenhance_langchain_documents(self, documents: list[Document]):
try:
return await self._information_enhancer.ainvoke(documents)
except Exception as e:
logger.error("Exception occured while enhancing confluence langchain document %s" % e)
raise e

async def _delete_previous_information_pieces(self, index=0, use_latest_collection: bool | None = None):
try:
await self._document_deleter.adelete_document(
self._settings.document_name[index], use_latest_collection=use_latest_collection
)
except HTTPException as e:
logger.error(
(
"Error while trying to delete documents with id: %s before uploading %s."
"NOTE: Still continuing with upload."
),
self._settings.document_name[index],
e,
)

def _upload_information_pieces(self, rag_api_documents, index=0, use_latest_collection: bool | None = None):
try:
self._rag_api.upload_information_piece(
UploadRequest(information_pieces=rag_api_documents, use_latest_collection=use_latest_collection)
)
self._key_value_store.upsert(self._settings.document_name[index], Status.READY)
logger.info("Confluence loaded successfully")
except Exception as e:
self._key_value_store.upsert(self._settings.document_name[index], Status.ERROR)
logger.error("Error while uploading Confluence to the database: %s", str(e))
raise HTTPException(500, f"Error loading from Confluence: {str(e)}") from e

def _sanitize_document_name(self, index) -> None:
document_name = (
self._settings.document_name[index] if self._settings.document_name[index] else self._settings.url[index]
)
document_name = document_name.replace("http://", "").replace("https://", "")

self._settings.document_name[index] = sanitize_document_name(document_name)
13 changes: 13 additions & 0 deletions admin-api-lib/src/admin_api_lib/dependency_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from admin_api_lib.impl.api_endpoints.default_confluence_loader import (
DefaultConfluenceLoader,
)
from admin_api_lib.impl.api_endpoints.default_confluence_updater import DefaultConfluenceUpdater
from admin_api_lib.impl.api_endpoints.default_document_deleter import (
DefaultDocumentDeleter,
)
Expand Down Expand Up @@ -176,6 +177,18 @@ class DependencyContainer(DeclarativeContainer):
document_deleter=document_deleter,
settings_mapper=confluence_settings_mapper,
)
confluence_updater = Singleton(
DefaultConfluenceUpdater,
extractor_api=document_extractor,
rag_api=rag_api,
key_value_store=key_value_store,
settings=confluence_settings,
information_enhancer=information_enhancer,
information_mapper=information_mapper,
chunker=chunker,
document_deleter=document_deleter,
settings_mapper=confluence_settings_mapper,
)
document_reference_retriever = Singleton(DefaultDocumentReferenceRetriever, file_service=file_service)
document_uploader = Singleton(
DefaultDocumentUploader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re # noqa: F401
from typing import Any, ClassVar, Dict, List, Optional, Set

from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictStr
from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictStr, StrictInt
from typing import Any, ClassVar, Dict, List, Optional
from admin_api_lib.extractor_api_client.openapi_client.models.key_value_pair import KeyValuePair
from typing import Optional, Set
Expand Down Expand Up @@ -49,6 +49,7 @@ class ConfluenceParameters(BaseModel):
confluence_kwargs: Optional[List[KeyValuePair]] = Field(
default=None, description="Additional kwargs like verify_ssl"
)
max_pages: Optional[StrictInt] = Field(default=None, description="Defines the maximum number of pages extracted.")
__properties: ClassVar[List[str]] = [
"url",
"token",
Expand All @@ -58,6 +59,7 @@ class ConfluenceParameters(BaseModel):
"keep_newlines",
"document_name",
"confluence_kwargs",
"max_pages",
]

model_config = ConfigDict(
Expand Down Expand Up @@ -130,6 +132,7 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"confluence_kwargs": [KeyValuePair.from_dict(_item) for _item in obj["confluence_kwargs"]]
if obj.get("confluence_kwargs") is not None
else None,
"max_pages": obj.get("max_pages"),
}
)
return _obj
21 changes: 21 additions & 0 deletions admin-api-lib/src/admin_api_lib/impl/admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import Depends, Request, Response, UploadFile

from admin_api_lib.api_endpoints.confluence_loader import ConfluenceLoader
from admin_api_lib.api_endpoints.confluence_updater import ConfluenceUpdater
from admin_api_lib.api_endpoints.document_deleter import DocumentDeleter
from admin_api_lib.api_endpoints.document_reference_retriever import (
DocumentReferenceRetriever,
Expand Down Expand Up @@ -104,6 +105,26 @@ async def load_confluence_post(
"""
await confluence_loader.aload_from_confluence()

@inject
async def update_confluence(
self,
confluence_updater: ConfluenceUpdater = Depends(Provide[DependencyContainer.confluence_updater]),
) -> None:
"""
Asynchronously updates the documents related to a Confluence spaces

Parameters
----------
confluence_uploader : ConfluenceLoader
The ConfluenceLoader instance to use for updating the documents. This is injected by dependency injection
(default is Depends(Provide[DependencyContainer.confluence_updater])).

Returns
-------
None
"""
await confluence_updater.aupdate_from_confluence()

@inject
async def document_reference_id_get(
self,
Expand Down
Loading
Loading