Skip to content

Commit

Permalink
Graphman copy indexing improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Jun 27, 2024
1 parent 1c6ff19 commit d61f937
Show file tree
Hide file tree
Showing 14 changed files with 649 additions and 65 deletions.
8 changes: 8 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub struct EnvVars {
/// Set by the flag `GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES`. On by
/// default.
pub enable_select_by_specific_attributes: bool,
/// Experimental feature.
///
/// Set the flag `GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION`. Off by default.
pub postpone_attribute_index_creation: bool,
/// Verbose logging of mapping inputs.
///
/// Set by the flag `GRAPH_LOG_TRIGGER_DATA`. Off by
Expand Down Expand Up @@ -271,6 +275,8 @@ impl EnvVars {
subgraph_error_retry_ceil: Duration::from_secs(inner.subgraph_error_retry_ceil_in_secs),
subgraph_error_retry_jitter: inner.subgraph_error_retry_jitter,
enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0,
postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0
|| cfg!(debug_assertions),
log_trigger_data: inner.log_trigger_data.0,
explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs),
explorer_lock_threshold: Duration::from_millis(inner.explorer_lock_threshold_in_msec),
Expand Down Expand Up @@ -393,6 +399,8 @@ struct Inner {
subgraph_error_retry_jitter: f64,
#[envconfig(from = "GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES", default = "true")]
enable_select_by_specific_attributes: EnvVarBoolean,
#[envconfig(from = "GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION", default = "false")]
postpone_attribute_index_creation: EnvVarBoolean,
#[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")]
log_trigger_data: EnvVarBoolean,
#[envconfig(from = "GRAPH_EXPLORER_TTL", default = "10")]
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/examples/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn print_delete_all(layout: &Layout) {
}

fn print_ddl(layout: &Layout) {
let ddl = ensure(layout.as_ddl(), "Failed to generate DDL");
let ddl = ensure(layout.as_ddl(None), "Failed to generate DDL");
println!("{}", ddl);
}

Expand Down
1 change: 1 addition & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ pub(crate) fn indexes_for_table(

Ok(results.into_iter().map(|i| i.def).collect())
}

pub(crate) fn drop_index(
conn: &mut PgConnection,
schema_name: &str,
Expand Down
54 changes: 49 additions & 5 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! `graph-node` was restarted while the copy was running.
use std::{
convert::TryFrom,
ops::DerefMut,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -24,8 +25,7 @@ use diesel::{
pg::Pg,
r2d2::{ConnectionManager, PooledConnection},
select,
serialize::Output,
serialize::ToSql,
serialize::{Output, ToSql},
sql_query,
sql_types::{BigInt, Integer},
update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl,
Expand All @@ -36,11 +36,13 @@ use graph::{
prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError, ENV_VARS},
schema::EntityType,
};
use itertools::Itertools;

use crate::{
advisory_lock, catalog,
dynds::DataSourcesTable,
primary::{DeploymentId, Site},
relational::index::IndexList,
};
use crate::{connection_pool::ConnectionPool, relational::Layout};
use crate::{relational::Table, relational_queries as rq};
Expand Down Expand Up @@ -761,7 +763,7 @@ impl Connection {
Ok(())
}

pub fn copy_data_internal(&mut self) -> Result<Status, StoreError> {
pub fn copy_data_internal(&mut self, index_list: IndexList) -> Result<Status, StoreError> {
let src = self.src.clone();
let dst = self.dst.clone();
let target_block = self.target_block.clone();
Expand Down Expand Up @@ -806,6 +808,46 @@ impl Connection {
progress.table_finished(&table.batch);
}

// Create indexes for all the attributes that were postponed at the start of
// the copy/graft operations.
// First recreate the indexes that existed in the original subgraph.
let conn = self.conn.deref_mut();
for table in state.tables.iter() {
let arr = index_list.indexes_for_table(
&self.dst.site.namespace,
&table.batch.src.name.to_string(),
&table.batch.dst,
true,
true,
)?;

for (_, sql) in arr {
let query = sql_query(format!("{};", sql));
query.execute(conn)?;
}
}

// Second create the indexes for the new fields.
// Here we need to skip those created in the first step for the old fields.
for table in state.tables.iter() {
let orig_colums = table
.batch
.src
.columns
.iter()
.map(|c| c.name.to_string())
.collect_vec();
for sql in table
.batch
.dst
.create_postponed_indexes(orig_colums)
.into_iter()
{
let query = sql_query(sql);
query.execute(conn)?;
}
}

self.copy_private_data_sources(&state)?;

self.transaction(|conn| state.finished(conn))?;
Expand All @@ -820,6 +862,8 @@ impl Connection {
/// block is guaranteed to not be subject to chain reorgs. All data up
/// to and including `target_block` will be copied.
///
/// The parameter index_list is a list of indexes that exist on the `src`.
///
/// The copy logic makes heavy use of the fact that the `vid` and
/// `block_range` of entity versions are related since for two entity
/// versions `v1` and `v2` such that `v1.vid <= v2.vid`, we know that
Expand All @@ -828,7 +872,7 @@ impl Connection {
/// lower(v1.block_range) => v2.vid > v1.vid` and we can therefore stop
/// the copying of each table as soon as we hit `max_vid = max { v.vid |
/// lower(v.block_range) <= target_block.number }`.
pub fn copy_data(&mut self) -> Result<Status, StoreError> {
pub fn copy_data(&mut self, index_list: IndexList) -> Result<Status, StoreError> {
// We require sole access to the destination site, and that we get a
// consistent view of what has been copied so far. In general, that
// is always true. It can happen though that this function runs when
Expand All @@ -842,7 +886,7 @@ impl Connection {
"Obtaining copy lock (this might take a long time if another process is still copying)"
);
advisory_lock::lock_copying(&mut self.conn, self.dst.site.as_ref())?;
let res = self.copy_data_internal();
let res = self.copy_data_internal(index_list);
advisory_lock::unlock_copying(&mut self.conn, self.dst.site.as_ref())?;
if matches!(res, Ok(Status::Cancelled)) {
warn!(&self.logger, "Copying was cancelled and is incomplete");
Expand Down
33 changes: 27 additions & 6 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use lru_time_cache::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use std::collections::{BTreeMap, HashMap};
use std::convert::Into;
use std::ops::Bound;
use std::ops::Deref;
use std::ops::{Bound, DerefMut};
use std::str::FromStr;
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
use std::time::{Duration, Instant};
Expand All @@ -52,7 +52,7 @@ use crate::deployment::{self, OnSync};
use crate::detail::ErrorDetail;
use crate::dynds::DataSourcesTable;
use crate::primary::DeploymentId;
use crate::relational::index::{CreateIndex, Method};
use crate::relational::index::{CreateIndex, IndexList, Method};
use crate::relational::{Layout, LayoutCache, SqlName, Table};
use crate::relational_queries::FromEntityData;
use crate::{advisory_lock, catalog, retry};
Expand Down Expand Up @@ -172,6 +172,11 @@ impl DeploymentStore {
DeploymentStore(Arc::new(store))
}

// Parameter index_def is used to copy over the definition of the indexes from the source subgraph
// to the destination one. This happens when it is set to Some. In this case also the BTree attribude
// indexes are created later on, when the subgraph has synced. In case this parameter is None, all
// indexes are created with the default creation strategy for a new subgraph, and also from the very
// start.
pub(crate) fn create_deployment(
&self,
schema: &InputSchema,
Expand All @@ -180,6 +185,7 @@ impl DeploymentStore {
graft_base: Option<Arc<Layout>>,
replace: bool,
on_sync: OnSync,
index_def: Option<IndexList>,
) -> Result<(), StoreError> {
let mut conn = self.get_conn()?;
conn.transaction(|conn| -> Result<_, StoreError> {
Expand Down Expand Up @@ -212,6 +218,7 @@ impl DeploymentStore {
site.clone(),
schema,
entities_with_causality_region.into_iter().collect(),
index_def,
)?;
// See if we are grafting and check that the graft is permissible
if let Some(base) = graft_base {
Expand Down Expand Up @@ -746,6 +753,13 @@ impl DeploymentStore {
.await
}

pub(crate) fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
let store = self.clone();
let mut binding = self.get_conn()?;
let conn = binding.deref_mut();
IndexList::load(conn, site, store)
}

/// Drops an index for a given deployment, concurrently.
pub(crate) async fn drop_index(
&self,
Expand Down Expand Up @@ -1483,12 +1497,12 @@ impl DeploymentStore {
&self,
logger: &Logger,
site: Arc<Site>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity)>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity, IndexList)>,
) -> Result<(), StoreError> {
let dst = self.find_layout(site.cheap_clone())?;

// If `graft_src` is `Some`, then there is a pending graft.
if let Some((src, block, src_deployment)) = graft_src {
if let Some((src, block, src_deployment, index_list)) = graft_src {
info!(
logger,
"Initializing graft by copying data from {} to {}",
Expand Down Expand Up @@ -1516,7 +1530,7 @@ impl DeploymentStore {
src_manifest_idx_and_name,
dst_manifest_idx_and_name,
)?;
let status = copy_conn.copy_data()?;
let status = copy_conn.copy_data(index_list)?;
if status == crate::copy::Status::Cancelled {
return Err(StoreError::Canceled);
}
Expand Down Expand Up @@ -1588,10 +1602,17 @@ impl DeploymentStore {
Ok(())
})?;
}

let mut conn = self.get_conn()?;
if ENV_VARS.postpone_attribute_index_creation {
// check if all indexes are valid and recreate them if they aren't
self.load_indexes(site.clone())?
.recreate_invalid_indexes(&mut conn, &dst)?;
}

// Make sure the block pointer is set. This is important for newly
// deployed subgraphs so that we respect the 'startBlock' setting
// the first time the subgraph is started
let mut conn = self.get_conn()?;
conn.transaction(|conn| crate::deployment::initialize_block_ptr(conn, &dst.site))?;
Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use graph::schema::{
EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema,
};
use graph::slog::warn;
use index::IndexList;
use inflector::Inflector;
use itertools::Itertools;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -384,12 +385,13 @@ impl Layout {
site: Arc<Site>,
schema: &InputSchema,
entities_with_causality_region: BTreeSet<EntityType>,
index_def: Option<IndexList>,
) -> Result<Layout, StoreError> {
let catalog =
Catalog::for_creation(conn, site.cheap_clone(), entities_with_causality_region)?;
let layout = Self::new(site, schema, catalog)?;
let sql = layout
.as_ddl()
.as_ddl(index_def)
.map_err(|_| StoreError::Unknown(anyhow!("failed to generate DDL for layout")))?;
conn.batch_execute(&sql)?;
Ok(layout)
Expand Down Expand Up @@ -1436,6 +1438,7 @@ pub struct Table {
/// aggregations, this is the object type for a specific interval, like
/// `Stats_hour`, not the overall aggregation type `Stats`.
pub object: EntityType,

/// The name of the database table for this type ('thing'), snakecased
/// version of `object`
pub name: SqlName,
Expand Down
Loading

0 comments on commit d61f937

Please sign in to comment.