Skip to content

Commit eb21756

Browse files
committed
Update compression stats when merging chunks
Merge compression chunk size stats when merging chunks. The merged chunk's stats is simply the sum of the stats of all compressed chunks that are merged. This ensures that the aggregate stats do not change due to the merge. Still, the stats might not accurately represent the merged chunk since merging both compressed and non-compressed chunks will leave some data uncompressed and this data won't be reflected in the stats of the merged chunk.
1 parent bc97125 commit eb21756

File tree

5 files changed

+207
-2
lines changed

5 files changed

+207
-2
lines changed

src/ts_catalog/compression_chunk_size.c

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
* LICENSE-APACHE for a copy of the license.
55
*/
66
#include <postgres.h>
7+
#include <access/htup_details.h>
8+
#include <executor/tuptable.h>
79

10+
#include "export.h"
811
#include "scan_iterator.h"
912
#include "scanner.h"
1013
#include "ts_catalog/catalog.h"
@@ -43,3 +46,75 @@ ts_compression_chunk_size_delete(int32 uncompressed_chunk_id)
4346

4447
return count;
4548
}
49+
50+
TSDLLEXPORT bool
51+
ts_compression_chunk_size_get(int32 chunk_id, Form_compression_chunk_size form)
52+
{
53+
ScanIterator iterator =
54+
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, AccessExclusiveLock, CurrentMemoryContext);
55+
bool found = false;
56+
57+
Assert(form != NULL);
58+
59+
init_scan_by_uncompressed_chunk_id(&iterator, chunk_id);
60+
ts_scanner_foreach(&iterator)
61+
{
62+
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
63+
bool should_free;
64+
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
65+
memcpy(form, GETSTRUCT(tuple), sizeof(FormData_compression_chunk_size));
66+
found = true;
67+
Assert(form->chunk_id == chunk_id);
68+
69+
if (should_free)
70+
heap_freetuple(tuple);
71+
72+
break;
73+
}
74+
75+
ts_scan_iterator_close(&iterator);
76+
77+
return found;
78+
}
79+
80+
TSDLLEXPORT bool
81+
ts_compression_chunk_size_update(int32 chunk_id, Form_compression_chunk_size form)
82+
{
83+
ScanIterator iterator =
84+
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext);
85+
bool found = false;
86+
CatalogSecurityContext sec_ctx;
87+
88+
Assert(form != NULL);
89+
90+
init_scan_by_uncompressed_chunk_id(&iterator, chunk_id);
91+
ts_scanner_foreach(&iterator)
92+
{
93+
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
94+
bool should_free;
95+
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
96+
HeapTuple copy = heap_copytuple(tuple);
97+
Form_compression_chunk_size tupform = (Form_compression_chunk_size) GETSTRUCT(copy);
98+
99+
/* Don't update chunk IDs so copy from existing tuple */
100+
form->chunk_id = tupform->chunk_id;
101+
form->compressed_chunk_id = tupform->compressed_chunk_id;
102+
103+
memcpy(tupform, form, sizeof(FormData_compression_chunk_size));
104+
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
105+
ts_catalog_update_tid_only(ti->scanrel, ts_scanner_get_tuple_tid(ti), copy);
106+
ts_catalog_restore_user(&sec_ctx);
107+
found = true;
108+
109+
heap_freetuple(copy);
110+
111+
if (should_free)
112+
heap_freetuple(tuple);
113+
114+
break;
115+
}
116+
117+
ts_scan_iterator_close(&iterator);
118+
119+
return found;
120+
}

src/ts_catalog/compression_chunk_size.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,10 @@
88
#include <compat/compat.h>
99
#include <postgres.h>
1010

11+
#include <ts_catalog/catalog.h>
12+
1113
extern TSDLLEXPORT int ts_compression_chunk_size_delete(int32 uncompressed_chunk_id);
14+
extern TSDLLEXPORT bool ts_compression_chunk_size_get(int32 chunk_id,
15+
Form_compression_chunk_size form);
16+
extern TSDLLEXPORT bool ts_compression_chunk_size_update(int32 chunk_id,
17+
Form_compression_chunk_size form);

tsl/src/chunk.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
#include "hypercube.h"
6363
#include "hypertable.h"
6464
#include "hypertable_cache.h"
65+
#include "ts_catalog/catalog.h"
66+
#include "ts_catalog/compression_chunk_size.h"
6567
#include "utils.h"
6668

6769
/* Data in a frozen chunk cannot be modified. So any operation
@@ -212,6 +214,7 @@ typedef struct RelationMergeInfo
212214
{
213215
Oid relid;
214216
struct VacuumCutoffs cutoffs;
217+
FormData_compression_chunk_size ccs;
215218
Chunk *chunk;
216219
Relation rel;
217220
char relpersistence;
@@ -823,6 +826,9 @@ merge_relinfos(RelationMergeInfo *relinfos, int nrelids, int mergeindex)
823826
ExclusiveLock);
824827
Relation new_rel = table_open(new_relid, AccessExclusiveLock);
825828
double total_num_tuples = 0.0;
829+
FormData_compression_chunk_size merged_ccs;
830+
831+
memset(&merged_ccs, 0, sizeof(FormData_compression_chunk_size));
826832

827833
pg17_workaround_init(new_rel, relinfos, nrelids);
828834

@@ -839,6 +845,26 @@ merge_relinfos(RelationMergeInfo *relinfos, int nrelids, int mergeindex)
839845
total_num_tuples += num_tuples;
840846
relinfo->rel = NULL;
841847
}
848+
849+
/*
850+
* Merge compression chunk size stats.
851+
*
852+
* Simply sum up the stats for all compressed relations that are
853+
* merged. Note that we don't add anything for non-compressed
854+
* relations that are merged because they don't have stats. This is a
855+
* bit weird because the data from uncompressed relations will not be
856+
* reflected in the stats of the merged chunk although the data is
857+
* part of the chunk.
858+
*/
859+
merged_ccs.compressed_heap_size += relinfo->ccs.compressed_heap_size;
860+
merged_ccs.compressed_toast_size += relinfo->ccs.compressed_toast_size;
861+
merged_ccs.compressed_index_size += relinfo->ccs.compressed_index_size;
862+
merged_ccs.uncompressed_heap_size += relinfo->ccs.uncompressed_heap_size;
863+
merged_ccs.uncompressed_toast_size += relinfo->ccs.uncompressed_toast_size;
864+
merged_ccs.uncompressed_index_size += relinfo->ccs.uncompressed_index_size;
865+
merged_ccs.numrows_post_compression += relinfo->ccs.numrows_post_compression;
866+
merged_ccs.numrows_pre_compression += relinfo->ccs.numrows_pre_compression;
867+
merged_ccs.numrows_frozen_immediately += relinfo->ccs.numrows_frozen_immediately;
842868
}
843869

844870
pg17_workaround_cleanup(new_rel);
@@ -849,6 +875,22 @@ merge_relinfos(RelationMergeInfo *relinfos, int nrelids, int mergeindex)
849875
table_close(new_rel, NoLock);
850876
table_close(relRelation, RowExclusiveLock);
851877

878+
/*
879+
* Update compression chunk size stats, but only if at least one of the
880+
* merged chunks was compressed. In that case the merged metadata should
881+
* be non-zero.
882+
*/
883+
if (merged_ccs.compressed_heap_size > 0)
884+
{
885+
/*
886+
* The result relation should always be compressed because we pick the
887+
* first compressed one, if one exists.
888+
*/
889+
890+
Assert(result_minfo->ccs.compressed_heap_size > 0);
891+
ts_compression_chunk_size_update(result_minfo->chunk->fd.id, &merged_ccs);
892+
}
893+
852894
return new_relid;
853895
}
854896

@@ -1044,6 +1086,14 @@ chunk_merge_chunks(PG_FUNCTION_ARGS)
10441086

10451087
if (mergeindex == -1)
10461088
mergeindex = i;
1089+
1090+
/* Read compression chunk size stats */
1091+
bool found = ts_compression_chunk_size_get(chunk->fd.id, &relinfo->ccs);
1092+
1093+
if (!found)
1094+
elog(NOTICE,
1095+
"missing compression chunk size stats for compressed chunk \"%s\"",
1096+
NameStr(chunk->fd.table_name));
10471097
}
10481098

10491099
if (ts_chunk_is_frozen(chunk))

tsl/test/expected/merge_chunks.out

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,20 @@ select * from chunk_info;
283283
_hyper_1_5_chunk | heap | (("time" >= 'Wed Jan 03 16:00:00 2024 PST'::timestamp with time zone) AND ("time" < 'Thu Jan 04 16:00:00 2024 PST'::timestamp with time zone))
284284
(10 rows)
285285

286+
select * from _timescaledb_catalog.compression_chunk_size;
287+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
288+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
289+
1 | 6 | 8192 | 0 | 32768 | 16384 | 8192 | 16384 | 1 | 1 | 1
290+
3 | 7 | 8192 | 0 | 32768 | 16384 | 8192 | 16384 | 1 | 1 | 1
291+
(2 rows)
292+
286293
call merge_chunks('{_timescaledb_internal._hyper_1_1_chunk, _timescaledb_internal._hyper_1_2_chunk, _timescaledb_internal._hyper_1_3_chunk}');
294+
select * from _timescaledb_catalog.compression_chunk_size;
295+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
296+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
297+
1 | 6 | 16384 | 0 | 65536 | 32768 | 16384 | 32768 | 2 | 2 | 2
298+
(1 row)
299+
287300
select * from chunk_info;
288301
chunk | tam | checkconstraint
289302
------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------
@@ -627,14 +640,40 @@ select * from partitions;
627640
_hyper_1_14_chunk | device | 1431655764 | 9223372036854775807
628641
(24 rows)
629642

630-
-- Merge all chunks until only 1 remains
643+
-- Show which chunks are compressed. Their compression_chunk_size
644+
-- metadata should be merged.
645+
select chunk_name from timescaledb_information.chunks where is_compressed=true;
646+
chunk_name
647+
------------------
648+
_hyper_1_1_chunk
649+
_hyper_1_2_chunk
650+
(2 rows)
651+
652+
--
653+
-- Merge all chunks until only 1 remains. Also check that metadata is
654+
-- merged.
655+
---
656+
select * from _timescaledb_catalog.compression_chunk_size;
657+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
658+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
659+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
660+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
661+
(2 rows)
662+
631663
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
632664
count | sum | round
633665
--------+---------+---------------
634666
518406 | 2854401 | 10373952.7510
635667
(1 row)
636668

637669
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk','_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_12_chunk']);
670+
select * from _timescaledb_catalog.compression_chunk_size;
671+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
672+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
673+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
674+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
675+
(2 rows)
676+
638677
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
639678
count | sum | round
640679
--------+---------+---------------
@@ -665,6 +704,13 @@ select * from partitions;
665704
(18 rows)
666705

667706
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_10_chunk','_timescaledb_internal._hyper_1_13_chunk', '_timescaledb_internal._hyper_1_15_chunk']);
707+
select * from _timescaledb_catalog.compression_chunk_size;
708+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
709+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
710+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
711+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
712+
(2 rows)
713+
668714
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
669715
count | sum | round
670716
--------+---------+---------------
@@ -689,6 +735,13 @@ select * from partitions;
689735
(12 rows)
690736

691737
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_11_chunk','_timescaledb_internal._hyper_1_14_chunk', '_timescaledb_internal._hyper_1_16_chunk']);
738+
select * from _timescaledb_catalog.compression_chunk_size;
739+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
740+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
741+
1 | 17 | 2441216 | 0 | 4489216 | 16384 | 688128 | 16384 | 46245 | 48 | 48
742+
2 | 18 | 2416640 | 0 | 4456448 | 16384 | 671744 | 16384 | 45832 | 48 | 48
743+
(2 rows)
744+
692745
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
693746
count | sum | round
694747
--------+---------+---------------
@@ -707,6 +760,12 @@ select * from partitions;
707760
(6 rows)
708761

709762
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk','_timescaledb_internal._hyper_1_2_chunk']);
763+
select * from _timescaledb_catalog.compression_chunk_size;
764+
chunk_id | compressed_chunk_id | uncompressed_heap_size | uncompressed_toast_size | uncompressed_index_size | compressed_heap_size | compressed_toast_size | compressed_index_size | numrows_pre_compression | numrows_post_compression | numrows_frozen_immediately
765+
----------+---------------------+------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+----------------------------
766+
1 | 17 | 4857856 | 0 | 8945664 | 32768 | 1359872 | 32768 | 92077 | 96 | 96
767+
(1 row)
768+
710769
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
711770
count | sum | round
712771
--------+---------+---------------

tsl/test/sql/merge_chunks.sql

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ select compress_chunk('_timescaledb_internal._hyper_1_3_chunk');
167167
-- Test merging compressed chunks
168168
begin;
169169
select * from chunk_info;
170+
171+
select * from _timescaledb_catalog.compression_chunk_size;
172+
170173
call merge_chunks('{_timescaledb_internal._hyper_1_1_chunk, _timescaledb_internal._hyper_1_2_chunk, _timescaledb_internal._hyper_1_3_chunk}');
174+
select * from _timescaledb_catalog.compression_chunk_size;
171175
select * from chunk_info;
172176
select count(*) as num_orphaned_slices from orphaned_slices;
173177
select * from mergeme;
@@ -291,18 +295,29 @@ alter table _timescaledb_internal._hyper_1_2_chunk set access method hypercore;
291295
-- Show partitions before merge
292296
select * from partitions;
293297

294-
-- Merge all chunks until only 1 remains
298+
-- Show which chunks are compressed. Their compression_chunk_size
299+
-- metadata should be merged.
300+
select chunk_name from timescaledb_information.chunks where is_compressed=true;
301+
--
302+
-- Merge all chunks until only 1 remains. Also check that metadata is
303+
-- merged.
304+
---
305+
select * from _timescaledb_catalog.compression_chunk_size;
295306
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
296307
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk','_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_12_chunk']);
308+
select * from _timescaledb_catalog.compression_chunk_size;
297309
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
298310
select * from partitions;
299311
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_10_chunk','_timescaledb_internal._hyper_1_13_chunk', '_timescaledb_internal._hyper_1_15_chunk']);
312+
select * from _timescaledb_catalog.compression_chunk_size;
300313
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
301314
select * from partitions;
302315
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_11_chunk','_timescaledb_internal._hyper_1_14_chunk', '_timescaledb_internal._hyper_1_16_chunk']);
316+
select * from _timescaledb_catalog.compression_chunk_size;
303317
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
304318
select * from partitions;
305319
call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk','_timescaledb_internal._hyper_1_2_chunk']);
320+
select * from _timescaledb_catalog.compression_chunk_size;
306321
select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme;
307322
select * from partitions;
308323
select * from chunk_info;

0 commit comments

Comments
 (0)