Skip to content

Commit 01b1859

Browse files
authored
feat: raise errors from document search ingest when any document failed (#483)
1 parent ea73883 commit 01b1859

File tree

4 files changed

+100
-10
lines changed

4 files changed

+100
-10
lines changed

packages/ragbits-document-search/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- DocumentSearch.ingest now raises IngestExecutionError when any errors are encountered during ingestion.
6+
57
## 0.12.0 (2025-03-25)
68

79
### Changed

packages/ragbits-document-search/src/ragbits/document_search/_main.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
IngestStrategy,
2828
SequentialIngestStrategy,
2929
)
30-
from ragbits.document_search.ingestion.strategies.base import IngestExecutionResult
30+
from ragbits.document_search.ingestion.strategies.base import IngestExecutionError, IngestExecutionResult
3131
from ragbits.document_search.retrieval.rephrasers.base import QueryRephraser
3232
from ragbits.document_search.retrieval.rephrasers.noop import NoopQueryRephraser
3333
from ragbits.document_search.retrieval.rerankers.base import Reranker, RerankerOptions
@@ -210,25 +210,36 @@ async def search(self, query: str, config: SearchConfig | None = None) -> Sequen
210210
return outputs.search_results
211211

212212
@traceable
213-
async def ingest(self, documents: str | Iterable[DocumentMeta | Document | Source]) -> IngestExecutionResult:
213+
async def ingest(
214+
self, documents: str | Iterable[DocumentMeta | Document | Source], fail_on_error: bool = True
215+
) -> IngestExecutionResult:
214216
"""
215217
Ingest documents into the search index.
216218
217219
Args:
218-
documents: Either:
219-
- A iterable of `Document`, `DocumentMetadata`, or `Source` objects
220-
- A source-specific URI string (e.g., "gcs://bucket/*") to specify source location(s), for example:
221-
- "file:///path/to/files/*.txt"
222-
- "gcs://bucket/folder/*"
223-
- "huggingface://dataset/split/row"
220+
documents: A string representing a source-specific URI (e.g., "gcs://bucket/*") or an iterable of
221+
`Document`, `DocumentMeta`, or `Source` objects. Examples of URI formats include:
222+
- "file:///path/to/files/*.txt"
223+
- "gcs://bucket/folder/*"
224+
- "huggingface://dataset/split/row"
225+
fail_on_error: If True, raises IngestExecutionError when any errors are encountered during ingestion.
226+
If False, returns all errors encountered in the IngestExecutionResult.
224227
225228
Returns:
226-
The ingest execution result.
229+
An IngestExecutionResult containing the results of the ingestion process.
230+
231+
Raises:
232+
IngestExecutionError: If fail_on_error is True and any errors are encountered during ingestion.
227233
"""
228234
resolved_documents = await SourceResolver.resolve(documents) if isinstance(documents, str) else documents
229-
return await self.ingest_strategy(
235+
results = await self.ingest_strategy(
230236
documents=resolved_documents,
231237
vector_store=self.vector_store,
232238
parser_router=self.parser_router,
233239
enricher_router=self.enricher_router,
234240
)
241+
242+
if fail_on_error and results.failed:
243+
raise IngestExecutionError(results.failed)
244+
245+
return results

packages/ragbits-document-search/src/ragbits/document_search/ingestion/strategies/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ class IngestExecutionResult:
6767
failed: list[IngestDocumentResult] = field(default_factory=list)
6868

6969

70+
class IngestExecutionError(Exception):
71+
"""
72+
Represents an error that occurred during the documents ingest execution.
73+
"""
74+
75+
def __init__(self, results: list[IngestDocumentResult]) -> None:
76+
self.results = results
77+
78+
7079
class IngestStrategy(WithConstructionConfig, ABC):
7180
"""
7281
Base class for ingest strategies, responsible for orchiesting the tasks required to index the document.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import pytest
2+
3+
from ragbits.core.embeddings.noop import NoopEmbedder
4+
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
5+
from ragbits.document_search import DocumentSearch
6+
from ragbits.document_search.documents.document import Document, DocumentMeta, DocumentType
7+
from ragbits.document_search.documents.element import Element
8+
from ragbits.document_search.ingestion.parsers.base import DocumentParser
9+
from ragbits.document_search.ingestion.parsers.router import DocumentParserRouter
10+
from ragbits.document_search.ingestion.strategies.base import IngestExecutionError
11+
12+
13+
class FailingParser(DocumentParser):
14+
"""A parser that always raises an exception."""
15+
16+
supported_document_types = {DocumentType.TXT}
17+
18+
@classmethod
19+
async def parse(cls, document: Document) -> list[Element]:
20+
raise ValueError("This parser always fails")
21+
22+
23+
async def test_ingest_fails_on_error():
24+
# Create a document search instance with a failing parser
25+
document_search = DocumentSearch(
26+
vector_store=InMemoryVectorStore(embedder=NoopEmbedder()),
27+
parser_router=DocumentParserRouter({DocumentType.TXT: FailingParser()}),
28+
)
29+
30+
# Create a test document
31+
document = DocumentMeta.create_text_document_from_literal("Test content")
32+
33+
# Test that ingest raises IngestExecutionError when fail_on_error=True (default)
34+
with pytest.raises(IngestExecutionError) as exc_info:
35+
await document_search.ingest([document])
36+
37+
# Verify the error details
38+
assert len(exc_info.value.results) == 1
39+
failed_result = exc_info.value.results[0]
40+
assert failed_result.document_uri == document.id
41+
assert failed_result.num_elements == 0
42+
assert failed_result.error is not None
43+
assert isinstance(failed_result.error.type, type(ValueError))
44+
assert failed_result.error.message == "This parser always fails"
45+
46+
47+
async def test_ingest_returns_errors_when_fail_on_error_false():
48+
# Create a document search instance with a failing parser
49+
document_search = DocumentSearch(
50+
vector_store=InMemoryVectorStore(embedder=NoopEmbedder()),
51+
parser_router=DocumentParserRouter({DocumentType.TXT: FailingParser()}),
52+
)
53+
54+
# Create a test document
55+
document = DocumentMeta.create_text_document_from_literal("Test content")
56+
57+
# Test that ingest returns errors when fail_on_error=False
58+
result = await document_search.ingest([document], fail_on_error=False)
59+
60+
# Verify the result details
61+
assert len(result.successful) == 0
62+
assert len(result.failed) == 1
63+
failed_result = result.failed[0]
64+
assert failed_result.document_uri == document.id
65+
assert failed_result.num_elements == 0
66+
assert failed_result.error is not None
67+
assert isinstance(failed_result.error.type, type(ValueError))
68+
assert failed_result.error.message == "This parser always fails"

0 commit comments

Comments
 (0)