diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index beeca1943a1..43703a31df0 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -152,6 +152,10 @@ pub struct EnvVars { /// Set by the flag `GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES`. On by /// default. pub enable_select_by_specific_attributes: bool, + /// Experimental feature. + /// + /// Set the flag `GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION`. Off by default. + pub postpone_attribute_index_creation: bool, /// Verbose logging of mapping inputs. /// /// Set by the flag `GRAPH_LOG_TRIGGER_DATA`. Off by @@ -271,6 +275,8 @@ impl EnvVars { subgraph_error_retry_ceil: Duration::from_secs(inner.subgraph_error_retry_ceil_in_secs), subgraph_error_retry_jitter: inner.subgraph_error_retry_jitter, enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0, + postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0 + || cfg!(debug_assertions), log_trigger_data: inner.log_trigger_data.0, explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs), explorer_lock_threshold: Duration::from_millis(inner.explorer_lock_threshold_in_msec), @@ -393,6 +399,8 @@ struct Inner { subgraph_error_retry_jitter: f64, #[envconfig(from = "GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES", default = "true")] enable_select_by_specific_attributes: EnvVarBoolean, + #[envconfig(from = "GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION", default = "false")] + postpone_attribute_index_creation: EnvVarBoolean, #[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")] log_trigger_data: EnvVarBoolean, #[envconfig(from = "GRAPH_EXPLORER_TTL", default = "10")] diff --git a/store/postgres/examples/layout.rs b/store/postgres/examples/layout.rs index 7556d5383b1..cab97889cba 100644 --- a/store/postgres/examples/layout.rs +++ b/store/postgres/examples/layout.rs @@ -42,7 +42,7 @@ fn print_delete_all(layout: &Layout) { } fn print_ddl(layout: &Layout) { - let ddl = ensure(layout.as_ddl(), "Failed to generate DDL"); + let ddl = ensure(layout.as_ddl(None), "Failed to generate DDL"); println!("{}", ddl); } diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 248fe80aadd..dc73ec6f7f5 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -687,6 +687,7 @@ pub(crate) fn indexes_for_table( Ok(results.into_iter().map(|i| i.def).collect()) } + pub(crate) fn drop_index( conn: &mut PgConnection, schema_name: &str, diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 8fc97c3a038..371e43e49ea 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -13,6 +13,7 @@ //! `graph-node` was restarted while the copy was running. use std::{ convert::TryFrom, + ops::DerefMut, sync::Arc, time::{Duration, Instant}, }; @@ -24,8 +25,7 @@ use diesel::{ pg::Pg, r2d2::{ConnectionManager, PooledConnection}, select, - serialize::Output, - serialize::ToSql, + serialize::{Output, ToSql}, sql_query, sql_types::{BigInt, Integer}, update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, @@ -36,11 +36,13 @@ use graph::{ prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError, ENV_VARS}, schema::EntityType, }; +use itertools::Itertools; use crate::{ advisory_lock, catalog, dynds::DataSourcesTable, primary::{DeploymentId, Site}, + relational::index::IndexList, }; use crate::{connection_pool::ConnectionPool, relational::Layout}; use crate::{relational::Table, relational_queries as rq}; @@ -761,7 +763,7 @@ impl Connection { Ok(()) } - pub fn copy_data_internal(&mut self) -> Result { + pub fn copy_data_internal(&mut self, index_list: IndexList) -> Result { let src = self.src.clone(); let dst = self.dst.clone(); let target_block = self.target_block.clone(); @@ -806,6 +808,46 @@ impl Connection { progress.table_finished(&table.batch); } + // Create indexes for all the attributes that were postponed at the start of + // the copy/graft operations. + // First recreate the indexes that existed in the original subgraph. + let conn = self.conn.deref_mut(); + for table in state.tables.iter() { + let arr = index_list.indexes_for_table( + &self.dst.site.namespace, + &table.batch.src.name.to_string(), + &table.batch.dst, + true, + true, + )?; + + for (_, sql) in arr { + let query = sql_query(format!("{};", sql)); + query.execute(conn)?; + } + } + + // Second create the indexes for the new fields. + // Here we need to skip those created in the first step for the old fields. + for table in state.tables.iter() { + let orig_colums = table + .batch + .src + .columns + .iter() + .map(|c| c.name.to_string()) + .collect_vec(); + for sql in table + .batch + .dst + .create_postponed_indexes(orig_colums) + .into_iter() + { + let query = sql_query(sql); + query.execute(conn)?; + } + } + self.copy_private_data_sources(&state)?; self.transaction(|conn| state.finished(conn))?; @@ -820,6 +862,8 @@ impl Connection { /// block is guaranteed to not be subject to chain reorgs. All data up /// to and including `target_block` will be copied. /// + /// The parameter index_list is a list of indexes that exist on the `src`. + /// /// The copy logic makes heavy use of the fact that the `vid` and /// `block_range` of entity versions are related since for two entity /// versions `v1` and `v2` such that `v1.vid <= v2.vid`, we know that @@ -828,7 +872,7 @@ impl Connection { /// lower(v1.block_range) => v2.vid > v1.vid` and we can therefore stop /// the copying of each table as soon as we hit `max_vid = max { v.vid | /// lower(v.block_range) <= target_block.number }`. - pub fn copy_data(&mut self) -> Result { + pub fn copy_data(&mut self, index_list: IndexList) -> Result { // We require sole access to the destination site, and that we get a // consistent view of what has been copied so far. In general, that // is always true. It can happen though that this function runs when @@ -842,7 +886,7 @@ impl Connection { "Obtaining copy lock (this might take a long time if another process is still copying)" ); advisory_lock::lock_copying(&mut self.conn, self.dst.site.as_ref())?; - let res = self.copy_data_internal(); + let res = self.copy_data_internal(index_list); advisory_lock::unlock_copying(&mut self.conn, self.dst.site.as_ref())?; if matches!(res, Ok(Status::Cancelled)) { warn!(&self.logger, "Copying was cancelled and is incomplete"); diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index f9ada4149e1..d8b04faac0b 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -29,8 +29,8 @@ use lru_time_cache::LruCache; use rand::{seq::SliceRandom, thread_rng}; use std::collections::{BTreeMap, HashMap}; use std::convert::Into; -use std::ops::Bound; use std::ops::Deref; +use std::ops::{Bound, DerefMut}; use std::str::FromStr; use std::sync::{atomic::AtomicUsize, Arc, Mutex}; use std::time::{Duration, Instant}; @@ -52,7 +52,7 @@ use crate::deployment::{self, OnSync}; use crate::detail::ErrorDetail; use crate::dynds::DataSourcesTable; use crate::primary::DeploymentId; -use crate::relational::index::{CreateIndex, Method}; +use crate::relational::index::{CreateIndex, IndexList, Method}; use crate::relational::{Layout, LayoutCache, SqlName, Table}; use crate::relational_queries::FromEntityData; use crate::{advisory_lock, catalog, retry}; @@ -172,6 +172,11 @@ impl DeploymentStore { DeploymentStore(Arc::new(store)) } + // Parameter index_def is used to copy over the definition of the indexes from the source subgraph + // to the destination one. This happens when it is set to Some. In this case also the BTree attribude + // indexes are created later on, when the subgraph has synced. In case this parameter is None, all + // indexes are created with the default creation strategy for a new subgraph, and also from the very + // start. pub(crate) fn create_deployment( &self, schema: &InputSchema, @@ -180,6 +185,7 @@ impl DeploymentStore { graft_base: Option>, replace: bool, on_sync: OnSync, + index_def: Option, ) -> Result<(), StoreError> { let mut conn = self.get_conn()?; conn.transaction(|conn| -> Result<_, StoreError> { @@ -212,6 +218,7 @@ impl DeploymentStore { site.clone(), schema, entities_with_causality_region.into_iter().collect(), + index_def, )?; // See if we are grafting and check that the graft is permissible if let Some(base) = graft_base { @@ -746,6 +753,13 @@ impl DeploymentStore { .await } + pub(crate) fn load_indexes(&self, site: Arc) -> Result { + let store = self.clone(); + let mut binding = self.get_conn()?; + let conn = binding.deref_mut(); + IndexList::load(conn, site, store) + } + /// Drops an index for a given deployment, concurrently. pub(crate) async fn drop_index( &self, @@ -1483,12 +1497,12 @@ impl DeploymentStore { &self, logger: &Logger, site: Arc, - graft_src: Option<(Arc, BlockPtr, SubgraphDeploymentEntity)>, + graft_src: Option<(Arc, BlockPtr, SubgraphDeploymentEntity, IndexList)>, ) -> Result<(), StoreError> { let dst = self.find_layout(site.cheap_clone())?; // If `graft_src` is `Some`, then there is a pending graft. - if let Some((src, block, src_deployment)) = graft_src { + if let Some((src, block, src_deployment, index_list)) = graft_src { info!( logger, "Initializing graft by copying data from {} to {}", @@ -1516,7 +1530,7 @@ impl DeploymentStore { src_manifest_idx_and_name, dst_manifest_idx_and_name, )?; - let status = copy_conn.copy_data()?; + let status = copy_conn.copy_data(index_list)?; if status == crate::copy::Status::Cancelled { return Err(StoreError::Canceled); } @@ -1588,10 +1602,17 @@ impl DeploymentStore { Ok(()) })?; } + + let mut conn = self.get_conn()?; + if ENV_VARS.postpone_attribute_index_creation { + // check if all indexes are valid and recreate them if they aren't + self.load_indexes(site.clone())? + .recreate_invalid_indexes(&mut conn, &dst)?; + } + // Make sure the block pointer is set. This is important for newly // deployed subgraphs so that we respect the 'startBlock' setting // the first time the subgraph is started - let mut conn = self.get_conn()?; conn.transaction(|conn| crate::deployment::initialize_block_ptr(conn, &dst.site))?; Ok(()) } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index d349875ddbb..3e44c8054a0 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -38,6 +38,7 @@ use graph::schema::{ EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema, }; use graph::slog::warn; +use index::IndexList; use inflector::Inflector; use itertools::Itertools; use lazy_static::lazy_static; @@ -384,12 +385,13 @@ impl Layout { site: Arc, schema: &InputSchema, entities_with_causality_region: BTreeSet, + index_def: Option, ) -> Result { let catalog = Catalog::for_creation(conn, site.cheap_clone(), entities_with_causality_region)?; let layout = Self::new(site, schema, catalog)?; let sql = layout - .as_ddl() + .as_ddl(index_def) .map_err(|_| StoreError::Unknown(anyhow!("failed to generate DDL for layout")))?; conn.batch_execute(&sql)?; Ok(layout) @@ -1436,6 +1438,7 @@ pub struct Table { /// aggregations, this is the object type for a specific interval, like /// `Stats_hour`, not the overall aggregation type `Stats`. pub object: EntityType, + /// The name of the database table for this type ('thing'), snakecased /// version of `object` pub name: SqlName, diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index e33e358f958..aa3aefd3561 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -14,7 +14,7 @@ use crate::relational::{ VID_COLUMN, }; -use super::{Catalog, Column, Layout, SqlName, Table}; +use super::{index::IndexList, Catalog, Column, Layout, SqlName, Table}; // In debug builds (for testing etc.) unconditionally create exclusion constraints, in release // builds for production, skip them @@ -29,7 +29,7 @@ impl Layout { /// /// See the unit tests at the end of this file for the actual DDL that /// gets generated - pub fn as_ddl(&self) -> Result { + pub fn as_ddl(&self, index_def: Option) -> Result { let mut out = String::new(); // Output enums first so table definitions can reference them @@ -41,7 +41,12 @@ impl Layout { tables.sort_by_key(|table| table.position); // Output 'create table' statements for all tables for table in tables { - table.as_ddl(&self.input_schema, &self.catalog, &mut out)?; + table.as_ddl( + &self.input_schema, + &self.catalog, + index_def.as_ref(), + &mut out, + )?; } Ok(out) @@ -256,9 +261,58 @@ impl Table { (method, index_expr) } + pub(crate) fn create_postponed_indexes(&self, skip_colums: Vec) -> Vec { + let mut indexing_queries = vec![]; + let columns = self.columns_to_index(); + + for (column_index, column) in columns.enumerate() { + let (method, index_expr) = + Self::calculate_attr_index_method_and_expression(self.immutable, column); + if !column.is_list() + && method == "btree" + && column.name.as_str() != "id" + && !skip_colums.contains(&column.name.to_string()) + { + let sql = format!( + "create index concurrently if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", + table_index = self.position, + table_name = self.name, + column_name = column.name, + qname = self.qualified_name, + ); + indexing_queries.push(sql); + } + } + indexing_queries + } + fn create_attribute_indexes(&self, out: &mut String) -> fmt::Result { - // Create indexes. + let columns = self.columns_to_index(); + + for (column_index, column) in columns.enumerate() { + let (method, index_expr) = + Self::calculate_attr_index_method_and_expression(self.immutable, column); + + // If `create_gin_indexes` is set to false, we don't create + // indexes on array attributes. Experience has shown that these + // indexes are very expensive to update and can have a very bad + // impact on the write performance of the database, but are + // hardly ever used or needed by queries. + if !column.is_list() || ENV_VARS.store.create_gin_indexes { + write!( + out, + "create index attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", + table_index = self.position, + table_name = self.name, + column_name = column.name, + qname = self.qualified_name, + )?; + } + } + writeln!(out) + } + fn columns_to_index(&self) -> impl Iterator { // Skip columns whose type is an array of enum, since there is no // good way to index them with Postgres 9.6. Once we move to // Postgres 11, we can enable that (tracked in graph-node issue @@ -282,27 +336,7 @@ impl Table { .filter(not_enum_list) .filter(not_immutable_pk) .filter(not_numeric_list); - - for (column_index, column) in columns.enumerate() { - let (method, index_expr) = - Self::calculate_attr_index_method_and_expression(self.immutable, column); - // If `create_gin_indexes` is set to false, we don't create - // indexes on array attributes. Experience has shown that these - // indexes are very expensive to update and can have a very bad - // impact on the write performance of the database, but are - // hardly ever used or needed by queries. - if !column.is_list() || ENV_VARS.store.create_gin_indexes { - write!( - out, - "create index attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", - table_index = self.position, - table_name = self.name, - column_name = column.name, - qname = self.qualified_name, - )?; - } - } - writeln!(out) + columns } /// If `self` is an aggregation and has cumulative aggregates, create an @@ -353,11 +387,28 @@ impl Table { &self, schema: &InputSchema, catalog: &Catalog, + index_def: Option<&IndexList>, out: &mut String, ) -> fmt::Result { self.create_table(out)?; self.create_time_travel_indexes(catalog, out)?; - self.create_attribute_indexes(out)?; + if index_def.is_some() && ENV_VARS.postpone_attribute_index_creation { + let arr = index_def + .unwrap() + .indexes_for_table( + &catalog.site.namespace, + &self.name.to_string(), + &self, + false, + false, + ) + .map_err(|_| fmt::Error)?; + for (_, sql) in arr { + writeln!(out, "{};", sql).expect("properly formated index statements") + } + } else { + self.create_attribute_indexes(out)?; + } self.create_aggregate_indexes(schema, out) } diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index 65495033315..e9abca2879a 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -1,3 +1,4 @@ +use index::CreateIndex; use itertools::Itertools; use pretty_assertions::assert_eq; @@ -152,34 +153,100 @@ fn test_manual_index_creation_ddl() { ); } +#[test] +fn generate_postponed_indexes() { + let layout = test_layout(THING_GQL); + let table = layout.table(&SqlName::from("Scalar")).unwrap(); + let skip_colums = vec!["id".to_string()]; + let query_vec = table.create_postponed_indexes(skip_colums); + assert!(query_vec.len() == 7); + let queries = query_vec.join(" "); + check_eqv(THING_POSTPONED_INDEXES, &queries) +} +const THING_POSTPONED_INDEXES: &str = r#" +create index concurrently if not exists attr_1_1_scalar_bool + on "sgd0815"."scalar" using btree("bool"); + create index concurrently if not exists attr_1_2_scalar_int + on "sgd0815"."scalar" using btree("int"); + create index concurrently if not exists attr_1_3_scalar_big_decimal + on "sgd0815"."scalar" using btree("big_decimal"); + create index concurrently if not exists attr_1_4_scalar_string + on "sgd0815"."scalar" using btree(left("string", 256)); + create index concurrently if not exists attr_1_5_scalar_bytes + on "sgd0815"."scalar" using btree(substring("bytes", 1, 64)); + create index concurrently if not exists attr_1_6_scalar_big_int + on "sgd0815"."scalar" using btree("big_int"); + create index concurrently if not exists attr_1_7_scalar_color + on "sgd0815"."scalar" using btree("color"); +"#; + +impl IndexList { + fn mock_thing_index_list() -> Self { + let mut indexes: HashMap> = HashMap::new(); + let v1 = vec![ + CreateIndex::parse(r#"create index thing_id_block_range_excl on sgd0815.thing using gist (id, block_range)"#.to_string()), + CreateIndex::parse(r#"create index brin_thing on sgd0815."thing" using brin (lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops)"#.to_string()), + // fixme: enable the index bellow once the parsing of statements is fixed, and BlockRangeUpper in particular (issue #5512) + // CreateIndex::parse(r#"create index thing_block_range_closed on sgd0815."thing" using btree (coalesce(upper(block_range), 2147483647)) where coalesce((upper(block_range), 2147483647) < 2147483647)"#.to_string()), + CreateIndex::parse(r#"create index attr_0_0_thing_id on sgd0815."thing" using btree (id)"#.to_string()), + CreateIndex::parse(r#"create index attr_0_1_thing_big_thing on sgd0815."thing" using gist (big_thing, block_range)"#.to_string()), + ]; + indexes.insert("thing".to_string(), v1); + let v2 = vec![ + CreateIndex::parse(r#"create index attr_1_0_scalar_id on sgd0815."scalar" using btree (id)"#.to_string(),), + CreateIndex::parse(r#"create index attr_1_1_scalar_bool on sgd0815."scalar" using btree (bool)"#.to_string(),), + CreateIndex::parse(r#"create index attr_1_2_scalar_int on sgd0815."scalar" using btree (int)"#.to_string(),), + CreateIndex::parse(r#"create index attr_1_3_scalar_big_decimal on sgd0815."scalar" using btree (big_decimal)"#.to_string()), + CreateIndex::parse(r#"create index attr_1_4_scalar_string on sgd0815."scalar" using btree (left(string, 256))"#.to_string()), + CreateIndex::parse(r#"create index attr_1_5_scalar_bytes on sgd0815."scalar" using btree (substring(bytes, 1, 64))"#.to_string()), + CreateIndex::parse(r#"create index attr_1_6_scalar_big_int on sgd0815."scalar" using btree (big_int)"#.to_string()), + CreateIndex::parse(r#"create index attr_1_7_scalar_color on sgd0815."scalar" using btree (color)"#.to_string()), + ]; + indexes.insert("scalar".to_string(), v2); + let v3 = vec![CreateIndex::parse( + r#"create index attr_2_0_file_thing_id on sgd0815."file_thing" using btree (id)"# + .to_string(), + )]; + indexes.insert("file_thing".to_string(), v3); + IndexList { indexes } + } +} + #[test] fn generate_ddl() { let layout = test_layout(THING_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); assert_eq!(THING_DDL, &sql); // Use `assert_eq!` to also test the formatting. + let il = IndexList::mock_thing_index_list(); + let layout = test_layout(THING_GQL); + let sql = layout.as_ddl(Some(il)).expect("Failed to generate DDL"); + println!("SQL: {}", sql); + println!("THING_DDL_ON_COPY: {}", THING_DDL_ON_COPY); + check_eqv(THING_DDL_ON_COPY, &sql); + let layout = test_layout(MUSIC_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); check_eqv(MUSIC_DDL, &sql); let layout = test_layout(FOREST_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); check_eqv(FOREST_DDL, &sql); let layout = test_layout(FULLTEXT_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); check_eqv(FULLTEXT_DDL, &sql); let layout = test_layout(FORWARD_ENUM_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); check_eqv(FORWARD_ENUM_SQL, &sql); let layout = test_layout(TS_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); check_eqv(TS_SQL, &sql); let layout = test_layout(LIFETIME_GQL); - let sql = layout.as_ddl().expect("Failed to generate DDL"); + let sql = layout.as_ddl(None).expect("Failed to generate DDL"); check_eqv(LIFETIME_SQL, &sql); } @@ -398,6 +465,76 @@ create index attr_2_0_file_thing_id "#; +const THING_DDL_ON_COPY: &str = r#"create type sgd0815."color" + as enum ('BLUE', 'red', 'yellow'); +create type sgd0815."size" + as enum ('large', 'medium', 'small'); + + create table "sgd0815"."thing" ( + vid bigserial primary key, + block_range int4range not null, + "id" text not null, + "big_thing" text not null + ); + + alter table "sgd0815"."thing" + add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_thing + on "sgd0815"."thing" + using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); +create index thing_block_range_closed + on "sgd0815"."thing"(coalesce(upper(block_range), 2147483647)) + where coalesce(upper(block_range), 2147483647) < 2147483647; +create index attr_0_0_thing_id + on sgd0815."thing" using btree (id); +create index attr_0_1_thing_big_thing + on sgd0815."thing" using gist (big_thing, block_range); + + + create table "sgd0815"."scalar" ( + vid bigserial primary key, + block_range int4range not null, + "id" text not null, + "bool" boolean, + "int" int4, + "big_decimal" numeric, + "string" text, + "bytes" bytea, + "big_int" numeric, + "color" "sgd0815"."color" + ); + + alter table "sgd0815"."scalar" + add constraint scalar_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_scalar + on "sgd0815"."scalar" + using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); +create index scalar_block_range_closed + on "sgd0815"."scalar"(coalesce(upper(block_range), 2147483647)) + where coalesce(upper(block_range), 2147483647) < 2147483647; +create index attr_1_0_scalar_id + on sgd0815."scalar" using btree (id); + + + create table "sgd0815"."file_thing" ( + vid bigserial primary key, + block_range int4range not null, + causality_region int not null, + "id" text not null + ); + + alter table "sgd0815"."file_thing" + add constraint file_thing_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_file_thing + on "sgd0815"."file_thing" + using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); +create index file_thing_block_range_closed + on "sgd0815"."file_thing"(coalesce(upper(block_range), 2147483647)) + where coalesce(upper(block_range), 2147483647) < 2147483647; +create index attr_2_0_file_thing_id + on sgd0815."file_thing" using btree (id); +"#; + const BOOKS_GQL: &str = r#"type Author @entity { id: ID! name: String! diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 9458ff55917..64d4d7c83bc 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -1,6 +1,12 @@ //! Parse Postgres index definition into a form that is meaningful for us. +use anyhow::{anyhow, Error}; +use std::collections::HashMap; use std::fmt::{Display, Write}; +use std::sync::Arc; +use diesel::sql_types::{Bool, Text}; +use diesel::{sql_query, Connection, PgConnection, RunQueryDsl}; +use graph::components::store::StoreError; use graph::itertools::Itertools; use graph::prelude::{ lazy_static, @@ -9,11 +15,15 @@ use graph::prelude::{ }; use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; +use crate::catalog; +use crate::command_support::catalog::Site; +use crate::deployment_store::DeploymentStore; +use crate::primary::Namespace; use crate::relational::{BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE}; -use super::VID_COLUMN; +use super::{Layout, Table, VID_COLUMN}; -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum Method { Brin, BTree, @@ -180,6 +190,25 @@ impl Expr { } } + /// Here we check if all the columns expressions of the two indexes are "kind of same". + /// We ignore the operator class of the expression by checking if the string of the + /// original expression is a prexif of the string of the current one. + fn is_same_kind_columns(current: &Vec, orig: &Vec) -> bool { + if orig.len() != current.len() { + return false; + } + for i in 0..orig.len() { + let o = orig[i].to_sql(); + let n = current[i].to_sql(); + + // check that string n starts with o + if n.len() < o.len() || n[0..o.len()] != o { + return false; + } + } + true + } + fn to_sql(&self) -> String { match self { Expr::Column(name) => name.to_string(), @@ -196,7 +225,7 @@ impl Expr { /// The condition for a partial index, i.e., the statement after `where ..` /// in a `create index` statement -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum Cond { /// The expression `coalesce(upper(block_range), 2147483647) > $number` Partial(BlockNumber), @@ -248,7 +277,7 @@ impl Cond { } } -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum CreateIndex { /// The literal index definition passed to `parse`. This is used when we /// can't parse a `create index` statement, e.g. because it uses @@ -354,8 +383,8 @@ impl CreateIndex { fn new_parsed(defn: &str) -> Option { let rx = Regex::new( - "create (?Punique )?index (?P[a-z0-9$_]+) \ - on (?Psgd[0-9]+)\\.(?P[a-z$_]+) \ + "create (?Punique )?index (?P\"?[a-z0-9$_]+\"?) \ + on (?Psgd[0-9]+)\\.(?P
\"?[a-z0-9$_]+\"?) \ using (?P[a-z]+) \\((?P.*?)\\)\ ( where \\((?P.*)\\))?\ ( with \\((?P.*)\\))?$", @@ -411,6 +440,32 @@ impl CreateIndex { } } + fn with_nsp(&self, nsp2: String) -> Result { + let s = self.clone(); + match s { + CreateIndex::Unknown { defn: _ } => Err(anyhow!("Failed to parse the index")), + CreateIndex::Parsed { + unique, + name, + nsp: _, + table, + method, + columns, + cond, + with, + } => Ok(CreateIndex::Parsed { + unique, + name, + nsp: nsp2, + table, + method, + columns, + cond, + with, + }), + } + } + pub fn is_attribute_index(&self) -> bool { use CreateIndex::*; match self { @@ -445,8 +500,7 @@ impl CreateIndex { } } - /// Return `true` if `self` is one of the indexes we create by default - pub fn is_default_index(&self) -> bool { + pub fn is_default_non_attr_index(&self) -> bool { lazy_static! { static ref DEFAULT_INDEXES: Vec = { fn dummy( @@ -487,7 +541,12 @@ impl CreateIndex { }; } - self.is_attribute_index() || DEFAULT_INDEXES.iter().any(|idx| self.is_same_index(idx)) + DEFAULT_INDEXES.iter().any(|idx| self.is_same_index(idx)) + } + + /// Return `true` if `self` is one of the indexes we create by default + pub fn is_default_index(&self) -> bool { + self.is_attribute_index() || self.is_default_non_attr_index() } fn is_same_index(&self, other: &CreateIndex) -> bool { @@ -517,13 +576,125 @@ impl CreateIndex { ) => { unique == o_unique && method == o_method - && columns == o_columns + && Expr::is_same_kind_columns(columns, o_columns) && cond == o_cond && with == o_with } } } + pub fn is_id(&self) -> bool { + // on imutable tables the id constraint is specified at table creation + match self { + CreateIndex::Unknown { .. } => (), + CreateIndex::Parsed { columns, .. } => { + if columns.len() == 1 { + if columns[0].is_id() { + return true; + } + } + } + } + false + } + + pub fn to_postpone(&self) -> bool { + fn has_prefix(s: &str, prefix: &str) -> bool { + s.starts_with(prefix) + || s.ends_with("\"") && s.starts_with(format!("\"{}", prefix).as_str()) + } + match self { + CreateIndex::Unknown { .. } => false, + CreateIndex::Parsed { + name, + columns, + method, + .. + } => { + if *method != Method::BTree { + return false; + } + if columns.len() == 1 && columns[0].is_id() { + return false; + } + has_prefix(name, "attr_") && self.is_attribute_index() + } + } + } + + pub fn name(&self) -> Option { + match self { + CreateIndex::Unknown { .. } => None, + CreateIndex::Parsed { name, .. } => Some(name.clone()), + } + } + + pub fn fields_exist_in_dest<'a>(&self, dest_table: &'a Table) -> bool { + fn column_exists<'a>(it: &mut impl Iterator, column_name: &String) -> bool { + it.any(|c| *c == *column_name) + } + + fn some_column_contained<'a>( + expr: &String, + it: &mut impl Iterator, + ) -> bool { + it.any(|c| expr.contains(c)) + } + + let cols = &mut dest_table.columns.iter().map(|i| i.name.as_str()); + match self { + CreateIndex::Unknown { defn: _ } => return true, + CreateIndex::Parsed { + columns: parsed_cols, + .. + } => { + for c in parsed_cols { + match c { + Expr::Column(column_name) => { + if !column_exists(cols, column_name) { + return false; + } + } + Expr::Prefix(column_name, _) => { + if !column_exists(cols, column_name) { + return false; + } + } + Expr::BlockRange | Expr::BlockRangeLower | Expr::BlockRangeUpper => { + if dest_table.immutable { + return false; + } + } + Expr::Vid => (), + Expr::Block => { + if !column_exists(cols, &"block".to_string()) { + return false; + } + } + Expr::Unknown(expression) => { + if some_column_contained( + expression, + &mut (vec!["block_range"]).into_iter(), + ) && dest_table.immutable + { + return false; + } + if !some_column_contained(expression, cols) + && !some_column_contained( + expression, + &mut (vec!["block_range", "vid"]).into_iter(), + ) + { + return false; + } + } + } + } + } + } + true + } + /// Generate a SQL statement that creates this index. If `concurrent` is /// `true`, make it a concurrent index creation. If `if_not_exists` is /// `true` add a `if not exists` clause to the index creation. @@ -558,6 +729,125 @@ impl CreateIndex { } } +#[derive(Debug)] +pub struct IndexList { + pub(crate) indexes: HashMap>, +} + +impl IndexList { + pub fn load( + conn: &mut PgConnection, + site: Arc, + store: DeploymentStore, + ) -> Result { + let mut list = IndexList { + indexes: HashMap::new(), + }; + let schema_name = site.namespace.clone(); + let layout = store.layout(conn, site)?; + for (_, table) in &layout.tables { + let table_name = table.name.as_str(); + let indexes = catalog::indexes_for_table(conn, schema_name.as_str(), table_name)?; + let collect: Vec = indexes.into_iter().map(CreateIndex::parse).collect(); + list.indexes.insert(table_name.to_string(), collect); + } + Ok(list) + } + + pub fn indexes_for_table( + &self, + namespace: &Namespace, + table_name: &String, + dest_table: &Table, + postponed: bool, + concurrent_if_not_exist: bool, + ) -> Result, String)>, Error> { + let mut arr = vec![]; + if let Some(vec) = self.indexes.get(table_name) { + for ci in vec { + // First we check if the fields do exist in the destination subgraph. + // In case of grafting that is not given. + if ci.fields_exist_in_dest(dest_table) + // Then we check if the index is one of the default indexes not based on + // the attributes. Those will be created anyway and we should skip them. + && !ci.is_default_non_attr_index() + // Then ID based indexes in the immutable tables are also created initially + // and should be skipped. + && !(ci.is_id() && dest_table.immutable) + // Finally we filter by the criteria is the index to be postponed. The ones + // that are not to be postponed we want to create during initial creation of + // the copied subgraph + && postponed == ci.to_postpone() + { + if let Ok(sql) = ci + .with_nsp(namespace.to_string())? + .to_sql(concurrent_if_not_exist, concurrent_if_not_exist) + { + arr.push((ci.name(), sql)) + } + } + } + } + Ok(arr) + } + + pub fn recreate_invalid_indexes( + &self, + conn: &mut PgConnection, + layout: &Layout, + ) -> Result<(), StoreError> { + #[derive(QueryableByName, Debug)] + struct IndexInfo { + #[diesel(sql_type = Bool)] + isvalid: bool, + } + + let namespace = &layout.catalog.site.namespace; + for table in layout.tables.values() { + for (ind_name, create_query) in + self.indexes_for_table(namespace, &table.name.to_string(), table, true, true)? + { + if let Some(index_name) = ind_name { + let table_name = table.name.clone(); + let query = r#" + SELECT x.indisvalid AS isvalid + FROM pg_index x + JOIN pg_class c ON c.oid = x.indrelid + JOIN pg_class i ON i.oid = x.indexrelid + LEFT JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE (c.relkind = ANY (ARRAY ['r'::"char", 'm'::"char", 'p'::"char"])) + AND (i.relkind = ANY (ARRAY ['i'::"char", 'I'::"char"])) + AND (n.nspname = $1) + AND (c.relname = $2) + AND (i.relname = $3);"#; + let ii_vec = sql_query(query) + .bind::(namespace.to_string()) + .bind::(table_name) + .bind::(index_name.clone()) + .get_results::(conn)? + .into_iter() + .map(|ii| ii.into()) + .collect::>(); + assert!(ii_vec.len() <= 1); + if ii_vec.len() == 0 || !ii_vec[0].isvalid { + // if a bad index exist lets first drop it + if ii_vec.len() > 0 { + let drop_query = sql_query(format!( + "DROP INDEX {}.{};", + namespace.to_string(), + index_name + )); + conn.transaction(|conn| drop_query.execute(conn))?; + } + sql_query(create_query).execute(conn)?; + } + } + } + } + Ok(()) + } +} + #[test] fn parse() { use Method::*; diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 9813bd73120..6b5fcdc6940 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -94,7 +94,9 @@ impl TablePair { if catalog::table_exists(conn, dst_nsp.as_str(), &dst.name)? { writeln!(query, "truncate table {};", dst.qualified_name)?; } else { - dst.as_ddl(schema, catalog, &mut query)?; + // In case of pruning we don't do delayed creation of indexes, + // as the asumption is that there is not that much data inserted. + dst.as_ddl(schema, catalog, None, &mut query)?; } conn.batch_execute(&query)?; diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 528079071a4..7216dc993b5 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -39,9 +39,11 @@ use graph::{ use crate::{ connection_pool::ConnectionPool, deployment::{OnSync, SubgraphHealth}, - primary, - primary::{DeploymentId, Mirror as PrimaryMirror, Site}, - relational::{index::Method, Layout}, + primary::{self, DeploymentId, Mirror as PrimaryMirror, Site}, + relational::{ + index::{IndexList, Method}, + Layout, + }, writable::WritableStore, NotificationSender, }; @@ -553,7 +555,7 @@ impl SubgraphStoreInner { // if the deployment already exists, we don't need to perform any copying // so we can set graft_base to None // if it doesn't exist, we need to copy the graft base to the new deployment - let graft_base = if !exists { + let graft_base_layout = if !exists { let graft_base = deployment .graft_base .as_ref() @@ -574,13 +576,25 @@ impl SubgraphStoreInner { .stores .get(&site.shard) .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; + + let index_def = if let Some(graft) = &graft_base.clone() { + if let Some(site) = self.sites.get(graft) { + Some(deployment_store.load_indexes(site)?) + } else { + None + } + } else { + None + }; + deployment_store.create_deployment( schema, deployment, site.clone(), - graft_base, + graft_base_layout, replace, OnSync::None, + index_def, )?; let exists_and_synced = |id: &DeploymentHash| { @@ -642,6 +656,7 @@ impl SubgraphStoreInner { src_loc ))); } + let index_def = src_store.load_indexes(src.clone())?; // Transmogrify the deployment into a new one let deployment = DeploymentCreate { @@ -671,6 +686,7 @@ impl SubgraphStoreInner { Some(graft_base), false, on_sync, + Some(index_def), )?; let mut pconn = self.primary_conn()?; @@ -1214,6 +1230,11 @@ impl SubgraphStoreInner { let src_store = self.for_site(&site)?; src_store.load_deployment(site) } + + pub fn load_indexes(&self, site: Arc) -> Result { + let src_store = self.for_site(&site)?; + src_store.load_indexes(site) + } } const STATE_ENS_NOT_CHECKED: u8 = 0; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index c014bbe4c70..ee7a5e4754f 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -36,6 +36,7 @@ use store::StoredDynamicDataSource; use crate::deployment_store::DeploymentStore; use crate::primary::DeploymentId; +use crate::relational::index::IndexList; use crate::retry; use crate::{primary, primary::Site, relational::Layout, SubgraphStore}; @@ -66,6 +67,10 @@ impl WritableSubgraphStore { fn find_site(&self, id: DeploymentId) -> Result, StoreError> { self.0.find_site(id) } + + fn load_indexes(&self, site: Arc) -> Result { + self.0.load_indexes(site) + } } #[derive(Copy, Clone)] @@ -222,7 +227,8 @@ impl SyncStore { Some((base_id, base_ptr)) => { let src = self.store.layout(&base_id)?; let deployment_entity = self.store.load_deployment(src.site.clone())?; - Some((src, base_ptr, deployment_entity)) + let indexes = self.store.load_indexes(src.site.clone())?; + Some((src, base_ptr, deployment_entity, indexes)) } None => None, }; diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index aa6d3fa1795..fe366b34509 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -477,7 +477,7 @@ fn create_schema(conn: &mut PgConnection) -> Layout { let query = format!("create schema {}", NAMESPACE.as_str()); conn.batch_execute(&query).unwrap(); - Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new()) + Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new(), None) .expect("Failed to create relational schema") } diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index 8d3d4329fae..b7b8f36b7d7 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -151,7 +151,7 @@ fn create_schema(conn: &mut PgConnection) -> Layout { NAMESPACE.clone(), NETWORK_NAME.to_string(), ); - Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new()) + Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new(), None) .expect("Failed to create relational schema") }