Skip to content

Commit d4f8d85

Browse files
wypbfacebook-github-bot
wypb
authored andcommitted
Fix the incorrect size calculation logic of FileSink. (facebookincubator#9429)
Summary: I was writing unit tests for the Textfile writer internally and found that data written to `MemorySink` may be overwritten. I investigated the reason and found that `FileSink::writeImpl` can accept multiple buffers. https://github.com/facebookincubator/velox/blob/main/velox/dwio/common/FileSink.cpp#L65-L80 Every time the data of a buffer is written, we should update `FileSink#size_` instead of updating after writing all buffers. Because `MemorySink::write` relies on `FileSink#size_` to write data to `MemorySink#data_`. https://github.com/facebookincubator/velox/blob/main/velox/dwio/common/FileSink.cpp#L184-L189 CC: mbasmanova xiaoxmeng Pull Request resolved: facebookincubator#9429 Reviewed By: amitkdutta, kewang1024 Differential Revision: D56004384 Pulled By: xiaoxmeng fbshipit-source-id: c824c5e481866abef12f99d570d41a852bb8b2e0
1 parent 8e9496b commit d4f8d85

File tree

3 files changed

+64
-4
lines changed

3 files changed

+64
-4
lines changed

velox/dwio/common/FileSink.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@ void FileSink::writeImpl(
6666
std::vector<DataBuffer<char>>& buffers,
6767
const std::function<uint64_t(const DataBuffer<char>&)>& callback) {
6868
DWIO_ENSURE(!isClosed(), "Cannot write to closed sink.");
69-
uint64_t size = 0;
69+
const uint64_t oldSize = size_;
7070
for (auto& buf : buffers) {
71-
size += callback(buf);
71+
// NOTE: we need to update 'size_' after each 'callback' invocation as some
72+
// file sink implementation like MemorySink depends on the updated 'size_'
73+
// for new write.
74+
size_ += callback(buf);
7275
}
73-
size_ += size;
7476
if (stats_ != nullptr) {
75-
stats_->incRawBytesWritten(size);
77+
stats_->incRawBytesWritten(size_ - oldSize);
7678
}
7779
// Writing buffer is treated as transferring ownership. So clearing the
7880
// buffers after all buffers are written.

velox/dwio/common/tests/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ add_executable(
2525
ExecutorBarrierTest.cpp
2626
OnDemandUnitLoaderTests.cpp
2727
LocalFileSinkTest.cpp
28+
MemorySinkTest.cpp
2829
LoggedExceptionTest.cpp
2930
ParallelForTest.cpp
3031
RangeTests.cpp
+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/common/FileSink.h"
18+
19+
#include <gtest/gtest.h>
20+
21+
namespace facebook::velox::dwio::common {
22+
23+
class MemorySinkTest : public testing::Test {
24+
protected:
25+
static void SetUpTestCase() {
26+
memory::MemoryManager::testingSetInstance({});
27+
}
28+
29+
std::shared_ptr<velox::memory::MemoryPool> pool_{
30+
memory::memoryManager()->addLeafPool()};
31+
};
32+
33+
TEST_F(MemorySinkTest, create) {
34+
std::string chars("abcdefghijklmnopqrst");
35+
std::vector<DataBuffer<char>> buffers;
36+
37+
// Add 'abcdefghij' to first buffer
38+
buffers.emplace_back(*pool_);
39+
buffers.back().append(0, chars.data(), 10);
40+
41+
// Add 'klmnopqrst' to second buffer
42+
buffers.emplace_back(*pool_);
43+
buffers.back().append(0, chars.data() + 10, 10);
44+
45+
ASSERT_EQ(buffers.size(), 2);
46+
47+
auto memorySink = std::make_unique<MemorySink>(
48+
1024, dwio::common::FileSink::Options{.pool = pool_.get()});
49+
50+
ASSERT_TRUE(memorySink->isBuffered());
51+
// Write data to MemorySink.
52+
memorySink->write(buffers);
53+
ASSERT_EQ(memorySink->size(), chars.length());
54+
ASSERT_EQ(memorySink->data(), chars);
55+
memorySink->close();
56+
}
57+
} // namespace facebook::velox::dwio::common

0 commit comments

Comments
 (0)