Skip to content

Commit 19fb599

Browse files
authored
buffer: add feature to evacuate chunk files when retry limit (#4986)
**Which issue(s) this PR fixes**: None. **What this PR does / why we need it**: Add feature to evacuate chunk files when retry limit. When reached the retry limit, `buf_file` and `buf_file_single` evacuates all the chunk files (and the meta files) in the queue to the following dir before purging. * `(root_dir)/buffer/(plugin-id)/` `root_dir` is `system_config.root_dir` if it is configured. Otherwise, `DEFAULT_BACKUP_DIR` is applied. (`/tmp/fluent`. We can change this by env var `FLUENT_BACKUP_DIR`) There is no separate directory for each worker because the IDs of each chunk are entirely unique. This makes recovery easier. After the problem with the flush (such as a network issue) is resolved, we can put back the files and restart Fluentd to flush them again. ## Difference from the `backup` feature: The `backup` feature is for unrecoverable errors, mainly for bad chunks. On the other hand, this feature is for normal chunks. The main motivation for this feature is to enable recovery by evacuating buffer files when the retry limit is reached due to external factors such as network issues. ## Difference from the `secondary` feature: The `secondary` feature is not suitable for recovery. It can be difficult to recover files made by `out_secondary_file` because the metadata is lost. For file buffers, the easiest way for recovery is to evacuate the chunk files as is. Once the issue is recovered, we can put back the chunk files, and restart Fluentd to load them. This feature enables it. **Docs Changes**: fluent/fluentd-docs-gitbook#583 **Release Note**: Same as the title. Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 6571703 commit 19fb599

File tree

6 files changed

+391
-3
lines changed

6 files changed

+391
-3
lines changed

lib/fluent/plugin/buf_file.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,20 @@ def handle_broken_files(path, mode, e)
229229
File.unlink(path, path + '.meta') rescue nil
230230
end
231231

232+
def evacuate_chunk(chunk)
233+
unless chunk.is_a?(Fluent::Plugin::Buffer::FileChunk)
234+
raise ArgumentError, "The chunk must be FileChunk, but it was #{chunk.class}."
235+
end
236+
237+
backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
238+
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
239+
240+
FileUtils.copy([chunk.path, chunk.meta_path], backup_dir)
241+
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
242+
rescue => e
243+
log.error "unexpected error while evacuating chunk files.", error: e
244+
end
245+
232246
private
233247

234248
def escaped_patterns(patterns)

lib/fluent/plugin/buf_file_single.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,20 @@ def handle_broken_files(path, mode, e)
241241
File.unlink(path) rescue nil
242242
end
243243

244+
def evacuate_chunk(chunk)
245+
unless chunk.is_a?(Fluent::Plugin::Buffer::FileSingleChunk)
246+
raise ArgumentError, "The chunk must be FileSingleChunk, but it was #{chunk.class}."
247+
end
248+
249+
backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
250+
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
251+
252+
FileUtils.copy(chunk.path, backup_dir)
253+
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
254+
rescue => e
255+
log.error "unexpected error while evacuating chunk files.", error: e
256+
end
257+
244258
private
245259

246260
def escaped_patterns(patterns)

lib/fluent/plugin/buffer.rb

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ def clear_queue!
625625
until @queue.empty?
626626
begin
627627
q = @queue.shift
628+
evacuate_chunk(q)
628629
log.trace("purging a chunk in queue"){ {id: dump_unique_id_hex(chunk.unique_id), bytesize: chunk.bytesize, size: chunk.size} }
629630
q.purge
630631
rescue => e
@@ -636,6 +637,25 @@ def clear_queue!
636637
end
637638
end
638639

640+
def evacuate_chunk(chunk)
641+
# Overwrite this on demand.
642+
#
643+
# Note: Difference from the `backup` feature.
644+
# The `backup` feature is for unrecoverable errors, mainly for bad chunks.
645+
# On the other hand, this feature is for normal chunks.
646+
# The main motivation for this feature is to enable recovery by evacuating buffer files
647+
# when the retry limit is reached due to external factors such as network issues.
648+
#
649+
# Note: Difference from the `secondary` feature.
650+
# The `secondary` feature is not suitable for recovery.
651+
# It can be difficult to recover files made by `out_secondary_file` because the metadata
652+
# is lost.
653+
# For file buffers, the easiest way for recovery is to evacuate the chunk files as is.
654+
# Once the issue is recovered, we can put back the chunk files, and restart Fluentd to
655+
# load them.
656+
# This feature enables it.
657+
end
658+
639659
def chunk_size_over?(chunk)
640660
chunk.bytesize > @chunk_limit_size || (@chunk_limit_records && chunk.size > @chunk_limit_records)
641661
end
@@ -925,8 +945,6 @@ def backup(chunk_unique_id)
925945
return
926946
end
927947

928-
safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
929-
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
930948
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log")
931949
backup_dir = File.dirname(backup_file)
932950

@@ -945,6 +963,14 @@ def optimistic_queued?(metadata = nil)
945963
!@queue.empty?
946964
end
947965
end
966+
967+
def safe_owner_id
968+
owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
969+
end
970+
971+
def backup_base_dir
972+
system_config.root_dir || DEFAULT_BACKUP_DIR
973+
end
948974
end
949975
end
950976
end

lib/fluent/plugin/buffer/file_chunk.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class FileChunkError < StandardError; end
3737
# path_prefix: path prefix string, ended with '.'
3838
# path_suffix: path suffix string, like '.log' (or any other user specified)
3939

40-
attr_reader :path, :permission
40+
attr_reader :path, :meta_path, :permission
4141

4242
def initialize(metadata, path, mode, perm: nil, compress: :text)
4343
super(metadata, compress: compress)

test/plugin/test_buf_file.rb

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,34 @@ def write(chunk)
2020
# drop
2121
end
2222
end
23+
24+
class DummyErrorOutputPlugin < DummyOutputPlugin
25+
def register_write(&block)
26+
instance_variable_set("@write", block)
27+
end
28+
29+
def initialize
30+
super
31+
@should_fail_writing = true
32+
@write = nil
33+
end
34+
35+
def recover
36+
@should_fail_writing = false
37+
end
38+
39+
def write(chunk)
40+
if @should_fail_writing
41+
raise "failed writing chunk"
42+
else
43+
@write ? @write.call(chunk) : nil
44+
end
45+
end
46+
47+
def format(tag, time, record)
48+
[tag, time.to_i, record].to_json + "\n"
49+
end
50+
end
2351
end
2452

2553
class FileBufferTest < Test::Unit::TestCase
@@ -1311,4 +1339,143 @@ def compare_log(plugin, msg)
13111339
assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
13121340
end
13131341
end
1342+
1343+
sub_test_case 'evacuate_chunk' do
1344+
def setup
1345+
Fluent::Test.setup
1346+
1347+
@now = Time.local(2025, 5, 30, 17, 0, 0)
1348+
@base_dir = File.expand_path("../../tmp/evacuate_chunk", __FILE__)
1349+
@buf_dir = File.join(@base_dir, "buffer")
1350+
@root_dir = File.join(@base_dir, "root")
1351+
FileUtils.mkdir_p(@root_dir)
1352+
1353+
Fluent::SystemConfig.overwrite_system_config("root_dir" => @root_dir) do
1354+
Timecop.freeze(@now)
1355+
yield
1356+
end
1357+
ensure
1358+
Timecop.return
1359+
FileUtils.rm_rf(@base_dir)
1360+
end
1361+
1362+
def start_plugin(plugin)
1363+
plugin.start
1364+
plugin.after_start
1365+
end
1366+
1367+
def stop_plugin(plugin)
1368+
plugin.stop unless plugin.stopped?
1369+
plugin.before_shutdown unless plugin.before_shutdown?
1370+
plugin.shutdown unless plugin.shutdown?
1371+
plugin.after_shutdown unless plugin.after_shutdown?
1372+
plugin.close unless plugin.closed?
1373+
plugin.terminate unless plugin.terminated?
1374+
end
1375+
1376+
def configure_output(id, chunk_key, buffer_conf)
1377+
output = FluentPluginFileBufferTest::DummyErrorOutputPlugin.new
1378+
output.configure(
1379+
config_element('ROOT', '', {'@id' => id}, [config_element('buffer', chunk_key, buffer_conf)])
1380+
)
1381+
yield output
1382+
ensure
1383+
stop_plugin(output)
1384+
end
1385+
1386+
def wait(sec: 4)
1387+
waiting(sec) do
1388+
Thread.pass until yield
1389+
end
1390+
end
1391+
1392+
def emit_events(output, tag, es)
1393+
output.interrupt_flushes
1394+
output.emit_events("test.1", dummy_event_stream)
1395+
@now += 1
1396+
Timecop.freeze(@now)
1397+
output.enqueue_thread_wait
1398+
output.flush_thread_wakeup
1399+
end
1400+
1401+
def proceed_to_next_retry(output)
1402+
@now += 1
1403+
Timecop.freeze(@now)
1404+
output.flush_thread_wakeup
1405+
end
1406+
1407+
def dummy_event_stream
1408+
Fluent::ArrayEventStream.new([
1409+
[ event_time("2025-05-30 10:00:00"), {"message" => "data1"} ],
1410+
[ event_time("2025-05-30 10:10:00"), {"message" => "data2"} ],
1411+
[ event_time("2025-05-30 10:20:00"), {"message" => "data3"} ],
1412+
])
1413+
end
1414+
1415+
def evacuate_dir(plugin_id)
1416+
File.join(@root_dir, "buffer", plugin_id)
1417+
end
1418+
1419+
test 'can recover by putting back evacuated chunk files' do
1420+
plugin_id = "test_output"
1421+
tag = "test.1"
1422+
buffer_conf = {
1423+
"path" => @buf_dir,
1424+
"flush_mode" => "interval",
1425+
"flush_interval" => "1s",
1426+
"retry_type" => "periodic",
1427+
"retry_max_times" => 1,
1428+
"retry_randomize" => false,
1429+
}
1430+
1431+
# Fail flushing and reach retry limit
1432+
configure_output(plugin_id, "tag", buffer_conf) do |output|
1433+
start_plugin(output)
1434+
1435+
emit_events(output, tag, dummy_event_stream)
1436+
wait { output.write_count == 1 and output.num_errors == 1 }
1437+
1438+
proceed_to_next_retry(output)
1439+
wait { output.write_count == 2 and output.num_errors == 2 }
1440+
wait { Dir.empty?(@buf_dir) }
1441+
1442+
# Assert evacuated files
1443+
evacuated_files = Dir.children(evacuate_dir(plugin_id)).map do |child_name|
1444+
File.join(evacuate_dir(plugin_id), child_name)
1445+
end
1446+
assert { evacuated_files.size == 2 } # .log and .log.meta
1447+
1448+
# Put back evacuated chunk files for recovery
1449+
FileUtils.move(evacuated_files, @buf_dir)
1450+
end
1451+
1452+
# Restart plugin to load the chunk files that were put back
1453+
written_data = []
1454+
configure_output(plugin_id, "tag", buffer_conf) do |output|
1455+
output.recover
1456+
output.register_write do |chunk|
1457+
written_data << chunk.read
1458+
end
1459+
start_plugin(output)
1460+
1461+
wait { not written_data.empty? }
1462+
end
1463+
1464+
# Assert the recovery success
1465+
assert { written_data.length == 1 }
1466+
1467+
expected_records = []
1468+
dummy_event_stream.each do |(time, record)|
1469+
expected_records << [tag, time.to_i, record]
1470+
end
1471+
1472+
actual_records = StringIO.open(written_data.first) do |io|
1473+
io.each_line.map do |line|
1474+
JSON.parse(line)
1475+
end
1476+
end
1477+
1478+
assert_equal(expected_records, actual_records)
1479+
end
1480+
end
13141481
end

0 commit comments

Comments
 (0)