Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a where clause to support rows deletion #326

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 123 additions & 8 deletions bin/pg_repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,16 @@ static bool is_superuser(void);
static void check_tablespace(void);
static bool preliminary_checks(char *errbuf, size_t errsize);
static bool is_requested_relation_exists(char *errbuf, size_t errsize);
static void repack_all_databases(const char *order_by);
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
static void repack_all_databases(const char *order_by, const char *where_clause);
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize, const char *where_clause);
static void repack_one_table(repack_table *table, const char *order_by);
static bool repack_table_indexes(PGresult *index_details);
static bool repack_all_indexes(char *errbuf, size_t errsize);
static void repack_cleanup(bool fatal, const repack_table *table);
static void repack_cleanup_callback(bool fatal, void *userdata);
static void repack_cleanup_index(bool fatal, void *userdata);
static bool rebuild_indexes(const repack_table *table);
static bool validate_where_clause(PGconn *conn, const char *table_name, const char *where_clause, char *errbuf, size_t errsize);

static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col);
Expand All @@ -248,6 +249,7 @@ static SimpleStringList parent_table_list = {NULL, NULL};
static SimpleStringList table_list = {NULL, NULL};
static SimpleStringList schema_list = {NULL, NULL};
static char *orderby = NULL;
static char *where_clause = NULL;
static char *tablespace = NULL;
static bool moveidx = false;
static SimpleStringList r_index = {NULL, NULL};
Expand Down Expand Up @@ -282,6 +284,7 @@ static pgut_option options[] =
{ 'l', 'c', "schema", &schema_list },
{ 'b', 'n', "no-order", &noorder },
{ 'b', 'N', "dry-run", &dryrun },
{ 's', 'X', "where-clause", &where_clause },
{ 's', 'o', "order-by", &orderby },
{ 's', 's', "tablespace", &tablespace },
{ 'b', 'S', "moveidx", &moveidx },
Expand Down Expand Up @@ -397,11 +400,11 @@ main(int argc, char *argv[])
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot repack specific schema(s) in all databases")));
repack_all_databases(orderby);
repack_all_databases(orderby, where_clause);
}
else
{
if (!repack_one_database(orderby, errbuf, sizeof(errbuf)))
if (!repack_one_database(orderby, errbuf, sizeof(errbuf), where_clause))
ereport(ERROR,
(errcode(ERROR), errmsg("%s failed with error: %s", PROGRAM_NAME, errbuf)));
}
Expand Down Expand Up @@ -691,7 +694,7 @@ is_requested_relation_exists(char *errbuf, size_t errsize){
* Call repack_one_database for each database.
*/
static void
repack_all_databases(const char *orderby)
repack_all_databases(const char *orderby, const char *where_clause)
{
PGresult *result;
int i;
Expand All @@ -715,7 +718,7 @@ repack_all_databases(const char *orderby)
elog(INFO, "repacking database \"%s\"", dbname);
if (!dryrun)
{
ret = repack_one_database(orderby, errbuf, sizeof(errbuf));
ret = repack_one_database(orderby, errbuf, sizeof(errbuf), where_clause);
if (!ret)
elog(INFO, "database \"%s\" skipped: %s", dbname, errbuf);
}
Expand Down Expand Up @@ -747,7 +750,7 @@ getoid(PGresult *res, int row, int col)
* Call repack_one_table for the target tables or each table in a database.
*/
static bool
repack_one_database(const char *orderby, char *errbuf, size_t errsize)
repack_one_database(const char *orderby, char *errbuf, size_t errsize, const char *where_clause)
{
bool ret = false;
PGresult *res = NULL;
Expand Down Expand Up @@ -944,11 +947,34 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.sql_pop = getstr(res, i, c++);
table.dest_tablespace = getstr(res, i, c++);

/* Validate WHERE clause if provided */
if (where_clause && where_clause[0])
{
if (num > 1) {
ereport(ERROR,
(errcode(E_PG_COMMAND),
errmsg("where-clause can only be used when repacking a single table.")));
continue;
}
if (!validate_where_clause(connection, table.target_name, where_clause, errbuf, errsize))
{
ereport(ERROR,
(errcode(E_PG_COMMAND),
errmsg("%s", errbuf)));
continue;
}
}

/* Craft Copy SQL */
initStringInfo(&copy_sql);
appendStringInfoString(&copy_sql, table.copy_data);
if (!orderby)

/* delete rows mode */
if (where_clause) {
appendStringInfoString(&copy_sql, " WHERE ");
appendStringInfoString(&copy_sql, where_clause);
}
if (!orderby)
{
if (ckey != NULL)
{
Expand Down Expand Up @@ -2445,8 +2471,97 @@ pgut_help(bool details)
printf(" -Z, --no-analyze don't analyze at end\n");
printf(" -k, --no-superuser-check skip superuser checks in client\n");
printf(" -C, --exclude-extension don't repack tables which belong to specific extension\n");
printf(" -X, --where=WHERE_CLAUSE (delete rows mode) keep only specific rows in the table as per condition specified.\n");
printf(" Note: where-clause can only be used when repacking a single table\n");
printf(" --no-error-on-invalid-index repack even though invalid index is found\n");
printf(" --error-on-invalid-index don't repack when invalid index is found, deprecated, as this is the default behavior now\n");
printf(" --apply-count number of tuples to apply in one transaction during replay\n");
printf(" --switch-threshold switch tables when that many tuples are left to catchup\n");
}

/*
* Validate a WHERE clause by running EXPLAIN SELECT 1 with the clause
* Returns true if the WHERE clause is valid, false otherwise
*/
static bool
validate_where_clause(PGconn *conn, const char *table_name, const char *where_clause, char *errbuf, size_t errsize)
{
bool valid = false;
PGresult *res = NULL;
StringInfoData sql;

if (!where_clause || !where_clause[0])
{
if (errbuf)
snprintf(errbuf, errsize, "empty WHERE clause is not valid");
return false; /* Empty where clause */
}

initStringInfo(&sql);

/* Build EXPLAIN query to validate the WHERE clause */
appendStringInfo(&sql, "EXPLAIN SELECT 1 FROM %s WHERE %s",
table_name, where_clause);

res = PQexec(conn, sql.data);

if (PQresultStatus(res) != PGRES_TUPLES_OK)
/* Clean up the error message. Remove duplicate ERROR: prefix as well as remove postgres LINE: error */
{
if (errbuf)
{
const char *pg_error = PQerrorMessage(conn);
char cleaned_error[1024] = {0}; // Buffer for cleaned error message

if (strstr(pg_error, "ERROR:") != NULL)
{
const char *error_start = pg_error;
const char *error_prefix = "ERROR: ";
const char *line_part;

if (strncmp(error_start, error_prefix, strlen(error_prefix)) == 0)
error_start += strlen(error_prefix);

/* Grab the actual error message ends (before LINE: part) */
line_part = strstr(error_start, "LINE ");
if (line_part != NULL)
{
size_t len = line_part - error_start;
char *end;

strncpy(cleaned_error, error_start, len);
cleaned_error[len] = '\0';

/* Trim text */
end = cleaned_error + len - 1;
while (end > cleaned_error && (*end == ' ' || *end == '\n' || *end == '\r'))
*end-- = '\0';
}
else
{
/* Edge case: No LINE: message from postgres */
strncpy(cleaned_error, error_start, sizeof(cleaned_error) - 1);
cleaned_error[sizeof(cleaned_error) - 1] = '\0';
}
}
else
{
/* Edge case: Unparsable error message, use the original message as it is*/
strncpy(cleaned_error, pg_error, sizeof(cleaned_error) - 1);
cleaned_error[sizeof(cleaned_error) - 1] = '\0';
}

snprintf(errbuf, errsize, "invalid WHERE clause: %s", cleaned_error);
}
valid = false;
}
else
{
valid = true;
}

CLEARPGRES(res);
termStringInfo(&sql);

return valid;
}
24 changes: 24 additions & 0 deletions doc/pg_repack.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Options:
--error-on-invalid-index don't repack when invalid index is found, deprecated, as this is the default behavior now
--apply-count number of tuples to apply in one trasaction during replay
--switch-threshold switch tables when that many tuples are left to catchup
-X CONDITION, --where-clause=CONDITION Remove certain live rows from the table. Use this option to clean up rows that are not needed anymore.

Connection options:
-d, --dbname=DBNAME database to connect
Expand Down Expand Up @@ -170,6 +171,16 @@ Reorg Options
``-o COLUMNS [,...]``, ``--order-by=COLUMNS [,...]``
Perform an online CLUSTER ordered by the specified columns.

``-X CONDITION``, ``--where-clause=CONDITION``
Remove certain live rows from the table. Use this option to clean up rows that are not needed anymore.
table. The condition is a SQL where-clause that is applied to the table being
repacked. Note that the condition specifies which rows WILL remain in the table
after repacking, not which rows will be deleted.
--where-clause="deleted_at IS NOT NULL" will lead to the repacked table removing all
rows where deleted_at is NULL at the time of repack start.
This option can only be used when repacking a single table and will fail if multiple
tables would be repacked.

``-n``, ``--no-order``
Perform an online VACUUM FULL. Since version 1.2 this is the default for
non-clustered tables.
Expand Down Expand Up @@ -331,6 +342,19 @@ Move the specified index to tablespace ``tbs``::

$ pg_repack -d test --index idx --tablespace tbs

Select only rows where id > 10 in table ``foo`` (note that the condition specifies
which rows will remain in the table after repacking, not which rows will be deleted). This would delete all rows where id <= 10::

$ pg_repack -d test --table foo --where-clause="id > 10"

Select only rows where value > 100 in table ``bar`` in column foo. This would delete all rows where foo <= 100::

$ pg_repack -d test --table bar --where-clause="foo > 100"

Select only rows where create_date is within the last year::

$ pg_repack -d test --table baz --where-clause="create_date > (current_date - interval '1 year')"


Diagnostics
-----------
Expand Down
2 changes: 1 addition & 1 deletion regress/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
# Test suite
#

REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger where-clause

USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
88 changes: 88 additions & 0 deletions regress/expected/where-clause.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
SET client_min_messages = warning;
-- Create a test table with some data
CREATE TABLE tbl_where_clause (
id int PRIMARY KEY,
value text,
active boolean,
deleted_at timestamp
);
-- Insert some test data
INSERT INTO tbl_where_clause VALUES (1, 'one', true, NULL);
INSERT INTO tbl_where_clause VALUES (2, 'two', true, NULL);
INSERT INTO tbl_where_clause VALUES (3, 'three', false, NULL);
INSERT INTO tbl_where_clause VALUES (4, 'four', false, NULL);
INSERT INTO tbl_where_clause VALUES (5, 'five', true, NULL);
-- Check initial data
SELECT * FROM tbl_where_clause ORDER BY id;
id | value | active | deleted_at
----+-------+--------+------------
1 | one | t |
2 | two | t |
3 | three | f |
4 | four | f |
5 | five | t |
(5 rows)

-- Run pg_repack with where clause to only repack active rows
\! pg_repack --dbname=contrib_regression --table=tbl_where_clause --where-clause="active = true"
INFO: repacking table "public.tbl_where_clause"
-- Verify data after repack
SELECT * FROM tbl_where_clause ORDER BY id;
id | value | active | deleted_at
----+-------+--------+------------
1 | one | t |
2 | two | t |
5 | five | t |
(3 rows)

-- Insert more data to verify the where clause worked correctly
INSERT INTO tbl_where_clause VALUES (6, 'six', true, NULL);
INSERT INTO tbl_where_clause VALUES (7, 'seven', false, NULL);
-- Run pg_repack with a different where clause
\! pg_repack --dbname=contrib_regression --table=tbl_where_clause --where-clause="id > 3"
INFO: repacking table "public.tbl_where_clause"
-- Verify data after second repack
SELECT * FROM tbl_where_clause ORDER BY id;
id | value | active | deleted_at
----+-------+--------+------------
5 | five | t |
6 | six | t |
7 | seven | f |
(3 rows)

-- Test with where clause and order-by together
\! pg_repack --dbname=contrib_regression --table=tbl_where_clause --where-clause="active = true" --order-by="value"
INFO: repacking table "public.tbl_where_clause"
-- Verify data after repack with order-by
SELECT * FROM tbl_where_clause ORDER BY id;
id | value | active | deleted_at
----+-------+--------+------------
5 | five | t |
6 | six | t |
(2 rows)

INSERT INTO tbl_where_clause VALUES (8, 'eight', true, NULL);
INSERT INTO tbl_where_clause VALUES (9, 'nine', false, '2023-01-04 10:00:00');
-- Test with deleted_at is null where clause (keep only non-deleted rows)
\! pg_repack --dbname=contrib_regression --table=tbl_where_clause --where-clause="deleted_at IS NULL"
INFO: repacking table "public.tbl_where_clause"
-- Verify data after repack with deleted_at IS NULL
SELECT * FROM tbl_where_clause ORDER BY id;
id | value | active | deleted_at
----+-------+--------+------------
5 | five | t |
6 | six | t |
8 | eight | t |
(3 rows)

-- Test with non-existent column in where clause (should fail)
\! pg_repack --dbname=contrib_regression --table=tbl_where_clause --where-clause="non_existent_column = true"
ERROR: invalid WHERE clause: column "non_existent_column" does not exist
-- Test with special character in column name without proper quoting (should fail)
\! pg_repack --dbname=contrib_regression --table=tbl_where_clause --where-clause="column with space = 'test'"
ERROR: invalid WHERE clause: syntax error at or near "column"
-- Test for repackaing whole database with a where clause (should fail)
\! pg_repack --dbname=contrib_regression --where-clause="id > 3"
ERROR: where-clause can only be used when repacking a single table.
-- Clean up
DROP TABLE tbl_where_clause;
Loading