Skip to content

Commit 762737e

Browse files
committed
Add pool
1 parent bb1303b commit 762737e

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

lib/pool.ex

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
defmodule SQL.Pool do
2+
@moduledoc false
3+
use GenServer
4+
5+
def start_link(opts) do
6+
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
7+
end
8+
9+
@impl true
10+
def init(state) do
11+
{:ok, initialize(state)}
12+
end
13+
14+
def checkout(%{id: id}=sql, pool \\ :default) do
15+
start_time = System.monotonic_time()
16+
scheduler_id = :erlang.system_info(:scheduler_id)
17+
state = :persistent_term.get({__MODULE__, pool})
18+
{_size, connections, activations, recent_activations, sockets, schedulers, _indexes, _health} = state
19+
{n, workers} = Map.get(schedulers, scheduler_id)
20+
case checkout(state, n, workers) do
21+
:none=error ->
22+
:telemetry.execute([:sql, :checkout], %{pool: pool, duration: System.monotonic_time()-start_time}, %{id: id})
23+
error
24+
{idx, _load} ->
25+
case :atomics.compare_exchange(connections, idx, 0, 1) do
26+
:ok ->
27+
:counters.add(activations, idx, 1)
28+
:counters.add(recent_activations, idx, 1)
29+
result = {idx, elem(sockets, idx-1)}
30+
:telemetry.execute([:sql, :checkout], %{pool: pool, duration: System.monotonic_time()-start_time}, %{id: id})
31+
result
32+
_ ->
33+
checkout(sql, pool)
34+
end
35+
end
36+
end
37+
38+
defp checkout(_state, _n, []), do: :none
39+
defp checkout({size, _connections, activations, recent_activations, _sockets, _schedulers, indexes, health}=state, n, workers) do
40+
# Power-of-three-choices with load weighting
41+
workers
42+
|> Enum.take_random(min(3, n))
43+
|> Enum.map(fn id ->
44+
active = :counters.get(activations, id)
45+
recent = :counters.get(recent_activations, id)
46+
health = :atomics.get(health, id)
47+
{id, active + recent / 1000, health} # Weight recent activations
48+
end)
49+
|> Enum.filter(fn {_, _, health} -> health == 1 end)
50+
|> Enum.min_by(fn {_, load, _} -> load end, fn -> nil end)
51+
|> case do
52+
{id, load, _} -> {id, trunc(load)}
53+
_ -> checkout(state, size-n, indexes--workers)
54+
end
55+
end
56+
57+
def checkin(idx, pool) do
58+
{_size, connections, activations, recent_activations, _sockets, _schedulers, _indexes, _health} = :persistent_term.get({__MODULE__, pool})
59+
:ok = :atomics.put(connections, idx, 0)
60+
:ok = :counters.sub(activations, idx, 1)
61+
:ok = :counters.sub(recent_activations, idx, 1)
62+
end
63+
64+
defp initialize(state) do
65+
pool = build_pool(state)
66+
:persistent_term.put({__MODULE__, state.name}, pool)
67+
Map.put(state, :pool, pool)
68+
end
69+
70+
defp build_pool(state) do
71+
schedulers = :erlang.system_info(:schedulers_online)
72+
size = state.size
73+
connections = :atomics.new(size, signed: false)
74+
health = :atomics.new(size, signed: false)
75+
indexes = Enum.to_list(1..size)
76+
schedulers = indexes
77+
|> Enum.reduce(%{}, fn id, acc ->
78+
Map.update(acc, rem(id, schedulers)+1, [id], &([id | &1]))
79+
end)
80+
|> Map.new(fn {k, v} -> {k, {length(v), v}} end)
81+
{size, connections, :counters.new(size, [:write_concurrency]), :counters.new(size, [:write_concurrency]), init_sockets(size, connections, health, state.protocol), schedulers, indexes, health}
82+
end
83+
84+
defp init_sockets(size, connections, health, _protocol) do
85+
# queries = :persistent_term.get({__MODULE__, :queries})
86+
for idx <- 1..size do
87+
:atomics.put(connections, idx, 0)
88+
:atomics.put(health, idx, 1)
89+
# :socket.open(:inet, :stream, protocol)
90+
self()
91+
end
92+
|> List.to_tuple()
93+
end
94+
end

0 commit comments

Comments
 (0)