Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed May 17, 2024
1 parent 6287b01 commit 4450622
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 9 deletions.
1 change: 1 addition & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ pub(crate) fn indexes_for_table(

Ok(results.into_iter().map(|i| i.def).collect())
}

pub(crate) fn drop_index(
conn: &mut PgConnection,
schema_name: &str,
Expand Down
30 changes: 27 additions & 3 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use lru_time_cache::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use std::collections::{BTreeMap, HashMap};
use std::convert::Into;
use std::ops::Bound;
use std::ops::Deref;
use std::ops::{Bound, DerefMut};
use std::str::FromStr;
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -121,6 +121,11 @@ pub struct StoreInner {
#[derive(Clone, CheapClone)]
pub struct DeploymentStore(Arc<StoreInner>);

#[derive(Debug)]
pub struct IndexList {
indexes: HashMap<String, Vec<CreateIndex>>,
}

#[derive(QueryableByName, Debug)]
struct IndexInfo {
#[diesel(sql_type = Bool)]
Expand Down Expand Up @@ -746,6 +751,25 @@ impl DeploymentStore {
.await
}

pub(crate) fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
let store = self.clone();
let mut binding = self.get_conn()?;
let conn = binding.deref_mut();
let mut list = IndexList {
indexes: HashMap::new(),
};
let schema_name = site.namespace.clone();
let layout = store.layout(conn, site)?;
for (_, table) in &layout.tables {
let table_name = table.name.as_str();
let indexes = catalog::indexes_for_table(conn, schema_name.as_str(), table_name)
.map_err(StoreError::from)?;
let collect: Vec<CreateIndex> = indexes.into_iter().map(CreateIndex::parse).collect();
list.indexes.insert(table_name.to_string(), collect);
}
Ok(list)
}

/// Drops an index for a given deployment, concurrently.
pub(crate) async fn drop_index(
&self,
Expand Down Expand Up @@ -1483,12 +1507,12 @@ impl DeploymentStore {
&self,
logger: &Logger,
site: Arc<Site>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity)>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity, IndexList)>,
) -> Result<(), StoreError> {
let dst = self.find_layout(site.cheap_clone())?;

// If `graft_src` is `Some`, then there is a pending graft.
if let Some((src, block, src_deployment)) = graft_src {
if let Some((src, block, src_deployment, _)) = graft_src {
info!(
logger,
"Initializing graft by copying data from {} to {}",
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/relational/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ impl CreateIndex {

fn new_parsed(defn: &str) -> Option<CreateIndex> {
let rx = Regex::new(
"create (?P<unique>unique )?index (?P<name>[a-z0-9$_]+) \
on (?P<nsp>sgd[0-9]+)\\.(?P<table>[a-z$_]+) \
"create (?P<unique>unique )?index (?P<name>\"?[a-z0-9$_]+\"?) \
on (?P<nsp>sgd[0-9]+)\\.(?P<table>\"?[a-z0-9$_]+\"?) \
using (?P<method>[a-z]+) \\((?P<columns>.*?)\\)\
( where \\((?P<cond>.*)\\))?\
( with \\((?P<with>.*)\\))?$",
Expand Down
9 changes: 7 additions & 2 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use graph::{
use crate::{
connection_pool::ConnectionPool,
deployment::{OnSync, SubgraphHealth},
primary,
primary::{DeploymentId, Mirror as PrimaryMirror, Site},
deployment_store::IndexList,
primary::{self, DeploymentId, Mirror as PrimaryMirror, Site},
relational::{index::Method, Layout},
writable::WritableStore,
NotificationSender,
Expand Down Expand Up @@ -1212,6 +1212,11 @@ impl SubgraphStoreInner {
let src_store = self.for_site(&site)?;
src_store.load_deployment(site)
}

pub fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
let src_store = self.for_site(&site)?;
src_store.load_indexes(site)
}
}

const STATE_ENS_NOT_CHECKED: u8 = 0;
Expand Down
9 changes: 7 additions & 2 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use graph::{
};
use store::StoredDynamicDataSource;

use crate::deployment_store::DeploymentStore;
use crate::deployment_store::{DeploymentStore, IndexList};
use crate::primary::DeploymentId;
use crate::retry;
use crate::{primary, primary::Site, relational::Layout, SubgraphStore};
Expand Down Expand Up @@ -66,6 +66,10 @@ impl WritableSubgraphStore {
fn find_site(&self, id: DeploymentId) -> Result<Arc<Site>, StoreError> {
self.0.find_site(id)
}

fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
self.0.load_indexes(site)
}
}

#[derive(Copy, Clone)]
Expand Down Expand Up @@ -222,7 +226,8 @@ impl SyncStore {
Some((base_id, base_ptr)) => {
let src = self.store.layout(&base_id)?;
let deployment_entity = self.store.load_deployment(src.site.clone())?;
Some((src, base_ptr, deployment_entity))
let indexes = self.store.load_indexes(src.site.clone())?;
Some((src, base_ptr, deployment_entity, indexes))
}
None => None,
};
Expand Down

0 comments on commit 4450622

Please sign in to comment.