Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed May 22, 2024
1 parent 1150d84 commit 2204c6e
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 29 deletions.
1 change: 1 addition & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ pub(crate) fn indexes_for_table(

Ok(results.into_iter().map(|i| i.def).collect())
}

pub(crate) fn drop_index(
conn: &mut PgConnection,
schema_name: &str,
Expand Down
67 changes: 56 additions & 11 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use lru_time_cache::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use std::collections::{BTreeMap, HashMap};
use std::convert::Into;
use std::ops::Bound;
use std::fmt::Write;
use std::ops::Deref;
use std::ops::{Bound, DerefMut};
use std::str::FromStr;
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -121,14 +122,6 @@ pub struct StoreInner {
#[derive(Clone, CheapClone)]
pub struct DeploymentStore(Arc<StoreInner>);

#[derive(QueryableByName, Debug)]
struct IndexInfo {
#[diesel(sql_type = Bool)]
isvalid: bool,
#[diesel(sql_type = Bool)]
isready: bool,
}

impl Deref for DeploymentStore {
type Target = StoreInner;
fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -190,6 +183,7 @@ impl DeploymentStore {
replace: bool,
on_sync: OnSync,
is_copy_op: bool,
index_def: Option<IndexList>,
) -> Result<(), StoreError> {
let mut conn = self.get_conn()?;
conn.transaction(|conn| -> Result<_, StoreError> {
Expand Down Expand Up @@ -223,6 +217,7 @@ impl DeploymentStore {
schema,
entities_with_causality_region.into_iter().collect(),
is_copy_op,
index_def,
)?;
// See if we are grafting and check that the graft is permissible
if let Some(base) = graft_base {
Expand Down Expand Up @@ -746,6 +741,25 @@ impl DeploymentStore {
.await
}

pub(crate) fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
let store = self.clone();
let mut binding = self.get_conn()?;
let conn = binding.deref_mut();
let mut list = IndexList {
indexes: HashMap::new(),
};
let schema_name = site.namespace.clone();
let layout = store.layout(conn, site)?;
for (_, table) in &layout.tables {
let table_name = table.name.as_str();
let indexes = catalog::indexes_for_table(conn, schema_name.as_str(), table_name)
.map_err(StoreError::from)?;
let collect: Vec<CreateIndex> = indexes.into_iter().map(CreateIndex::parse).collect();
list.indexes.insert(table_name.to_string(), collect);
}
Ok(list)
}

/// Drops an index for a given deployment, concurrently.
pub(crate) async fn drop_index(
&self,
Expand Down Expand Up @@ -1483,12 +1497,20 @@ impl DeploymentStore {
&self,
logger: &Logger,
site: Arc<Site>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity)>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity, IndexList)>,
) -> Result<(), StoreError> {
#[derive(QueryableByName, Debug)]
struct IndexInfo {
#[diesel(sql_type = Bool)]
isvalid: bool,
#[diesel(sql_type = Bool)]
isready: bool,
}

let dst = self.find_layout(site.cheap_clone())?;

// If `graft_src` is `Some`, then there is a pending graft.
if let Some((src, block, src_deployment)) = graft_src {
if let Some((src, block, src_deployment, _)) = graft_src {
info!(
logger,
"Initializing graft by copying data from {} to {}",
Expand Down Expand Up @@ -2057,3 +2079,26 @@ impl PruneReporter for OngoingPruneReporter {
)
}
}

#[derive(Debug)]
pub struct IndexList {
indexes: HashMap<String, Vec<CreateIndex>>,
}

impl IndexList {
pub fn can_postpone(ci: &CreateIndex) -> bool {
let res = ci.is_attribute_index() && ci.is_default_index();
res
}
pub fn indexes_for_table(&self, namespace: String, table_name: String, out: &mut String) {
if let Some(vec) = self.indexes.get(&table_name) {
for ci in vec {
if !ci.is_constraint() && !ci.is_pkey() {
if let Ok(sql) = ci.with_nsp(namespace.clone()).to_sql(false, false) {
writeln!(out, "{};", sql).expect("SQL should be properly formated")
}
}
}
}
}
}
12 changes: 11 additions & 1 deletion store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use crate::deployment_store::IndexList;
use crate::relational_queries::{
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
FindPossibleDeletionsQuery, ReturnedEntityData,
Expand Down Expand Up @@ -362,6 +363,7 @@ impl Layout {
let table_name = SqlName::verbatim(POI_TABLE.to_owned());
Table {
object: poi_type.to_owned(),
name_space: SqlName(catalog.site.namespace.to_string()),
qualified_name: SqlName::qualified_name(&catalog.site.namespace, &table_name),
name: table_name,
columns,
Expand All @@ -385,12 +387,13 @@ impl Layout {
schema: &InputSchema,
entities_with_causality_region: BTreeSet<EntityType>,
is_copy_op: bool,
index_def: Option<IndexList>,
) -> Result<Layout, StoreError> {
let catalog =
Catalog::for_creation(conn, site.cheap_clone(), entities_with_causality_region)?;
let layout = Self::new(site, schema, catalog)?;
let sql = layout
.as_ddl(is_copy_op)
.as_ddl(is_copy_op, index_def)
.map_err(|_| StoreError::Unknown(anyhow!("failed to generate DDL for layout")))?;
conn.batch_execute(&sql)?;
Ok(layout)
Expand Down Expand Up @@ -1437,6 +1440,11 @@ pub struct Table {
/// aggregations, this is the object type for a specific interval, like
/// `Stats_hour`, not the overall aggregation type `Stats`.
pub object: EntityType,

/// The name of the database table for this type ('thing'), snakecased
/// version of `object`
pub name_space: SqlName,

/// The name of the database table for this type ('thing'), snakecased
/// version of `object`
pub name: SqlName,
Expand Down Expand Up @@ -1494,6 +1502,7 @@ impl Table {

let table = Table {
object: defn.cheap_clone(),
name_space: SqlName::verbatim(catalog.site.namespace.to_string()),
name: table_name,
qualified_name,
// Default `is_account_like` to `false`; the caller should call
Expand All @@ -1513,6 +1522,7 @@ impl Table {
pub fn new_like(&self, namespace: &Namespace, name: &SqlName) -> Arc<Table> {
let other = Table {
object: self.object.clone(),
name_space: SqlName(namespace.to_string()),
name: name.clone(),
qualified_name: SqlName::qualified_name(namespace, name),
columns: self.columns.clone(),
Expand Down
33 changes: 27 additions & 6 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use graph::{
schema::InputSchema,
};

use crate::block_range::CAUSALITY_REGION_COLUMN;
use crate::relational::{
ColumnType, BLOCK_COLUMN, BLOCK_RANGE_COLUMN, BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE,
VID_COLUMN,
};
use crate::{block_range::CAUSALITY_REGION_COLUMN, deployment_store::IndexList};

use super::{Catalog, Column, Layout, SqlName, Table};

Expand All @@ -29,7 +29,11 @@ impl Layout {
///
/// See the unit tests at the end of this file for the actual DDL that
/// gets generated
pub fn as_ddl(&self, is_copy_op: bool) -> Result<String, fmt::Error> {
pub fn as_ddl(
&self,
is_copy_op: bool,
index_def: Option<IndexList>,
) -> Result<String, fmt::Error> {
let mut out = String::new();

// Output enums first so table definitions can reference them
Expand All @@ -41,7 +45,13 @@ impl Layout {
tables.sort_by_key(|table| table.position);
// Output 'create table' statements for all tables
for table in tables {
table.as_ddl(&self.input_schema, &self.catalog, is_copy_op, &mut out)?;
table.as_ddl(
&self.input_schema,
&self.catalog,
is_copy_op,
index_def.as_ref(),
&mut out,
)?;
}

Ok(out)
Expand Down Expand Up @@ -387,12 +397,23 @@ impl Table {
schema: &InputSchema,
catalog: &Catalog,
is_copy_op: bool,
index_def: Option<&IndexList>,
out: &mut String,
) -> fmt::Result {
self.create_table(out)?;
self.create_time_travel_indexes(catalog, out)?;
self.create_attribute_indexes(is_copy_op, out)?;
self.create_aggregate_indexes(schema, out)
if is_copy_op {
if let Some(ind_def) = index_def {
// TODO: only create the initial indexes, not the attribute ones that can be postponed
ind_def.indexes_for_table(self.name_space.to_string(), self.name.to_string(), out);
Ok(())
} else {
panic!("Missinge index definition for copy operation");
}
} else {
self.create_time_travel_indexes(catalog, out)?;
self.create_attribute_indexes(is_copy_op, out)?;
self.create_aggregate_indexes(schema, out)
}
}

pub fn exclusion_ddl(&self, out: &mut String) -> fmt::Result {
Expand Down
63 changes: 58 additions & 5 deletions store/postgres/src/relational/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::relational::{BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE};

use super::VID_COLUMN;

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub enum Method {
Brin,
BTree,
Expand Down Expand Up @@ -196,7 +196,7 @@ impl Expr {

/// The condition for a partial index, i.e., the statement after `where ..`
/// in a `create index` statement
#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub enum Cond {
/// The expression `coalesce(upper(block_range), 2147483647) > $number`
Partial(BlockNumber),
Expand Down Expand Up @@ -248,7 +248,7 @@ impl Cond {
}
}

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub enum CreateIndex {
/// The literal index definition passed to `parse`. This is used when we
/// can't parse a `create index` statement, e.g. because it uses
Expand Down Expand Up @@ -354,8 +354,8 @@ impl CreateIndex {

fn new_parsed(defn: &str) -> Option<CreateIndex> {
let rx = Regex::new(
"create (?P<unique>unique )?index (?P<name>[a-z0-9$_]+) \
on (?P<nsp>sgd[0-9]+)\\.(?P<table>[a-z$_]+) \
"create (?P<unique>unique )?index (?P<name>\"?[a-z0-9$_]+\"?) \
on (?P<nsp>sgd[0-9]+)\\.(?P<table>\"?[a-z0-9$_]+\"?) \
using (?P<method>[a-z]+) \\((?P<columns>.*?)\\)\
( where \\((?P<cond>.*)\\))?\
( with \\((?P<with>.*)\\))?$",
Expand Down Expand Up @@ -411,6 +411,32 @@ impl CreateIndex {
}
}

pub fn with_nsp(&self, nsp2: String) -> Self {
let s = self.clone();
match s {
CreateIndex::Unknown { defn } => CreateIndex::Unknown { defn },
CreateIndex::Parsed {
unique,
name,
nsp: _,
table,
method,
columns,
cond,
with,
} => CreateIndex::Parsed {
unique,
name,
nsp: nsp2,
table,
method,
columns,
cond,
with,
},
}
}

pub fn is_attribute_index(&self) -> bool {
use CreateIndex::*;
match self {
Expand Down Expand Up @@ -524,6 +550,29 @@ impl CreateIndex {
}
}

pub fn is_pkey(&self) -> bool {
let suffix = "_pkey";
match self {
CreateIndex::Unknown { defn: _ } => false,
CreateIndex::Parsed {
unique,
name,
columns,
..
} => {
*unique && has_suffix(name, suffix) && columns.len() == 1 && columns[0] == Expr::Vid
}
}
}
pub fn is_constraint(&self) -> bool {
let suffix = format!("{}_excl", BLOCK_RANGE_COLUMN);
println!("CI: {:?}", self);
match self {
CreateIndex::Unknown { defn: _ } => false,
CreateIndex::Parsed { name, .. } => has_suffix(name, &suffix),
}
}

/// Generate a SQL statement that creates this index. If `concurrent` is
/// `true`, make it a concurrent index creation. If `if_not_exists` is
/// `true` add a `if not exists` clause to the index creation.
Expand Down Expand Up @@ -558,6 +607,10 @@ impl CreateIndex {
}
}

fn has_suffix(s: &str, suffix: &str) -> bool {
s.ends_with(suffix) || s.starts_with("\"") && s.ends_with(format!("{}\"", suffix).as_str())
}

#[test]
fn parse() {
use Method::*;
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl TablePair {
} else {
// In case of pruning we don't do delayed creation of indexes,
// as the asumption is that there is not that much data inserted.
dst.as_ddl(schema, catalog, false, &mut query)?;
dst.as_ddl(schema, catalog, false, None, &mut query)?;
}
conn.batch_execute(&query)?;

Expand Down
Loading

0 comments on commit 2204c6e

Please sign in to comment.