diff --git a/.changeset/six-frogs-thank.md b/.changeset/six-frogs-thank.md new file mode 100644 index 0000000000..9485a0338d --- /dev/null +++ b/.changeset/six-frogs-thank.md @@ -0,0 +1,11 @@ +--- +"llamaindex": patch +--- + +Fix database insertion for `PGVectorStore` + +It will now: + +- throw an error if there is an insertion error. +- Upsert documents with the same id. +- add all documents to the database as a single `INSERT` call (inside a transaction). diff --git a/packages/llamaindex/src/storage/vectorStore/PGVectorStore.ts b/packages/llamaindex/src/storage/vectorStore/PGVectorStore.ts index 5c426bd6bc..f4bfff31f3 100644 --- a/packages/llamaindex/src/storage/vectorStore/PGVectorStore.ts +++ b/packages/llamaindex/src/storage/vectorStore/PGVectorStore.ts @@ -200,34 +200,53 @@ export class PGVectorStore * @returns A list of zero or more id values for the created records. */ async add(embeddingResults: BaseNode[]): Promise { - if (embeddingResults.length == 0) { + if (embeddingResults.length === 0) { console.debug("Empty list sent to PGVectorStore::add"); - return Promise.resolve([]); + return []; } - const sql: string = `INSERT INTO ${this.schemaName}.${this.tableName} - (id, external_id, collection, document, metadata, embeddings) - VALUES ($1, $2, $3, $4, $5, $6)`; - const db = await this.getDb(); - const data = this.getDataToInsert(embeddingResults); - const ret: string[] = []; - for (let index = 0; index < data.length; index++) { - const params = data[index]; - try { - const result = await db.query(sql, params); - if (result.rows.length) { - const id = result.rows[0].id as string; - ret.push(id); - } - } catch (err) { - const msg = `${err}`; - console.log(msg, err); - } + try { + await db.query("BEGIN"); + + const data = this.getDataToInsert(embeddingResults); + + const placeholders = data + .map( + (_, index) => + `($${index * 6 + 1}, ` + + `$${index * 6 + 2}, ` + + `$${index * 6 + 3}, ` + + `$${index * 6 + 4}, ` + + `$${index * 6 + 5}, ` + + `$${index * 6 + 6})`, + ) + .join(", "); + + const sql = ` + INSERT INTO ${this.schemaName}.${this.tableName} + (id, external_id, collection, document, metadata, embeddings) + VALUES ${placeholders} + ON CONFLICT (id) DO UPDATE SET + external_id = EXCLUDED.external_id, + collection = EXCLUDED.collection, + document = EXCLUDED.document, + metadata = EXCLUDED.metadata, + embeddings = EXCLUDED.embeddings + RETURNING id + `; + + const flattenedParams = data.flat(); + const result = await db.query(sql, flattenedParams); + + await db.query("COMMIT"); + + return result.rows.map((row) => row.id as string); + } catch (error) { + await db.query("ROLLBACK"); + throw error; } - - return Promise.resolve(ret); } /**