Skip to content

Commit 266e62d

Browse files
author
Benoit Hanotte
committed
[FLINK-15577] Add different windows tests to blink planner
The Blink planner doesn't seem to be subject to the bug described in FLINK-15577. For safety, we also add the tests to ensure no regression is possible that would introduce the issue in the Blink planner.
1 parent 4787f1b commit 266e62d

File tree

4 files changed

+249
-0
lines changed

4 files changed

+249
-0
lines changed

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,154 @@ Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0,
108108
+- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
109109
+- Calc(select=[ts, b, *(b, b) AS $f2])
110110
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
111+
]]>
112+
</Resource>
113+
</TestCase>
114+
<TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]">
115+
<Resource name="sql">
116+
<![CDATA[
117+
WITH window_1h AS (
118+
SELECT 1
119+
FROM MyTable2
120+
GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
121+
),
122+
123+
window_2h AS (
124+
SELECT 1
125+
FROM MyTable2
126+
GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
127+
)
128+
129+
(SELECT * FROM window_1h)
130+
UNION ALL
131+
(SELECT * FROM window_2h)
132+
]]>
133+
</Resource>
134+
<Resource name="planBefore">
135+
<![CDATA[
136+
LogicalUnion(all=[true])
137+
:- LogicalProject(EXPR$0=[1])
138+
: +- LogicalAggregate(group=[{0}])
139+
: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)])
140+
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
141+
+- LogicalProject(EXPR$0=[1])
142+
+- LogicalAggregate(group=[{0}])
143+
+- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)])
144+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
145+
]]>
146+
</Resource>
147+
<Resource name="planAfter">
148+
<![CDATA[
149+
Union(all=[true], union=[EXPR$0])
150+
:- Calc(select=[1 AS EXPR$0])
151+
: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[])
152+
: +- Exchange(distribution=[single])
153+
: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[])
154+
: +- Calc(select=[ts], reuse_id=[1])
155+
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
156+
+- Calc(select=[1 AS EXPR$0])
157+
+- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[])
158+
+- Exchange(distribution=[single])
159+
+- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[])
160+
+- Reused(reference_id=[1])
161+
]]>
162+
</Resource>
163+
</TestCase>
164+
<TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]">
165+
<Resource name="sql">
166+
<![CDATA[
167+
WITH window_1h AS (
168+
SELECT 1
169+
FROM MyTable2
170+
GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
171+
),
172+
173+
window_2h AS (
174+
SELECT 1
175+
FROM MyTable2
176+
GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
177+
)
178+
179+
(SELECT * FROM window_1h)
180+
UNION ALL
181+
(SELECT * FROM window_2h)
182+
]]>
183+
</Resource>
184+
<Resource name="planBefore">
185+
<![CDATA[
186+
LogicalUnion(all=[true])
187+
:- LogicalProject(EXPR$0=[1])
188+
: +- LogicalAggregate(group=[{0}])
189+
: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)])
190+
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
191+
+- LogicalProject(EXPR$0=[1])
192+
+- LogicalAggregate(group=[{0}])
193+
+- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)])
194+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
195+
]]>
196+
</Resource>
197+
<Resource name="planAfter">
198+
<![CDATA[
199+
Union(all=[true], union=[EXPR$0])
200+
:- Calc(select=[1 AS EXPR$0])
201+
: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[])
202+
: +- Exchange(distribution=[single], reuse_id=[1])
203+
: +- Calc(select=[ts])
204+
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
205+
+- Calc(select=[1 AS EXPR$0])
206+
+- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[])
207+
+- Sort(orderBy=[ts ASC])
208+
+- Reused(reference_id=[1])
209+
]]>
210+
</Resource>
211+
</TestCase>
212+
<TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]">
213+
<Resource name="sql">
214+
<![CDATA[
215+
WITH window_1h AS (
216+
SELECT 1
217+
FROM MyTable2
218+
GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
219+
),
220+
221+
window_2h AS (
222+
SELECT 1
223+
FROM MyTable2
224+
GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
225+
)
226+
227+
(SELECT * FROM window_1h)
228+
UNION ALL
229+
(SELECT * FROM window_2h)
230+
]]>
231+
</Resource>
232+
<Resource name="planBefore">
233+
<![CDATA[
234+
LogicalUnion(all=[true])
235+
:- LogicalProject(EXPR$0=[1])
236+
: +- LogicalAggregate(group=[{0}])
237+
: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)])
238+
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
239+
+- LogicalProject(EXPR$0=[1])
240+
+- LogicalAggregate(group=[{0}])
241+
+- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)])
242+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
243+
]]>
244+
</Resource>
245+
<Resource name="planAfter">
246+
<![CDATA[
247+
Union(all=[true], union=[EXPR$0])
248+
:- Calc(select=[1 AS EXPR$0])
249+
: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[])
250+
: +- Exchange(distribution=[single])
251+
: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[])
252+
: +- Calc(select=[ts], reuse_id=[1])
253+
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
254+
+- Calc(select=[1 AS EXPR$0])
255+
+- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[])
256+
+- Exchange(distribution=[single])
257+
+- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[])
258+
+- Reused(reference_id=[1])
111259
]]>
112260
</Resource>
113261
</TestCase>

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,53 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
520520
+- Exchange(distribution=[single])
521521
+- Calc(select=[rowtime, c, a])
522522
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
523+
]]>
524+
</Resource>
525+
</TestCase>
526+
<TestCase name="testWindowAggregateWithDifferentWindows">
527+
<Resource name="sql">
528+
<![CDATA[
529+
WITH window_1h AS (
530+
SELECT 1
531+
FROM MyTable
532+
GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
533+
),
534+
535+
window_2h AS (
536+
SELECT 1
537+
FROM MyTable
538+
GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
539+
)
540+
541+
(SELECT * FROM window_1h)
542+
UNION ALL
543+
(SELECT * FROM window_2h)
544+
]]>
545+
</Resource>
546+
<Resource name="planBefore">
547+
<![CDATA[
548+
LogicalUnion(all=[true])
549+
:- LogicalProject(EXPR$0=[1])
550+
: +- LogicalAggregate(group=[{0}])
551+
: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)])
552+
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
553+
+- LogicalProject(EXPR$0=[1])
554+
+- LogicalAggregate(group=[{0}])
555+
+- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)])
556+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
557+
]]>
558+
</Resource>
559+
<Resource name="planAfter">
560+
<![CDATA[
561+
Union(all=[true], union=[EXPR$0])
562+
:- Calc(select=[1 AS EXPR$0])
563+
: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 3600000)], select=[])
564+
: +- Exchange(distribution=[single], reuse_id=[1])
565+
: +- Calc(select=[rowtime])
566+
: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
567+
+- Calc(select=[1 AS EXPR$0])
568+
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[])
569+
+- Reused(reference_id=[1])
523570
]]>
524571
</Resource>
525572
</TestCase>

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,33 @@ class WindowAggregateTest(aggStrategy: AggregatePhaseStrategy) extends TableTest
322322

323323
util.verifyPlan(sql)
324324
}
325+
326+
@Test
327+
def testWindowAggregateWithDifferentWindows(): Unit = {
328+
// This test ensures that the LogicalWindowAggregate node' digest contains the window specs.
329+
// This allows the planner to make the distinction between similar aggregations using different
330+
// windows (see FLINK-15577).
331+
val sql =
332+
"""
333+
|WITH window_1h AS (
334+
| SELECT 1
335+
| FROM MyTable2
336+
| GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
337+
|),
338+
|
339+
|window_2h AS (
340+
| SELECT 1
341+
| FROM MyTable2
342+
| GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
343+
|)
344+
|
345+
|(SELECT * FROM window_1h)
346+
|UNION ALL
347+
|(SELECT * FROM window_2h)
348+
|""".stripMargin
349+
350+
util.verifyPlan(sql)
351+
}
325352
}
326353

327354
object WindowAggregateTest {

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,4 +349,31 @@ class WindowAggregateTest extends TableTestBase {
349349

350350
util.verifyPlan(sql)
351351
}
352+
353+
@Test
354+
def testWindowAggregateWithDifferentWindows(): Unit = {
355+
// This test ensures that the LogicalWindowAggregate node' digest contains the window specs.
356+
// This allows the planner to make the distinction between similar aggregations using different
357+
// windows (see FLINK-15577).
358+
val sql =
359+
"""
360+
|WITH window_1h AS (
361+
| SELECT 1
362+
| FROM MyTable
363+
| GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
364+
|),
365+
|
366+
|window_2h AS (
367+
| SELECT 1
368+
| FROM MyTable
369+
| GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
370+
|)
371+
|
372+
|(SELECT * FROM window_1h)
373+
|UNION ALL
374+
|(SELECT * FROM window_2h)
375+
|""".stripMargin
376+
377+
util.verifyPlan(sql)
378+
}
352379
}

0 commit comments

Comments
 (0)