Skip to content

Commit 13a19e7

Browse files
committed
Update compression stats when merging chunks
Merge compression chunk size stats when merging chunks. The merged chunk's stats is simply the sum of the stats of all compressed chunks that are merged. This ensures that the aggregate stats do not change due to the merge. Still, the stats might not accurately represent the merged chunk since merging both compressed and non-compressed chunks will leave some data uncompressed and this data won't be reflected in the stats of the merged chunk.
1 parent 64d10ef commit 13a19e7

File tree

6 files changed

+210
-2
lines changed

6 files changed

+210
-2
lines changed

.unreleased/pg_7909

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixes: #7909 Update compression stats when merging chunks

src/ts_catalog/compression_chunk_size.c

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
* LICENSE-APACHE for a copy of the license.
55
*/
66
#include <postgres.h>
7+
#include <access/htup_details.h>
8+
#include <executor/tuptable.h>
79

10+
#include "export.h"
811
#include "scan_iterator.h"
912
#include "scanner.h"
1013
#include "ts_catalog/catalog.h"
@@ -43,3 +46,75 @@ ts_compression_chunk_size_delete(int32 uncompressed_chunk_id)
4346

4447
return count;
4548
}
49+
50+
TSDLLEXPORT bool
51+
ts_compression_chunk_size_get(int32 chunk_id, Form_compression_chunk_size form)
52+
{
53+
ScanIterator iterator =
54+
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, AccessExclusiveLock, CurrentMemoryContext);
55+
bool found = false;
56+
57+
Assert(form != NULL);
58+
59+
init_scan_by_uncompressed_chunk_id(&iterator, chunk_id);
60+
ts_scanner_foreach(&iterator)
61+
{
62+
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
63+
bool should_free;
64+
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
65+
memcpy(form, GETSTRUCT(tuple), sizeof(FormData_compression_chunk_size));
66+
found = true;
67+
Assert(form->chunk_id == chunk_id);
68+
69+
if (should_free)
70+
heap_freetuple(tuple);
71+
72+
break;
73+
}
74+
75+
ts_scan_iterator_close(&iterator);
76+
77+
return found;
78+
}
79+
80+
TSDLLEXPORT bool
81+
ts_compression_chunk_size_update(int32 chunk_id, Form_compression_chunk_size form)
82+
{
83+
ScanIterator iterator =
84+
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext);
85+
bool found = false;
86+
CatalogSecurityContext sec_ctx;
87+
88+
Assert(form != NULL);
89+
90+
init_scan_by_uncompressed_chunk_id(&iterator, chunk_id);
91+
ts_scanner_foreach(&iterator)
92+
{
93+
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
94+
bool should_free;
95+
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
96+
HeapTuple copy = heap_copytuple(tuple);
97+
Form_compression_chunk_size tupform = (Form_compression_chunk_size) GETSTRUCT(copy);
98+
99+
/* Don't update chunk IDs so copy from existing tuple */
100+
form->chunk_id = tupform->chunk_id;
101+
form->compressed_chunk_id = tupform->compressed_chunk_id;
102+
103+
memcpy(tupform, form, sizeof(FormData_compression_chunk_size));
104+
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
105+
ts_catalog_update_tid_only(ti->scanrel, ts_scanner_get_tuple_tid(ti), copy);
106+
ts_catalog_restore_user(&sec_ctx);
107+
found = true;
108+
109+
heap_freetuple(copy);
110+
111+
if (should_free)
112+
heap_freetuple(tuple);
113+
114+
break;
115+
}
116+
117+
ts_scan_iterator_close(&iterator);
118+
119+
return found;
120+
}

src/ts_catalog/compression_chunk_size.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,10 @@
88
#include <compat/compat.h>
99
#include <postgres.h>
1010

11+
#include <ts_catalog/catalog.h>
12+
1113
extern TSDLLEXPORT int ts_compression_chunk_size_delete(int32 uncompressed_chunk_id);
14+
extern TSDLLEXPORT bool ts_compression_chunk_size_get(int32 chunk_id,
15+
Form_compression_chunk_size form);
16+
extern TSDLLEXPORT bool ts_compression_chunk_size_update(int32 chunk_id,
17+
Form_compression_chunk_size form);

tsl/src/chunk.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
#include "hypercube.h"
6363
#include "hypertable.h"
6464
#include "hypertable_cache.h"
65+
#include "ts_catalog/catalog.h"
66+
#include "ts_catalog/compression_chunk_size.h"
6567
#include "utils.h"
6668

6769
/* Data in a frozen chunk cannot be modified. So any operation
@@ -212,6 +214,7 @@ typedef struct RelationMergeInfo
212214
{
213215
Oid relid;
214216
struct VacuumCutoffs cutoffs;
217+
FormData_compression_chunk_size ccs;
215218
Chunk *chunk;
216219
Relation rel;
217220
char relpersistence;
@@ -823,6 +826,9 @@ merge_relinfos(RelationMergeInfo *relinfos, int nrelids, int mergeindex)
823826
ExclusiveLock);
824827
Relation new_rel = table_open(new_relid, AccessExclusiveLock);
825828
double total_num_tuples = 0.0;
829+
FormData_compression_chunk_size merged_ccs;
830+
831+
memset(&merged_ccs, 0, sizeof(FormData_compression_chunk_size));
826832

827833
pg17_workaround_init(new_rel, relinfos, nrelids);
828834

@@ -839,6 +845,26 @@ merge_relinfos(RelationMergeInfo *relinfos, int nrelids, int mergeindex)
839845
total_num_tuples += num_tuples;
840846
relinfo->rel = NULL;
841847
}
848+
849+
/*
850+
* Merge compression chunk size stats.
851+
*
852+
* Simply sum up the stats for all compressed relations that are
853+
* merged. Note that we don't add anything for non-compressed
854+
* relations that are merged because they don't have stats. This is a
855+
* bit weird because the data from uncompressed relations will not be
856+
* reflected in the stats of the merged chunk although the data is
857+
* part of the chunk.
858+
*/
859+
merged_ccs.compressed_heap_size += relinfo->ccs.compressed_heap_size;
860+
merged_ccs.compressed_toast_size += relinfo->ccs.compressed_toast_size;
861+
merged_ccs.compressed_index_size += relinfo->ccs.compressed_index_size;
862+
merged_ccs.uncompressed_heap_size += relinfo->ccs.uncompressed_heap_size;
863+
merged_ccs.uncompressed_toast_size += relinfo->ccs.uncompressed_toast_size;
864+
merged_ccs.uncompressed_index_size += relinfo->ccs.uncompressed_index_size;
865+
merged_ccs.numrows_post_compression += relinfo->ccs.numrows_post_compression;
866+
merged_ccs.numrows_pre_compression += relinfo->ccs.numrows_pre_compression;
867+
merged_ccs.numrows_frozen_immediately += relinfo->ccs.numrows_frozen_immediately;
842868
}
843869

844870
pg17_workaround_cleanup(new_rel);
@@ -849,6 +875,22 @@ merge_relinfos(RelationMergeInfo *relinfos, int nrelids, int mergeindex)
849875
table_close(new_rel, NoLock);
850876
table_close(relRelation, RowExclusiveLock);
851877

878+
/*
879+
* Update compression chunk size stats, but only if at least one of the
880+
* merged chunks was compressed. In that case the merged metadata should
881+
* be non-zero.
882+
*/
883+
if (merged_ccs.compressed_heap_size > 0)
884+
{
885+
/*
886+
* The result relation should always be compressed because we pick the
887+
* first compressed one, if one exists.
888+
*/
889+
890+
Assert(result_minfo->ccs.compressed_heap_size > 0);
891+
ts_compression_chunk_size_update(result_minfo->chunk->fd.id, &merged_ccs);
892+
}
893+
852894
return new_relid;
853895
}
854896

@@ -1044,6 +1086,14 @@ chunk_merge_chunks(PG_FUNCTION_ARGS)
10441086

10451087
if (mergeindex == -1)
10461088
mergeindex = i;
1089+
1090+
/* Read compression chunk size stats */
1091+
bool found = ts_compression_chunk_size_get(chunk->fd.id, &relinfo->ccs);
1092+
1093+
if (!found)
1094+
elog(NOTICE,
1095+
"missing compression chunk size stats for compressed chunk \"%s\"",
1096+
NameStr(chunk->fd.table_name));
10471097
}
10481098

10491099
if (ts_chunk_is_frozen(chunk))

tsl/test/expected/merge_chunks.out

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,20 @@ select * from chunk_info;
283283
_hyper_1_5_chunk | heap | (("time" >= 'Wed Jan 03 16:00:00 2024 PST'::timestamp with time zone) AND ("time" < 'Thu Jan 04 16:00:00 2024 PST'::timestamp with time zone))
284284
(10 rows)
285285

286+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
287+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
288+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
289+
1 | 6 | 8192 | 0 | 32768 | 16384 | 8192 | 16384 | 1 | 1 | 1
290+
3 | 7 | 8192 | 0 | 32768 | 16384 | 8192 | 16384 | 1 | 1 | 1
291+
(2 rows)
292+
286293
call merge_chunks('{_timescaledb_internal._hyper_1_1_chunk, _timescaledb_internal._hyper_1_2_chunk, _timescaledb_internal._hyper_1_3_chunk}');
294+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
295+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
296+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
297+
1 | 6 | 16384 | 0 | 65536 | 32768 | 16384 | 32768 | 2 | 2 | 2
298+
(1 row)
299+
287300
select * from chunk_info;
288301
chunk | tam | checkconstraint
289302
------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------
@@ -627,14 +640,41 @@ select * from partitions;
627640
_hyper_1_14_chunk | device | 1431655764 | 9223372036854775807
628641
(24 rows)
629642

630-
-- Merge all chunks until only 1 remains
643+
-- Show which chunks are compressed. Their compression_chunk_size
644+
-- metadata should be merged.
645+
select chunk_name from timescaledb_information.chunks
646+
where is_compressed=true order by chunk_name;
647+
chunk_name
648+
------------------
649+
_hyper_1_1_chunk
650+
_hyper_1_2_chunk
651+
(2 rows)
652+
653+
--
654+
-- Merge all chunks until only 1 remains. Also check that metadata is
655+
-- merged.
656+
---
657+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
658+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
659+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
660+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
661+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
662+
(2 rows)
663+
631664
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
632665
count | sum | round
633666
--------+---------+---------------
634667
518406 | 2854401 | 10373952.7510
635668
(1 row)
636669

637670
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk','_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_12_chunk']);
671+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
672+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
673+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
674+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
675+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
676+
(2 rows)
677+
638678
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
639679
count | sum | round
640680
--------+---------+---------------
@@ -665,6 +705,13 @@ select * from partitions;
665705
(18 rows)
666706

667707
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_10_chunk','_timescaledb_internal._hyper_1_13_chunk', '_timescaledb_internal._hyper_1_15_chunk']);
708+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
709+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
710+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
711+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
712+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
713+
(2 rows)
714+
668715
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
669716
count | sum | round
670717
--------+---------+---------------
@@ -689,6 +736,13 @@ select * from partitions;
689736
(12 rows)
690737

691738
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_11_chunk','_timescaledb_internal._hyper_1_14_chunk', '_timescaledb_internal._hyper_1_16_chunk']);
739+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
740+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
741+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
742+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
743+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
744+
(2 rows)
745+
692746
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
693747
count | sum | round
694748
--------+---------+---------------
@@ -707,6 +761,12 @@ select * from partitions;
707761
(6 rows)
708762

709763
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk','_timescaledb_internal._hyper_1_2_chunk']);
764+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
765+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
766+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
767+
1 | 17 | 4857856 | 0 | 8945664 | 32768 | 1359872 | 32768 | 92077 | 96 | 96
768+
(1 row)
769+
710770
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
711771
count | sum | round
712772
--------+---------+---------------

tsl/test/sql/merge_chunks.sql

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ select compress_chunk('_timescaledb_internal._hyper_1_3_chunk');
167167
-- Test merging compressed chunks
168168
begin;
169169
select * from chunk_info;
170+
171+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
172+
170173
call merge_chunks('{_timescaledb_internal._hyper_1_1_chunk, _timescaledb_internal._hyper_1_2_chunk, _timescaledb_internal._hyper_1_3_chunk}');
174+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
171175
select * from chunk_info;
172176
select count(*) as num_orphaned_slices from orphaned_slices;
173177
select * from mergeme;
@@ -291,18 +295,30 @@ alter table _timescaledb_internal._hyper_1_2_chunk set access method hypercore;
291295
-- Show partitions before merge
292296
select * from partitions;
293297

294-
-- Merge all chunks until only 1 remains
298+
-- Show which chunks are compressed. Their compression_chunk_size
299+
-- metadata should be merged.
300+
select chunk_name from timescaledb_information.chunks
301+
where is_compressed=true order by chunk_name;
302+
--
303+
-- Merge all chunks until only 1 remains. Also check that metadata is
304+
-- merged.
305+
---
306+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
295307
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
296308
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk','_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_12_chunk']);
309+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
297310
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
298311
select * from partitions;
299312
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_10_chunk','_timescaledb_internal._hyper_1_13_chunk', '_timescaledb_internal._hyper_1_15_chunk']);
313+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
300314
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
301315
select * from partitions;
302316
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_11_chunk','_timescaledb_internal._hyper_1_14_chunk', '_timescaledb_internal._hyper_1_16_chunk']);
317+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
303318
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
304319
select * from partitions;
305320
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk','_timescaledb_internal._hyper_1_2_chunk']);
321+
select * from _timescaledb_catalog.compression_chunk_size order by chunk_id;
306322
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
307323
select * from partitions;
308324
select * from chunk_info;

0 commit comments

Comments
 (0)