Skip to content

Commit f53478a

Browse files
committed
refactor pool and benchmarks
1 parent 13f5b56 commit f53478a

File tree

4 files changed

+683
-106
lines changed

4 files changed

+683
-106
lines changed

benchmarks/bench.exs

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,69 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: 2025 DBVisor
3+
14
import SQL
25
import Ecto.Query
36
defmodule SQL.Repo do
47
use Ecto.Repo, otp_app: :sql, adapter: Ecto.Adapters.Postgres
58
use SQL, adapter: SQL.Adapters.Postgres, repo: __MODULE__
69

710
def sql() do
8-
Enum.to_list ~SQL[select 1]
11+
case SQL.Pool.checkout(%{id: 1}, :default) do
12+
{idx, _socket} ->
13+
Process.sleep(5)
14+
SQL.Pool.checkin(idx, :default)
15+
:none ->
16+
Process.sleep(5)
17+
end
918
end
1019

1120
def ecto() do
12-
{sql, params} = to_sql(:all, select(from("users"), 1))
13-
{:ok, %{columns: columns, rows: rows}} = query(sql, params)
14-
Enum.map(rows, &Map.new(Enum.zip(columns, &1)))
21+
try do
22+
checkout(fn ->
23+
Process.sleep(5)
24+
end)
25+
rescue
26+
_ -> :error
27+
Process.sleep(5)
28+
end
1529
end
1630
end
1731
Application.put_env(:sql, :ecto_repos, [SQL.Repo])
1832
Application.put_env(:sql, SQL.Repo, log: false, username: "postgres", password: "postgres", hostname: "localhost", database: "sql_test#{System.get_env("MIX_TEST_PARTITION")}", pool_size: 10)
1933
SQL.Repo.__adapter__().storage_up(SQL.Repo.config())
2034
SQL.Repo.start_link()
35+
SQL.Pool.start_link(%{name: :default, size: 10, protocol: :tcp})
2136
query = "temp" |> recursive_ctes(true) |> with_cte("temp", as: ^union_all(select("temp", [t], %{n: 0, fact: 1}), ^where(select("temp", [t], [t.n+1, t.n+1*t.fact]), [t], t.n < 9))) |> select([t], [t.n])
2237
sql = ~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]
2338
result = Tuple.to_list(SQL.Lexer.lex("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)"))
2439
tokens = Enum.at(result, -1)
2540
context = Map.merge(Enum.at(result, 1), %{sql_lock: nil, module: SQL.Adapters.ANSI})
2641
{:ok, pcontext, ptokens} = SQL.Parser.parse(tokens, context)
42+
2743
Benchee.run(
2844
%{
29-
"comptime to_string" => fn _ -> to_string(sql) end,
30-
"comptime to_sql" => fn _ -> SQL.to_sql(sql) end,
31-
"comptime inspect" => fn _ -> inspect(sql) end,
32-
"lex" => fn _ -> SQL.Lexer.lex("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)") end,
33-
"parse" => fn _ -> SQL.Parser.parse(tokens, context) end,
34-
"iodata" => fn _ -> pcontext.module.to_iodata(ptokens, pcontext) end,
35-
"format" => fn _ -> SQL.Format.to_iodata(ptokens, pcontext, 0, false) end,
36-
"lex+parse+iodata" => fn _ ->
37-
{:ok, _, tokens} = SQL.Lexer.lex("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)")
38-
{:ok, pcontext, tokens} = SQL.Parser.parse(tokens, context)
39-
pcontext.module.to_iodata(tokens, pcontext)
40-
end,
41-
"parse/3" => fn _ -> SQL.parse("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)") end,
45+
# "comptime to_string" => fn _ -> to_string(sql) end,
46+
# "comptime to_sql" => fn _ -> SQL.to_sql(sql) end,
47+
# "comptime inspect" => fn _ -> inspect(sql) end,
48+
# "lex" => fn _ -> SQL.Lexer.lex("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)") end,
49+
# "parse" => fn _ -> SQL.Parser.parse(tokens, context) end,
50+
# "iodata" => fn _ -> pcontext.module.to_iodata(ptokens, pcontext) end,
51+
# "format" => fn _ -> SQL.Format.to_iodata(ptokens, pcontext, 0, false) end,
52+
# "lex+parse+iodata" => fn _ ->
53+
# {:ok, _, tokens} = SQL.Lexer.lex("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)")
54+
# {:ok, pcontext, tokens} = SQL.Parser.parse(tokens, context)
55+
# pcontext.module.to_iodata(tokens, pcontext)
56+
# end,
57+
# "parse/3" => fn _ -> SQL.parse("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)") end,
4258
"sql" => fn _ -> SQL.Repo.sql() end,
4359
"ecto" => fn _ -> SQL.Repo.ecto() end,
44-
"runtime to_string" => fn _ -> to_string(~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]) end,
45-
"runtime to_sql" => fn _ -> SQL.to_sql(~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]) end,
46-
"runtime inspect" => fn _ -> inspect(~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]) end,
47-
"runtime ecto" => fn _ -> SQL.Repo.to_sql(:all, "temp" |> recursive_ctes(true) |> with_cte("temp", as: ^union_all(select("temp", [t], %{n: 0, fact: 1}), ^where(select("temp", [t], [t.n+1, t.n+1*t.fact]), [t], t.n < 9))) |> select([t], [t.n])) end,
48-
"comptime ecto" => fn _ -> SQL.Repo.to_sql(:all, query) end
60+
# "runtime to_string" => fn _ -> to_string(~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]) end,
61+
# "runtime to_sql" => fn _ -> SQL.to_sql(~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]) end,
62+
# "runtime inspect" => fn _ -> inspect(~SQL[with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)]) end,
63+
# "runtime ecto" => fn _ -> SQL.Repo.to_sql(:all, "temp" |> recursive_ctes(true) |> with_cte("temp", as: ^union_all(select("temp", [t], %{n: 0, fact: 1}), ^where(select("temp", [t], [t.n+1, t.n+1*t.fact]), [t], t.n < 9))) |> select([t], [t.n])) end,
64+
# "comptime ecto" => fn _ -> SQL.Repo.to_sql(:all, query) end
4965
},
66+
parallel: 10, time: 1,
5067
inputs: %{"1..100_000" => Enum.to_list(1..100_000)},
5168
memory_time: 2,
5269
reduction_time: 2,

benchmarks/pool.exs

Lines changed: 256 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,268 @@
1-
import SQL
2-
import Ecto.Query
3-
SQL.Pool.start_link(%{name: :mypool, protocol: :tcp, size: 10})
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: 2025 DBVisor
3+
44
defmodule SQL.Repo do
55
use Ecto.Repo, otp_app: :sql, adapter: Ecto.Adapters.Postgres
6-
use SQL, adapter: SQL.Adapters.Postgres, repo: __MODULE__
76

87
def sql() do
9-
{idx, _} = SQL.Pool.checkout(SQL.parse("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)"), :mypool)
8+
{idx, _} = SQL.Pool.checkout(%{}, :mypool)
9+
Process.sleep(5)
1010
SQL.Pool.checkin(idx, :mypool)
1111
end
1212

1313
def ecto() do
14-
checkout(fn -> SQL.parse("with recursive temp (n, fact) as (select 0, 1 union all select n+1, (n+1)*fact from temp where n < 9)") end)
14+
checkout(fn -> Process.sleep(5) end)
1515
end
1616
end
17+
1718
Application.put_env(:sql, :ecto_repos, [SQL.Repo])
18-
Application.put_env(:sql, SQL.Repo, log: false, username: "postgres", password: "postgres", hostname: "localhost", database: "sql_test#{System.get_env("MIX_TEST_PARTITION")}", pool_size: 10)
19-
SQL.Repo.__adapter__().storage_up(SQL.Repo.config())
20-
SQL.Repo.start_link()
21-
Benchee.run(
22-
%{
23-
"sql" => fn _ -> SQL.Repo.sql() end,
24-
"ecto" => fn _ -> SQL.Repo.ecto() end,
25-
},
26-
parallel: 50, time: 1,
27-
inputs: %{"1..100_000" => Enum.to_list(1..100_000)},
28-
memory_time: 2,
29-
reduction_time: 2,
30-
unit_scaling: :smallest,
31-
measure_function_call_overhead: true,
32-
profile_after: :eprof
19+
Application.put_env(:sql, SQL.Repo,
20+
log: false,
21+
username: "postgres",
22+
password: "postgres",
23+
hostname: "localhost",
24+
database: "sql_test#{System.get_env("MIX_TEST_PARTITION")}"
3325
)
26+
27+
SQL.Repo.__adapter__().storage_up(SQL.Repo.config())
28+
29+
defmodule Pool.Benchmark do
30+
@moduledoc false
31+
32+
@duration_ms 30_000
33+
@scale_interval 5_000
34+
@report_interval 2_000
35+
36+
@deterministic_delays Enum.map(1..1_000, fn i ->
37+
trunc(50 + :math.sin(i) * 100 + 100)
38+
end)
39+
40+
:ets.new(:latency, [:named_table, :public, {:read_concurrency, true}])
41+
42+
def run_all(pool_size \\ 10, initial_clients \\ 10, max_clients \\ 50, flat \\ false, mode \\ :deterministic) do
43+
if mode == :benchee do
44+
SQL.Repo.start_link(pool_size: pool_size)
45+
SQL.Pool.start_link(%{name: :mypool, protocol: :tcp, size: pool_size})
46+
Benchee.run(
47+
%{
48+
"sql" => fn -> SQL.Repo.sql() end,
49+
"ecto" => fn -> SQL.Repo.ecto() end,
50+
},
51+
parallel: 100,
52+
time: 5,
53+
memory_time: 2,
54+
reduction_time: 2,
55+
unit_scaling: :smallest,
56+
measure_function_call_overhead: true,
57+
profile_after: :eprof
58+
)
59+
else
60+
IO.puts("Starting side-by-side benchmark: SQL vs Ecto")
61+
IO.puts("Mode: #{mode}, Pool size: #{pool_size}, Initial clients: #{initial_clients}, Max clients: #{max_clients}, Flat: #{flat}\n")
62+
63+
results =
64+
[:sql, :ecto]
65+
|> Enum.map(fn adapter ->
66+
{adapter, run_single(adapter, mode, pool_size, initial_clients, max_clients, flat)}
67+
end)
68+
69+
print_comparison(results)
70+
end
71+
end
72+
73+
defp run_single(adapter, mode, pool_size, initial_clients, max_clients, flat) do
74+
if mode == :deterministic, do: :rand.seed(:exsplus, {1234, 5678, 9012})
75+
:ets.delete_all_objects(:latency)
76+
{:ok, pid} =
77+
case adapter do
78+
:sql ->
79+
IO.puts("Starting benchmark with SQL pool (mode: #{mode})")
80+
SQL.Pool.start_link(%{size: pool_size, protocol: :tcp, name: :default})
81+
82+
:ecto ->
83+
IO.puts("Starting benchmark with Ecto pool (mode: #{mode})")
84+
SQL.Repo.start_link(pool_size: pool_size)
85+
end
86+
87+
stop_time = System.monotonic_time(:millisecond) + @duration_ms
88+
counter = :counters.new(1, [:write_concurrency])
89+
90+
state = %{
91+
adapter: adapter,
92+
pool: pid,
93+
pool_size: pool_size,
94+
counter: counter,
95+
stop_time: stop_time,
96+
flat: flat,
97+
initial_clients: initial_clients,
98+
max_clients: max_clients
99+
}
100+
101+
start_clients(state, initial_clients, mode, 0)
102+
spawn(fn -> dynamic_load_loop(state, initial_clients, mode, 0) end)
103+
spawn(fn -> report_loop(state) end)
104+
105+
Process.sleep(@duration_ms + 100)
106+
107+
total = :counters.get(counter, 1)
108+
avg_qps = total / (@duration_ms / 1000)
109+
110+
IO.puts("\n=== #{String.upcase(to_string(adapter))} Final Report ===")
111+
IO.puts("Total requests: #{total}")
112+
IO.puts("Average QPS: #{Float.round(avg_qps, 1)}\n")
113+
114+
{total, avg_qps}
115+
end
116+
117+
defp start_clients(state, n, mode, start_idx) do
118+
Enum.each(0..(n - 1), fn i ->
119+
spawn(fn -> client_loop(state, mode, start_idx + i) end)
120+
end)
121+
end
122+
123+
defp client_loop(%{adapter: adapter, counter: counter, stop_time: stop_time}=state, mode, idx) do
124+
if System.monotonic_time(:millisecond) < stop_time do
125+
case adapter do
126+
:sql ->
127+
{pool_idx, _socket} = measured_checkout_sql(:default)
128+
:counters.add(counter, 1, 1)
129+
sleep_for(mode, idx)
130+
SQL.Pool.checkin(pool_idx, :default)
131+
132+
:ecto ->
133+
measured_checkout_ecto(fn ->
134+
:counters.add(counter, 1, 1)
135+
sleep_for(mode, idx)
136+
end)
137+
end
138+
139+
client_loop(state, mode, idx + 1)
140+
end
141+
end
142+
143+
defp sleep_for(:deterministic, _idx), do: Process.sleep(5)
144+
defp sleep_for(:realistic, idx) do
145+
delay = Enum.at(@deterministic_delays, rem(idx, length(@deterministic_delays)))
146+
Process.sleep(delay)
147+
end
148+
149+
defp measured_checkout_sql(pool) do
150+
start = System.monotonic_time(:nanosecond)
151+
result = SQL.Pool.checkout(%{id: 1}, pool)
152+
elapsed = System.monotonic_time(:nanosecond) - start
153+
:ets.insert(:latency, {:latency, elapsed})
154+
result
155+
end
156+
157+
defp measured_checkout_ecto(fun) when is_function(fun, 0) do
158+
start = System.monotonic_time(:nanosecond)
159+
result = SQL.Repo.checkout(fun)
160+
elapsed = System.monotonic_time(:nanosecond) - start
161+
:ets.insert(:latency, {:latency, elapsed})
162+
result
163+
end
164+
165+
defp dynamic_load_loop(%{flat: true, initial_clients: clients, adapter: adapter} = state, clients, _mode, _iteration) do
166+
if System.monotonic_time(:millisecond) < state.stop_time do
167+
IO.puts("[#{String.upcase(to_string(adapter))} Load] Active clients: #{clients} Metrics: #{inspect SQL.Pool.metrics(:default), limit: :infinity}")
168+
Process.sleep(@scale_interval)
169+
dynamic_load_loop(state, clients, :deterministic, 0)
170+
end
171+
end
172+
173+
defp dynamic_load_loop(%{initial_clients: initial_clients, max_clients: max_clients, adapter: adapter} = state, clients, mode, iteration) do
174+
if System.monotonic_time(:millisecond) < state.stop_time do
175+
next_clients = min(initial_clients + iteration * 10, max_clients)
176+
actual_pool_size = current_pool_size(state)
177+
178+
string = "[#{String.upcase(to_string(adapter))} Load] Active clients: #{next_clients}, Pool size: #{actual_pool_size}"
179+
string = if adapter == :sql, do: string <> " Metrics: #{inspect SQL.Pool.metrics(:default), limit: :infinity}", else: string
180+
IO.puts(string)
181+
182+
if trunc(next_clients) > trunc(clients), do: start_clients(state, next_clients - clients, :deterministic, clients)
183+
184+
Process.sleep(@scale_interval)
185+
dynamic_load_loop(state, next_clients, mode, iteration + 1)
186+
end
187+
end
188+
189+
defp current_pool_size(%{adapter: :sql}) do
190+
{_, _, _, _, _, _, _, size, _, _} = :persistent_term.get({SQL.Pool, :default})
191+
size
192+
end
193+
defp current_pool_size(%{pool_size: pool_size}), do: pool_size
194+
defp current_pool_size(_), do: nil
195+
196+
defp report_loop(%{adapter: :sql, counter: counter, stop_time: stop_time}=state) do
197+
prev_count = :counters.get(counter, 1)
198+
prev_time = System.monotonic_time(:millisecond)
199+
Process.sleep(@report_interval)
200+
201+
now_count = :counters.get(counter, 1)
202+
now_time = System.monotonic_time(:millisecond)
203+
qps = (now_count - prev_count) / ((now_time - prev_time) / 1000)
204+
205+
metrics = SQL.Pool.metrics(:default)
206+
percentiles = compute_percentiles()
207+
208+
IO.puts("""
209+
[SQL Stats] QPS: #{Float.round(qps,1)}, Pool: #{metrics.pool_size}, In-use: #{metrics.current_in_use}, Idle: #{metrics.idle_connections}
210+
Cache H/M: #{metrics.cache_hits}/#{metrics.cache_misses}, Rotation retries: #{metrics.rotation_retries}, Free list top: #{metrics.free_list_top}
211+
Checkout latency (ns) P50/P90/P95/P99: #{percentiles.p50}/#{percentiles.p90}/#{percentiles.p95}/#{percentiles.p99}
212+
""")
213+
214+
if now_time < stop_time, do: report_loop(state)
215+
end
216+
217+
defp report_loop(%{counter: counter, stop_time: stop_time}=state) do
218+
prev = :counters.get(counter, 1)
219+
prev_time = System.monotonic_time(:millisecond)
220+
Process.sleep(@report_interval)
221+
222+
now = :counters.get(counter, 1)
223+
now_time = System.monotonic_time(:millisecond)
224+
qps = (now - prev) / ((now_time - prev_time) / 1000)
225+
226+
IO.puts("[#{String.upcase(to_string(state.adapter))} Stats] Current QPS: #{Float.round(qps, 1)}, Pool size: #{current_pool_size(state)}")
227+
228+
if now_time < stop_time do
229+
report_loop(state)
230+
end
231+
end
232+
233+
defp compute_percentiles() do
234+
latencies = :ets.tab2list(:latency) |> Enum.map(fn {:latency, ns} -> ns end)
235+
sorted = Enum.sort(latencies)
236+
count = length(sorted)
237+
238+
if count == 0 do
239+
%{p50: 0, p90: 0, p95: 0, p99: 0}
240+
else
241+
%{
242+
p50: Enum.at(sorted, trunc(count * 0.5)),
243+
p90: Enum.at(sorted, trunc(count * 0.9)),
244+
p95: Enum.at(sorted, trunc(count * 0.95)),
245+
p99: Enum.at(sorted, trunc(count * 0.99))
246+
}
247+
end
248+
end
249+
250+
defp print_comparison(results) do
251+
[{:sql, {sql_total, sql_qps}}, {:ecto, {ecto_total, ecto_qps}}] = results
252+
253+
IO.puts("=== Side-by-side Comparison ===")
254+
IO.puts("SQL -> Total: #{sql_total}, Avg QPS: #{Float.round(sql_qps, 1)}")
255+
IO.puts("Ecto -> Total: #{ecto_total}, Avg QPS: #{Float.round(ecto_qps, 1)}\n")
256+
IO.puts("Speedup: #{Float.round(sql_qps / ecto_qps, 2)}x")
257+
end
258+
end
259+
260+
[mode_arg, pool_size_arg, flat_arg] =
261+
case System.argv() do
262+
[m, size, flat] -> [String.to_atom(m), String.to_integer(size), flat == "true"]
263+
[m, size] -> [String.to_atom(m), String.to_integer(size), false]
264+
[m] -> [String.to_atom(m), 10, false]
265+
_ -> [:deterministic, 10, false]
266+
end
267+
268+
Pool.Benchmark.run_all(pool_size_arg, 10, 50, flat_arg, mode_arg)

0 commit comments

Comments
 (0)