From daa15bb33a75cff7718d9e46396619441c864954 Mon Sep 17 00:00:00 2001 From: Sriniketh J Date: Sat, 31 May 2025 02:06:41 +0530 Subject: [PATCH 1/5] feat: add run_async support for OllamaDocumentEmbedder --- .../embedders/ollama/document_embedder.py | 71 +++++++++++++++++-- .../ollama/tests/test_document_embedder.py | 17 +++++ 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py index 8d2f5f5059..37982e8a67 100644 --- a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py +++ b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py @@ -3,7 +3,7 @@ from haystack import Document, component from tqdm import tqdm -from ollama import Client +from ollama import AsyncClient, Client @component @@ -75,6 +75,16 @@ def __init__( self._client = Client(host=self.url, timeout=self.timeout) + def _prepare_input(self, documents: List[Document]) -> List[Document]: + if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): + msg = ( + "OllamaDocumentEmbedder expects a list of Documents as input." + "In case you want to embed a list of strings, please use the OllamaTextEmbedder." + ) + raise TypeError(msg) + + return documents + def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]: """ Prepares the texts to embed by concatenating the Document text with the metadata fields to embed. @@ -115,6 +125,24 @@ def _embed_batch( return all_embeddings + async def _embed_batch_async( + self, texts_to_embed: List[str], batch_size: int, generation_kwargs: Optional[Dict[str, Any]] = None + ): + """ + Internal method to embed a batch of texts asynchronously. + """ + self._client = AsyncClient(host=self.url, timeout=self.timeout) + all_embeddings = [] + + for i in tqdm( + range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings" + ): + batch = texts_to_embed[i : i + batch_size] + result = await self._client.embed(model=self.model, input=batch, options=generation_kwargs) + all_embeddings.extend(result["embeddings"]) + + return all_embeddings + @component.output_types(documents=List[Document], meta=Dict[str, Any]) def run(self, documents: List[Document], generation_kwargs: Optional[Dict[str, Any]] = None): """ @@ -130,12 +158,7 @@ def run(self, documents: List[Document], generation_kwargs: Optional[Dict[str, A - `documents`: Documents with embedding information attached - `meta`: The metadata collected during the embedding process """ - if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): - msg = ( - "OllamaDocumentEmbedder expects a list of Documents as input." - "In case you want to embed a list of strings, please use the OllamaTextEmbedder." - ) - raise TypeError(msg) + documents = self._prepare_input(documents=documents) generation_kwargs = generation_kwargs or self.generation_kwargs @@ -148,3 +171,37 @@ def run(self, documents: List[Document], generation_kwargs: Optional[Dict[str, A doc.embedding = emb return {"documents": documents, "meta": {"model": self.model}} + + @component.output_types(documents=List[Document], meta=Dict[str, Any]) + async def run_async(self, documents: List[Document], generation_kwargs: Optional[Dict[str, Any]] = None): + """ + Asynchronously run an Ollama Model to compute embeddings of the provided documents. + + :param documents: + Documents to be converted to an embedding. + :param generation_kwargs: + Optional arguments to pass to the Ollama generation endpoint, such as temperature, + top_p, etc. See the + [Ollama docs](https://github.com/jmorganca/ollama/blob/main/docs/modelfile.md#valid-parameters-and-values). + :returns: A dictionary with the following keys: + - `documents`: Documents with embedding information attached + - `meta`: The metadata collected during the embedding process + """ + + documents = self._prepare_input(documents=documents) + + if not documents: + # return early if we were passed an empty list + return {"documents": [], "meta": {}} + + generation_kwargs = generation_kwargs or self.generation_kwargs + + texts_to_embed = self._prepare_texts_to_embed(documents=documents) + embeddings = await self._embed_batch_async( + texts_to_embed=texts_to_embed, batch_size=self.batch_size, generation_kwargs=generation_kwargs + ) + + for doc, emb in zip(documents, embeddings): + doc.embedding = emb + + return {"documents": documents, "meta": {"model": self.model}} diff --git a/integrations/ollama/tests/test_document_embedder.py b/integrations/ollama/tests/test_document_embedder.py index 7d972e8985..009117240c 100644 --- a/integrations/ollama/tests/test_document_embedder.py +++ b/integrations/ollama/tests/test_document_embedder.py @@ -50,6 +50,23 @@ def test_run(self): Document(content="Llamas have been used as pack animals for centuries, especially in South America."), ] result = embedder.run(list_of_docs) + + assert result["meta"]["model"] == "nomic-embed-text" + documents = result["documents"] + assert len(documents) == 3 + assert all(isinstance(element, float) for document in documents for element in document.embedding) + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async(self): + embedder = OllamaDocumentEmbedder(model="nomic-embed-text", batch_size=2) + list_of_docs = [ + Document(content="Llamas are amazing animals known for their soft wool and gentle demeanor."), + Document(content="The Andes mountains are the natural habitat of many llamas."), + Document(content="Llamas have been used as pack animals for centuries, especially in South America."), + ] + result = await embedder.run_async(list_of_docs) + assert result["meta"]["model"] == "nomic-embed-text" documents = result["documents"] assert len(documents) == 3 From a5b7c406b979c162f1824e763a850c557d6bc7d4 Mon Sep 17 00:00:00 2001 From: Sriniketh J Date: Wed, 4 Jun 2025 02:29:24 +0530 Subject: [PATCH 2/5] fix: use one client per doc embedder class obj --- .../components/embedders/ollama/document_embedder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py index 37982e8a67..d15446ee4b 100644 --- a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py +++ b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py @@ -74,6 +74,7 @@ def __init__( self.prefix = prefix self._client = Client(host=self.url, timeout=self.timeout) + self._async_client = AsyncClient(host=self.url, timeout=self.timeout) def _prepare_input(self, documents: List[Document]) -> List[Document]: if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): @@ -131,14 +132,13 @@ async def _embed_batch_async( """ Internal method to embed a batch of texts asynchronously. """ - self._client = AsyncClient(host=self.url, timeout=self.timeout) all_embeddings = [] for i in tqdm( range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings" ): batch = texts_to_embed[i : i + batch_size] - result = await self._client.embed(model=self.model, input=batch, options=generation_kwargs) + result = await self._async_client.embed(model=self.model, input=batch, options=generation_kwargs) all_embeddings.extend(result["embeddings"]) return all_embeddings From be4ae4fecd20f015de8592b4a5efc2eb540d725c Mon Sep 17 00:00:00 2001 From: Sriniketh J Date: Fri, 6 Jun 2025 20:35:06 +0530 Subject: [PATCH 3/5] docs: address review comments --- .../components/embedders/ollama/document_embedder.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py index d15446ee4b..6bfc5bdbc2 100644 --- a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py +++ b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py @@ -77,6 +77,9 @@ def __init__( self._async_client = AsyncClient(host=self.url, timeout=self.timeout) def _prepare_input(self, documents: List[Document]) -> List[Document]: + """ + Prepares the list of documents to embed by appropriate validation. + """ if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): msg = ( "OllamaDocumentEmbedder expects a list of Documents as input." @@ -160,6 +163,10 @@ def run(self, documents: List[Document], generation_kwargs: Optional[Dict[str, A """ documents = self._prepare_input(documents=documents) + if not documents: + # return early if we were passed an empty list + return {"documents": [], "meta": {}} + generation_kwargs = generation_kwargs or self.generation_kwargs texts_to_embed = self._prepare_texts_to_embed(documents=documents) From 7b8ca260b637b8e7724e333b8ae1ec5f9373ec5b Mon Sep 17 00:00:00 2001 From: Sriniketh J Date: Mon, 9 Jun 2025 14:30:39 +0530 Subject: [PATCH 4/5] fix: better async handling --- .../embedders/ollama/document_embedder.py | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py index 6bfc5bdbc2..9b24349517 100644 --- a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py +++ b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py @@ -1,3 +1,4 @@ +import asyncio from typing import Any, Dict, List, Optional from haystack import Document, component @@ -137,12 +138,21 @@ async def _embed_batch_async( """ all_embeddings = [] - for i in tqdm( - range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings" - ): - batch = texts_to_embed[i : i + batch_size] - result = await self._async_client.embed(model=self.model, input=batch, options=generation_kwargs) - all_embeddings.extend(result["embeddings"]) + batches = [texts_to_embed[i : i + batch_size] for i in range(0, len(texts_to_embed), batch_size)] + + tasks = [ + self._async_client.embed( + model=self.model, + input=batch, + options=generation_kwargs, + ) + for batch in batches + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + for _idx, res in enumerate(results): + all_embeddings.extend(res["embeddings"]) return all_embeddings From 07141808e049ebe87ee5f9d2bcc27df5c9dbfe73 Mon Sep 17 00:00:00 2001 From: Sriniketh J Date: Mon, 9 Jun 2025 21:09:43 +0530 Subject: [PATCH 5/5] fix: type checking --- .../components/embedders/ollama/document_embedder.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py index 9b24349517..d13ec2c9c7 100644 --- a/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py +++ b/integrations/ollama/src/haystack_integrations/components/embedders/ollama/document_embedder.py @@ -151,7 +151,10 @@ async def _embed_batch_async( results = await asyncio.gather(*tasks, return_exceptions=True) - for _idx, res in enumerate(results): + for idx, res in enumerate(results): + if isinstance(res, BaseException): + err_msg = f"Embedding batch {idx} raised an exception." + raise RuntimeError(err_msg) all_embeddings.extend(res["embeddings"]) return all_embeddings