Skip to content

Commit

Permalink
Use two-parameter absl::MutexLock constructor instead of
Browse files Browse the repository at this point in the history
`absl::Mutex::LockWhen()` to consistently rely on RAII for unlocking.

PiperOrigin-RevId: 629412943
  • Loading branch information
QrczakMK committed Apr 30, 2024
1 parent 7c54c39 commit 157b6b0
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions riegeli/records/record_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,12 @@ absl::Status RecordWriterBase::ParallelWorker::AnnotateStatus(
absl::Status status) {
std::promise<absl::Status> done_promise;
std::future<absl::Status> done_future = done_promise.get_future();
mutex_.LockWhen(
absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
AnnotateStatusRequest{std::move(status), std::move(done_promise)});
mutex_.Unlock();
{
absl::MutexLock lock(
&mutex_, absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
AnnotateStatusRequest{std::move(status), std::move(done_promise)});
}
return done_future.get();
}

Expand All @@ -681,12 +682,13 @@ bool RecordWriterBase::ParallelWorker::WriteSignature() {
ChunkPromises chunk_promises;
chunk_promises.chunk_header.set_value(chunk.header);
chunk_promises.chunk.set_value(std::move(chunk));
mutex_.LockWhen(
absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
WriteChunkRequest{chunk_promises.chunk_header.get_future(),
chunk_promises.chunk.get_future()});
mutex_.Unlock();
{
absl::MutexLock lock(
&mutex_, absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
WriteChunkRequest{chunk_promises.chunk_header.get_future(),
chunk_promises.chunk.get_future()});
}
return true;
}

Expand All @@ -697,12 +699,13 @@ bool RecordWriterBase::ParallelWorker::WriteMetadata() {
return true;
}
ChunkPromises* const chunk_promises = new ChunkPromises();
mutex_.LockWhen(
absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
WriteChunkRequest{chunk_promises->chunk_header.get_future(),
chunk_promises->chunk.get_future()});
mutex_.Unlock();
{
absl::MutexLock lock(
&mutex_, absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
WriteChunkRequest{chunk_promises->chunk_header.get_future(),
chunk_promises->chunk.get_future()});
}
internal::ThreadPool::global().Schedule([this, chunk_promises] {
Chunk chunk;
EncodeMetadata(chunk);
Expand All @@ -717,12 +720,13 @@ bool RecordWriterBase::ParallelWorker::CloseChunk() {
if (ABSL_PREDICT_FALSE(!ok())) return false;
ChunkEncoder* const chunk_encoder = chunk_encoder_.release();
ChunkPromises* const chunk_promises = new ChunkPromises();
mutex_.LockWhen(
absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
WriteChunkRequest{chunk_promises->chunk_header.get_future(),
chunk_promises->chunk.get_future()});
mutex_.Unlock();
{
absl::MutexLock lock(
&mutex_, absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
WriteChunkRequest{chunk_promises->chunk_header.get_future(),
chunk_promises->chunk.get_future()});
}
internal::ThreadPool::global().Schedule(
[this, chunk_encoder, chunk_promises] {
Chunk chunk;
Expand All @@ -737,10 +741,11 @@ bool RecordWriterBase::ParallelWorker::CloseChunk() {

bool RecordWriterBase::ParallelWorker::PadToBlockBoundary() {
if (ABSL_PREDICT_FALSE(!ok())) return false;
mutex_.LockWhen(
absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(PadToBlockBoundaryRequest());
mutex_.Unlock();
{
absl::MutexLock lock(
&mutex_, absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(PadToBlockBoundaryRequest());
}
return true;
}

Expand All @@ -752,11 +757,12 @@ RecordWriterBase::FutureStatus RecordWriterBase::ParallelWorker::FutureFlush(
FlushType flush_type) {
std::promise<absl::Status> done_promise;
FutureStatus done_future = done_promise.get_future();
mutex_.LockWhen(
absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
FlushRequest{flush_type, std::move(done_promise)});
mutex_.Unlock();
{
absl::MutexLock lock(
&mutex_, absl::Condition(this, &ParallelWorker::HasCapacityForRequest));
chunk_writer_requests_.emplace_back(
FlushRequest{flush_type, std::move(done_promise)});
}
return done_future;
}

Expand Down

0 comments on commit 157b6b0

Please sign in to comment.