Skip to content

Commit 8a31aea

Browse files
committed
Fix jdbc_fetch_size usage with postgresql
1 parent ae2523b commit 8a31aea

File tree

3 files changed

+14
-8
lines changed

3 files changed

+14
-8
lines changed

lib/logstash/plugin_mixins/jdbc/jdbc.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ def open_jdbc_connection
160160
require "java"
161161
require "sequel"
162162
require "sequel/adapters/jdbc"
163+
require "sequel/adapters/jdbc/transactions"
163164

164165
Sequel.application_timezone = @plugin_timezone.to_sym
165166
if @drivers_loaded.false?
@@ -183,6 +184,7 @@ def open_jdbc_connection
183184
end
184185
@database = jdbc_connect()
185186
@database.extension(:pagination)
187+
@database.extend(Sequel::JDBC::Transactions)
186188
if @jdbc_default_timezone
187189
@database.extension(:named_timezones)
188190
@database.timezone = @jdbc_default_timezone

lib/logstash/plugin_mixins/jdbc/statement_handler.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@ class NormalStatementHandler < StatementHandler
3131
# @yieldparam row [Hash{Symbol=>Object}]
3232
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
3333
query = build_query(db, sql_last_value)
34-
if jdbc_paging_enabled
35-
query.each_page(jdbc_page_size) do |paged_dataset|
36-
paged_dataset.each do |row|
34+
# Execute query in transaction cause PG driver require autocommit off for set fetch count
35+
# See: https://jdbc.postgresql.org/documentation/head/query.html
36+
db.transaction(rollback: :always) do
37+
if jdbc_paging_enabled
38+
query.each_page(jdbc_page_size) do |paged_dataset|
39+
paged_dataset.each do |row|
40+
yield row
41+
end
42+
end
43+
else
44+
query.each do |row|
3745
yield row
3846
end
3947
end
40-
else
41-
query.each do |row|
42-
yield row
43-
end
4448
end
4549
end
4650

spec/inputs/jdbc_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,7 @@
10261026
end
10271027

10281028
it "should report the statements to logging" do
1029-
expect(plugin.logger).to receive(:debug).once
1029+
expect(plugin.logger).to receive(:debug).twice
10301030
plugin.run(queue)
10311031
end
10321032
end

0 commit comments

Comments
 (0)