Skip to content

Commit

Permalink
store: Use VidBatcher to batch pruning queries
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Feb 11, 2025
1 parent c4844ce commit 5f648ff
Showing 1 changed file with 61 additions and 75 deletions.
136 changes: 61 additions & 75 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Write, sync::Arc, time::Instant};
use std::{fmt::Write, sync::Arc};

use diesel::{
connection::SimpleConnection,
Expand All @@ -20,7 +20,7 @@ use itertools::Itertools;
use crate::{
catalog, deployment,
relational::{Table, VID_COLUMN},
vid_batcher::{AdaptiveBatchSize, VidRange},
vid_batcher::{VidBatcher, VidRange},
};

use super::{Catalog, Layout, Namespace};
Expand Down Expand Up @@ -86,51 +86,47 @@ impl TablePair {

// Determine the last vid that we need to copy
let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy
let mut next_vid = range.min;
while next_vid <= range.max {
let start = Instant::now();
let rows = conn.transaction(|conn| {
// Page through all rows in `src` in batches of `batch_size`
// and copy the ones that are visible to queries at block
// heights between `earliest_block` and `final_block`, but
// whose block_range does not extend past `final_block`
// since they could still be reverted while we copy.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(format!(
"/* controller=prune,phase=final,start_vid={next_vid},batch_size={batch_size} */ \
let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?;

while !batcher.finished() {
let (_, rows) = batcher.step(|start, end| {
conn.transaction(|conn| {
// Page through all rows in `src` in batches of `batch_size`
// and copy the ones that are visible to queries at block
// heights between `earliest_block` and `final_block`, but
// whose block_range does not extend past `final_block`
// since they could still be reverted while we copy.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(format!(
"/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \
insert into {dst}({column_list}) \
select {column_list} from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]') \
and vid >= $3 and vid < $3 + $4 \
and vid >= $3 and vid <= $4 \
order by vid",
src = self.src.qualified_name,
dst = self.dst.qualified_name,
batch_size = batch_size.size,
batch_size = end - start + 1,
))
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.execute(conn)
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(start)
.bind::<BigInt, _>(end)
.execute(conn)
.map_err(StoreError::from)
})
})?;
cancel.check_cancel()?;

next_vid += batch_size.size;

batch_size.adapt(start.elapsed());

reporter.prune_batch(
self.src.name.as_str(),
rows,
rows.unwrap_or(0),
PrunePhase::CopyFinal,
next_vid > range.max,
batcher.finished(),
);
}
Ok(())
Expand All @@ -149,46 +145,41 @@ impl TablePair {

// Determine the last vid that we need to copy
let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?;
let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy
let mut next_vid = range.min;
while next_vid <= range.max {
let start = Instant::now();
let rows = conn.transaction(|conn| {
while !batcher.finished() {
let (_, rows) = batcher.step(|start, end| {
// Page through all the rows in `src` in batches of
// `batch_size` that are visible to queries at block heights
// starting right after `final_block`.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(format!(
"/* controller=prune,phase=nonfinal,start_vid={next_vid},batch_size={batch_size} */ \
// starting right after `final_block`. The conditions on
// `block_range` are expressed redundantly to make more
// indexes useable
conn.transaction(|conn| {
sql_query(format!(
"/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \
insert into {dst}({column_list}) \
select {column_list} from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null) \
and vid >= $2 and vid < $2 + $3 \
and vid >= $2 and vid <= $3 \
order by vid",
dst = self.dst.qualified_name,
src = self.src.qualified_name,
batch_size = batch_size.size
))
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.execute(conn)
.map_err(StoreError::from)
dst = self.dst.qualified_name,
src = self.src.qualified_name,
batch_size = end - start + 1,
))
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(start)
.bind::<BigInt, _>(end)
.execute(conn)
.map_err(StoreError::from)
})
})?;

next_vid += batch_size.size;

batch_size.adapt(start.elapsed());

reporter.prune_batch(
self.src.name.as_str(),
rows,
rows.unwrap_or(0),
PrunePhase::CopyNonfinal,
next_vid > range.max,
batcher.finished(),
);
}
Ok(())
Expand Down Expand Up @@ -421,32 +412,27 @@ impl Layout {
// Delete all entity versions whose range was closed
// before `req.earliest_block`
let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?;
let mut batch_size = AdaptiveBatchSize::new(&table);
let mut next_vid = range.min;
while next_vid <= range.max {
let start = Instant::now();
let rows = sql_query(format!(
"/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \
let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?;

while !batcher.finished() {
let (_, rows) = batcher.step(|start, end| {sql_query(format!(
"/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \
delete from {qname} \
where coalesce(upper(block_range), 2147483647) <= $1 \
and vid >= $2 and vid < $2 + $3",
and vid >= $2 and vid <= $3",
qname = table.qualified_name,
batch_size = batch_size.size
batch_size = end - start + 1
))
.bind::<Integer, _>(req.earliest_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.execute(conn)?;

next_vid += batch_size.size;

batch_size.adapt(start.elapsed());
.bind::<BigInt, _>(start)
.bind::<BigInt, _>(end)
.execute(conn).map_err(StoreError::from)})?;

reporter.prune_batch(
table.name.as_str(),
rows as usize,
rows.unwrap_or(0),
PrunePhase::Delete,
next_vid > range.max,
batcher.finished(),
);
}
}
Expand Down

0 comments on commit 5f648ff

Please sign in to comment.