Skip to content

Commit

Permalink
small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Jun 13, 2024
1 parent ca9fec8 commit 5a4acef
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 62 deletions.
62 changes: 3 additions & 59 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use detail::DeploymentDetail;
use diesel::connection::SimpleConnection;
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::sql_types::{Bool, Text};
use diesel::{prelude::*, sql_query};
use graph::anyhow::Context;
use graph::blockchain::block_stream::FirehoseCursor;
Expand Down Expand Up @@ -1484,14 +1483,6 @@ impl DeploymentStore {
site: Arc<Site>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity, IndexList)>,
) -> Result<(), StoreError> {
#[derive(QueryableByName, Debug)]
struct IndexInfo {
#[diesel(sql_type = Bool)]
isvalid: bool,
#[diesel(sql_type = Bool)]
isready: bool,
}

let dst = self.find_layout(site.cheap_clone())?;

// If `graft_src` is `Some`, then there is a pending graft.
Expand Down Expand Up @@ -1596,58 +1587,11 @@ impl DeploymentStore {
})?;
}

// check if all indexes are valid and recreate them if they aren't
let mut conn = self.get_conn()?;

if ENV_VARS.postpone_attribute_index_creation {
let index_list = self.load_indexes(site.clone())?;
for table in dst.tables.values() {
for (ind_name, create_query) in index_list.indexes_for_table(
&site.namespace,
&table.name.to_string(),
table,
true,
true,
) {
if let Some(index_name) = ind_name {
let table_name = table.name.clone();
let query = r#"
SELECT x.indisvalid AS isvalid,
x.indisready AS isready
FROM pg_index x
JOIN pg_class c ON c.oid = x.indrelid
JOIN pg_class i ON i.oid = x.indexrelid
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE (c.relkind = ANY (ARRAY ['r'::"char", 'm'::"char", 'p'::"char"]))
AND (i.relkind = ANY (ARRAY ['i'::"char", 'I'::"char"]))
AND (n.nspname = $1)
AND (c.relname = $2)
AND (i.relname = $3);"#;
let ii_vec = sql_query(query)
.bind::<Text, _>(site.namespace.to_string())
.bind::<Text, _>(table_name)
.bind::<Text, _>(index_name.clone())
.get_results::<IndexInfo>(&mut conn)?
.into_iter()
.map(|ii| ii.into())
.collect::<Vec<IndexInfo>>();
assert!(ii_vec.len() <= 1);
// Check if the index is valid. If either isvalid or isready flag of pg_index table
// isn't true, drop it and recreate it.
if ii_vec.len() == 0 || !ii_vec[0].isvalid || !ii_vec[0].isready {
if ii_vec.len() > 0 {
let drop_query = sql_query(format!(
"DROP INDEX {}.{};",
site.namespace.to_string(),
index_name
));
conn.transaction(|conn| drop_query.execute(conn))?;
}
sql_query(create_query).execute(&mut conn)?;
}
}
}
}
// check if all indexes are valid and recreate them if they aren't
self.load_indexes(site.clone())?
.recreate_invalid_indexes(&mut conn, &dst)?;
}

// Make sure the block pointer is set. This is important for newly
Expand Down
65 changes: 62 additions & 3 deletions store/postgres/src/relational/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::collections::HashMap;
use std::fmt::{Display, Write};
use std::sync::Arc;

use diesel::PgConnection;
use diesel::sql_types::{Bool, Text};
use diesel::{sql_query, Connection, PgConnection, RunQueryDsl};
use graph::components::store::StoreError;
use graph::itertools::Itertools;
use graph::prelude::{
Expand All @@ -16,9 +17,10 @@ use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
use crate::catalog;
use crate::command_support::catalog::Site;
use crate::deployment_store::DeploymentStore;
use crate::primary::Namespace;
use crate::relational::{BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE};

use super::{Table, VID_COLUMN};
use super::{Layout, Table, VID_COLUMN};

#[derive(Clone, Debug, PartialEq)]
pub enum Method {
Expand Down Expand Up @@ -753,7 +755,7 @@ impl IndexList {

pub fn indexes_for_table(
&self,
namespace: &crate::primary::Namespace,
namespace: &Namespace,
table_name: &String,
dest_table: &Table,
postponed: bool,
Expand All @@ -778,6 +780,63 @@ impl IndexList {
}
arr
}
pub fn recreate_invalid_indexes(
&self,
conn: &mut PgConnection,
layout: &Layout,
) -> Result<(), StoreError> {
#[derive(QueryableByName, Debug)]
struct IndexInfo {
#[diesel(sql_type = Bool)]
isvalid: bool,
#[diesel(sql_type = Bool)]
isready: bool,
}

let namespace = &layout.catalog.site.namespace;
for table in layout.tables.values() {
for (ind_name, create_query) in
self.indexes_for_table(namespace, &table.name.to_string(), table, true, true)
{
if let Some(index_name) = ind_name {
let table_name = table.name.clone();
let query = r#"
SELECT x.indisvalid AS isvalid,
x.indisready AS isready
FROM pg_index x
JOIN pg_class c ON c.oid = x.indrelid
JOIN pg_class i ON i.oid = x.indexrelid
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE (c.relkind = ANY (ARRAY ['r'::"char", 'm'::"char", 'p'::"char"]))
AND (i.relkind = ANY (ARRAY ['i'::"char", 'I'::"char"]))
AND (n.nspname = $1)
AND (c.relname = $2)
AND (i.relname = $3);"#;
let ii_vec = sql_query(query)
.bind::<Text, _>(namespace.to_string())
.bind::<Text, _>(table_name)
.bind::<Text, _>(index_name.clone())
.get_results::<IndexInfo>(conn)?
.into_iter()
.map(|ii| ii.into())
.collect::<Vec<IndexInfo>>();
assert!(ii_vec.len() <= 1);
if ii_vec.len() == 0 || !ii_vec[0].isvalid || !ii_vec[0].isready {
if ii_vec.len() > 0 {
let drop_query = sql_query(format!(
"DROP INDEX {}.{};",
namespace.to_string(),
index_name
));
conn.transaction(|conn| drop_query.execute(conn))?;
}
sql_query(create_query).execute(conn)?;
}
}
}
}
Ok(())
}
}

#[test]
Expand Down

0 comments on commit 5a4acef

Please sign in to comment.