-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathstf_compressed_ofstream.hpp
358 lines (307 loc) · 14.6 KB
/
stf_compressed_ofstream.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
#ifndef __STF_COMPRESSED_OFSTREAM_HPP__
#define __STF_COMPRESSED_OFSTREAM_HPP__
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <future>
#include <iostream>
#include <signal.h>
#include "stf_compression_buffer.hpp"
#include "stf_compressed_chunked_base.hpp"
#include "stf_ofstream.hpp"
#include "stf_record.hpp"
namespace stf {
/**
* \class STFCompressedOFstream
*
* Provides transparent on-the-fly compression of an STF file
*/
template<typename Compressor>
class STFCompressedOFstream : public STFOFstream, public STFCompressedChunkedBase {
private:
Compressor compressor_; /**< Compressor object */
STFCompressionPointerWrapper<const uint8_t*> in_buf_; /**< Input data pointer */
STFExponentialCompressionBuffer cur_chunk_buf_;
STFExponentialCompressionBuffer compression_chunk_buf_;
STFCompressionBuffer out_buf_; /**< Output data buffer - holds compressed data that has not been written to the file */
std::future<void> compression_done_;
bool compression_in_progress_ = false;
bool pending_chunk_ = false; /**< True if there is pending data in the output buffer */
size_t bytes_written_ = 0; /**< Number of uncompressed bytes written so far */
bool incomplete_chunk_ = false; /**< If true, the current chunk in the buffer doesn't end with a marker record */
/**
* Writes directly to the file, bypassing the compressor
* \param data Buffer to write
* \param size Number of elements in the buffer
*/
template <typename T>
inline void direct_write_(const T* data, size_t size) {
STFOFstream::fwrite_(data, sizeof(T), size);
}
/**
* Writes an arithmetic type directly to the file, bypassing the compressor
* \param data Data to write
*/
template <typename T>
inline std::enable_if_t<std::is_arithmetic_v<T>>
direct_write_(const T data) {
direct_write_(&data, 1);
}
/**
* Writes a non-arithmetic type directly to the file, bypassing the compressor
* \param data Data to write
*/
template <typename T>
inline std::enable_if_t<std::negation_v<std::is_arithmetic<T>>>
direct_write_(const T& data) {
direct_write_(&data, 1);
}
/**
* Writes a ChunkOffset directly to the file, bypassing the compressor
* \param data Data to write
*/
inline void direct_write_(const ChunkOffset& data) {
direct_write_(data.getOffset());
direct_write_(data.getStartPC());
direct_write_(data.getUncompressedChunkSize());
}
/**
* Writes a vector directly to the file, bypassing the compressor
* \param data Data to write
* \param size Number of elements to write
*/
template <typename T>
inline std::enable_if_t<std::is_arithmetic_v<T>>
direct_write_(const std::vector<T>& data, size_t size) {
direct_write_(size);
direct_write_(data.data(), size);
}
/**
* Writes a vector directly to the file, bypassing the compressor
* \param data Data to write
* \param size Number of elements to write
*/
template <typename T>
inline std::enable_if_t<std::negation_v<std::is_arithmetic<T>>>
direct_write_(const std::vector<T>& data, size_t size) {
direct_write_(size);
for(size_t i = 0; i < size; ++i) {
direct_write_(data[i]);
}
}
/**
* Writes a vector directly to the file, bypassing the compressor
* \param data Data to write
*/
template <typename T>
inline void direct_write_(const std::vector<T>& data) {
direct_write_(data, data.size());
}
/**
* Compresses data into the output buffer
* \param data Buffer to write
* \param size Size of an element
* \param num Number of elements to read
*/
inline size_t fwrite_(const void* data, size_t size, size_t num) override {
const size_t num_bytes = size * num;
cur_chunk_buf_.fit(num_bytes);
const auto ptr = reinterpret_cast<const uint8_t*>(data);
std::copy(ptr, ptr + num_bytes, cur_chunk_buf_.get() + cur_chunk_buf_.end());
cur_chunk_buf_.advanceWritePtr(num_bytes);
pending_chunk_ = true;
incomplete_chunk_ = true; // Assume this isn't a marker record for now
return num;
}
inline void compressChunkAsync_(const uint64_t next_chunk_pc) {
const size_t num_bytes = compression_chunk_buf_.end();
compressor_.compress(out_buf_, compression_chunk_buf_);
bytes_written_ += num_bytes;
endChunk_(next_chunk_pc);
}
inline void compressChunk_() {
if(STF_EXPECT_TRUE(compression_in_progress_)) {
compression_done_.get();
compression_in_progress_ = false;
}
if(STF_EXPECT_TRUE(pending_chunk_)) {
// Every chunk needs to end with a marker record, otherwise any tools that try to read the trace will
// probably throw an exception when they reach the end
stf_assert(!incomplete_chunk_, "Attempted to write a chunk that doesn't end with a marker record");
pending_chunk_ = false;
compression_in_progress_ = true;
std::swap(cur_chunk_buf_, compression_chunk_buf_);
// Grab the next chunk's PC now since compression is happening asynchronously
const uint64_t next_chunk_pc = pc_tracker_.getNextPC();
compression_done_ = std::move(std::async(std::launch::async,
[this, next_chunk_pc](){ this->compressChunkAsync_(next_chunk_pc); }));
}
}
/**
* Writes compressed data out to the file
*/
inline void writeChunk_() {
// Only bother trying to write if there's some data in the buffer
if(out_buf_.end()) {
direct_write_(out_buf_.get(), out_buf_.end());
out_buf_.reset();
}
}
static inline sigset_t initSigSet_() {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
sigaddset(&set, SIGABRT);
sigaddset(&set, SIGSEGV);
return set;
}
/**
* Ends the current compressed chunk and flushes it to the file
*/
inline void endChunk_(const uint64_t next_chunk_pc) {
static const sigset_t set = initSigSet_();
// Mask signals until we're done writing to the file
int err = pthread_sigmask(SIG_BLOCK, &set, nullptr);
stf_assert(err == 0, "Failed to mask signals with error: " << strerror(err));
// Empty the buffer if it's full
if(out_buf_.full()) {
writeChunk_();
}
// Flush any data remaining in the internal compressor buffers
compressor_.flush(out_buf_);
// Write any remaining data that was flushed
writeChunk_();
chunk_indices_.back().setUncompressedChunkSize(bytes_written_);
bytes_written_ = 0;
// Get the current file offset so we can write it back at the beginning
const off_t end = ftell(stream_);
// Write the chunk offsets to the end of the file
direct_write_(chunk_indices_, chunk_indices_.size());
// Write the end of the last chunk into the spot we reserved when we opened the file
fseek(stream_, Compressor::getMagic().size() + sizeof(marker_record_chunk_size_), SEEK_SET);
direct_write_(end);
// Seek back to the end of the chunk we just wrote
fseek(stream_, end, SEEK_SET);
// Unmask signals
err = pthread_sigmask(SIG_UNBLOCK, &set, nullptr);
stf_assert(err == 0, "Failed to unmask signals with error: " << strerror(err));
// Start a new chunk
chunk_indices_.emplace_back(end, next_chunk_pc, 0);
}
/**
* Closes the file
*/
int close_() override {
if(!stream_) {
return 0;
}
// Finish any pending chunk
if(pending_chunk_) {
// Skip writing the chunk in the buffer if it would break any readers that try to use the trace
if(incomplete_chunk_) {
std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl;
}
else {
compressChunk_();
}
}
if(compression_in_progress_) {
compression_done_.get();
compression_in_progress_ = false;
}
return STFOFstream::close_();
}
public:
/**
* Constructs an STFCompressedOFstream
* \param args Optional arguments that will be passed to the underlying compressor
*/
template<typename ... CompressorArgs>
explicit STFCompressedOFstream(CompressorArgs... args) :
compressor_(args...)
{
}
/**
* Constructs an STFCompressedOFstream and opens it
* \param filename Filename to open
* \param chunk_size Chunk size to use. Overrides STFCompressedChunkedBase::DEFAULT_CHUNK_SIZE
* \param args Optional arguments that will be passed to the underlying compressor
*/
template<typename ... CompressorArgs>
STFCompressedOFstream(const std::string_view filename, const size_t chunk_size, CompressorArgs... args) : // cppcheck-suppress passedByValue
STFCompressedOFstream(args...)
{
setChunkSize(chunk_size);
STFCompressedOFstream::open(filename);
}
/**
* Constructs an STFCompressedOFstream and opens it
* \param filename Filename to open
* \param args Optional arguments that will be passed to the underlying compressor
*/
template<typename ... CompressorArgs>
explicit STFCompressedOFstream(const std::string_view filename, CompressorArgs... args) : // cppcheck-suppress passedByValue
STFCompressedOFstream(filename, DEFAULT_CHUNK_SIZE, args...)
{
}
// Have to override the base class destructor to ensure that *our* close method gets called before destruction
inline ~STFCompressedOFstream() override {
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFCompressedOFstream::close_();
}
/**
* Sets the chunk size
* \note Once the stream is open this cannot be changed
* \param chunk_size Number of marker records per chunk
*/
void setChunkSize(const size_t chunk_size) {
stf_assert(!stream_, "Must set chunk size before opening file.");
marker_record_chunk_size_ = chunk_size;
}
/**
* Opens a file using the specified chunk size
* \param filename Filename to open
* \param chunk_size Number of marker records per chunk
*/
void open(const std::string_view filename, const size_t chunk_size) { // cppcheck-suppress passedByValue
setChunkSize(chunk_size);
open(filename);
}
/**
* Opens a file
* \param filename Filename to open
*/
void open(const std::string_view filename) override { // cppcheck-suppress passedByValue
static constexpr off_t ZERO = 0;
// Open the file
STFFstream::open(filename, "wb");
// Write the magic string for the compressor
direct_write_(Compressor::getMagic().data(), Compressor::getMagic().size());
// Write the number of marker records per chunk
direct_write_(marker_record_chunk_size_);
// Write a placeholder value for the end of the last chunk (since we don't know how much data we're going to have)
direct_write_(ZERO);
// The first chunk starts here
chunk_indices_.emplace_back(ftell(stream_), 0, 0);
// Size the output buffer to match the filesystem block size
const size_t block_size = getFSBlockSize_();
out_buf_.initSize(block_size);
cur_chunk_buf_.initSize(block_size, true);
compression_chunk_buf_.initSize(block_size, true);
// ...but also make sure it can fit a reasonable amount of compressed data
out_buf_.fit(Compressor::getInitialBoundedSize());
next_chunk_end_ = marker_record_chunk_size_;
}
void markerRecordCallback() override {
STFOFstream::markerRecordCallback();
incomplete_chunk_ = false; // This chunk is safe to write now
// If we've crossed the chunk boundary, close the current chunk and start a new one
if(STF_EXPECT_FALSE(num_marker_records_ >= next_chunk_end_)) {
compressChunk_();
next_chunk_end_ += marker_record_chunk_size_;
}
}
};
} // end namespace stf
#endif