Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make open() and close() methods of STFFstream (and all subclasses) thread-safe #30

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions stf-inc/stf_compressed_ifstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ namespace stf {
}));
}

/**
* Closes the file
*/
int close_() override {
if(!stream_) {
return 0;
}

if(decompression_in_progress_) {
decompress_result_.get();
}

return STFIFstream::close_();
}

public:
STFCompressedIFstream() = default;

Expand All @@ -87,14 +102,13 @@ namespace stf {
explicit STFCompressedIFstream(const std::string_view filename) : // cppcheck-suppress passedByValue
STFCompressedIFstream()
{
open(filename);
STFCompressedIFstream::open(filename);
}

// Have to override the base class destructor to ensure that *our* close method gets called before destruction
inline ~STFCompressedIFstream() override {
if(stream_) {
STFCompressedIFstream::close();
}
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFCompressedIFstream::close_();
}

/**
Expand All @@ -108,21 +122,6 @@ namespace stf {
readNextChunk_();
}

/**
* Closes the file
*/
int close() override {
if(!stream_) {
return 0;
}

if(decompression_in_progress_) {
decompress_result_.get();
}

return STFIFstream::close();
}

/**
* Seeks by the specified number of marker records
* \param num_markers Number of marker records to seek by
Expand Down
56 changes: 28 additions & 28 deletions stf-inc/stf_compressed_ofstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,32 @@ namespace stf {
chunk_indices_.emplace_back(end, next_chunk_pc, 0);
}

/**
* Closes the file
*/
int close_() override {
if(!stream_) {
return 0;
}

// Finish any pending chunk
if(pending_chunk_) {
// Skip writing the chunk in the buffer if it would break any readers that try to use the trace
if(incomplete_chunk_) {
std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl;
}
else {
compressChunk_();
}
}
if(compression_in_progress_) {
compression_done_.get();
compression_in_progress_ = false;
}

return STFOFstream::close_();
}

public:
/**
* Constructs an STFCompressedOFstream
Expand Down Expand Up @@ -258,9 +284,8 @@ namespace stf {

// Have to override the base class destructor to ensure that *our* close method gets called before destruction
inline ~STFCompressedOFstream() override {
if(stream_) {
STFCompressedOFstream::close();
}
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFCompressedOFstream::close_();
}

/**
Expand Down Expand Up @@ -317,31 +342,6 @@ namespace stf {
next_chunk_end_ = marker_record_chunk_size_;
}

/**
* Closes the file
*/
int close() override {
if(!stream_) {
return 0;
}

// Finish any pending chunk
if(pending_chunk_) {
// Skip writing the chunk in the buffer if it would break any readers that try to use the trace
if(incomplete_chunk_) {
std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl;
}
else {
compressChunk_();
}
}
if(compression_in_progress_) {
compression_done_.get();
compression_in_progress_ = false;
}
return STFOFstream::close();
}

void markerRecordCallback() override {
STFOFstream::markerRecordCallback();
incomplete_chunk_ = false; // This chunk is safe to write now
Expand Down
81 changes: 52 additions & 29 deletions stf-inc/stf_fstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
#include "stf_protocol_id.hpp"
#include "stf_vlen.hpp"

/**
* \macro STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK
*
* This macro must be invoked before calling the close_() method in an STFFstream-derived class
*/
#define STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK() std::lock_guard<std::mutex> l(STFFstream::open_close_mutex_)

namespace stf {
/**
* \class STFFstream
Expand All @@ -38,6 +45,7 @@ namespace stf {
size_t num_records_read_ = 0; /**< Number of records seen so far */
size_t num_marker_records_ = 0; /**< Number of marker records seen so far */
bool has_32bit_events_ = false; /**< If true, EventRecord event values are packed into 32 bits */
std::mutex open_close_mutex_; /**< Ensures open() and close() are not called from more than 1 thread simultaneously */

STFFstream() = default;

Expand Down Expand Up @@ -106,15 +114,49 @@ namespace stf {
}
}

/**
* Virtual method that does the actual work of closing a file.
* WARNING: This function should not be called unless the open/close mutex has been locked with STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK
* This can be overridden by subclasses, but note that any subclass that overrides it must also override the destructor.
*/
inline virtual int close_() {
int retcode = 0;
if(stream_) {
if(stream_ == stdout) {
fflush(stream_); // need to manually flush stdout
}
else if(stream_ != stdin) { // don't close stdin/stdout
if (used_popen_) {
retcode = pclose (stream_);
}
else if (stream_ != stdout) {
retcode = fclose (stream_);
}
}
stream_ = nullptr;
}
num_records_read_ = 0;
num_marker_records_ = 0;

// If we aren't closing this from the atexit handler, go ahead and remove ourselves
// from open_streams_
if(!lock_open_streams_) {
std::lock_guard<std::mutex> l(open_streams_mutex_);
open_streams_.erase(this);
}
return retcode;
}

public:
// Prevents copying any STF I/O objects
STFFstream(const STFFstream&) = delete;
void operator=(const STFFstream&) = delete;

// Any class that overrides close_ must also override the destructor!
// Also note that the open/close mutex must be manually locked with STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK in any overridden destructor
virtual inline ~STFFstream() {
if (stream_) {
STFFstream::close();
}
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFFstream::close_();
}

/**
Expand Down Expand Up @@ -143,6 +185,10 @@ namespace stf {
* \param rw_mode R/W mode
*/
inline void open(const std::string_view filename, const std::string_view rw_mode) { // cppcheck-suppress passedByValue
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();

stf_assert(!stream_, "Stream is already open. Call close() first.");

// special handling for stdin/stdout
if(filename.compare("-") == 0) {
if(rw_mode.compare("rb") == 0) {
Expand All @@ -166,32 +212,9 @@ namespace stf {
/**
* \brief close the trace reader/writer
*/
virtual inline int close() {
int retcode = 0;
if (stream_) {
if(stream_ == stdout) {
fflush(stream_); // need to manually flush stdout
}
else if(stream_ != stdin) { // don't close stdin/stdout
if (used_popen_) {
retcode = pclose (stream_);
}
else if (stream_ != stdout) {
retcode = fclose (stream_);
}
}
stream_ = nullptr;
}
num_records_read_ = 0;
num_marker_records_ = 0;

// If we aren't closing this from the atexit handler, go ahead and remove ourselves
// from open_streams_
if(!lock_open_streams_) {
std::lock_guard<std::mutex> l(open_streams_mutex_);
open_streams_.erase(this);
}
return retcode;
inline int close() {
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
return close_();
}

/**
Expand Down
Loading