diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 742d994d2df..0e82c1bfcc7 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -1,4 +1,4 @@ -use std::{fmt::Write, sync::Arc, time::Instant}; +use std::{fmt::Write, sync::Arc}; use diesel::{ connection::SimpleConnection, @@ -20,7 +20,7 @@ use itertools::Itertools; use crate::{ catalog, deployment, relational::{Table, VID_COLUMN}, - vid_batcher::{AdaptiveBatchSize, VidRange}, + vid_batcher::{VidBatcher, VidRange}, }; use super::{Catalog, Layout, Namespace}; @@ -86,51 +86,47 @@ impl TablePair { // Determine the last vid that we need to copy let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?; - - let mut batch_size = AdaptiveBatchSize::new(&self.src); - // The first vid we still need to copy - let mut next_vid = range.min; - while next_vid <= range.max { - let start = Instant::now(); - let rows = conn.transaction(|conn| { - // Page through all rows in `src` in batches of `batch_size` - // and copy the ones that are visible to queries at block - // heights between `earliest_block` and `final_block`, but - // whose block_range does not extend past `final_block` - // since they could still be reverted while we copy. - // The conditions on `block_range` are expressed redundantly - // to make more indexes useable - sql_query(format!( - "/* controller=prune,phase=final,start_vid={next_vid},batch_size={batch_size} */ \ + let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?; + + while !batcher.finished() { + let (_, rows) = batcher.step(|start, end| { + conn.transaction(|conn| { + // Page through all rows in `src` in batches of `batch_size` + // and copy the ones that are visible to queries at block + // heights between `earliest_block` and `final_block`, but + // whose block_range does not extend past `final_block` + // since they could still be reverted while we copy. + // The conditions on `block_range` are expressed redundantly + // to make more indexes useable + sql_query(format!( + "/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ where lower(block_range) <= $2 \ and coalesce(upper(block_range), 2147483647) > $1 \ and coalesce(upper(block_range), 2147483647) <= $2 \ and block_range && int4range($1, $2, '[]') \ - and vid >= $3 and vid < $3 + $4 \ + and vid >= $3 and vid <= $4 \ order by vid", src = self.src.qualified_name, dst = self.dst.qualified_name, - batch_size = batch_size.size, + batch_size = end - start + 1, )) - .bind::(earliest_block) - .bind::(final_block) - .bind::(next_vid) - .bind::(&batch_size) - .execute(conn) + .bind::(earliest_block) + .bind::(final_block) + .bind::(start) + .bind::(end) + .execute(conn) + .map_err(StoreError::from) + }) })?; cancel.check_cancel()?; - next_vid += batch_size.size; - - batch_size.adapt(start.elapsed()); - reporter.prune_batch( self.src.name.as_str(), - rows, + rows.unwrap_or(0), PrunePhase::CopyFinal, - next_vid > range.max, + batcher.finished(), ); } Ok(()) @@ -149,46 +145,41 @@ impl TablePair { // Determine the last vid that we need to copy let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?; + let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?; - let mut batch_size = AdaptiveBatchSize::new(&self.src); - // The first vid we still need to copy - let mut next_vid = range.min; - while next_vid <= range.max { - let start = Instant::now(); - let rows = conn.transaction(|conn| { + while !batcher.finished() { + let (_, rows) = batcher.step(|start, end| { // Page through all the rows in `src` in batches of // `batch_size` that are visible to queries at block heights - // starting right after `final_block`. - // The conditions on `block_range` are expressed redundantly - // to make more indexes useable - sql_query(format!( - "/* controller=prune,phase=nonfinal,start_vid={next_vid},batch_size={batch_size} */ \ + // starting right after `final_block`. The conditions on + // `block_range` are expressed redundantly to make more + // indexes useable + conn.transaction(|conn| { + sql_query(format!( + "/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ where coalesce(upper(block_range), 2147483647) > $1 \ and block_range && int4range($1, null) \ - and vid >= $2 and vid < $2 + $3 \ + and vid >= $2 and vid <= $3 \ order by vid", - dst = self.dst.qualified_name, - src = self.src.qualified_name, - batch_size = batch_size.size - )) - .bind::(final_block) - .bind::(next_vid) - .bind::(&batch_size) - .execute(conn) - .map_err(StoreError::from) + dst = self.dst.qualified_name, + src = self.src.qualified_name, + batch_size = end - start + 1, + )) + .bind::(final_block) + .bind::(start) + .bind::(end) + .execute(conn) + .map_err(StoreError::from) + }) })?; - next_vid += batch_size.size; - - batch_size.adapt(start.elapsed()); - reporter.prune_batch( self.src.name.as_str(), - rows, + rows.unwrap_or(0), PrunePhase::CopyNonfinal, - next_vid > range.max, + batcher.finished(), ); } Ok(()) @@ -421,32 +412,27 @@ impl Layout { // Delete all entity versions whose range was closed // before `req.earliest_block` let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?; - let mut batch_size = AdaptiveBatchSize::new(&table); - let mut next_vid = range.min; - while next_vid <= range.max { - let start = Instant::now(); - let rows = sql_query(format!( - "/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \ + let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?; + + while !batcher.finished() { + let (_, rows) = batcher.step(|start, end| {sql_query(format!( + "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ delete from {qname} \ where coalesce(upper(block_range), 2147483647) <= $1 \ - and vid >= $2 and vid < $2 + $3", + and vid >= $2 and vid <= $3", qname = table.qualified_name, - batch_size = batch_size.size + batch_size = end - start + 1 )) .bind::(req.earliest_block) - .bind::(next_vid) - .bind::(&batch_size) - .execute(conn)?; - - next_vid += batch_size.size; - - batch_size.adapt(start.elapsed()); + .bind::(start) + .bind::(end) + .execute(conn).map_err(StoreError::from)})?; reporter.prune_batch( table.name.as_str(), - rows as usize, + rows.unwrap_or(0), PrunePhase::Delete, - next_vid > range.max, + batcher.finished(), ); } }