Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove foreign key, make trigger handle deletes #485

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
238 changes: 176 additions & 62 deletions projects/extension/sql/idempotent/012-vectorizer-int.sql
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ begin
, chunk text not null
, embedding @extschema:vector@.vector(%L) storage main not null
, unique (%s, chunk_seq)
, foreign key (%s) references %I.%I (%s) on delete cascade
)
$sql$
, target_schema, target_table
Expand All @@ -146,9 +145,6 @@ begin
)
, dimensions
, _pk_cols
, _pk_cols
, source_schema, source_table
, _pk_cols
) into strict _sql
;
execute _sql;
Expand Down Expand Up @@ -467,74 +463,128 @@ set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_source_trigger
create or replace function ai._vectorizer_create_source_trigger
( trigger_name pg_catalog.name
, queue_schema pg_catalog.name
-- _build_vectorizer_trigger_definition
create or replace function ai._build_vectorizer_trigger_definition
Comment on lines +466 to +467
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably call this ai._vectorizer_build_trigger_definition so as to fit the pattern

( queue_schema pg_catalog.name
, queue_table pg_catalog.name
, source_schema pg_catalog.name
, source_table pg_catalog.name
, target_schema pg_catalog.name
, target_table pg_catalog.name
, source_pk pg_catalog.jsonb
) returns void as
) returns pg_catalog.text as
$func$
declare
_sql pg_catalog.text;
_pk_change_check pg_catalog.text;
_delete_statement pg_catalog.text;
_pk_columns pg_catalog.text;
_pk_values pg_catalog.text;
_func_def pg_catalog.text;
begin
-- create the trigger function
-- the trigger function is security definer
-- the owner of the source table is creating the trigger function
-- so the trigger function is run as the owner of the source table
-- who also owns the queue table
-- this means anyone with insert/update on the source is able
-- to enqueue rows in the queue table automatically
-- since the trigger function only does inserts, this should be safe
select pg_catalog.format
( $sql$
create function %I.%I() returns trigger
as $plpgsql$
-- Pre-calculate all the parts we need
select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum)
into strict _pk_columns
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

select pg_catalog.string_agg(pg_catalog.format('new.%I', x.attname), ', ' order by x.attnum)
into strict _pk_values
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

-- Create delete statement for deleted rows
_delete_statement := format('delete from %I.%I where ', target_schema, target_table) operator(pg_catalog.||)
(select string_agg(
quote_ident(attname) operator(pg_catalog.||) ' = $1.' operator(pg_catalog.||) quote_ident(attname),
' and '
Comment on lines +494 to +495
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you prefer using the concatenate operator instead of the format function? It's a matter of taste, but I find format a little easier to read and understand what the output is supposed to look like. But this works

)
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name));

-- Create the primary key change check expression
select string_agg(
'old.' operator(pg_catalog.||) quote_ident(attname) operator(pg_catalog.||) ' IS DISTINCT FROM new.' operator(pg_catalog.||) quote_ident(attname),
' OR '
)
into strict _pk_change_check
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

-- Build the complete function definition
_func_def := $def$
begin
insert into %I.%I (%s)
values (%s);
return null;
if (TG_OP = 'DELETE') then
execute '$DELETE_STATEMENT$' using old;
return old;
elsif (TG_OP = 'UPDATE') then
if $PK_CHANGE_CHECK$ then
execute '$DELETE_STATEMENT$' using old;
end if;
Comment on lines +510 to +516
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for using EXECUTE here instead of a "plain" DELETE statement like the INSERT statement below?


insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
values ($PK_VALUES$);
return new;
else
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go ahead and handle when TG_OP = 'TRUNCATE'. We can just truncate the queue and the target.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. Truncate may require a separate trigger. I'm not sure if you can do a for each row trigger on a truncate op. If it requires a separate trigger, I guess let's skip it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. i bet you could have two triggers with one trigger function.

insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
values ($PK_VALUES$);
return new;
end if;
end;
$plpgsql$ language plpgsql volatile parallel safe security definer
set search_path to pg_catalog, pg_temp
$sql$
, queue_schema, trigger_name
, queue_schema, queue_table
, (
select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum)
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name)
)
, (
select pg_catalog.string_agg(pg_catalog.format('new.%I', x.attname), ', ' order by x.attnum)
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name)
)
) into strict _sql
;
execute _sql;
$def$;

-- revoke all on trigger function from public
select pg_catalog.format
( $sql$
revoke all on function %I.%I() from public
$sql$
, queue_schema, trigger_name
) into strict _sql
;
-- Replace placeholders
_func_def := replace(_func_def, '$DELETE_STATEMENT$', _delete_statement);
_func_def := replace(_func_def, '$PK_CHANGE_CHECK$', _pk_change_check);
_func_def := replace(_func_def, '$QUEUE_SCHEMA$', quote_ident(queue_schema));
_func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table));
_func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns);
_func_def := replace(_func_def, '$PK_VALUES$', _pk_values);
Comment on lines +530 to +535
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing you chose this over format because format doesn't have named fields? It does have positional fields, but that's still not ideal.


return _func_def;
end;
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp;
-------------------------------------------------------------------------------
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a blank line here

-- _vectorizer_create_source_trigger
create or replace function ai._vectorizer_create_source_trigger
( trigger_name pg_catalog.name -- Name for the trigger
, queue_schema pg_catalog.name -- Schema containing the queue table
, queue_table pg_catalog.name -- Table that will store queued items
, source_schema pg_catalog.name -- Schema containing the watched table
, source_table pg_catalog.name -- Table being watched for changes
, target_schema pg_catalog.name -- Schema containing the target table for deletions
, target_table pg_catalog.name -- Table where corresponding rows should be deleted
, source_pk pg_catalog.jsonb -- JSON describing primary key columns to track
, original_owner pg_catalog.name default null -- Role that should own the trigger
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused parameter

) returns void as
$func$
declare
_sql pg_catalog.text;
_pk_columns pg_catalog.text;
_pk_values pg_catalog.text;
_pk_where_clause pg_catalog.text;
_pk_change_check pg_catalog.text;
_delete_statement pg_catalog.text;
Comment on lines +557 to +561
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused variables

begin

execute format(
'create or replace function ai.%I() returns trigger as $trigger_def$ %s $trigger_def$ language plpgsql volatile parallel safe security definer',
trigger_name,
ai._build_vectorizer_trigger_definition(queue_schema, queue_table, target_schema, target_table, source_pk)
);

-- Revoke public permissions
_sql := pg_catalog.format(
'revoke all on function %I.%I() from public',
queue_schema, trigger_name
);
execute _sql;

-- create the trigger on the source table
select pg_catalog.format
( $sql$
create trigger %I
after insert or update
on %I.%I
for each row execute function %I.%I();
$sql$
, trigger_name
, source_schema, source_table
, queue_schema, trigger_name
-- Create the trigger
select pg_catalog.format(
$sql$
create trigger %I
after insert or update or delete
on %I.%I
for each row execute function %I.%I()
$sql$,
trigger_name,
source_schema, source_table,
queue_schema, trigger_name
) into strict _sql
;
execute _sql;
Expand All @@ -544,6 +594,70 @@ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

do $upgrade_block$
declare
_vec record;
_version text;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused variable

_new_version text := '0.8.1'; -- Version after upgrade
_owner name;
_acl aclitem[];
_debug_acl aclitem[];
Comment on lines +602 to +604
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused variables

begin
-- Find all vectorizers with version < 0.8.1
for _vec in (
select
v.id,
v.source_schema,
v.source_table,
v.source_pk,
v.target_schema,
v.target_table,
v.trigger_name,
v.queue_schema,
v.queue_table,
v.config
from ai.vectorizer v
where string_to_array((v.config->>'version'), '.')::int[] < string_to_array('0.8.1', '.')::int[]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well shit. that's super handy. I didn't know you could compare int[] like that. TIL

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to handle pre-release tags? e.g. 0.8.1-dev

)
loop
raise notice 'Upgrading trigger function for vectorizer ID %s from version %s', _vec.id, _vec.config->>'version';

execute format(
'alter extension ai add function ai.%I()',
_vec.trigger_name
);
Comment on lines +625 to +628
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you verify that adding it to the extension does not change the ownership of the function?

I probably would have altered the function to switch owner and then switch back, but this seems like it might be better or at least equally good.


execute format(
'create or replace function ai.%I() returns trigger as $trigger_def$ %s $trigger_def$ language plpgsql volatile parallel safe security definer',
_vec.trigger_name,
ai._build_vectorizer_trigger_definition(_vec.queue_schema, _vec.queue_table, _vec.target_schema, _vec.target_table, _vec.source_pk)
);

execute format(
'drop trigger if exists %I on %I.%I',
_vec.trigger_name, _vec.source_schema, _vec.source_table
);

execute format(
'create trigger %I after insert or update or delete on %I.%I for each row execute function ai.%I()',
_vec.trigger_name, _vec.source_schema, _vec.source_table, _vec.trigger_name
);

execute format(
'alter extension ai drop function ai.%I()',
_vec.trigger_name
);

update ai.vectorizer
set config = jsonb_set(config, '{version}', format('"%s"', _new_version)::jsonb)
where id = _vec.id;
Comment on lines +651 to +653
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure we don't have any other config upgrades in this release or this may cause the other(s) to fail.


raise info 'Successfully upgraded trigger for vectorizer ID % to version %', _vec.id, _new_version;
end loop;
end;
$upgrade_block$;


-------------------------------------------------------------------------------
-- _vectorizer_vector_index_exists
create or replace function ai._vectorizer_vector_index_exists
Expand Down
2 changes: 2 additions & 0 deletions projects/extension/sql/idempotent/013-vectorizer-api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ begin
, queue_table
, _source_schema
, _source_table
, target_schema
, target_table
, _source_pk
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
do language plpgsql $block$
DECLARE
_vectorizer RECORD;
_constraint_name text;
_sql text;
BEGIN
-- Loop through all vectorizers
FOR _vectorizer IN
SELECT
v.id,
v.target_schema,
v.target_table,
v.source_schema,
v.source_table
FROM ai.vectorizer v
LOOP
-- Find the foreign key constraint for this vectorizer's store table
SELECT conname INTO _constraint_name
FROM pg_constraint c
JOIN pg_class t ON c.conrelid = t.oid
JOIN pg_namespace n ON t.relnamespace = n.oid
JOIN pg_class t2 ON c.confrelid = t2.oid
JOIN pg_namespace n2 ON t2.relnamespace = n2.oid
WHERE n.nspname = _vectorizer.target_schema
AND t.relname = _vectorizer.target_table
AND n2.nspname = _vectorizer.source_schema
AND t2.relname = _vectorizer.source_table
AND c.contype = 'f';

IF _constraint_name IS NOT NULL THEN
-- Build and execute the ALTER TABLE command to drop the constraint
_sql := format(
'ALTER TABLE %I.%I DROP CONSTRAINT %I',
_vectorizer.target_schema,
_vectorizer.target_table,
_constraint_name
);

RAISE NOTICE 'Dropping foreign key constraint % from %.%',
_constraint_name,
_vectorizer.target_schema,
_vectorizer.target_table;

EXECUTE _sql;
ELSE
RAISE NOTICE 'No foreign key constraint found for %.%',
_vectorizer.target_schema,
_vectorizer.target_table;
END IF;
END LOOP;
END;
$block$;

drop function if exists ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb);
5 changes: 3 additions & 2 deletions projects/extension/tests/contents/output16.expected
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE EXTENSION
event trigger _vectorizer_handle_drops
function ai.anthropic_generate(text,jsonb,integer,text,text,text,double precision,integer,text,text,text[],double precision,jsonb,jsonb,integer,double precision,boolean)
function ai.anthropic_list_models(text,text,text,boolean)
function ai._build_vectorizer_trigger_definition(name,name,name,name,jsonb)
function ai.chunking_character_text_splitter(name,integer,integer,text,boolean)
function ai.chunking_recursive_character_text_splitter(name,integer,integer,text[],boolean)
function ai.chunk_text_recursively(text,integer,integer,text[],boolean)
Expand Down Expand Up @@ -81,7 +82,7 @@ CREATE EXTENSION
function ai._validate_scheduling(jsonb)
function ai._vectorizer_create_dependencies(integer)
function ai._vectorizer_create_queue_table(name,name,jsonb,name[])
function ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb)
function ai._vectorizer_create_source_trigger(name,name,name,name,name,name,name,jsonb,name)
function ai._vectorizer_create_target_table(name,name,jsonb,name,name,integer,name[])
function ai._vectorizer_create_vector_index(name,name,jsonb)
function ai._vectorizer_create_view(name,name,name,name,jsonb,name,name,name[])
Expand All @@ -106,7 +107,7 @@ CREATE EXTENSION
table ai.vectorizer_errors
view ai.secret_permissions
view ai.vectorizer_status
(102 rows)
(103 rows)

Table "ai._secret_permissions"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
Expand Down
5 changes: 3 additions & 2 deletions projects/extension/tests/contents/output17.expected
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE EXTENSION
event trigger _vectorizer_handle_drops
function ai.anthropic_generate(text,jsonb,integer,text,text,text,double precision,integer,text,text,text[],double precision,jsonb,jsonb,integer,double precision,boolean)
function ai.anthropic_list_models(text,text,text,boolean)
function ai._build_vectorizer_trigger_definition(name,name,name,name,jsonb)
function ai.chunking_character_text_splitter(name,integer,integer,text,boolean)
function ai.chunking_recursive_character_text_splitter(name,integer,integer,text[],boolean)
function ai.chunk_text_recursively(text,integer,integer,text[],boolean)
Expand Down Expand Up @@ -81,7 +82,7 @@ CREATE EXTENSION
function ai._validate_scheduling(jsonb)
function ai._vectorizer_create_dependencies(integer)
function ai._vectorizer_create_queue_table(name,name,jsonb,name[])
function ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb)
function ai._vectorizer_create_source_trigger(name,name,name,name,name,name,name,jsonb,name)
function ai._vectorizer_create_target_table(name,name,jsonb,name,name,integer,name[])
function ai._vectorizer_create_vector_index(name,name,jsonb)
function ai._vectorizer_create_view(name,name,name,name,jsonb,name,name,name[])
Expand Down Expand Up @@ -120,7 +121,7 @@ CREATE EXTENSION
type ai.vectorizer_status[]
view ai.secret_permissions
view ai.vectorizer_status
(116 rows)
(117 rows)

Table "ai._secret_permissions"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
Expand Down
Loading