Skip to content

Commit

Permalink
store: Use VidRange for pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Feb 11, 2025
1 parent aa1b9fd commit a20b488
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 55 deletions.
65 changes: 13 additions & 52 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,14 @@ use itertools::Itertools;
use crate::{
catalog, deployment,
relational::{Table, VID_COLUMN},
vid_batcher::AdaptiveBatchSize,
vid_batcher::{AdaptiveBatchSize, VidRange},
};

use super::{
index::{load_indexes_from_table, IndexList},
Catalog, Layout, Namespace,
};

// Additions to `Table` that are useful for pruning
impl Table {
/// Return the first and last vid of any entity that is visible in the
/// block range from `first_block` (inclusive) to `last_block`
/// (exclusive)
fn vid_range(
&self,
conn: &mut PgConnection,
first_block: BlockNumber,
last_block: BlockNumber,
) -> Result<(i64, i64), StoreError> {
#[derive(QueryableByName)]
struct VidRange {
#[diesel(sql_type = BigInt)]
min_vid: i64,
#[diesel(sql_type = BigInt)]
max_vid: i64,
}

// Determine the last vid that we need to copy
let VidRange { min_vid, max_vid } = sql_query(format!(
"/* controller=prune,first={first_block},last={last_block} */ \
select coalesce(min(vid), 0) as min_vid, \
coalesce(max(vid), -1) as max_vid from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2)",
src = self.qualified_name,
))
.bind::<Integer, _>(first_block)
.bind::<Integer, _>(last_block)
.get_result::<VidRange>(conn)?;
Ok((min_vid, max_vid))
}
}

/// Utility to copy relevant data out of a source table and into a new
/// destination table and replace the source table with the destination
/// table
Expand Down Expand Up @@ -131,12 +94,12 @@ impl TablePair {
let column_list = self.column_list();

// Determine the last vid that we need to copy
let (min_vid, max_vid) = self.src.vid_range(conn, earliest_block, final_block)?;
let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy
let mut next_vid = min_vid;
while next_vid <= max_vid {
let mut next_vid = range.min;
while next_vid <= range.max {
let start = Instant::now();
let rows = conn.transaction(|conn| {
// Page through all rows in `src` in batches of `batch_size`
Expand Down Expand Up @@ -176,7 +139,7 @@ impl TablePair {
self.src.name.as_str(),
rows,
PrunePhase::CopyFinal,
next_vid > max_vid,
next_vid > range.max,
);
}
Ok(())
Expand All @@ -194,14 +157,12 @@ impl TablePair {
let column_list = self.column_list();

// Determine the last vid that we need to copy
let (min_vid, max_vid) = self
.src
.vid_range(conn, final_block + 1, BLOCK_NUMBER_MAX)?;
let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy
let mut next_vid = min_vid;
while next_vid <= max_vid {
let mut next_vid = range.min;
while next_vid <= range.max {
let start = Instant::now();
let rows = conn.transaction(|conn| {
// Page through all the rows in `src` in batches of
Expand Down Expand Up @@ -236,7 +197,7 @@ impl TablePair {
self.src.name.as_str(),
rows,
PrunePhase::CopyNonfinal,
next_vid > max_vid,
next_vid > range.max,
);
}
Ok(())
Expand Down Expand Up @@ -468,10 +429,10 @@ impl Layout {
PruningStrategy::Delete => {
// Delete all entity versions whose range was closed
// before `req.earliest_block`
let (min_vid, max_vid) = table.vid_range(conn, 0, req.earliest_block)?;
let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?;
let mut batch_size = AdaptiveBatchSize::new(&table);
let mut next_vid = min_vid;
while next_vid <= max_vid {
let mut next_vid = range.min;
while next_vid <= range.max {
let start = Instant::now();
let rows = sql_query(format!(
"/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \
Expand All @@ -494,7 +455,7 @@ impl Layout {
table.name.as_str(),
rows as usize,
PrunePhase::Delete,
next_vid > max_vid,
next_vid > range.max,
);
}
}
Expand Down
31 changes: 28 additions & 3 deletions store/postgres/src/vid_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use diesel::{
};
use graph::{
env::ENV_VARS,
prelude::{BlockPtr, StoreError},
prelude::{BlockNumber, BlockPtr, StoreError},
util::ogive::Ogive,
};

Expand Down Expand Up @@ -251,9 +251,9 @@ impl VidBatcher {
#[derive(Copy, Clone, QueryableByName)]
pub(crate) struct VidRange {
#[diesel(sql_type = BigInt, column_name = "min_vid")]
min: i64,
pub min: i64,
#[diesel(sql_type = BigInt, column_name = "max_vid")]
max: i64,
pub max: i64,
}

const EMPTY_VID_RANGE: VidRange = VidRange { max: -1, min: 0 };
Expand Down Expand Up @@ -300,6 +300,31 @@ impl VidRange {
.unwrap_or(EMPTY_VID_RANGE);
Ok(vid_range)
}

/// Return the first and last vid of any entity that is visible in the
/// block range from `first_block` (inclusive) to `last_block`
/// (exclusive)
pub fn for_prune(
conn: &mut PgConnection,
src: &Table,
first_block: BlockNumber,
last_block: BlockNumber,
) -> Result<Self, StoreError> {
sql_query(format!(
"/* controller=prune,first={first_block},last={last_block} */ \
select coalesce(min(vid), 0) as min_vid, \
coalesce(max(vid), -1) as max_vid from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2)",
src = src.qualified_name,
))
.bind::<Integer, _>(first_block)
.bind::<Integer, _>(last_block)
.get_result::<VidRange>(conn)
.map_err(StoreError::from)
}
}

#[cfg(test)]
Expand Down

0 comments on commit a20b488

Please sign in to comment.