-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: remove foreign key, make trigger handle deletes #485
base: main
Are you sure you want to change the base?
Changes from all commits
c6997e5
56c399e
1888d88
32245ff
6ac8381
94f08cd
1293d44
7696bcd
67f61a5
317f1f4
88aaf3d
4f72146
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,7 +126,6 @@ begin | |
, chunk text not null | ||
, embedding @extschema:vector@.vector(%L) storage main not null | ||
, unique (%s, chunk_seq) | ||
, foreign key (%s) references %I.%I (%s) on delete cascade | ||
) | ||
$sql$ | ||
, target_schema, target_table | ||
|
@@ -146,9 +145,6 @@ begin | |
) | ||
, dimensions | ||
, _pk_cols | ||
, _pk_cols | ||
, source_schema, source_table | ||
, _pk_cols | ||
) into strict _sql | ||
; | ||
execute _sql; | ||
|
@@ -467,74 +463,128 @@ set search_path to pg_catalog, pg_temp | |
; | ||
|
||
------------------------------------------------------------------------------- | ||
-- _vectorizer_create_source_trigger | ||
create or replace function ai._vectorizer_create_source_trigger | ||
( trigger_name pg_catalog.name | ||
, queue_schema pg_catalog.name | ||
-- _build_vectorizer_trigger_definition | ||
create or replace function ai._build_vectorizer_trigger_definition | ||
( queue_schema pg_catalog.name | ||
, queue_table pg_catalog.name | ||
, source_schema pg_catalog.name | ||
, source_table pg_catalog.name | ||
, target_schema pg_catalog.name | ||
, target_table pg_catalog.name | ||
, source_pk pg_catalog.jsonb | ||
) returns void as | ||
) returns pg_catalog.text as | ||
$func$ | ||
declare | ||
_sql pg_catalog.text; | ||
_pk_change_check pg_catalog.text; | ||
_delete_statement pg_catalog.text; | ||
_pk_columns pg_catalog.text; | ||
_pk_values pg_catalog.text; | ||
_func_def pg_catalog.text; | ||
begin | ||
-- create the trigger function | ||
-- the trigger function is security definer | ||
-- the owner of the source table is creating the trigger function | ||
-- so the trigger function is run as the owner of the source table | ||
-- who also owns the queue table | ||
-- this means anyone with insert/update on the source is able | ||
-- to enqueue rows in the queue table automatically | ||
-- since the trigger function only does inserts, this should be safe | ||
select pg_catalog.format | ||
( $sql$ | ||
create function %I.%I() returns trigger | ||
as $plpgsql$ | ||
-- Pre-calculate all the parts we need | ||
select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum) | ||
into strict _pk_columns | ||
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name); | ||
|
||
select pg_catalog.string_agg(pg_catalog.format('new.%I', x.attname), ', ' order by x.attnum) | ||
into strict _pk_values | ||
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name); | ||
|
||
-- Create delete statement for deleted rows | ||
_delete_statement := format('delete from %I.%I where ', target_schema, target_table) operator(pg_catalog.||) | ||
(select string_agg( | ||
quote_ident(attname) operator(pg_catalog.||) ' = $1.' operator(pg_catalog.||) quote_ident(attname), | ||
' and ' | ||
Comment on lines
+494
to
+495
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you prefer using the concatenate operator instead of the |
||
) | ||
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name)); | ||
|
||
-- Create the primary key change check expression | ||
select string_agg( | ||
'old.' operator(pg_catalog.||) quote_ident(attname) operator(pg_catalog.||) ' IS DISTINCT FROM new.' operator(pg_catalog.||) quote_ident(attname), | ||
' OR ' | ||
) | ||
into strict _pk_change_check | ||
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name); | ||
|
||
-- Build the complete function definition | ||
_func_def := $def$ | ||
begin | ||
insert into %I.%I (%s) | ||
values (%s); | ||
return null; | ||
if (TG_OP = 'DELETE') then | ||
execute '$DELETE_STATEMENT$' using old; | ||
return old; | ||
elsif (TG_OP = 'UPDATE') then | ||
if $PK_CHANGE_CHECK$ then | ||
execute '$DELETE_STATEMENT$' using old; | ||
end if; | ||
Comment on lines
+510
to
+516
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason for using EXECUTE here instead of a "plain" DELETE statement like the INSERT statement below? |
||
|
||
insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$) | ||
values ($PK_VALUES$); | ||
return new; | ||
else | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's go ahead and handle when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm. Truncate may require a separate trigger. I'm not sure if you can do a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh. i bet you could have two triggers with one trigger function. |
||
insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$) | ||
values ($PK_VALUES$); | ||
return new; | ||
end if; | ||
end; | ||
$plpgsql$ language plpgsql volatile parallel safe security definer | ||
set search_path to pg_catalog, pg_temp | ||
$sql$ | ||
, queue_schema, trigger_name | ||
, queue_schema, queue_table | ||
, ( | ||
select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum) | ||
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name) | ||
) | ||
, ( | ||
select pg_catalog.string_agg(pg_catalog.format('new.%I', x.attname), ', ' order by x.attnum) | ||
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name) | ||
) | ||
) into strict _sql | ||
; | ||
execute _sql; | ||
$def$; | ||
|
||
-- revoke all on trigger function from public | ||
select pg_catalog.format | ||
( $sql$ | ||
revoke all on function %I.%I() from public | ||
$sql$ | ||
, queue_schema, trigger_name | ||
) into strict _sql | ||
; | ||
-- Replace placeholders | ||
_func_def := replace(_func_def, '$DELETE_STATEMENT$', _delete_statement); | ||
_func_def := replace(_func_def, '$PK_CHANGE_CHECK$', _pk_change_check); | ||
_func_def := replace(_func_def, '$QUEUE_SCHEMA$', quote_ident(queue_schema)); | ||
_func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table)); | ||
_func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns); | ||
_func_def := replace(_func_def, '$PK_VALUES$', _pk_values); | ||
Comment on lines
+530
to
+535
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing you chose this over |
||
|
||
return _func_def; | ||
end; | ||
$func$ language plpgsql stable security invoker | ||
set search_path to pg_catalog, pg_temp; | ||
------------------------------------------------------------------------------- | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a blank line here |
||
-- _vectorizer_create_source_trigger | ||
create or replace function ai._vectorizer_create_source_trigger | ||
( trigger_name pg_catalog.name -- Name for the trigger | ||
, queue_schema pg_catalog.name -- Schema containing the queue table | ||
, queue_table pg_catalog.name -- Table that will store queued items | ||
, source_schema pg_catalog.name -- Schema containing the watched table | ||
, source_table pg_catalog.name -- Table being watched for changes | ||
, target_schema pg_catalog.name -- Schema containing the target table for deletions | ||
, target_table pg_catalog.name -- Table where corresponding rows should be deleted | ||
, source_pk pg_catalog.jsonb -- JSON describing primary key columns to track | ||
, original_owner pg_catalog.name default null -- Role that should own the trigger | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unused parameter |
||
) returns void as | ||
$func$ | ||
declare | ||
_sql pg_catalog.text; | ||
_pk_columns pg_catalog.text; | ||
_pk_values pg_catalog.text; | ||
_pk_where_clause pg_catalog.text; | ||
_pk_change_check pg_catalog.text; | ||
_delete_statement pg_catalog.text; | ||
Comment on lines
+557
to
+561
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unused variables |
||
begin | ||
|
||
execute format( | ||
'create or replace function ai.%I() returns trigger as $trigger_def$ %s $trigger_def$ language plpgsql volatile parallel safe security definer', | ||
trigger_name, | ||
ai._build_vectorizer_trigger_definition(queue_schema, queue_table, target_schema, target_table, source_pk) | ||
); | ||
|
||
-- Revoke public permissions | ||
_sql := pg_catalog.format( | ||
'revoke all on function %I.%I() from public', | ||
queue_schema, trigger_name | ||
); | ||
execute _sql; | ||
|
||
-- create the trigger on the source table | ||
select pg_catalog.format | ||
( $sql$ | ||
create trigger %I | ||
after insert or update | ||
on %I.%I | ||
for each row execute function %I.%I(); | ||
$sql$ | ||
, trigger_name | ||
, source_schema, source_table | ||
, queue_schema, trigger_name | ||
-- Create the trigger | ||
select pg_catalog.format( | ||
$sql$ | ||
create trigger %I | ||
after insert or update or delete | ||
on %I.%I | ||
for each row execute function %I.%I() | ||
$sql$, | ||
trigger_name, | ||
source_schema, source_table, | ||
queue_schema, trigger_name | ||
) into strict _sql | ||
; | ||
execute _sql; | ||
|
@@ -544,6 +594,70 @@ language plpgsql volatile security invoker | |
set search_path to pg_catalog, pg_temp | ||
; | ||
|
||
do $upgrade_block$ | ||
declare | ||
_vec record; | ||
_version text; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unused variable |
||
_new_version text := '0.8.1'; -- Version after upgrade | ||
_owner name; | ||
_acl aclitem[]; | ||
_debug_acl aclitem[]; | ||
Comment on lines
+602
to
+604
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unused variables |
||
begin | ||
-- Find all vectorizers with version < 0.8.1 | ||
for _vec in ( | ||
select | ||
v.id, | ||
v.source_schema, | ||
v.source_table, | ||
v.source_pk, | ||
v.target_schema, | ||
v.target_table, | ||
v.trigger_name, | ||
v.queue_schema, | ||
v.queue_table, | ||
v.config | ||
from ai.vectorizer v | ||
where string_to_array((v.config->>'version'), '.')::int[] < string_to_array('0.8.1', '.')::int[] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well shit. that's super handy. I didn't know you could compare int[] like that. TIL There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need to handle pre-release tags? e.g. |
||
) | ||
loop | ||
raise notice 'Upgrading trigger function for vectorizer ID %s from version %s', _vec.id, _vec.config->>'version'; | ||
|
||
execute format( | ||
'alter extension ai add function ai.%I()', | ||
_vec.trigger_name | ||
); | ||
Comment on lines
+625
to
+628
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you verify that adding it to the extension does not change the ownership of the function? I probably would have altered the function to switch owner and then switch back, but this seems like it might be better or at least equally good. |
||
|
||
execute format( | ||
'create or replace function ai.%I() returns trigger as $trigger_def$ %s $trigger_def$ language plpgsql volatile parallel safe security definer', | ||
_vec.trigger_name, | ||
ai._build_vectorizer_trigger_definition(_vec.queue_schema, _vec.queue_table, _vec.target_schema, _vec.target_table, _vec.source_pk) | ||
); | ||
|
||
execute format( | ||
'drop trigger if exists %I on %I.%I', | ||
_vec.trigger_name, _vec.source_schema, _vec.source_table | ||
); | ||
|
||
execute format( | ||
'create trigger %I after insert or update or delete on %I.%I for each row execute function ai.%I()', | ||
_vec.trigger_name, _vec.source_schema, _vec.source_table, _vec.trigger_name | ||
); | ||
|
||
execute format( | ||
'alter extension ai drop function ai.%I()', | ||
_vec.trigger_name | ||
); | ||
|
||
update ai.vectorizer | ||
set config = jsonb_set(config, '{version}', format('"%s"', _new_version)::jsonb) | ||
where id = _vec.id; | ||
Comment on lines
+651
to
+653
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make sure we don't have any other config upgrades in this release or this may cause the other(s) to fail. |
||
|
||
raise info 'Successfully upgraded trigger for vectorizer ID % to version %', _vec.id, _new_version; | ||
end loop; | ||
end; | ||
$upgrade_block$; | ||
|
||
|
||
------------------------------------------------------------------------------- | ||
-- _vectorizer_vector_index_exists | ||
create or replace function ai._vectorizer_vector_index_exists | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
do language plpgsql $block$ | ||
DECLARE | ||
_vectorizer RECORD; | ||
_constraint_name text; | ||
_sql text; | ||
BEGIN | ||
-- Loop through all vectorizers | ||
FOR _vectorizer IN | ||
SELECT | ||
v.id, | ||
v.target_schema, | ||
v.target_table, | ||
v.source_schema, | ||
v.source_table | ||
FROM ai.vectorizer v | ||
LOOP | ||
-- Find the foreign key constraint for this vectorizer's store table | ||
SELECT conname INTO _constraint_name | ||
FROM pg_constraint c | ||
JOIN pg_class t ON c.conrelid = t.oid | ||
JOIN pg_namespace n ON t.relnamespace = n.oid | ||
JOIN pg_class t2 ON c.confrelid = t2.oid | ||
JOIN pg_namespace n2 ON t2.relnamespace = n2.oid | ||
WHERE n.nspname = _vectorizer.target_schema | ||
AND t.relname = _vectorizer.target_table | ||
AND n2.nspname = _vectorizer.source_schema | ||
AND t2.relname = _vectorizer.source_table | ||
AND c.contype = 'f'; | ||
|
||
IF _constraint_name IS NOT NULL THEN | ||
-- Build and execute the ALTER TABLE command to drop the constraint | ||
_sql := format( | ||
'ALTER TABLE %I.%I DROP CONSTRAINT %I', | ||
_vectorizer.target_schema, | ||
_vectorizer.target_table, | ||
_constraint_name | ||
); | ||
|
||
RAISE NOTICE 'Dropping foreign key constraint % from %.%', | ||
_constraint_name, | ||
_vectorizer.target_schema, | ||
_vectorizer.target_table; | ||
|
||
EXECUTE _sql; | ||
ELSE | ||
RAISE NOTICE 'No foreign key constraint found for %.%', | ||
_vectorizer.target_schema, | ||
_vectorizer.target_table; | ||
END IF; | ||
END LOOP; | ||
END; | ||
$block$; | ||
|
||
drop function if exists ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably call this
ai._vectorizer_build_trigger_definition
so as to fit the pattern