Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphman copy indexing improvements #5425

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub struct EnvVars {
/// Set by the flag `GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES`. On by
/// default.
pub enable_select_by_specific_attributes: bool,
/// Experimental feature.
///
/// Set the flag `GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION`. Off by default.
pub postpone_attribute_index_creation: bool,
/// Verbose logging of mapping inputs.
///
/// Set by the flag `GRAPH_LOG_TRIGGER_DATA`. Off by
Expand Down Expand Up @@ -271,6 +275,8 @@ impl EnvVars {
subgraph_error_retry_ceil: Duration::from_secs(inner.subgraph_error_retry_ceil_in_secs),
subgraph_error_retry_jitter: inner.subgraph_error_retry_jitter,
enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0,
postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0
|| cfg!(debug_assertions),
log_trigger_data: inner.log_trigger_data.0,
explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs),
explorer_lock_threshold: Duration::from_millis(inner.explorer_lock_threshold_in_msec),
Expand Down Expand Up @@ -393,6 +399,8 @@ struct Inner {
subgraph_error_retry_jitter: f64,
#[envconfig(from = "GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES", default = "true")]
enable_select_by_specific_attributes: EnvVarBoolean,
#[envconfig(from = "GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION", default = "false")]
postpone_attribute_index_creation: EnvVarBoolean,
#[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")]
log_trigger_data: EnvVarBoolean,
#[envconfig(from = "GRAPH_EXPLORER_TTL", default = "10")]
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/examples/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn print_delete_all(layout: &Layout) {
}

fn print_ddl(layout: &Layout) {
let ddl = ensure(layout.as_ddl(), "Failed to generate DDL");
let ddl = ensure(layout.as_ddl(None), "Failed to generate DDL");
println!("{}", ddl);
}

Expand Down
1 change: 1 addition & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ pub(crate) fn indexes_for_table(

Ok(results.into_iter().map(|i| i.def).collect())
}

pub(crate) fn drop_index(
conn: &mut PgConnection,
schema_name: &str,
Expand Down
54 changes: 49 additions & 5 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! `graph-node` was restarted while the copy was running.
use std::{
convert::TryFrom,
ops::DerefMut,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -24,8 +25,7 @@ use diesel::{
pg::Pg,
r2d2::{ConnectionManager, PooledConnection},
select,
serialize::Output,
serialize::ToSql,
serialize::{Output, ToSql},
sql_query,
sql_types::{BigInt, Integer},
update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl,
Expand All @@ -36,11 +36,13 @@ use graph::{
prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError, ENV_VARS},
schema::EntityType,
};
use itertools::Itertools;

use crate::{
advisory_lock, catalog,
dynds::DataSourcesTable,
primary::{DeploymentId, Site},
relational::index::IndexList,
};
use crate::{connection_pool::ConnectionPool, relational::Layout};
use crate::{relational::Table, relational_queries as rq};
Expand Down Expand Up @@ -761,7 +763,7 @@ impl Connection {
Ok(())
}

pub fn copy_data_internal(&mut self) -> Result<Status, StoreError> {
pub fn copy_data_internal(&mut self, index_list: IndexList) -> Result<Status, StoreError> {
let src = self.src.clone();
let dst = self.dst.clone();
let target_block = self.target_block.clone();
Expand Down Expand Up @@ -806,6 +808,46 @@ impl Connection {
progress.table_finished(&table.batch);
}

// Create indexes for all the attributes that were postponed at the start of
// the copy/graft operations.
// First recreate the indexes that existed in the original subgraph.
let conn = self.conn.deref_mut();
for table in state.tables.iter() {
let arr = index_list.indexes_for_table(
&self.dst.site.namespace,
&table.batch.src.name.to_string(),
&table.batch.dst,
true,
true,
)?;

for (_, sql) in arr {
let query = sql_query(format!("{};", sql));
query.execute(conn)?;
}
}

// Second create the indexes for the new fields.
// Here we need to skip those created in the first step for the old fields.
for table in state.tables.iter() {
let orig_colums = table
.batch
.src
.columns
.iter()
.map(|c| c.name.to_string())
.collect_vec();
for sql in table
.batch
.dst
.create_postponed_indexes(orig_colums)
.into_iter()
{
let query = sql_query(sql);
query.execute(conn)?;
}
}

self.copy_private_data_sources(&state)?;

self.transaction(|conn| state.finished(conn))?;
Expand All @@ -820,6 +862,8 @@ impl Connection {
/// block is guaranteed to not be subject to chain reorgs. All data up
/// to and including `target_block` will be copied.
///
/// The parameter index_list is a list of indexes that exist on the `src`.
///
/// The copy logic makes heavy use of the fact that the `vid` and
/// `block_range` of entity versions are related since for two entity
/// versions `v1` and `v2` such that `v1.vid <= v2.vid`, we know that
Expand All @@ -828,7 +872,7 @@ impl Connection {
/// lower(v1.block_range) => v2.vid > v1.vid` and we can therefore stop
/// the copying of each table as soon as we hit `max_vid = max { v.vid |
/// lower(v.block_range) <= target_block.number }`.
pub fn copy_data(&mut self) -> Result<Status, StoreError> {
pub fn copy_data(&mut self, index_list: IndexList) -> Result<Status, StoreError> {
// We require sole access to the destination site, and that we get a
// consistent view of what has been copied so far. In general, that
// is always true. It can happen though that this function runs when
Expand All @@ -842,7 +886,7 @@ impl Connection {
"Obtaining copy lock (this might take a long time if another process is still copying)"
);
advisory_lock::lock_copying(&mut self.conn, self.dst.site.as_ref())?;
let res = self.copy_data_internal();
let res = self.copy_data_internal(index_list);
advisory_lock::unlock_copying(&mut self.conn, self.dst.site.as_ref())?;
if matches!(res, Ok(Status::Cancelled)) {
warn!(&self.logger, "Copying was cancelled and is incomplete");
Expand Down
33 changes: 27 additions & 6 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use lru_time_cache::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use std::collections::{BTreeMap, HashMap};
use std::convert::Into;
use std::ops::Bound;
use std::ops::Deref;
use std::ops::{Bound, DerefMut};
use std::str::FromStr;
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
use std::time::{Duration, Instant};
Expand All @@ -52,7 +52,7 @@ use crate::deployment::{self, OnSync};
use crate::detail::ErrorDetail;
use crate::dynds::DataSourcesTable;
use crate::primary::DeploymentId;
use crate::relational::index::{CreateIndex, Method};
use crate::relational::index::{CreateIndex, IndexList, Method};
use crate::relational::{Layout, LayoutCache, SqlName, Table};
use crate::relational_queries::FromEntityData;
use crate::{advisory_lock, catalog, retry};
Expand Down Expand Up @@ -172,6 +172,11 @@ impl DeploymentStore {
DeploymentStore(Arc::new(store))
}

// Parameter index_def is used to copy over the definition of the indexes from the source subgraph
// to the destination one. This happens when it is set to Some. In this case also the BTree attribude
// indexes are created later on, when the subgraph has synced. In case this parameter is None, all
// indexes are created with the default creation strategy for a new subgraph, and also from the very
// start.
pub(crate) fn create_deployment(
&self,
schema: &InputSchema,
Expand All @@ -180,6 +185,7 @@ impl DeploymentStore {
graft_base: Option<Arc<Layout>>,
replace: bool,
on_sync: OnSync,
index_def: Option<IndexList>,
) -> Result<(), StoreError> {
let mut conn = self.get_conn()?;
conn.transaction(|conn| -> Result<_, StoreError> {
Expand Down Expand Up @@ -212,6 +218,7 @@ impl DeploymentStore {
site.clone(),
schema,
entities_with_causality_region.into_iter().collect(),
index_def,
)?;
// See if we are grafting and check that the graft is permissible
if let Some(base) = graft_base {
Expand Down Expand Up @@ -746,6 +753,13 @@ impl DeploymentStore {
.await
}

pub(crate) fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
let store = self.clone();
let mut binding = self.get_conn()?;
let conn = binding.deref_mut();
IndexList::load(conn, site, store)
}

/// Drops an index for a given deployment, concurrently.
pub(crate) async fn drop_index(
&self,
Expand Down Expand Up @@ -1483,12 +1497,12 @@ impl DeploymentStore {
&self,
logger: &Logger,
site: Arc<Site>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity)>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity, IndexList)>,
) -> Result<(), StoreError> {
let dst = self.find_layout(site.cheap_clone())?;

// If `graft_src` is `Some`, then there is a pending graft.
if let Some((src, block, src_deployment)) = graft_src {
if let Some((src, block, src_deployment, index_list)) = graft_src {
info!(
logger,
"Initializing graft by copying data from {} to {}",
Expand Down Expand Up @@ -1516,7 +1530,7 @@ impl DeploymentStore {
src_manifest_idx_and_name,
dst_manifest_idx_and_name,
)?;
let status = copy_conn.copy_data()?;
let status = copy_conn.copy_data(index_list)?;
if status == crate::copy::Status::Cancelled {
return Err(StoreError::Canceled);
}
Expand Down Expand Up @@ -1588,10 +1602,17 @@ impl DeploymentStore {
Ok(())
})?;
}

let mut conn = self.get_conn()?;
if ENV_VARS.postpone_attribute_index_creation {
// check if all indexes are valid and recreate them if they aren't
self.load_indexes(site.clone())?
.recreate_invalid_indexes(&mut conn, &dst)?;
}

// Make sure the block pointer is set. This is important for newly
// deployed subgraphs so that we respect the 'startBlock' setting
// the first time the subgraph is started
let mut conn = self.get_conn()?;
conn.transaction(|conn| crate::deployment::initialize_block_ptr(conn, &dst.site))?;
Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use graph::schema::{
EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema,
};
use graph::slog::warn;
use index::IndexList;
use inflector::Inflector;
use itertools::Itertools;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -384,12 +385,13 @@ impl Layout {
site: Arc<Site>,
schema: &InputSchema,
entities_with_causality_region: BTreeSet<EntityType>,
index_def: Option<IndexList>,
) -> Result<Layout, StoreError> {
let catalog =
Catalog::for_creation(conn, site.cheap_clone(), entities_with_causality_region)?;
let layout = Self::new(site, schema, catalog)?;
let sql = layout
.as_ddl()
.as_ddl(index_def)
.map_err(|_| StoreError::Unknown(anyhow!("failed to generate DDL for layout")))?;
conn.batch_execute(&sql)?;
Ok(layout)
Expand Down Expand Up @@ -1436,6 +1438,7 @@ pub struct Table {
/// aggregations, this is the object type for a specific interval, like
/// `Stats_hour`, not the overall aggregation type `Stats`.
pub object: EntityType,

/// The name of the database table for this type ('thing'), snakecased
/// version of `object`
pub name: SqlName,
Expand Down
Loading
Loading