Skip to content

Commit e111c11

Browse files
committed
Save queries to table
1 parent d28a422 commit e111c11

File tree

5 files changed

+226
-4
lines changed

5 files changed

+226
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mvn clean install -DskipTests
99
unzip folder and copy to trino plugin folder
1010

1111
```
12-
cp -R trino-simple-db-logger-370 /path/to/trino/folder/plugin
12+
cp -R trino-simple-db-logger-376 /path/to/trino/folder/plugin
1313
```
1414

1515
create schema

pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>io.trino</groupId>
99
<artifactId>trino-root</artifactId>
10-
<version>370</version>
10+
<version>376</version>
1111
</parent>
1212

1313
<groupId>io.trino</groupId>
@@ -27,6 +27,11 @@
2727
<artifactId>configuration</artifactId>
2828
</dependency>
2929

30+
<dependency>
31+
<groupId>io.airlift</groupId>
32+
<artifactId>json</artifactId>
33+
</dependency>
34+
3035
<dependency>
3136
<groupId>io.airlift</groupId>
3237
<artifactId>log</artifactId>
@@ -47,6 +52,11 @@
4752
<artifactId>flyway-core</artifactId>
4853
</dependency>
4954

55+
<dependency>
56+
<groupId>org.jdbi</groupId>
57+
<artifactId>jdbi3-core</artifactId>
58+
</dependency>
59+
5060
<dependency>
5161
<groupId>mysql</groupId>
5262
<artifactId>mysql-connector-java</artifactId>

src/main/java/io/trino/dblistener/DbListener.java

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,188 @@
1313
*/
1414
package io.trino.dblistener;
1515

16+
import com.google.inject.Inject;
17+
import io.airlift.json.JsonCodec;
18+
import io.airlift.json.JsonCodecFactory;
1619
import io.airlift.log.Logger;
1720
import io.trino.spi.eventlistener.EventListener;
1821
import io.trino.spi.eventlistener.QueryCompletedEvent;
22+
import io.trino.spi.eventlistener.QueryStatistics;
23+
import io.trino.spi.eventlistener.StageGcStatistics;
24+
import org.jdbi.v3.core.Handle;
25+
import org.jdbi.v3.core.Jdbi;
26+
27+
import java.util.List;
1928

2029
public class DbListener
2130
implements EventListener
2231
{
2332
private static final Logger LOG = Logger.get(DbListener.class);
33+
private static final JsonCodec<List<StageGcStatistics>> STAGE_GC_STATS_CODEC = new JsonCodecFactory().listJsonCodec(StageGcStatistics.class);
34+
private final Jdbi jdbi;
35+
36+
@Inject
37+
public DbListener(Jdbi jdbi)
38+
{
39+
this.jdbi = jdbi;
40+
}
2441

2542
@Override
2643
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
2744
{
28-
LOG.info("query ID is: %s", queryCompletedEvent.getMetadata().getQueryId());
45+
jdbi.useTransaction(handle -> {
46+
saveQuery(queryCompletedEvent, handle);
47+
});
48+
}
49+
50+
private void saveQuery(QueryCompletedEvent queryCompletedEvent, Handle handle)
51+
{
52+
String sql = "" +
53+
"INSERT INTO queries (" +
54+
" query_id," +
55+
" catalog," +
56+
" `schema`," +
57+
" environment," +
58+
" query_text," +
59+
" query_plan," +
60+
" created," +
61+
" finished," +
62+
" query_state," +
63+
" error_info," +
64+
" cpu_time," +
65+
" failed_cpu_time," +
66+
" wall_time," +
67+
" queued_time," +
68+
" scheduled_time," +
69+
" failed_scheduled_time," +
70+
" waiting_time," +
71+
" analysis_time," +
72+
" planning_time," +
73+
" execution_time," +
74+
" input_blocked_time," +
75+
" failed_input_blocked_time," +
76+
" output_blocked_time," +
77+
" failed_output_blocked_time," +
78+
" peak_user_memory_bytes," +
79+
" peak_task_total_memory," +
80+
" physical_input_bytes," +
81+
" physical_input_rows," +
82+
" processed_input_bytes," +
83+
" processed_input_rows," +
84+
" internal_network_bytes," +
85+
" internal_network_rows," +
86+
" total_bytes," +
87+
" total_rows," +
88+
" output_bytes," +
89+
" output_rows," +
90+
" written_bytes," +
91+
" written_rows," +
92+
" cumulative_memory," +
93+
" failed_cumulative_memory," +
94+
" completed_splits," +
95+
" stage_gc_statistics" +
96+
")" +
97+
"VALUES (" +
98+
" :query_id," +
99+
" :catalog," +
100+
" :schema," +
101+
" :environment," +
102+
" :query_text," +
103+
" :query_plan," +
104+
" :created," +
105+
" :finished," +
106+
" :query_state," +
107+
" :error_info," +
108+
" :cpu_time," +
109+
" :failed_cpu_time," +
110+
" :wall_time," +
111+
" :queued_time," +
112+
" :scheduled_time," +
113+
" :failed_scheduled_time," +
114+
" :waiting_time," +
115+
" :analysis_time," +
116+
" :planning_time," +
117+
" :execution_time," +
118+
" :input_blocked_time," +
119+
" :failed_input_blocked_time," +
120+
" :output_blocked_time," +
121+
" :failed_output_blocked_time," +
122+
" :peak_user_memory_bytes," +
123+
" :peak_task_total_memory," +
124+
" :physical_input_bytes," +
125+
" :physical_input_rows," +
126+
" :processed_input_bytes," +
127+
" :processed_input_rows," +
128+
" :internal_network_bytes," +
129+
" :internal_network_rows," +
130+
" :total_bytes," +
131+
" :total_rows," +
132+
" :output_bytes," +
133+
" :output_rows," +
134+
" :written_bytes," +
135+
" :written_rows," +
136+
" :cumulative_memory," +
137+
" :failed_cumulative_memory," +
138+
" :completed_splits," +
139+
" :stage_gc_statistics" +
140+
")";
141+
142+
QueryStatistics queryStatistics = queryCompletedEvent.getStatistics();
143+
String failureInfo = queryCompletedEvent.getFailureInfo().isPresent() ? queryCompletedEvent.getFailureInfo().get().getFailureMessage().get() : "";
144+
long scheduledTime = queryStatistics.getScheduledTime().isPresent() ? queryStatistics.getScheduledTime().get().toMillis() : 0;
145+
long failedScheduledTime = queryStatistics.getFailedScheduledTime().isPresent() ? queryStatistics.getFailedScheduledTime().get().toMillis() : 0;
146+
long waitingTime = queryStatistics.getResourceWaitingTime().isPresent() ? queryStatistics.getResourceWaitingTime().get().toMillis() : 0;
147+
long analysisTime = queryStatistics.getAnalysisTime().isPresent() ? queryStatistics.getAnalysisTime().get().toMillis() : 0;
148+
long planningTime = queryStatistics.getPlanningTime().isPresent() ? queryStatistics.getPlanningTime().get().toMillis() : 0;
149+
long executionTime = queryStatistics.getExecutionTime().isPresent() ? queryStatistics.getExecutionTime().get().toMillis() : 0;
150+
long inputBlockedTime = queryStatistics.getInputBlockedTime().isPresent() ? queryStatistics.getInputBlockedTime().get().toMillis() : 0;
151+
long failedInputBlockedTime = queryStatistics.getFailedInputBlockedTime().isPresent() ? queryStatistics.getFailedInputBlockedTime().get().toMillis() : 0;
152+
long outputBlockedTime = queryStatistics.getOutputBlockedTime().isPresent() ? queryStatistics.getOutputBlockedTime().get().toMillis() : 0;
153+
long failedOutputBlockedTime = queryStatistics.getFailedOutputBlockedTime().isPresent() ? queryStatistics.getFailedOutputBlockedTime().get().toMillis() : 0;
154+
155+
handle.createUpdate(sql)
156+
.bind("query_id", queryCompletedEvent.getMetadata().getQueryId())
157+
.bind("catalog", queryCompletedEvent.getContext().getCatalog().orElse(""))
158+
.bind("schema", queryCompletedEvent.getContext().getSchema().orElse(""))
159+
.bind("environment", queryCompletedEvent.getContext().getEnvironment())
160+
.bind("query_text", queryCompletedEvent.getMetadata().getQuery())
161+
.bind("query_plan", queryCompletedEvent.getMetadata().getPlan().orElse(""))
162+
.bind("created", queryCompletedEvent.getCreateTime())
163+
.bind("finished", queryCompletedEvent.getEndTime())
164+
.bind("query_state", queryCompletedEvent.getMetadata().getQueryState())
165+
.bind("error_info", failureInfo)
166+
.bind("cpu_time", queryStatistics.getCpuTime().toMillis())
167+
.bind("failed_cpu_time", queryStatistics.getFailedCpuTime().toMillis())
168+
.bind("wall_time", queryStatistics.getWallTime().toMillis())
169+
.bind("queued_time", queryStatistics.getQueuedTime().toMillis())
170+
.bind("scheduled_time", scheduledTime)
171+
.bind("failed_scheduled_time", failedScheduledTime)
172+
.bind("waiting_time", waitingTime)
173+
.bind("analysis_time", analysisTime)
174+
.bind("planning_time", planningTime)
175+
.bind("execution_time", executionTime)
176+
.bind("input_blocked_time", inputBlockedTime)
177+
.bind("failed_input_blocked_time", failedInputBlockedTime)
178+
.bind("output_blocked_time", outputBlockedTime)
179+
.bind("failed_output_blocked_time", failedOutputBlockedTime)
180+
.bind("peak_user_memory_bytes", queryStatistics.getPeakUserMemoryBytes())
181+
.bind("peak_task_total_memory", queryStatistics.getPeakTaskTotalMemory())
182+
.bind("physical_input_bytes", queryStatistics.getPhysicalInputBytes())
183+
.bind("physical_input_rows", queryStatistics.getPhysicalInputRows())
184+
.bind("processed_input_bytes", queryStatistics.getProcessedInputBytes())
185+
.bind("processed_input_rows", queryStatistics.getProcessedInputRows())
186+
.bind("internal_network_bytes", queryStatistics.getInternalNetworkBytes())
187+
.bind("internal_network_rows", queryStatistics.getInternalNetworkRows())
188+
.bind("total_bytes", queryStatistics.getTotalBytes())
189+
.bind("total_rows", queryStatistics.getTotalRows())
190+
.bind("output_bytes", queryStatistics.getOutputBytes())
191+
.bind("output_rows", queryStatistics.getOutputRows())
192+
.bind("written_bytes", queryStatistics.getWrittenBytes())
193+
.bind("written_rows", queryStatistics.getWrittenRows())
194+
.bind("cumulative_memory", queryStatistics.getCumulativeMemory())
195+
.bind("failed_cumulative_memory", queryStatistics.getFailedCumulativeMemory())
196+
.bind("completed_splits", queryStatistics.getCompletedSplits())
197+
.bind("stage_gc_statistics", STAGE_GC_STATS_CODEC.toJson(queryStatistics.getStageGcStatistics()))
198+
.execute();
29199
}
30200
}

src/main/java/io/trino/dblistener/DbModule.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515

1616
import com.google.inject.Binder;
1717
import com.google.inject.Module;
18+
import com.google.inject.Provides;
1819
import com.google.inject.Scopes;
20+
import com.google.inject.Singleton;
21+
import org.jdbi.v3.core.Jdbi;
1922

2023
import static io.airlift.configuration.ConfigBinder.configBinder;
2124

@@ -28,4 +31,11 @@ public void configure(Binder binder)
2831
configBinder(binder).bindConfig(DblistenerConfig.class);
2932
binder.bind(DbListener.class).in(Scopes.SINGLETON);
3033
}
34+
35+
@Provides
36+
@Singleton
37+
public static Jdbi create(DblistenerConfig config)
38+
{
39+
return Jdbi.create(config.getUrl(), config.getUser(), config.getPassword());
40+
}
3141
}

src/main/resources/db/mysql/V1__create_schema.sql

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,43 @@ CREATE TABLE queries (
66
environment VARCHAR(256),
77
query_text LONGTEXT NOT NULL,
88
query_plan LONGTEXT,
9-
query_json LONGTEXT,
109
created TIMESTAMP(6) NOT NULL,
1110
finished TIMESTAMP(6) NOT NULL,
1211
query_state VARCHAR(32) NOT NULL,
1312
error_info LONGTEXT,
13+
cpu_time BIGINT,
14+
failed_cpu_time BIGINT,
15+
wall_time BIGINT,
16+
queued_time BIGINT,
17+
scheduled_time BIGINT,
18+
failed_scheduled_time BIGINT,
19+
waiting_time BIGINT,
20+
analysis_time BIGINT,
21+
planning_time BIGINT,
22+
execution_time BIGINT,
23+
input_blocked_time BIGINT,
24+
failed_input_blocked_time BIGINT,
25+
output_blocked_time BIGINT,
26+
failed_output_blocked_time BIGINT,
27+
peak_user_memory_bytes BIGINT,
28+
peak_task_user_memory BIGINT,
29+
peak_task_total_memory BIGINT,
30+
physical_input_bytes BIGINT,
31+
physical_input_rows BIGINT,
32+
processed_input_bytes BIGINT,
33+
processed_input_rows BIGINT,
34+
internal_network_bytes BIGINT,
35+
internal_network_rows BIGINT,
36+
total_bytes BIGINT,
37+
total_rows BIGINT,
38+
output_bytes BIGINT,
39+
output_rows BIGINT,
40+
written_bytes BIGINT,
41+
written_rows BIGINT,
42+
cumulative_memory DOUBLE,
43+
failed_cumulative_memory DOUBLE,
44+
completed_splits BIGINT,
45+
stage_gc_statistics LONGTEXT,
1446
PRIMARY KEY(id),
1547
UNIQUE INDEX(query_id),
1648
INDEX(created, finished)

0 commit comments

Comments
 (0)