diff --git a/projects/extension/sql/idempotent/012-vectorizer-int.sql b/projects/extension/sql/idempotent/012-vectorizer-int.sql index 3e5e96214..565977785 100644 --- a/projects/extension/sql/idempotent/012-vectorizer-int.sql +++ b/projects/extension/sql/idempotent/012-vectorizer-int.sql @@ -461,7 +461,6 @@ $func$ language plpgsql volatile security invoker set search_path to pg_catalog, pg_temp ; - ------------------------------------------------------------------------------- -- _vectorizer_create_source_trigger create or replace function ai._vectorizer_create_source_trigger @@ -508,41 +507,40 @@ begin into strict _pk_change_check from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name); - -- Create the trigger function with direct string construction - _sql := format( - $sql$ - create function %I.%I() returns trigger as - $plpgsql$ - begin - if (TG_OP = 'DELETE') then - execute %L using old; - return old; - elsif (TG_OP = 'UPDATE') then - if %s then - execute %L using old; - end if; - - insert into %I.%I (%s) - values (%s); - return new; - else -- INSERT - insert into %I.%I (%s) - values (%s); - return new; + -- Create the trigger function + _sql := $create_func$ + create function $QUEUE_SCHEMA$.$TRIGGER_NAME$() returns trigger as + $trigger_func$ + begin + if (TG_OP = 'DELETE') then + execute '$delete_statement$' using old; + return old; + elsif (TG_OP = 'UPDATE') then + if $pk_change_check$ then + execute '$delete_statement$' using old; end if; - end; - $plpgsql$ language plpgsql volatile parallel safe security definer - set search_path to pg_catalog, pg_temp - $sql$, - queue_schema, trigger_name, -- Function name (1,2) - _delete_statement, -- DELETE case (3) - _pk_change_check, -- UPDATE check condition (4) - _delete_statement, -- UPDATE delete statement (5) - queue_schema, queue_table, -- Queue table for UPDATE (6,7) - _pk_columns, _pk_values, -- Columns and values for UPDATE (8,9) - queue_schema, queue_table, -- Queue table for INSERT (10,11) - _pk_columns, _pk_values -- Columns and values for INSERT (12,13) - ); + + insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$) + values ($PK_VALUES$); + return new; + else -- INSERT + insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$) + values ($PK_VALUES$); + return new; + end if; + end; + $trigger_func$ language plpgsql volatile parallel safe security definer + set search_path to pg_catalog, pg_temp + $create_func$; + + -- Replace the placeholders with actual values + _sql := replace(_sql, '$QUEUE_SCHEMA$', quote_ident(queue_schema)); + _sql := replace(_sql, '$TRIGGER_NAME$', quote_ident(trigger_name)); + _sql := replace(_sql, '$delete_statement$', _delete_statement); + _sql := replace(_sql, '$pk_change_check$', _pk_change_check); + _sql := replace(_sql, '$QUEUE_TABLE$', quote_ident(queue_table)); + _sql := replace(_sql, '$PK_COLUMNS$', _pk_columns); + _sql := replace(_sql, '$PK_VALUES$', _pk_values); execute _sql; @@ -573,6 +571,68 @@ language plpgsql volatile security invoker set search_path to pg_catalog, pg_temp ; +do $upgrade_block$ +declare + _vec record; + _version text; + _new_version text := '0.9.0'; -- Version after upgrade +begin + -- Find all vectorizers with version < 0.9.0 + for _vec in ( + select + v.id, + v.source_schema, + v.source_table, + v.source_pk, + v.target_schema, + v.target_table, + v.trigger_name, + v.queue_schema, + v.queue_table, + v.config + from ai.vectorizer v + where (v.config->>'version')::text ~ '^0\.8\.' + ) + loop + raise notice 'Upgrading trigger for vectorizer ID % from version %', _vec.id, _vec.config->>'version'; + + -- Drop existing trigger + execute format( + 'drop trigger if exists %I on %I.%I', + _vec.trigger_name, + _vec.source_schema, + _vec.source_table + ); + + -- Drop existing trigger function + execute format( + 'drop function if exists %I.%I()', + _vec.queue_schema, + _vec.trigger_name + ); + + -- Create new trigger + perform ai._vectorizer_create_source_trigger( + _vec.trigger_name, + _vec.queue_schema, + _vec.queue_table, + _vec.source_schema, + _vec.source_table, + _vec.target_schema, + _vec.target_table, + _vec.source_pk + ); + + -- Update the version in the config + update ai.vectorizer + set config = jsonb_set(config, '{version}', format('"%s"', _new_version)::jsonb) + where id = _vec.id; + + raise notice 'Successfully upgraded trigger for vectorizer ID % to version %', _vec.id, _new_version; + end loop; +end; +$upgrade_block$; + ------------------------------------------------------------------------------- -- _vectorizer_vector_index_exists create or replace function ai._vectorizer_vector_index_exists diff --git a/projects/extension/sql/incremental/018-add-delete-handling-to-trigger.sql b/projects/extension/sql/incremental/018-add-delete-handling-to-trigger.sql deleted file mode 100644 index 9af5b5a78..000000000 --- a/projects/extension/sql/incremental/018-add-delete-handling-to-trigger.sql +++ /dev/null @@ -1,165 +0,0 @@ -do language plpgsql $block$ -DECLARE - _vectorizer RECORD; - _function_body text; - _pk_cols text; - _sql text; -BEGIN - -- Loop through all vectorizers - FOR _vectorizer IN - SELECT - v.id, - v.target_schema, - v.target_table, - v.source_schema, - v.source_table, - v.queue_schema, - v.queue_table, - v.trigger_name, - v.source_pk - FROM ai.vectorizer v - LOOP - -- Get the PK columns for the WHERE clause - SELECT pg_catalog.string_agg( - pg_catalog.format('%I = $1.%I', x.attname, x.attname), - ' AND ' - order by x.pknum - ) - INTO STRICT _pk_cols - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(pknum int, attname name); - - -- Drop the old trigger - EXECUTE format( - 'DROP TRIGGER IF EXISTS %I ON %I.%I', - _vectorizer.trigger_name, - _vectorizer.source_schema, - _vectorizer.source_table - ); - - -- Drop the old trigger function - EXECUTE format( - 'DROP FUNCTION IF EXISTS %I.%I()', - _vectorizer.queue_schema, - _vectorizer.trigger_name - ); - - -- Create the new trigger function with updated logic - SELECT format( - $sql$ - CREATE FUNCTION %I.%I() RETURNS trigger - AS $trg$ - BEGIN - IF (TG_OP = 'DELETE') THEN - -- Delete all chunks in target table related to this row - EXECUTE pg_catalog.format( - 'DELETE FROM %I.%I WHERE %s', - %L, %L, %L - ) USING OLD; - RETURN OLD; - ELSIF (TG_OP = 'UPDATE') THEN - -- If PK has changed, delete old chunks and queue the new row - IF %s THEN - -- Delete old chunks - EXECUTE pg_catalog.format( - 'DELETE FROM %I.%I WHERE %s', - %L, %L, %L - ) USING OLD; - - -- Queue the new row for processing - INSERT INTO %I.%I (%s) - VALUES (%s); - ELSE - -- If only non-PK columns changed, just queue the new row - INSERT INTO %I.%I (%s) - VALUES (%s); - END IF; - RETURN NEW; - ELSIF (TG_OP = 'INSERT') THEN - -- Queue the new row - INSERT INTO %I.%I (%s) - VALUES (%s); - RETURN NEW; - END IF; - RETURN NULL; - END; - $trg$ LANGUAGE plpgsql VOLATILE PARALLEL SAFE SECURITY DEFINER - SET search_path = pg_catalog, pg_temp; - $sql$, - _vectorizer.queue_schema, - _vectorizer.trigger_name, - -- Parameters for the dynamic delete query - _vectorizer.target_schema, - _vectorizer.target_table, - _pk_cols, - -- PK change detection condition - ( - SELECT pg_catalog.string_agg( - pg_catalog.format('OLD.%I IS DISTINCT FROM NEW.%I', x.attname, x.attname), - ' OR ' - ORDER BY x.pknum - ) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(pknum int, attname name) - ), - -- Target params for second delete - _vectorizer.target_schema, - _vectorizer.target_table, - _pk_cols, - -- Queue table parameters for PK change insert - _vectorizer.queue_schema, - _vectorizer.queue_table, - ( - SELECT pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' ORDER BY x.attnum) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(attnum int, attname name) - ), - ( - SELECT pg_catalog.string_agg(pg_catalog.format('NEW.%I', x.attname), ', ' ORDER BY x.attnum) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(attnum int, attname name) - ), - -- Queue table parameters for normal update insert - _vectorizer.queue_schema, - _vectorizer.queue_table, - ( - SELECT pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' ORDER BY x.attnum) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(attnum int, attname name) - ), - ( - SELECT pg_catalog.string_agg(pg_catalog.format('NEW.%I', x.attname), ', ' ORDER BY x.attnum) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(attnum int, attname name) - ), - -- Queue table parameters for insert - _vectorizer.queue_schema, - _vectorizer.queue_table, - ( - SELECT pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' ORDER BY x.attnum) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(attnum int, attname name) - ), - ( - SELECT pg_catalog.string_agg(pg_catalog.format('NEW.%I', x.attname), ', ' ORDER BY x.attnum) - FROM pg_catalog.jsonb_to_recordset(_vectorizer.source_pk) x(attnum int, attname name) - ) - ) INTO _sql; - - -- Create the new trigger function - EXECUTE _sql; - - -- Revoke all on trigger function from public - EXECUTE format( - 'REVOKE ALL ON FUNCTION %I.%I() FROM PUBLIC', - _vectorizer.queue_schema, - _vectorizer.trigger_name - ); - - -- Create the new trigger - EXECUTE format( - 'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON %I.%I FOR EACH ROW EXECUTE FUNCTION %I.%I()', - _vectorizer.trigger_name, - _vectorizer.source_schema, - _vectorizer.source_table, - _vectorizer.queue_schema, - _vectorizer.trigger_name - ); - - RAISE NOTICE 'Updated trigger % for vectorizer %', _vectorizer.trigger_name, _vectorizer.id; - END LOOP; -END; -$block$; \ No newline at end of file diff --git a/projects/extension/tests/contents/output16.expected b/projects/extension/tests/contents/output16.expected index 8966bcb6a..8e5e3d30c 100644 --- a/projects/extension/tests/contents/output16.expected +++ b/projects/extension/tests/contents/output16.expected @@ -81,7 +81,7 @@ CREATE EXTENSION function ai._validate_scheduling(jsonb) function ai._vectorizer_create_dependencies(integer) function ai._vectorizer_create_queue_table(name,name,jsonb,name[]) - function ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb) + function ai._vectorizer_create_source_trigger(name,name,name,name,name,name,name,jsonb) function ai._vectorizer_create_target_table(name,name,jsonb,name,name,integer,name[]) function ai._vectorizer_create_vector_index(name,name,jsonb) function ai._vectorizer_create_view(name,name,name,name,jsonb,name,name,name[]) diff --git a/projects/extension/tests/contents/output17.expected b/projects/extension/tests/contents/output17.expected index 5d58ffa79..e5c4ce935 100644 --- a/projects/extension/tests/contents/output17.expected +++ b/projects/extension/tests/contents/output17.expected @@ -81,7 +81,7 @@ CREATE EXTENSION function ai._validate_scheduling(jsonb) function ai._vectorizer_create_dependencies(integer) function ai._vectorizer_create_queue_table(name,name,jsonb,name[]) - function ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb) + function ai._vectorizer_create_source_trigger(name,name,name,name,name,name,name,jsonb) function ai._vectorizer_create_target_table(name,name,jsonb,name,name,integer,name[]) function ai._vectorizer_create_vector_index(name,name,jsonb) function ai._vectorizer_create_view(name,name,name,name,jsonb,name,name,name[]) diff --git a/projects/extension/tests/privileges/function.expected b/projects/extension/tests/privileges/function.expected index 90843a852..30065c861 100644 --- a/projects/extension/tests/privileges/function.expected +++ b/projects/extension/tests/privileges/function.expected @@ -52,10 +52,10 @@ f | bob | execute | no | ai | _vectorizer_create_queue_table(queue_schema name, queue_table name, source_pk jsonb, grant_to name[]) f | fred | execute | no | ai | _vectorizer_create_queue_table(queue_schema name, queue_table name, source_pk jsonb, grant_to name[]) f | jill | execute | YES | ai | _vectorizer_create_queue_table(queue_schema name, queue_table name, source_pk jsonb, grant_to name[]) - f | alice | execute | YES | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, source_pk jsonb) - f | bob | execute | no | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, source_pk jsonb) - f | fred | execute | no | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, source_pk jsonb) - f | jill | execute | YES | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, source_pk jsonb) + f | alice | execute | YES | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, target_schema name, target_table name, source_pk jsonb) + f | bob | execute | no | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, target_schema name, target_table name, source_pk jsonb) + f | fred | execute | no | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, target_schema name, target_table name, source_pk jsonb) + f | jill | execute | YES | ai | _vectorizer_create_source_trigger(trigger_name name, queue_schema name, queue_table name, source_schema name, source_table name, target_schema name, target_table name, source_pk jsonb) f | alice | execute | YES | ai | _vectorizer_create_target_table(source_schema name, source_table name, source_pk jsonb, target_schema name, target_table name, dimensions integer, grant_to name[]) f | bob | execute | no | ai | _vectorizer_create_target_table(source_schema name, source_table name, source_pk jsonb, target_schema name, target_table name, dimensions integer, grant_to name[]) f | fred | execute | no | ai | _vectorizer_create_target_table(source_schema name, source_table name, source_pk jsonb, target_schema name, target_table name, dimensions integer, grant_to name[]) diff --git a/projects/extension/tests/upgrade/test_upgrade.py b/projects/extension/tests/upgrade/test_upgrade.py index a399732d4..3d3042490 100644 --- a/projects/extension/tests/upgrade/test_upgrade.py +++ b/projects/extension/tests/upgrade/test_upgrade.py @@ -222,3 +222,108 @@ def test_production_version_upgrade_path(): Path(__file__).parent.absolute().joinpath("upgrade1.snapshot").read_text() ) assert upgrade0 == upgrade1 + + +def test_vectorizer_trigger_upgrade(): + create_user(USER) + create_user(OTHER_USER) + create_database("trigger_upgrade") + + # Create extension at 0.8.0 + create_extension("trigger_upgrade", "0.8.0") + assert check_version("trigger_upgrade") == "0.8.0" + + # Create a test table and vectorizer + with psycopg.connect(db_url(user=USER, dbname="trigger_upgrade"), autocommit=True) as con: + with con.cursor() as cur: + # Create test table + cur.execute(""" + CREATE TABLE public.upgrade_test ( + id int primary key, + content text not null + ) + """) + + # Create vectorizer + cur.execute(""" + SELECT ai.create_vectorizer( + 'public.upgrade_test'::regclass, + embedding=>ai.embedding_openai('text-embedding-3-small', 768), + chunking=>ai.chunking_character_text_splitter('content'), + scheduling=>ai.scheduling_none() + ) + """) + vectorizer_id = cur.fetchone()[0] + + # Get the trigger function definition before upgrade + cur.execute(""" + SELECT p.prosrc + FROM pg_proc p + JOIN pg_trigger t ON t.tgfoid = p.oid + JOIN pg_class c ON t.tgrelid = c.oid + JOIN ai.vectorizer v ON v.trigger_name = t.tgname + WHERE v.id = %s + """, (vectorizer_id,)) + old_trigger_def = cur.fetchone()[0] + + # Verify old trigger doesn't handle PK changes + assert "IS DISTINCT FROM" not in old_trigger_def + + # Upgrade to the new version + update_extension("trigger_upgrade", "0.8.1-dev") + assert check_version("trigger_upgrade") == "0.8.1-dev" + + # Get the new trigger function definition + cur.execute(""" + SELECT p.prosrc + FROM pg_proc p + JOIN pg_trigger t ON t.tgfoid = p.oid + JOIN pg_class c ON t.tgrelid = c.oid + JOIN ai.vectorizer v ON v.trigger_name = t.tgname + WHERE v.id = %s + """, (vectorizer_id,)) + new_trigger_def = cur.fetchone()[0] + + # Verify new trigger has the PK change handling + assert "IS DISTINCT FROM" in new_trigger_def + + # Check that the version in the config was updated + cur.execute(""" + SELECT config->>'version' + FROM ai.vectorizer + WHERE id = %s + """, (vectorizer_id,)) + version = cur.fetchone()[0] + assert version == "0.9.0" + + # Test trigger functionality + # Insert a row + cur.execute("INSERT INTO public.upgrade_test VALUES (1, 'test content')") + + # Verify row was queued + cur.execute(""" + SELECT EXISTS ( + SELECT 1 + FROM ai._vectorizer_q_1 + WHERE id = 1 + ) + """) + assert cur.fetchone()[0] + + # Update with PK change + cur.execute("UPDATE public.upgrade_test SET id = 2 WHERE id = 1") + + # Verify old row was deleted from target and new PK was queued + cur.execute(""" + SELECT NOT EXISTS ( + SELECT 1 + FROM upgrade_test_embedding_store + WHERE id = 1 + ) + AND EXISTS ( + SELECT 1 + FROM ai._vectorizer_q_1 + WHERE id = 2 + ) + """) + assert cur.fetchone()[0] \ No newline at end of file