Skip to content

Commit

Permalink
fix: database insertion for PGVectorStore (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanleecode authored Sep 6, 2024
1 parent ae1149f commit 83d7f41
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
11 changes: 11 additions & 0 deletions .changeset/six-frogs-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"llamaindex": patch
---

Fix database insertion for `PGVectorStore`

It will now:

- throw an error if there is an insertion error.
- Upsert documents with the same id.
- add all documents to the database as a single `INSERT` call (inside a transaction).
63 changes: 41 additions & 22 deletions packages/llamaindex/src/storage/vectorStore/PGVectorStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,34 +200,53 @@ export class PGVectorStore
* @returns A list of zero or more id values for the created records.
*/
async add(embeddingResults: BaseNode<Metadata>[]): Promise<string[]> {
if (embeddingResults.length == 0) {
if (embeddingResults.length === 0) {
console.debug("Empty list sent to PGVectorStore::add");
return Promise.resolve([]);
return [];
}

const sql: string = `INSERT INTO ${this.schemaName}.${this.tableName}
(id, external_id, collection, document, metadata, embeddings)
VALUES ($1, $2, $3, $4, $5, $6)`;

const db = await this.getDb();
const data = this.getDataToInsert(embeddingResults);

const ret: string[] = [];
for (let index = 0; index < data.length; index++) {
const params = data[index];
try {
const result = await db.query(sql, params);
if (result.rows.length) {
const id = result.rows[0].id as string;
ret.push(id);
}
} catch (err) {
const msg = `${err}`;
console.log(msg, err);
}
try {
await db.query("BEGIN");

const data = this.getDataToInsert(embeddingResults);

const placeholders = data
.map(
(_, index) =>
`($${index * 6 + 1}, ` +
`$${index * 6 + 2}, ` +
`$${index * 6 + 3}, ` +
`$${index * 6 + 4}, ` +
`$${index * 6 + 5}, ` +
`$${index * 6 + 6})`,
)
.join(", ");

const sql = `
INSERT INTO ${this.schemaName}.${this.tableName}
(id, external_id, collection, document, metadata, embeddings)
VALUES ${placeholders}
ON CONFLICT (id) DO UPDATE SET
external_id = EXCLUDED.external_id,
collection = EXCLUDED.collection,
document = EXCLUDED.document,
metadata = EXCLUDED.metadata,
embeddings = EXCLUDED.embeddings
RETURNING id
`;

const flattenedParams = data.flat();
const result = await db.query(sql, flattenedParams);

await db.query("COMMIT");

return result.rows.map((row) => row.id as string);
} catch (error) {
await db.query("ROLLBACK");
throw error;
}

return Promise.resolve(ret);
}

/**
Expand Down

0 comments on commit 83d7f41

Please sign in to comment.