Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent runs leads to duplicate rows #3866

Open
simon-pactum opened this issue Feb 19, 2025 · 5 comments
Open

Concurrent runs leads to duplicate rows #3866

simon-pactum opened this issue Feb 19, 2025 · 5 comments
Labels
Bug Something isn't working Engine: Postgres

Comments

@simon-pactum
Copy link

simon-pactum commented Feb 19, 2025

When two concurrent sqlmesh plans are running, the latter only bails out at the end with something like:

Error: Plan 'e08afcaae9b3446fb09e98e9b38cdf31' is no longer valid for the target environment 'prod'. Expected previous plan ID: '72795b6460e14a9585aa46301749bc11', actual previous plan ID: '23fe369a90c34e789743abbba767791a'. Please recreate the plan and try again

But at that point it's already too late and duplicate rows have been created.

To reproduce:

sqlmesh: 0.158.2

# Config
gateways:
  postgres:
    connection:
      type: postgres
      host: localdb
      user: local
      password: local
      port: 5432
      database: local


default_gateway: postgres

model_defaults:
  dialect: postgres
MODEL (
  name sqlmesh_example.duplicate,
  kind FULL,
);

SELECT
  t.*
FROM
  -- Large enough to be "slow" enough for human input!
  -- Change after having created it once to force it to get recreated
  generate_series(1, 1000000) as t
  -- generate_series(1, 10000000) as t
  1. sqlmesh plan say yes, let it do its thing
  2. Change the model to force a rebuild, e.g. by adding an extra 0
  3. Terminal A: sqlmesh plan wait for prompt, don't answer yet
  4. Terminal B: sqlmesh plan wait for prompt, don't answer yet
  5. Terminal A & B: Answer y in both terminals in quick succession

Now I have twice as many rows as expected in the prod environment:

# SELECT COUNT(1) FROM sqlmesh_example.duplicate;
  count   
----------
 20000000
(1 row)

This probably happened as both did the INSERT ... at the same time.


I'm not sure if this is an actual bug or just room for documentation improvement.

Either way, it would be nice if it bailed out with an error before causing issues

@simon-pactum
Copy link
Author

simon-pactum commented Feb 19, 2025

One thing I'd appreciate more clarity on is if it's sufficient to limit concurrency per environment or globally, I would suspect globally as it's all the same physical tables, so two environments may interfere?

EDIT: Have to limit globally, can be reproduced as above but with sqlmesh plan foo/sqlmesh plan bar concurrently

@izeigerman
Copy link
Member

I believe this happens because the transaction isolation in your postgres instance is set to READ COMMITTED or below. So I think the following happens:

  1. DELETE from the first plan
  2. DELETE from the 2nd plan
  3. INSERT from the 1st plan and COMMIT, Changes become visible to the 2nd plan
  4. INSERT from the 2nd plan and COMMIT

I believe if you set the default isolation level to at least REPEATABLE READ, it should stop happening.
Closing this issue for now, feel free to reopen if you still see this issue after trying the suggested remedy.

@simon-pactum
Copy link
Author

Default isolation level is not the issue, I can reproduce with:

default_transaction_isolation="repeatable read"

It's possible to reproduce against postgres in docker-compose running like this:

services:
  localdb:
    image: postgres:17
    command: "postgres -c default_transaction_isolation=\"repeatable read\" -c log_statement=all -c log_connections=true -c log_disconnections=true -c log_line_prefix='%m [%d] [%p]'"
    environment:
      POSTGRES_DB: local
      POSTGRES_USER: local
      POSTGRES_PASSWORD: local

  1. INSERT from the 1st plan and COMMIT, Changes become visible to the 2nd plan
  2. INSERT from the 2nd plan and COMMIT

I think with more granularity what happens is:

  1. INSERT from 1st plan starts
  2. INSERT from 2nd plan starts
  3. INSERT from 1st plan completes, COMMIT
  4. INSERT from 2nd plan completes, COMMIT

Here is full postgres log (with repeatable read as default transaction isolation)

postgres log
localdb-1  | 2025-02-26 09:14:59.897 UTC [[unknown]] [78]LOG:  connection received: host=172.22.0.3 port=47846
localdb-1  | 2025-02-26 09:14:59.905 UTC [local] [78]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:14:59.905 UTC [local] [78]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:14:59.910 UTC [local] [78]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.910 UTC [local] [78]LOG:  statement: SELECT current_catalog
localdb-1  | 2025-02-26 09:14:59.910 UTC [local] [78]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.914 UTC [local] [78]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.914 UTC [local] [78]LOG:  statement: SELECT * FROM (SELECT schemaname AS schema_name, tablename AS name, 'TABLE' AS type FROM pg_tables UNION ALL SELECT schemaname AS schema_name, viewname AS name, 'VIEW' AS type FROM pg_views UNION ALL SELECT schemaname AS schema_name, matviewname AS name, 'MATERIALIZED_VIEW' AS type FROM pg_matviews) AS objs WHERE schema_name = 'sqlmesh__sqlmesh_example' AND name IN ('sqlmesh_example__duplicate__2368512693')
localdb-1  | 2025-02-26 09:14:59.920 UTC [local] [78]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.921 UTC [local] [78]LOG:  disconnection: session time: 0:00:00.024 user=local database=local host=172.22.0.3 port=47846
localdb-1  | 2025-02-26 09:14:59.925 UTC [[unknown]] [79]LOG:  connection received: host=172.22.0.3 port=47852
localdb-1  | 2025-02-26 09:14:59.931 UTC [local] [79]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:14:59.931 UTC [local] [79]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:14:59.933 UTC [local] [79]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.934 UTC [local] [79]LOG:  statement: CREATE SCHEMA IF NOT EXISTS "sqlmesh__sqlmesh_example"
localdb-1  | 2025-02-26 09:14:59.934 UTC [local] [79]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.935 UTC [[unknown]] [80]LOG:  connection received: host=172.22.0.3 port=47860
localdb-1  | 2025-02-26 09:14:59.940 UTC [local] [80]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:14:59.940 UTC [local] [80]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:14:59.946 UTC [local] [80]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.946 UTC [local] [80]LOG:  statement: CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" AS SELECT "t".* FROM GENERATE_SERIES(1, 11000001) AS "t" WHERE FALSE LIMIT 0
localdb-1  | 2025-02-26 09:14:59.948 UTC [local] [80]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.950 UTC [local] [80]LOG:  disconnection: session time: 0:00:00.015 user=local database=local host=172.22.0.3 port=47860
localdb-1  | 2025-02-26 09:14:59.953 UTC [local] [76]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.953 UTC [local] [76]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:14:59.954 UTC [local] [76]LOG:  statement: SELECT "name", "identifier" FROM "sqlmesh"."_snapshots" WHERE ("name", "identifier") IN (('"local"."sqlmesh_example"."duplicate"', '1557094982'))
localdb-1  | 2025-02-26 09:14:59.958 UTC [local] [76]LOG:  statement: INSERT INTO "sqlmesh"."_snapshots" ("name", "identifier", "version", "snapshot", "kind_name", "updated_ts", "unpaused_ts", "ttl_ms", "unrestorable") SELECT CAST("name" AS TEXT) AS "name", CAST("identifier" AS TEXT) AS "identifier", CAST("version" AS TEXT) AS "version", CAST("snapshot" AS TEXT) AS "snapshot", CAST("kind_name" AS TEXT) AS "kind_name", CAST("updated_ts" AS BIGINT) AS "updated_ts", CAST("unpaused_ts" AS BIGINT) AS "unpaused_ts", CAST("ttl_ms" AS BIGINT) AS "ttl_ms", CAST("unrestorable" AS BOOLEAN) AS "unrestorable" FROM (VALUES ('"local"."sqlmesh_example"."duplicate"', '1557094982', '2368512693', '{"name":"\"local\".\"sqlmesh_example\".\"duplicate\"","change_category":1,"fingerprint":{"data_hash":"1122495316","metadata_hash":"1755126686","parent_data_hash":"0","parent_metadata_hash":"0"},"previous_versions":[],"dev_table_suffix":"dev","node":{"name":"sqlmesh_example.duplicate","project":"","start":"2025-02-18","cron":"@daily","tags":[],"dialect":"postgres","kind":{"name":"FULL"},"partitioned_by":[],"clustered_by":[],"default_catalog":"local","audits":[],"grains":[],"references":[],"allow_partials":false,"signals":[],"enabled":true,"python_env":{},"jinja_macros":{"packages":{},"root_macros":{},"global_objs":{},"create_builtins_module":"sqlmesh.utils.jinja","top_level_packages":[]},"audit_definitions":{},"mapping_schema":{},"extract_dependencies_from_query":true,"pre_statements":[],"post_statements":[],"on_virtual_update":[],"query":"SELECT\n  t.*\nFROM\n  -- Large enough to be \"slow\" enough for human input!\n  generate_series(1, 11000001) as t","source_type":"sql"},"parents":[],"created_ts":1740561290778,"ttl":"in 1 week","version":"2368512693","migrated":false}', 'FULL', 1740561290778, CAST(NULL AS BIGINT), 604800000, FALSE)) AS "t"("name", "identifier", "version", "snapshot", "kind_name", "updated_ts", "unpaused_ts", "ttl_ms", "unrestorable")
localdb-1  | 2025-02-26 09:14:59.960 UTC [local] [76]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.961 UTC [local] [76]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.961 UTC [local] [76]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:14:59.962 UTC [local] [76]LOG:  statement: SELECT "id", "intervals"."name", "intervals"."identifier", "intervals"."version", "start_ts", "end_ts", "is_dev", "is_removed", "is_pending_restatement" FROM "sqlmesh"."_intervals" AS "intervals" WHERE ("intervals"."name", "intervals"."version") IN (('"local"."sqlmesh_example"."duplicate"', '2368512693')) ORDER BY "intervals"."name", "intervals"."version", "created_ts" NULLS FIRST, "is_removed" NULLS FIRST, "is_pending_restatement" NULLS FIRST
localdb-1  | 2025-02-26 09:14:59.962 UTC [local] [76]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.963 UTC [local] [76]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.963 UTC [local] [76]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:14:59.963 UTC [local] [76]LOG:  statement: SELECT "id", "intervals"."name", "intervals"."identifier", "intervals"."version", "start_ts", "end_ts", "is_dev", "is_removed", "is_pending_restatement" FROM "sqlmesh"."_intervals" AS "intervals" WHERE ("intervals"."name", "intervals"."version") IN (('"local"."sqlmesh_example"."duplicate"', '2368512693')) ORDER BY "intervals"."name", "intervals"."version", "created_ts" NULLS FIRST, "is_removed" NULLS FIRST, "is_pending_restatement" NULLS FIRST
localdb-1  | 2025-02-26 09:14:59.964 UTC [local] [76]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:14:59.967 UTC [[unknown]] [81]LOG:  connection received: host=172.22.0.3 port=47872
localdb-1  | 2025-02-26 09:14:59.971 UTC [local] [81]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:14:59.971 UTC [local] [81]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:14:59.976 UTC [local] [81]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:14:59.976 UTC [local] [81]LOG:  statement: SELECT "attname" AS "column_name", "pg_catalog".format_type("atttypid", "atttypmod") AS "data_type" FROM "pg_catalog"."pg_attribute" JOIN "pg_catalog"."pg_class" ON "pg_class"."oid" = "attrelid" JOIN "pg_catalog"."pg_namespace" ON "pg_namespace"."oid" = "relnamespace" WHERE ("attnum" > 0 AND NOT "attisdropped" AND "relname" = 'sqlmesh_example__duplicate__2368512693') AND "nspname" = 'sqlmesh__sqlmesh_example'
localdb-1  | 2025-02-26 09:14:59.978 UTC [local] [81]LOG:  statement: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'sqlmesh_example__duplicate__2368512693' AND "table_schema" = 'sqlmesh__sqlmesh_example'
localdb-1  | 2025-02-26 09:14:59.980 UTC [local] [81]LOG:  statement: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" WHERE TRUE
localdb-1  | 2025-02-26 09:14:59.980 UTC [local] [81]LOG:  statement: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" ("t") SELECT "t" FROM (SELECT "t".* FROM GENERATE_SERIES(1, 11000001) AS "t") AS "_subquery"
localdb-1  | 2025-02-26 09:15:00.204 UTC [[unknown]] [82]LOG:  connection received: host=172.22.0.4 port=39730
localdb-1  | 2025-02-26 09:15:00.209 UTC [local] [82]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:15:00.209 UTC [local] [82]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:15:00.211 UTC [local] [82]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:00.211 UTC [local] [82]LOG:  statement: SELECT current_catalog
localdb-1  | 2025-02-26 09:15:00.211 UTC [local] [82]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:00.213 UTC [local] [82]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:00.213 UTC [local] [82]LOG:  statement: SELECT * FROM (SELECT schemaname AS schema_name, tablename AS name, 'TABLE' AS type FROM pg_tables UNION ALL SELECT schemaname AS schema_name, viewname AS name, 'VIEW' AS type FROM pg_views UNION ALL SELECT schemaname AS schema_name, matviewname AS name, 'MATERIALIZED_VIEW' AS type FROM pg_matviews) AS objs WHERE schema_name = 'sqlmesh__sqlmesh_example' AND name IN ('sqlmesh_example__duplicate__2368512693')
localdb-1  | 2025-02-26 09:15:00.216 UTC [local] [82]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:00.217 UTC [local] [82]LOG:  disconnection: session time: 0:00:00.013 user=local database=local host=172.22.0.4 port=39730
localdb-1  | 2025-02-26 09:15:00.217 UTC [local] [77]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:00.218 UTC [local] [77]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:00.218 UTC [local] [77]LOG:  statement: SELECT "name", "identifier" FROM "sqlmesh"."_snapshots" WHERE ("name", "identifier") IN (('"local"."sqlmesh_example"."duplicate"', '1557094982'))
localdb-1  | 2025-02-26 09:15:00.219 UTC [local] [77]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:00.220 UTC [local] [77]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:00.220 UTC [local] [77]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:00.220 UTC [local] [77]LOG:  statement: SELECT "id", "intervals"."name", "intervals"."identifier", "intervals"."version", "start_ts", "end_ts", "is_dev", "is_removed", "is_pending_restatement" FROM "sqlmesh"."_intervals" AS "intervals" WHERE ("intervals"."name", "intervals"."version") IN (('"local"."sqlmesh_example"."duplicate"', '2368512693')) ORDER BY "intervals"."name", "intervals"."version", "created_ts" NULLS FIRST, "is_removed" NULLS FIRST, "is_pending_restatement" NULLS FIRST
localdb-1  | 2025-02-26 09:15:00.221 UTC [local] [77]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:00.222 UTC [local] [77]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:00.222 UTC [local] [77]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:00.222 UTC [local] [77]LOG:  statement: SELECT "id", "intervals"."name", "intervals"."identifier", "intervals"."version", "start_ts", "end_ts", "is_dev", "is_removed", "is_pending_restatement" FROM "sqlmesh"."_intervals" AS "intervals" WHERE ("intervals"."name", "intervals"."version") IN (('"local"."sqlmesh_example"."duplicate"', '2368512693')) ORDER BY "intervals"."name", "intervals"."version", "created_ts" NULLS FIRST, "is_removed" NULLS FIRST, "is_pending_restatement" NULLS FIRST
localdb-1  | 2025-02-26 09:15:00.222 UTC [local] [77]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:00.228 UTC [[unknown]] [83]LOG:  connection received: host=172.22.0.4 port=39746
localdb-1  | 2025-02-26 09:15:00.231 UTC [local] [83]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:15:00.231 UTC [local] [83]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:15:00.237 UTC [local] [83]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:00.237 UTC [local] [83]LOG:  statement: SELECT "attname" AS "column_name", "pg_catalog".format_type("atttypid", "atttypmod") AS "data_type" FROM "pg_catalog"."pg_attribute" JOIN "pg_catalog"."pg_class" ON "pg_class"."oid" = "attrelid" JOIN "pg_catalog"."pg_namespace" ON "pg_namespace"."oid" = "relnamespace" WHERE ("attnum" > 0 AND NOT "attisdropped" AND "relname" = 'sqlmesh_example__duplicate__2368512693') AND "nspname" = 'sqlmesh__sqlmesh_example'
localdb-1  | 2025-02-26 09:15:00.241 UTC [local] [83]LOG:  statement: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'sqlmesh_example__duplicate__2368512693' AND "table_schema" = 'sqlmesh__sqlmesh_example'
localdb-1  | 2025-02-26 09:15:00.242 UTC [local] [83]LOG:  statement: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" WHERE TRUE
localdb-1  | 2025-02-26 09:15:00.243 UTC [local] [83]LOG:  statement: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" ("t") SELECT "t" FROM (SELECT "t".* FROM GENERATE_SERIES(1, 11000001) AS "t") AS "_subquery"
localdb-1  | 2025-02-26 09:15:07.534 UTC [] [27]LOG:  checkpoint complete: wrote 8716 buffers (53.2%); 0 WAL file(s) added, 8 removed, 0 recycled; write=239.996 s, sync=0.078 s, total=240.098 s; sync files=40, longest=0.073 s, average=0.002 s; distance=141259 kB, estimate=141259 kB; lsn=3/35E097E0, redo lsn=3/17A3F0E0
localdb-1  | 2025-02-26 09:15:07.922 UTC [] [27]LOG:  checkpoint starting: wal
localdb-1  | 2025-02-26 09:15:13.295 UTC [] [27]LOG:  checkpoint complete: wrote 41 buffers (0.3%); 0 WAL file(s) added, 0 removed, 33 recycled; write=5.190 s, sync=0.105 s, total=5.373 s; sync files=9, longest=0.100 s, average=0.012 s; distance=534282 kB, estimate=534282 kB; lsn=3/56150DC8, redo lsn=3/38401B28
localdb-1  | 2025-02-26 09:15:13.791 UTC [] [27]LOG:  checkpoints are occurring too frequently (6 seconds apart)
localdb-1  | 2025-02-26 09:15:13.791 UTC [] [27]HINT:  Consider increasing the configuration parameter "max_wal_size".
localdb-1  | 2025-02-26 09:15:13.791 UTC [] [27]LOG:  checkpoint starting: wal
localdb-1  | 2025-02-26 09:15:16.703 UTC [local] [81]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.743 UTC [[unknown]] [85]LOG:  connection received: host=172.22.0.3 port=37604
localdb-1  | 2025-02-26 09:15:16.758 UTC [local] [85]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:15:16.758 UTC [local] [85]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:15:16.767 UTC [local] [85]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.767 UTC [local] [85]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:16.773 UTC [local] [85]LOG:  statement: INSERT INTO "sqlmesh"."_intervals" ("id", "created_ts", "name", "identifier", "version", "start_ts", "end_ts", "is_dev", "is_removed", "is_compacted", "is_pending_restatement") SELECT CAST("id" AS TEXT) AS "id", CAST("created_ts" AS BIGINT) AS "created_ts", CAST("name" AS TEXT) AS "name", CAST("identifier" AS TEXT) AS "identifier", CAST("version" AS TEXT) AS "version", CAST("start_ts" AS BIGINT) AS "start_ts", CAST("end_ts" AS BIGINT) AS "end_ts", CAST("is_dev" AS BOOLEAN) AS "is_dev", CAST("is_removed" AS BOOLEAN) AS "is_removed", CAST("is_compacted" AS BOOLEAN) AS "is_compacted", CAST("is_pending_restatement" AS BOOLEAN) AS "is_pending_restatement" FROM (VALUES ('642ff9c2d93744f8902bd5cf2a93c274', 1740561316769, '"local"."sqlmesh_example"."duplicate"', '1557094982', '2368512693', 1739836800000, 1739923200000, FALSE, FALSE, FALSE, FALSE)) AS "t"("id", "created_ts", "name", "identifier", "version", "start_ts", "end_ts", "is_dev", "is_removed", "is_compacted", "is_pending_restatement")
localdb-1  | 2025-02-26 09:15:16.779 UTC [local] [85]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.784 UTC [local] [85]LOG:  disconnection: session time: 0:00:00.040 user=local database=local host=172.22.0.3 port=37604
localdb-1  | 2025-02-26 09:15:16.785 UTC [local] [81]LOG:  disconnection: session time: 0:00:16.818 user=local database=local host=172.22.0.3 port=47872
localdb-1  | 2025-02-26 09:15:16.787 UTC [local] [76]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.787 UTC [local] [76]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:16.788 UTC [local] [76]LOG:  statement: SELECT "name", "identifier" FROM "sqlmesh"."_snapshots" WHERE ("name", "identifier") IN (('"local"."sqlmesh_example"."duplicate"', '1557094982'))
localdb-1  | 2025-02-26 09:15:16.791 UTC [local] [76]LOG:  statement: SELECT "previous_finalized_snapshots", "start_at", "plan_id", "requirements", "catalog_name_override", "snapshots", "suffix_target", "name", "expiration_ts", "finalized_ts", "promoted_snapshot_ids", "normalize_name", "end_at", "previous_plan_id" FROM "sqlmesh"."_environments" WHERE "name" = 'prod' FOR UPDATE
localdb-1  | 2025-02-26 09:15:16.793 UTC [local] [76]LOG:  statement: UPDATE "sqlmesh"."_snapshots" SET "updated_ts" = 1740561316792 WHERE ("name", "identifier") IN (('"local"."sqlmesh_example"."full_model"', '4017356042'))
localdb-1  | 2025-02-26 09:15:16.795 UTC [local] [76]LOG:  statement: DELETE FROM "sqlmesh"."_environments" WHERE "name" = 'prod'
localdb-1  | 2025-02-26 09:15:16.797 UTC [local] [76]LOG:  statement: INSERT INTO "sqlmesh"."_environments" ("name", "snapshots", "start_at", "end_at", "plan_id", "previous_plan_id", "expiration_ts", "finalized_ts", "promoted_snapshot_ids", "suffix_target", "catalog_name_override", "previous_finalized_snapshots", "normalize_name", "requirements") SELECT CAST("name" AS TEXT) AS "name", CAST("snapshots" AS TEXT) AS "snapshots", CAST("start_at" AS TEXT) AS "start_at", CAST("end_at" AS TEXT) AS "end_at", CAST("plan_id" AS TEXT) AS "plan_id", CAST("previous_plan_id" AS TEXT) AS "previous_plan_id", CAST("expiration_ts" AS BIGINT) AS "expiration_ts", CAST("finalized_ts" AS BIGINT) AS "finalized_ts", CAST("promoted_snapshot_ids" AS TEXT) AS "promoted_snapshot_ids", CAST("suffix_target" AS TEXT) AS "suffix_target", CAST("catalog_name_override" AS TEXT) AS "catalog_name_override", CAST("previous_finalized_snapshots" AS TEXT) AS "previous_finalized_snapshots", CAST("normalize_name" AS BOOLEAN) AS "normalize_name", CAST("requirements" AS TEXT) AS "requirements" FROM (VALUES ('prod', '[{"name": "\"local\".\"sqlmesh_example\".\"duplicate\"", "change_category": 1, "fingerprint": {"data_hash": "1122495316", "metadata_hash": "1755126686", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "previous_versions": [], "dev_table_suffix": "dev", "version": "2368512693", "physical_schema": "sqlmesh__sqlmesh_example", "parents": [], "kind_name": "FULL", "node_type": "model"}]', '2025-02-18 00:00:00', '2025-02-19 00:00:00', 'c765a6e102354084ad6d63e95c276c1e', '1109ad4f4f6c4a74b1c61bd407ad90ce', CAST(NULL AS BIGINT), CAST(NULL AS BIGINT), CAST(NULL AS TEXT), 'schema', CAST(NULL AS TEXT), '[{"name": "\"local\".\"sqlmesh_example\".\"full_model\"", "change_category": 1, "fingerprint": {"data_hash": "1740495626", "metadata_hash": "1284020239", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "previous_versions": [{"fingerprint": {"data_hash": "3389658398", "metadata_hash": "3683677804", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "version": "1514008781", "change_category": 1, "physical_schema": "sqlmesh__sqlmesh_example", "dev_table_suffix": "dev"}, {"fingerprint": {"data_hash": "907022810", "metadata_hash": "326945199", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "version": "239533169", "change_category": 1, "physical_schema": "sqlmesh__sqlmesh_example", "dev_table_suffix": "dev"}, {"fingerprint": {"data_hash": "2206817699", "metadata_hash": "3269386440", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "version": "365530612", "change_category": 1, "physical_schema": "sqlmesh__sqlmesh_example", "dev_table_suffix": "dev"}, {"fingerprint": {"data_hash": "1515993326", "metadata_hash": "2288455608", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "version": "3981074143", "change_category": 1, "physical_schema": "sqlmesh__sqlmesh_example", "dev_table_suffix": "dev"}, {"fingerprint": {"data_hash": "70227680", "metadata_hash": "1501277351", "parent_data_hash": "0", "parent_metadata_hash": "0"}, "version": "2197897703", "change_category": 1, "physical_schema": "sqlmesh__sqlmesh_example", "dev_table_suffix": "dev"}], "dev_table_suffix": "dev", "version": "2619609257", "physical_schema": "sqlmesh__sqlmesh_example", "parents": [], "kind_name": "FULL", "node_type": "model"}]', TRUE, '{}')) AS "t"("name", "snapshots", "start_at", "end_at", "plan_id", "previous_plan_id", "expiration_ts", "finalized_ts", "promoted_snapshot_ids", "suffix_target", "catalog_name_override", "previous_finalized_snapshots", "normalize_name", "requirements")
localdb-1  | 2025-02-26 09:15:16.798 UTC [local] [76]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.801 UTC [local] [76]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.801 UTC [local] [76]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:16.801 UTC [local] [76]LOG:  statement: SELECT "snapshot", "name", "identifier", "version", "updated_ts", "unpaused_ts", "unrestorable" FROM "sqlmesh"."_snapshots" AS "snapshots" WHERE ("snapshots"."name", "snapshots"."version") IN (('"local"."sqlmesh_example"."duplicate"', '2368512693')) FOR UPDATE
localdb-1  | 2025-02-26 09:15:16.803 UTC [local] [76]LOG:  statement: UPDATE "sqlmesh"."_snapshots" SET "unpaused_ts" = 1739923200000, "updated_ts" = 1740561316802 WHERE ("name", "identifier") IN (('"local"."sqlmesh_example"."duplicate"', '1557094982'))
localdb-1  | 2025-02-26 09:15:16.803 UTC [local] [76]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.805 UTC [local] [79]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.806 UTC [local] [79]LOG:  statement: CREATE SCHEMA IF NOT EXISTS "sqlmesh_example"
localdb-1  | 2025-02-26 09:15:16.806 UTC [local] [79]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.808 UTC [[unknown]] [86]LOG:  connection received: host=172.22.0.3 port=37606
localdb-1  | 2025-02-26 09:15:16.811 UTC [local] [86]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:15:16.811 UTC [local] [86]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:15:16.813 UTC [local] [86]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.813 UTC [local] [86]LOG:  statement: DROP VIEW IF EXISTS "sqlmesh_example"."duplicate" CASCADE
localdb-1  | 2025-02-26 09:15:16.814 UTC [local] [86]LOG:  statement: CREATE VIEW "sqlmesh_example"."duplicate" AS SELECT * FROM "local"."sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693"
localdb-1  | 2025-02-26 09:15:16.824 UTC [local] [86]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.827 UTC [local] [86]LOG:  disconnection: session time: 0:00:00.019 user=local database=local host=172.22.0.3 port=37606
localdb-1  | 2025-02-26 09:15:16.828 UTC [[unknown]] [87]LOG:  connection received: host=172.22.0.3 port=37610
localdb-1  | 2025-02-26 09:15:16.832 UTC [local] [87]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:15:16.832 UTC [local] [87]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:15:16.833 UTC [local] [87]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.833 UTC [local] [87]LOG:  statement: DROP VIEW IF EXISTS "sqlmesh_example"."full_model"
localdb-1  | 2025-02-26 09:15:16.838 UTC [local] [87]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.840 UTC [local] [87]LOG:  disconnection: session time: 0:00:00.011 user=local database=local host=172.22.0.3 port=37610
localdb-1  | 2025-02-26 09:15:16.840 UTC [local] [76]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.841 UTC [local] [76]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:16.841 UTC [local] [76]LOG:  statement: SELECT "plan_id" FROM "sqlmesh"."_environments" WHERE "name" = 'prod' FOR UPDATE
localdb-1  | 2025-02-26 09:15:16.842 UTC [local] [76]LOG:  statement: UPDATE "sqlmesh"."_environments" SET "finalized_ts" = 1740561316841 WHERE "name" = 'prod'
localdb-1  | 2025-02-26 09:15:16.842 UTC [local] [76]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.844 UTC [local] [79]LOG:  disconnection: session time: 0:00:16.918 user=local database=local host=172.22.0.3 port=47852
localdb-1  | 2025-02-26 09:15:16.844 UTC [local] [76]LOG:  disconnection: session time: 0:00:26.076 user=local database=local host=172.22.0.3 port=52388
localdb-1  | 2025-02-26 09:15:16.929 UTC [local] [83]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.958 UTC [[unknown]] [88]LOG:  connection received: host=172.22.0.4 port=54888
localdb-1  | 2025-02-26 09:15:16.961 UTC [local] [88]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
localdb-1  | 2025-02-26 09:15:16.961 UTC [local] [88]LOG:  connection authorized: user=local database=local
localdb-1  | 2025-02-26 09:15:16.962 UTC [local] [88]LOG:  statement: BEGIN
localdb-1  | 2025-02-26 09:15:16.963 UTC [local] [88]LOG:  statement: SELECT 1
localdb-1  | 2025-02-26 09:15:16.968 UTC [local] [88]LOG:  statement: INSERT INTO "sqlmesh"."_intervals" ("id", "created_ts", "name", "identifier", "version", "start_ts", "end_ts", "is_dev", "is_removed", "is_compacted", "is_pending_restatement") SELECT CAST("id" AS TEXT) AS "id", CAST("created_ts" AS BIGINT) AS "created_ts", CAST("name" AS TEXT) AS "name", CAST("identifier" AS TEXT) AS "identifier", CAST("version" AS TEXT) AS "version", CAST("start_ts" AS BIGINT) AS "start_ts", CAST("end_ts" AS BIGINT) AS "end_ts", CAST("is_dev" AS BOOLEAN) AS "is_dev", CAST("is_removed" AS BOOLEAN) AS "is_removed", CAST("is_compacted" AS BOOLEAN) AS "is_compacted", CAST("is_pending_restatement" AS BOOLEAN) AS "is_pending_restatement" FROM (VALUES ('082727e59a2b4d17b08944170b0571b6', 1740561316963, '"local"."sqlmesh_example"."duplicate"', '1557094982', '2368512693', 1739836800000, 1739923200000, FALSE, FALSE, FALSE, FALSE)) AS "t"("id", "created_ts", "name", "identifier", "version", "start_ts", "end_ts", "is_dev", "is_removed", "is_compacted", "is_pending_restatement")
localdb-1  | 2025-02-26 09:15:16.969 UTC [local] [88]LOG:  statement: COMMIT
localdb-1  | 2025-02-26 09:15:16.972 UTC [local] [88]LOG:  disconnection: session time: 0:00:00.014 user=local database=local host=172.22.0.4 port=54888
localdb-1  | 2025-02-26 09:15:16.973 UTC [local] [83]LOG:  disconnection: session time: 0:00:16.745 user=local database=local host=172.22.0.4 port=39746
Postgres logs, filtered down to only process 81 and 83 for readability (The ones doing the inserts) Note 81s "late" COMMIT
pg.log:localdb-1  | 2025-02-26 09:14:59.967 UTC [[unknown]] [81]LOG:  connection received: host=172.22.0.3 port=47872
pg.log:localdb-1  | 2025-02-26 09:14:59.971 UTC [local] [81]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
pg.log:localdb-1  | 2025-02-26 09:14:59.971 UTC [local] [81]LOG:  connection authorized: user=local database=local
pg.log:localdb-1  | 2025-02-26 09:14:59.976 UTC [local] [81]LOG:  statement: BEGIN
pg.log:localdb-1  | 2025-02-26 09:14:59.976 UTC [local] [81]LOG:  statement: SELECT "attname" AS "column_name", "pg_catalog".format_type("atttypid", "atttypmod") AS "data_type" FROM "pg_catalog"."pg_attribute" JOIN "pg_catalog"."pg_class" ON "pg_class"."oid" = "attrelid" JOIN "pg_catalog"."pg_namespace" ON "pg_namespace"."oid" = "relnamespace" WHERE ("attnum" > 0 AND NOT "attisdropped" AND "relname" = 'sqlmesh_example__duplicate__2368512693') AND "nspname" = 'sqlmesh__sqlmesh_example'
pg.log:localdb-1  | 2025-02-26 09:14:59.978 UTC [local] [81]LOG:  statement: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'sqlmesh_example__duplicate__2368512693' AND "table_schema" = 'sqlmesh__sqlmesh_example'
pg.log:localdb-1  | 2025-02-26 09:14:59.980 UTC [local] [81]LOG:  statement: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" WHERE TRUE
pg.log:localdb-1  | 2025-02-26 09:14:59.980 UTC [local] [81]LOG:  statement: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" ("t") SELECT "t" FROM (SELECT "t".* FROM GENERATE_SERIES(1, 11000001) AS "t") AS "_subquery"
pg.log:localdb-1  | 2025-02-26 09:15:00.228 UTC [[unknown]] [83]LOG:  connection received: host=172.22.0.4 port=39746
pg.log:localdb-1  | 2025-02-26 09:15:00.231 UTC [local] [83]LOG:  connection authenticated: identity="local" method=scram-sha-256 (/var/lib/postgresql/data/pg_hba.conf:128)
pg.log:localdb-1  | 2025-02-26 09:15:00.231 UTC [local] [83]LOG:  connection authorized: user=local database=local
pg.log:localdb-1  | 2025-02-26 09:15:00.237 UTC [local] [83]LOG:  statement: BEGIN
pg.log:localdb-1  | 2025-02-26 09:15:00.237 UTC [local] [83]LOG:  statement: SELECT "attname" AS "column_name", "pg_catalog".format_type("atttypid", "atttypmod") AS "data_type" FROM "pg_catalog"."pg_attribute" JOIN "pg_catalog"."pg_class" ON "pg_class"."oid" = "attrelid" JOIN "pg_catalog"."pg_namespace" ON "pg_namespace"."oid" = "relnamespace" WHERE ("attnum" > 0 AND NOT "attisdropped" AND "relname" = 'sqlmesh_example__duplicate__2368512693') AND "nspname" = 'sqlmesh__sqlmesh_example'
pg.log:localdb-1  | 2025-02-26 09:15:00.241 UTC [local] [83]LOG:  statement: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'sqlmesh_example__duplicate__2368512693' AND "table_schema" = 'sqlmesh__sqlmesh_example'
pg.log:localdb-1  | 2025-02-26 09:15:00.242 UTC [local] [83]LOG:  statement: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" WHERE TRUE
pg.log:localdb-1  | 2025-02-26 09:15:00.243 UTC [local] [83]LOG:  statement: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__duplicate__2368512693" ("t") SELECT "t" FROM (SELECT "t".* FROM GENERATE_SERIES(1, 11000001) AS "t") AS "_subquery"
pg.log:localdb-1  | 2025-02-26 09:15:16.703 UTC [local] [81]LOG:  statement: COMMIT
pg.log:localdb-1  | 2025-02-26 09:15:16.785 UTC [local] [81]LOG:  disconnection: session time: 0:00:16.818 user=local database=local host=172.22.0.3 port=47872
pg.log:localdb-1  | 2025-02-26 09:15:16.929 UTC [local] [83]LOG:  statement: COMMIT
pg.log:localdb-1  | 2025-02-26 09:15:16.973 UTC [local] [83]LOG:  disconnection: session time: 0:00:16.745 user=local database=local host=172.22.0.4 port=39746

I don't have permission to reopen the issue though

@simon-pactum
Copy link
Author

@izeigerman Would you mind reopening this one?

@izeigerman izeigerman reopened this Mar 18, 2025
@izeigerman izeigerman added Bug Something isn't working Engine: Postgres labels Mar 18, 2025
@izeigerman
Copy link
Member

izeigerman commented Mar 18, 2025

I see now, @simon-pactum. I think we could lock the table before doing the insert with

LOCK TABLE <physical_table> IN EXCLUSIVE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Something isn't working Engine: Postgres
Projects
None yet
Development

No branches or pull requests

2 participants