Skip to content

Commit d165e76

Browse files
committed
Refactor RoutingManager
1 parent cdc2ee2 commit d165e76

File tree

10 files changed

+398
-407
lines changed

10 files changed

+398
-407
lines changed

gateway-ha/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,6 @@
6262
<artifactId>jackson-dataformat-yaml</artifactId>
6363
</dependency>
6464

65-
<dependency>
66-
<groupId>com.google.errorprone</groupId>
67-
<artifactId>error_prone_annotations</artifactId>
68-
</dependency>
69-
7065
<dependency>
7166
<groupId>com.google.guava</groupId>
7267
<artifactId>guava</artifactId>

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ public HealthCheckObserver(RoutingManager routingManager)
2828
@Override
2929
public void observe(java.util.List<ClusterStats> clustersStats)
3030
{
31-
routingManager.updateBackEndStats(clustersStats);
31+
routingManager.updateClusterStats(clustersStats);
3232
}
3333
}
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.router;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.common.base.Function;
18+
import com.google.common.base.Strings;
19+
import com.google.common.cache.CacheBuilder;
20+
import com.google.common.cache.CacheLoader;
21+
import com.google.common.cache.LoadingCache;
22+
import io.airlift.log.Logger;
23+
import io.trino.gateway.ha.clustermonitor.ClusterStats;
24+
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
25+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
26+
import io.trino.gateway.ha.config.RoutingConfiguration;
27+
import jakarta.annotation.Nullable;
28+
import jakarta.ws.rs.HttpMethod;
29+
30+
import java.net.HttpURLConnection;
31+
import java.net.URL;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.concurrent.ExecutionException;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
40+
import java.util.concurrent.Future;
41+
import java.util.concurrent.TimeUnit;
42+
43+
/**
44+
* This class performs health check, stats counts for each backend and provides a backend given
45+
* request object. Default implementation comes here.
46+
*/
47+
public abstract class BaseRoutingManager
48+
implements RoutingManager
49+
{
50+
private static final Logger log = Logger.get(BaseRoutingManager.class);
51+
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
52+
private final GatewayBackendManager gatewayBackendManager;
53+
private final ConcurrentHashMap<String, TrinoStatus> backendToStatus;
54+
private final String defaultRoutingGroup;
55+
private final QueryHistoryManager queryHistoryManager;
56+
private final LoadingCache<String, String> queryIdBackendCache;
57+
private final LoadingCache<String, String> queryIdRoutingGroupCache;
58+
private final LoadingCache<String, String> queryIdExternalUrlCache;
59+
60+
public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration)
61+
{
62+
this.gatewayBackendManager = gatewayBackendManager;
63+
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
64+
this.queryHistoryManager = queryHistoryManager;
65+
this.queryIdBackendCache = buildCache(this::findBackendForUnknownQueryId);
66+
this.queryIdRoutingGroupCache = buildCache(this::findRoutingGroupForUnknownQueryId);
67+
this.queryIdExternalUrlCache = buildCache(this::findExternalUrlForUnknownQueryId);
68+
this.backendToStatus = new ConcurrentHashMap<>();
69+
}
70+
71+
/**
72+
* Provide a strategy to select a backend out of all available backends
73+
*/
74+
protected abstract Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user);
75+
76+
@Override
77+
public void setBackendForQueryId(String queryId, String backend)
78+
{
79+
queryIdBackendCache.put(queryId, backend);
80+
}
81+
82+
@Override
83+
public void setRoutingGroupForQueryId(String queryId, String routingGroup)
84+
{
85+
queryIdRoutingGroupCache.put(queryId, routingGroup);
86+
}
87+
88+
/**
89+
* Performs routing to a default backend.
90+
*/
91+
public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user)
92+
{
93+
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveDefaultBackends().stream()
94+
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
95+
.toList();
96+
return selectBackend(backends, user).orElseThrow(() -> new IllegalStateException("Number of active backends found zero"));
97+
}
98+
99+
/**
100+
* Performs routing to a given cluster group. This falls back to a default backend, if no scheduled
101+
* backend is found.
102+
*/
103+
@Override
104+
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
105+
{
106+
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveBackends(routingGroup).stream()
107+
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
108+
.toList();
109+
return selectBackend(backends, user).orElseGet(() -> provideDefaultBackendConfiguration(user));
110+
}
111+
112+
/**
113+
* Performs cache look up, if a backend not found, it checks with all backends and tries to find
114+
* out which backend has info about given query id.
115+
*/
116+
@Nullable
117+
@Override
118+
public String findBackendForQueryId(String queryId)
119+
{
120+
String backendAddress = null;
121+
try {
122+
backendAddress = queryIdBackendCache.get(queryId);
123+
}
124+
catch (ExecutionException e) {
125+
log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
126+
}
127+
return backendAddress;
128+
}
129+
130+
@Nullable
131+
@Override
132+
public String findExternalUrlForQueryId(String queryId)
133+
{
134+
String externalUrl = null;
135+
try {
136+
externalUrl = queryIdExternalUrlCache.get(queryId);
137+
}
138+
catch (ExecutionException e) {
139+
log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
140+
}
141+
return externalUrl;
142+
}
143+
144+
/**
145+
* Looks up the routing group associated with the queryId in the cache.
146+
* If it's not in the cache, look up in query history
147+
*/
148+
@Nullable
149+
@Override
150+
public String findRoutingGroupForQueryId(String queryId)
151+
{
152+
String routingGroup = null;
153+
try {
154+
routingGroup = queryIdRoutingGroupCache.get(queryId);
155+
}
156+
catch (ExecutionException e) {
157+
log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
158+
}
159+
return routingGroup;
160+
}
161+
162+
@Override
163+
public void updateBackEndHealth(String backendId, TrinoStatus value)
164+
{
165+
log.info("backend %s isHealthy %s", backendId, value);
166+
backendToStatus.put(backendId, value);
167+
}
168+
169+
@Override
170+
public void updateClusterStats(List<ClusterStats> stats)
171+
{
172+
for (ClusterStats clusterStats : stats) {
173+
updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus());
174+
}
175+
}
176+
177+
@VisibleForTesting
178+
void setExternalUrlForQueryId(String queryId, String externalUrl)
179+
{
180+
queryIdExternalUrlCache.put(queryId, externalUrl);
181+
}
182+
183+
@VisibleForTesting
184+
String findBackendForUnknownQueryId(String queryId)
185+
{
186+
String backend;
187+
backend = queryHistoryManager.getBackendForQueryId(queryId);
188+
if (Strings.isNullOrEmpty(backend)) {
189+
log.debug("Unable to find backend mapping for [%s]. Searching for suitable backend", queryId);
190+
backend = searchAllBackendForQuery(queryId);
191+
}
192+
return backend;
193+
}
194+
195+
/**
196+
* This tries to find out which backend may have info about given query id. If not found returns
197+
* the first healthy backend.
198+
*/
199+
private String searchAllBackendForQuery(String queryId)
200+
{
201+
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getAllBackends();
202+
203+
Map<String, Future<Integer>> responseCodes = new HashMap<>();
204+
try {
205+
for (ProxyBackendConfiguration backend : backends) {
206+
String target = backend.getProxyTo() + "/v1/query/" + queryId;
207+
208+
Future<Integer> call =
209+
executorService.submit(
210+
() -> {
211+
URL url = new URL(target);
212+
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
213+
conn.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(5));
214+
conn.setReadTimeout((int) TimeUnit.SECONDS.toMillis(5));
215+
conn.setRequestMethod(HttpMethod.HEAD);
216+
return conn.getResponseCode();
217+
});
218+
responseCodes.put(backend.getProxyTo(), call);
219+
}
220+
for (Map.Entry<String, Future<Integer>> entry : responseCodes.entrySet()) {
221+
if (entry.getValue().isDone()) {
222+
int responseCode = entry.getValue().get();
223+
if (responseCode == 200) {
224+
log.info("Found query [%s] on backend [%s]", queryId, entry.getKey());
225+
setBackendForQueryId(queryId, entry.getKey());
226+
return entry.getKey();
227+
}
228+
}
229+
}
230+
}
231+
catch (Exception e) {
232+
log.warn("Query id [%s] not found", queryId);
233+
}
234+
// Fallback on first active backend if queryId mapping not found.
235+
return gatewayBackendManager.getActiveBackends(defaultRoutingGroup).get(0).getProxyTo();
236+
}
237+
238+
/**
239+
* Attempts to look up the routing group associated with the query id from query history table
240+
*/
241+
private String findRoutingGroupForUnknownQueryId(String queryId)
242+
{
243+
String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
244+
setRoutingGroupForQueryId(queryId, routingGroup);
245+
return routingGroup;
246+
}
247+
248+
/**
249+
* Attempts to look up the external url associated with the query id from query history table
250+
*/
251+
private String findExternalUrlForUnknownQueryId(String queryId)
252+
{
253+
String externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId);
254+
setExternalUrlForQueryId(queryId, externalUrl);
255+
return externalUrl;
256+
}
257+
258+
private static LoadingCache<String, String> buildCache(Function<String, String> loader)
259+
{
260+
return CacheBuilder.newBuilder()
261+
.maximumSize(10000)
262+
.expireAfterAccess(30, TimeUnit.MINUTES)
263+
.build(
264+
new CacheLoader<>()
265+
{
266+
@Override
267+
public String load(String queryId)
268+
{
269+
return loader.apply(queryId);
270+
}
271+
});
272+
}
273+
274+
private boolean isBackendHealthy(String backendId)
275+
{
276+
TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN);
277+
return status == TrinoStatus.HEALTHY;
278+
}
279+
}

0 commit comments

Comments
 (0)