Skip to content

Commit 624b6e7

Browse files
authored
docs: update Document Search how-to (#455)
1 parent b4c064b commit 624b6e7

11 files changed

+537
-579
lines changed

docs/api_reference/document_search/documents.md

+15-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,18 @@
88

99
::: ragbits.document_search.documents.element.Element
1010

11-
::: ragbits.document_search.documents.sources.Source
11+
::: ragbits.document_search.documents.sources.Source
12+
13+
::: ragbits.document_search.documents.sources.AzureBlobStorageSource
14+
15+
::: ragbits.document_search.documents.sources.GCSSource
16+
17+
::: ragbits.document_search.documents.sources.GitSource
18+
19+
::: ragbits.document_search.documents.sources.HuggingFaceSource
20+
21+
::: ragbits.document_search.documents.sources.LocalFileSource
22+
23+
::: ragbits.document_search.documents.sources.S3Source
24+
25+
::: ragbits.document_search.documents.sources.WebSource
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
# How-To: Ingest Documents
2+
3+
The Ragbits document ingest pipeline consists of four main steps: loading, parsing, enrichment, and indexing. All of these steps can be orchestrated using different strategies, depending on the expected load.
4+
5+
## Loading sources
6+
7+
Before a document can be processed, it must be defined and downloaded. In Ragbits, there are a few ways to do this: you can specify the source URI, the source instance, the document metadata or the document itself.
8+
9+
=== "URI"
10+
11+
```python
12+
from ragbits.document_search import DocumentSearch
13+
14+
document_search = DocumentSearch(...)
15+
16+
await document_search.ingest("s3://")
17+
```
18+
19+
=== "Source"
20+
21+
```python
22+
from ragbits.document_search.documents.sources import WebSource
23+
from ragbits.document_search import DocumentSearch
24+
25+
document_search = DocumentSearch(...)
26+
27+
await document_search.ingest([WebSource(...), ...])
28+
```
29+
30+
=== "Metadata"
31+
32+
```python
33+
from ragbits.document_search.documents.document import DocumentMeta
34+
from ragbits.document_search import DocumentSearch
35+
36+
document_search = DocumentSearch(...)
37+
38+
await document_search.ingest([DocumentMeta.from_local_path(...), ...])
39+
```
40+
41+
=== "Document"
42+
43+
```python
44+
from ragbits.document_search.documents.document import Document
45+
from ragbits.document_search import DocumentSearch
46+
47+
document_search = DocumentSearch(...)
48+
49+
await document_search.ingest([Document(...), ...])
50+
```
51+
52+
### Supported sources
53+
54+
This is the list of currently supported sources by Ragbits.
55+
56+
| Source | URI Schema | Class |
57+
|-|-|-|
58+
| Azure Blob Storage | `azure://https://account_name.blob.core.windows.net/<container-name>|<blob-name>` | [`AzureBlobStorageSource`][ragbits.document_search.documents.sources.AzureBlobStorageSource] |
59+
| Google Cloud Storage | `gcs://<bucket-name>/<prefix>` | [`GCSSource`][ragbits.document_search.documents.sources.GCSSource] |
60+
| Git | `git://<https-url>|<ssh-url>` | [`GitSource`][ragbits.document_search.documents.sources.GitSource] |
61+
| Hugging Face | `huggingface://<dataset-path>/<split>/<row>` | [`HuggingFaceSource`][ragbits.document_search.documents.sources.HuggingFaceSource] |
62+
| Local file | `file://<file-path>|<blob-pattern>` | [`LocalFileSource`][ragbits.document_search.documents.sources.LocalFileSource] |
63+
| Amazon S3 | `s3://<bucket-name>/<prefix>` | [`S3Source`][ragbits.document_search.documents.sources.S3Source] |
64+
| Web | `web://<https-url>` | [`WebSource`][ragbits.document_search.documents.sources.WebSource] |
65+
66+
To define a new sources, extend the [`Source`][ragbits.document_search.documents.sources.Source] class.
67+
68+
```python
69+
from ragbits.document_search.documents.sources import Source
70+
71+
72+
class CustomSource(Source):
73+
"""
74+
Source that downloads file from the web.
75+
"""
76+
77+
protocol: ClassVar[str] = "custom"
78+
source_url: str
79+
...
80+
81+
@property
82+
def id(self) -> str:
83+
"""
84+
Source unique identifier.
85+
"""
86+
return f"{self.protocol}:{self.source_url}"
87+
88+
@classmethod
89+
async def from_uri(cls, uri: str) -> list[Self]:
90+
"""
91+
Create source instances from a URI path.
92+
93+
Args:
94+
uri: The URI path.
95+
96+
Returns:
97+
The list of sources.
98+
"""
99+
return [cls(...), ...]
100+
101+
async def fetch(self) -> Path:
102+
"""
103+
Download a file for the given url.
104+
105+
Returns:
106+
The local path to the downloaded file.
107+
"""
108+
...
109+
return Path(f"/tmp/{self.source_url}")
110+
```
111+
112+
## Parsing documents
113+
114+
Depending on the document type, different parsers operate in the background to convert the document into a list of elements. Ragbits primarily relies on the [`unstructured`](https://github.com/Unstructured-IO/unstructured) library, which supports parsing and chunking for most common document formats (e.g., PDF, Markdown, DOC, JPG).
115+
116+
To define a new parser, extend the [`DocumentParser`][ragbits.document_search.ingestion.parsers.base.DocumentParser] class.
117+
118+
```python
119+
from bs4 import BeautifulSoup
120+
from ragbits.document_search.documents.document import Document, DocumentType
121+
from ragbits.document_search.documents.element import Element
122+
from ragbits.document_search.ingestion.parsers import DocumentParser
123+
124+
125+
class HTMLDocumentParser(DocumentParser):
126+
"""
127+
Parser that uses the Beautiful Soup to process the documents.
128+
"""
129+
130+
supported_document_types = {DocumentType.HTML}
131+
132+
async def parse(self, document: Document) -> list[Element]:
133+
"""
134+
Parse the HTML document using the Beautiful Soup.
135+
136+
Args:
137+
document: The document to parse.
138+
139+
Returns:
140+
The list of elements extracted from the document.
141+
"""
142+
dom = BeautifulSoup(document.local_path.read_text(), "html.parser")
143+
...
144+
return [
145+
TextElement(document_meta=document.metadata, ...),
146+
...
147+
]
148+
```
149+
150+
To apply the new parser, define a [`DocumentParserRouter`][ragbits.document_search.ingestion.parsers.DocumentParserRouter] and assign it to the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance.
151+
152+
```python
153+
from ragbits.document_search import DocumentSearch
154+
from ragbits.document_search.documents.document import DocumentType
155+
from ragbits.document_search.ingestion.parsers import DocumentParserRouter
156+
157+
parser_router = DocumentParserRouter({
158+
DocumentType.HTML: HTMLDocumentParser(),
159+
...
160+
})
161+
document_search = DocumentSearch(parser_router=parser_router, ...)
162+
```
163+
164+
## Enriching elements
165+
166+
After parsing the document, the resulting elements can optionally be enriched. Element enrichers generate additional information about elements, such as text summaries or image descriptions. Most enrichers are lightweight wrappers around LLMs that process elements in a specific format. By default, Ragbits enriches image elements with descriptions using the preferred VLM.
167+
168+
To define a new enricher, extend the [`ElementEnricher`][ragbits.document_search.ingestion.enrichers.base.ElementEnricher] class.
169+
170+
```python
171+
from ragbits.document_search.documents.element import TextElement
172+
from ragbits.document_search.ingestion.enrichers import ElementEnricher
173+
174+
175+
class TextElementEnricher(ElementEnricher[TextElement]):
176+
"""
177+
Enricher that summarizes text elements using LLM.
178+
"""
179+
180+
async def enrich(self, elements: list[TextElement]) -> list[TextElement]:
181+
"""
182+
Enrich text elements with the text summary.
183+
184+
Args:
185+
elements: The text elements to be enriched.
186+
187+
Returns:
188+
The list of enriched text elements.
189+
"""
190+
responses = await llm.generate(TextSummarizerPrompt(...))
191+
...
192+
return [
193+
TextElement(
194+
document_meta=element.document_meta,
195+
content=...,
196+
),
197+
...
198+
]
199+
```
200+
201+
To apply the new enricher, define a [`ElementEnricherRouter`][ragbits.document_search.ingestion.enrichers.ElementEnricherRouter] and assign it to the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance.
202+
203+
```python
204+
from ragbits.document_search import DocumentSearch
205+
from ragbits.document_search.documents.element import TextElement
206+
from ragbits.document_search.ingestion.enrichers import ElementEnricherRouter
207+
208+
enricher_router = ElementEnricherRouter({
209+
TextElement: TextElementEnricher(),
210+
...
211+
})
212+
document_search = DocumentSearch(enricher_router=enricher_router, ...)
213+
```
214+
215+
## Indexing elements
216+
217+
At the end of the ingestion process, elements are indexed into the vector database. First, the vector store is scanned to identify and remove any existing elements from sources that are about to be ingested. Then, the new elements are inserted, ensuring that only the latest versions of the sources remain. Indexing is performed in batches, allowing all elements from a batch of documents to be processed in a single request to the database, which improves efficiency and speeds up the process.
218+
219+
## Orchestrating ingest tasks
220+
221+
Running an ingest pipeline can be time-consuming, depending on your expected load. Ragbits offers three built-in ingest strategies that you can use out of the box for your workload, or you can implement a custom strategy to suit your needs.
222+
223+
=== "Sequential"
224+
225+
```python
226+
from ragbits.document_search import DocumentSearch
227+
from ragbits.document_search.ingestion.strategies import SequentialIngestStrategy
228+
229+
ingest_strategy = SequentialIngestStrategy()
230+
document_search = DocumentSearch(ingest_strategy=ingest_strategy, ...)
231+
232+
await document_search.ingest("s3://")
233+
```
234+
235+
The default ingest strategy in Ragbits is [`SequentialIngestStrategy`][ragbits.document_search.ingestion.strategies.SequentialIngestStrategy]. This strategy processes documents one by one, waiting for each document to be processed before moving on to the next. Although it's the simplest and most straightforward strategy, it may be slow when processing a large number of documents.
236+
237+
=== "Batched"
238+
239+
```python
240+
from ragbits.document_search import DocumentSearch
241+
from ragbits.document_search.ingestion.strategies import BatchedIngestStrategy
242+
243+
ingest_strategy = BatchedIngestStrategy(batch_size=10)
244+
document_search = DocumentSearch(ingest_strategy=ingest_strategy, ...)
245+
246+
await document_search.ingest("s3://")
247+
```
248+
249+
If you need to process documents simultaneously, you can use the [`BatchedIngestStrategy`][ragbits.document_search.ingestion.strategies.BatchedIngestStrategy] strategy. This strategy uses Python built-in `asyncio` to process documents concurrently, making it faster than the [`SequentialIngestStrategy`][ragbits.document_search.ingestion.strategies.SequentialIngestStrategy] strategy, especially with large document volumes.
250+
251+
=== "Ray Distributed"
252+
253+
```python
254+
from ragbits.document_search import DocumentSearch
255+
from ragbits.document_search.ingestion.strategies import RayDistributedIngestStrategy
256+
257+
ingest_strategy = RayDistributedIngestStrategy(cpu_batch_size=1, io_batch_size=5)
258+
document_search = DocumentSearch(ingest_strategy=ingest_strategy, ...)
259+
260+
await document_search.ingest("s3://")
261+
```
262+
263+
If you need even better performance, you can use the [`RayDistributedIngestStrategy`][ragbits.document_search.ingestion.strategies.RayDistributedIngestStrategy] strategy. By default, when run outside of a Ray cluster, the Ray Core library will parallelize the processing of documents on the local machine, using available CPU cores.
264+
265+
When run inside a Ray cluster, the Ray Core library will parallelize the processing of documents across the nodes in the cluster. There are several ways of sending documents to the Ray cluster for processing, but using Ray Jobs API is by far the most recommended one.
266+
267+
To use Ray Jobs API, you should prepare the processing script and the documents to be processed, and then submit the job to the Ray cluster.
268+
Make sure to replace `<cluster_address>` with the address of your Ray cluster and adjust the `entrypoint` and `runtime_env` parameters to match your setup.
269+
270+
```python
271+
from ray.job_submission import JobSubmissionClient
272+
273+
client = JobSubmissionClient("http://<cluster_address>:8265")
274+
client.submit_job(
275+
entrypoint="python script.py",
276+
runtime_env={
277+
"working_dir": "./",
278+
"pip": [
279+
"ragbits-core",
280+
"ragbits-document-search[ray]"
281+
]
282+
},
283+
)
284+
```
285+
286+
Ray Jobs is also available as CLI commands. You can submit a job using the following command:
287+
288+
```bash
289+
ray job submit \
290+
--address http://<cluster_address>:8265 \
291+
--runtime-env '{"pip": ["ragbits-core", "ragbits-document-search[ray]"]}' \
292+
--working-dir . \
293+
--python script.py
294+
```
295+
296+
There are also other ways to submit jobs to the Ray cluster. For more information, please refer to the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/index.html).
297+
298+
To define a new ingest strategy, extend the [`IngestStrategy`][ragbits.document_search.ingestion.strategies.IngestStrategy] class.
299+
300+
```python
301+
from ragbits.core.vector_stores import VectorStore
302+
from ragbits.document_search.documents.document import Document, DocumentMeta
303+
from ragbits.document_search.documents.sources import Source
304+
from ragbits.document_search.ingestion.enrichers import ElementEnricherRouter
305+
from ragbits.document_search.ingestion.parsers import DocumentParserRouter
306+
from ragbits.document_search.ingestion.strategies import (
307+
IngestDocumentResult,
308+
IngestError,
309+
IngestExecutionResult,
310+
IngestStrategy,
311+
)
312+
313+
314+
class DelayedIngestStrategy(IngestStrategy):
315+
"""
316+
Ingest strategy that processes documents in sequence, one at a time with a small delay.
317+
"""
318+
319+
async def __call__(
320+
self,
321+
documents: Iterable[DocumentMeta | Document | Source],
322+
vector_store: VectorStore,
323+
parser_router: DocumentParserRouter,
324+
enricher_router: ElementEnricherRouter,
325+
) -> IngestExecutionResult:
326+
"""
327+
Ingest documents sequentially one by one with a small delay.
328+
329+
Args:
330+
documents: The documents to ingest.
331+
vector_store: The vector store to store document chunks.
332+
parser_router: The document parser router to use.
333+
enricher_router: The intermediate element enricher router to use.
334+
335+
Returns:
336+
The ingest execution result.
337+
"""
338+
results = IngestExecutionResult()
339+
340+
for document in documents:
341+
try:
342+
# Parse
343+
parsed_elements = await self._call_with_error_handling(self._parse_document, ...)
344+
345+
# Enrich
346+
enriched_elements = await self._call_with_error_handling(self._enrich_elements, ...)
347+
348+
# Index
349+
await self._call_with_error_handling(self._remove_elements, ...)
350+
await self._call_with_error_handling(self._insert_elements, ...)
351+
352+
# Artificial delay
353+
await asyncio.sleep(1)
354+
355+
except Exception as exc:
356+
results.failed.append(IngestDocumentResult(error=IngestError.from_exception(exc), ...))
357+
else:
358+
results.successful.append(IngestDocumentResult(...))
359+
360+
return results
361+
```

0 commit comments

Comments
 (0)