Skip to content

buffer: add feature to evacuate chunk files when retry limit #4986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,20 @@ def handle_broken_files(path, mode, e)
File.unlink(path, path + '.meta') rescue nil
end

def evacuate_chunk(chunk)
unless chunk.is_a?(Fluent::Plugin::Buffer::FileChunk)
raise ArgumentError, "The chunk must be FileChunk, but it was #{chunk.class}."
end

backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)

FileUtils.copy([chunk.path, chunk.meta_path], backup_dir)
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
rescue => e
log.error "unexpected error while evacuating chunk files.", error: e
end

private

def escaped_patterns(patterns)
Expand Down
14 changes: 14 additions & 0 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ def handle_broken_files(path, mode, e)
File.unlink(path) rescue nil
end

def evacuate_chunk(chunk)
unless chunk.is_a?(Fluent::Plugin::Buffer::FileSingleChunk)
raise ArgumentError, "The chunk must be FileSingleChunk, but it was #{chunk.class}."
end

backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)

FileUtils.copy(chunk.path, backup_dir)
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
rescue => e
log.error "unexpected error while evacuating chunk files.", error: e
end

private

def escaped_patterns(patterns)
Expand Down
30 changes: 28 additions & 2 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ def clear_queue!
until @queue.empty?
begin
q = @queue.shift
evacuate_chunk(q)
log.trace("purging a chunk in queue"){ {id: dump_unique_id_hex(chunk.unique_id), bytesize: chunk.bytesize, size: chunk.size} }
q.purge
rescue => e
Expand All @@ -636,6 +637,25 @@ def clear_queue!
end
end

def evacuate_chunk(chunk)
# Overwrite this on demand.
#
# Note: Difference from the `backup` feature.
# The `backup` feature is for unrecoverable errors, mainly for bad chunks.
# On the other hand, this feature is for normal chunks.
# The main motivation for this feature is to enable recovery by evacuating buffer files
# when the retry limit is reached due to external factors such as network issues.
#
# Note: Difference from the `secondary` feature.
# The `secondary` feature is not suitable for recovery.
# It can be difficult to recover files made by `out_secondary_file` because the metadata
# is lost.
# For file buffers, the easiest way for recovery is to evacuate the chunk files as is.
# Once the issue is recovered, we can put back the chunk files, and restart Fluentd to
# load them.
# This feature enables it.
end

def chunk_size_over?(chunk)
chunk.bytesize > @chunk_limit_size || (@chunk_limit_records && chunk.size > @chunk_limit_records)
end
Expand Down Expand Up @@ -925,8 +945,6 @@ def backup(chunk_unique_id)
return
end

safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

Expand All @@ -945,6 +963,14 @@ def optimistic_queued?(metadata = nil)
[email protected]?
end
end

def safe_owner_id
owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
end

def backup_base_dir
system_config.root_dir || DEFAULT_BACKUP_DIR
end
end
end
end
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class FileChunkError < StandardError; end
# path_prefix: path prefix string, ended with '.'
# path_suffix: path suffix string, like '.log' (or any other user specified)

attr_reader :path, :permission
attr_reader :path, :meta_path, :permission

def initialize(metadata, path, mode, perm: nil, compress: :text)
super(metadata, compress: compress)
Expand Down
167 changes: 167 additions & 0 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,34 @@ def write(chunk)
# drop
end
end

class DummyErrorOutputPlugin < DummyOutputPlugin
def register_write(&block)
instance_variable_set("@write", block)
end

def initialize
super
@should_fail_writing = true
@write = nil
end

def recover
@should_fail_writing = false
end

def write(chunk)
if @should_fail_writing
raise "failed writing chunk"
else
@write ? @write.call(chunk) : nil
end
end

def format(tag, time, record)
[tag, time.to_i, record].to_json + "\n"
end
end
end

class FileBufferTest < Test::Unit::TestCase
Expand Down Expand Up @@ -1311,4 +1339,143 @@ def compare_log(plugin, msg)
assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end
end

sub_test_case 'evacuate_chunk' do
def setup
Fluent::Test.setup

@now = Time.local(2025, 5, 30, 17, 0, 0)
@base_dir = File.expand_path("../../tmp/evacuate_chunk", __FILE__)
@buf_dir = File.join(@base_dir, "buffer")
@root_dir = File.join(@base_dir, "root")
FileUtils.mkdir_p(@root_dir)

Fluent::SystemConfig.overwrite_system_config("root_dir" => @root_dir) do
Timecop.freeze(@now)
yield
end
ensure
Timecop.return
FileUtils.rm_rf(@base_dir)
end

def start_plugin(plugin)
plugin.start
plugin.after_start
end

def stop_plugin(plugin)
plugin.stop unless plugin.stopped?
plugin.before_shutdown unless plugin.before_shutdown?
plugin.shutdown unless plugin.shutdown?
plugin.after_shutdown unless plugin.after_shutdown?
plugin.close unless plugin.closed?
plugin.terminate unless plugin.terminated?
end

def configure_output(id, chunk_key, buffer_conf)
output = FluentPluginFileBufferTest::DummyErrorOutputPlugin.new
output.configure(
config_element('ROOT', '', {'@id' => id}, [config_element('buffer', chunk_key, buffer_conf)])
)
yield output
ensure
stop_plugin(output)
end

def wait(sec: 4)
waiting(sec) do
Thread.pass until yield
end
end

def emit_events(output, tag, es)
output.interrupt_flushes
output.emit_events("test.1", dummy_event_stream)
@now += 1
Timecop.freeze(@now)
output.enqueue_thread_wait
output.flush_thread_wakeup
end

def proceed_to_next_retry(output)
@now += 1
Timecop.freeze(@now)
output.flush_thread_wakeup
end

def dummy_event_stream
Fluent::ArrayEventStream.new([
[ event_time("2025-05-30 10:00:00"), {"message" => "data1"} ],
[ event_time("2025-05-30 10:10:00"), {"message" => "data2"} ],
[ event_time("2025-05-30 10:20:00"), {"message" => "data3"} ],
])
end

def evacuate_dir(plugin_id)
File.join(@root_dir, "buffer", plugin_id)
end

test 'can recover by putting back evacuated chunk files' do
plugin_id = "test_output"
tag = "test.1"
buffer_conf = {
"path" => @buf_dir,
"flush_mode" => "interval",
"flush_interval" => "1s",
"retry_type" => "periodic",
"retry_max_times" => 1,
"retry_randomize" => false,
}

# Fail flushing and reach retry limit
configure_output(plugin_id, "tag", buffer_conf) do |output|
start_plugin(output)

emit_events(output, tag, dummy_event_stream)
wait { output.write_count == 1 and output.num_errors == 1 }

proceed_to_next_retry(output)
wait { output.write_count == 2 and output.num_errors == 2 }
wait { Dir.empty?(@buf_dir) }

# Assert evacuated files
evacuated_files = Dir.children(evacuate_dir(plugin_id)).map do |child_name|
File.join(evacuate_dir(plugin_id), child_name)
end
assert { evacuated_files.size == 2 } # .log and .log.meta

# Put back evacuated chunk files for recovery
FileUtils.move(evacuated_files, @buf_dir)
end

# Restart plugin to load the chunk files that were put back
written_data = []
configure_output(plugin_id, "tag", buffer_conf) do |output|
output.recover
output.register_write do |chunk|
written_data << chunk.read
end
start_plugin(output)

wait { not written_data.empty? }
end

# Assert the recovery success
assert { written_data.length == 1 }

expected_records = []
dummy_event_stream.each do |(time, record)|
expected_records << [tag, time.to_i, record]
end

actual_records = StringIO.open(written_data.first) do |io|
io.each_line.map do |line|
JSON.parse(line)
end
end

assert_equal(expected_records, actual_records)
end
end
end
Loading
Loading