-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathstf_compressed_ifstream.hpp
196 lines (165 loc) · 7.9 KB
/
stf_compressed_ifstream.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#ifndef __STF_COMPRESSED_IFSTREAM_HPP__
#define __STF_COMPRESSED_IFSTREAM_HPP__
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <future>
#include <string_view>
#include "stf_compression_buffer.hpp"
#include "stf_compressed_ifstream_base.hpp"
namespace stf {
/**
* \class STFCompressedIFstream
*
* Provides transparent on-the-fly decompression of a compressed STF file
*/
template<typename Decompressor>
class STFCompressedIFstream final : public STFCompressedIFstreamBase<Decompressor, STFCompressedIFstream<Decompressor>> {
private:
using base_class = STFCompressedIFstreamBase<Decompressor, STFCompressedIFstream<Decompressor>>;
friend base_class;
// From STFCompressedIFstreamBase
using base_class::decompressor_;
using base_class::in_buf_;
using base_class::out_buf_;
using base_class::decompress_result_;
using base_class::next_chunk_index_it_;
using base_class::last_read_pos_;
using base_class::block_size_;
using base_class::readChunk_;
using base_class::endChunk_;
// From STFCompressedChunkedBase
using base_class::marker_record_chunk_size_;
using base_class::next_chunk_end_;
using base_class::chunk_indices_;
// From STFFstream
using base_class::stream_;
using base_class::pc_tracker_;
using base_class::num_marker_records_;
bool decompression_in_progress_ = false; /**< Flag indicating whether a chunk is currently being decompressed */
STFCompressionBuffer decompressed_buf_; /**< Decompressed data pointer */
/**
* Returns true if all data has been consumed from the file and decompression buffers
*/
inline bool allInputConsumedImpl_() const {
return base_class::allInputConsumedImpl_() && decompressed_buf_.consumed();
}
/**
* Asynchronously read and decompress the next chunk in the file
*/
void readNextChunk_() {
// Make sure there are still chunks left to read
if(STF_EXPECT_FALSE(next_chunk_index_it_ == chunk_indices_.end())) {
decompressed_buf_.consume();
return;
}
const size_t uncompressed_chunk_size = next_chunk_index_it_->getUncompressedChunkSize();
const auto chunk_it = ++next_chunk_index_it_;
// Decompress the chunk in a separate thread
decompression_in_progress_ = true;
decompress_result_ = std::move(std::async(std::launch::async,
[this, chunk_it, uncompressed_chunk_size]() {
this->readChunk_(chunk_it,
uncompressed_chunk_size,
this->decompressed_buf_);
}));
}
/**
* Closes the file
*/
int close_() override {
if(!stream_) {
return 0;
}
if(decompression_in_progress_) {
decompress_result_.get();
}
return STFIFstream::close_();
}
public:
STFCompressedIFstream() = default;
/**
* Constructs an STFCompressedIFstream
*
* \param filename Filename to open
*/
explicit STFCompressedIFstream(const std::string_view filename) : // cppcheck-suppress passedByValue
STFCompressedIFstream()
{
STFCompressedIFstream::open(filename);
}
// Have to override the base class destructor to ensure that *our* close method gets called before destruction
inline ~STFCompressedIFstream() override {
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFCompressedIFstream::close_();
}
/**
* Opens a file
* \param filename Filename to open
*/
void open(const std::string_view filename) final { // cppcheck-suppress passedByValue
base_class::open(filename);
decompressed_buf_.initSize(block_size_);
// Read the next chunk
readNextChunk_();
}
/**
* Seeks by the specified number of marker records
* \param num_markers Number of marker records to seek by
*/
inline void seek(size_t num_markers) final {
// If the seek point comes before the next chunk boundary, just seek normally within the chunk
if(num_marker_records_ + num_markers >= next_chunk_end_) {
// Throw away what's currently in the buffer since we're moving to a new chunk
out_buf_.consume();
// Otherwise, seek to the next chunk
if(STF_EXPECT_TRUE(decompression_in_progress_)) {
// Wait for decompressor to finish
decompress_result_.get();
decompression_in_progress_ = false;
std::swap(out_buf_, decompressed_buf_);
}
in_buf_.reset();
endChunk_();
const auto chunk_idx = (num_marker_records_ + num_markers) / marker_record_chunk_size_;
if(STF_EXPECT_FALSE(chunk_idx >= chunk_indices_.size())) {
stf_throw("Attempted to seek past the end of the trace");
}
num_markers = (num_marker_records_ + num_markers) % marker_record_chunk_size_;
next_chunk_index_it_ = std::next(chunk_indices_.begin(), static_cast<ssize_t>(chunk_idx));
num_marker_records_ = chunk_idx * marker_record_chunk_size_;
next_chunk_end_ = num_marker_records_ + marker_record_chunk_size_;
fseek(stream_, next_chunk_index_it_->getOffset(), SEEK_SET);
last_read_pos_ = next_chunk_index_it_->getOffset();
pc_tracker_.forcePC(next_chunk_index_it_->getStartPC());
const auto current_chunk = next_chunk_index_it_;
if(STF_EXPECT_TRUE(next_chunk_index_it_ != chunk_indices_.end())) {
++next_chunk_index_it_;
}
readChunk_(next_chunk_index_it_, current_chunk->getUncompressedChunkSize(), out_buf_);
readNextChunk_();
}
STFIFstream::seek(num_markers);
}
/**
* Callback for marker records
*/
inline void markerRecordCallback() final {
STFIFstream::markerRecordCallback();
// If we cross a chunk boundary, move on to the next chunk
if(STF_EXPECT_FALSE(num_marker_records_ >= next_chunk_end_)) {
next_chunk_end_ += marker_record_chunk_size_;
// There should be a decompressed chunk ready to go
if(STF_EXPECT_TRUE(decompression_in_progress_)) {
// Wait for decompressor to finish
decompress_result_.get();
decompression_in_progress_ = false;
std::swap(out_buf_, decompressed_buf_);
}
// Start decompressing the next chunk
readNextChunk_();
}
}
};
} // end namespace stf
#endif