feat: complete Query statements of FlinkSQL (#93)

* feat: add inlineDataValueClasue rule

* test: update tests of select statements

* feat: support flinksql window TVF grammar

* test: flink sql windown TVF statement test

* feat: support grouping sets grammar

* test: window TVF Aggregation and Group Window Aggregation tests

* test: supplemental selectAggregation with test cases

* test: add Having statement test case

* feat: support flinkSql over aggregation grammar

* test: add over aggregation grammar test cases

* test: flink sql join statement test cases

* test: flink sql set Operations grammar test cases

* test: flink sql limit clause test case

* feat: remove allPlusUid and replace with uid

* feat: support flink sql pattern recognition grammar

* test: flink sql pattern recognition tests

* feat: add flink sql with clause rule

* test: flink sql with clasue select tests

* feat: rebuild flink sql parser
This commit is contained in:
Hayden
2023-05-17 10:30:25 +08:00
committed by GitHub
parent fbee70cde5
commit a026ae0592
19 changed files with 8188 additions and 4486 deletions

View File

@ -0,0 +1,13 @@
SELECT * FROM Orders;
SELECT order_id, price + tax FROM Orders;
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
SELECT price + tax FROM Orders WHERE id = 10;
SELECT PRETTY_PRINT(order_id) FROM Orders;
SELECT * FROM Orders ORDER BY order_time, order_id;
SELECT * FROM Orders ORDER BY orderTime LIMIT 3;

View File

@ -0,0 +1,122 @@
-- Window TVF Aggregation
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);
SELECT
window_start,
window_end,
item, supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (supplier_id, item);
-- GROUPING SETS
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY
window_start,
window_end,
GROUPING SETS ((supplier_id), ());
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY
window_start,
window_end,
ROLLUP (supplier_id);
SELECT
window_start,
window_end,
item,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY
window_start,
window_end,
CUBE (supplier_id, item);
-- Group Window Aggregation
SELECT
user,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
SUM(amount) FROM Orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' DAY),
user;
SELECT
user,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
SUM(amount) FROM Orders
GROUP BY
HOP(order_time, INTERVAL '1' DAY),
user;
SELECT
user,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
SUM(amount) FROM Orders
GROUP BY
SESSION(order_time, INTERVAL '1' DAY),
user;
-- Having
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50;
-- Over Aggregation
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders;
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table;
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW);

View File

@ -0,0 +1,7 @@
SELECT DISTINCT order_id, price + tax FROM Orders;
SELECT DISTINCT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
SELECT DISTINCT price + tax FROM Orders WHERE id = 10;
SELECT DISTINCT PRETTY_PRINT(order_id) FROM Orders;

View File

@ -0,0 +1,128 @@
-- INNER Equi-JOIN
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id;
-- OUTER Equi-JOIN
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id;
SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id;
SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id;
-- Interval Joins
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time;
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time = s.ship_time; -- ltime = rtime
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time >= s.ship_time -- ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
AND o.order_time < s.ship_time + INTERVAL '10' MINUTE;
-- Temporal Joins
SELECT
order_id,
price,
orders.currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency;
-- Lookup Join
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
-- Table Function INNER JOIN
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res);
-- Table Function LEFT OUTER JOIN
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE;
-- Cross Join
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
-- FULL OUTER Window Join
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
COALESCE(L.window_start, R.window_start) as window_start,
COALESCE(L.window_end, R.window_end) as window_end
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
-- Semi Window Joins
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE L.num IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end
);
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE L.num IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end
);
-- Anti Window Joins
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE L.num NOT IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end
);
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE NOT EXISTS (
SELECT * FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end
);

View File

@ -0,0 +1,165 @@
-- basic pattern recognition
SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
PARTITION BY userid
ORDER BY proctime
MEASURES
A.id AS aid,
B.id AS bid,
C.id AS cid
PATTERN (A B C)
DEFINE
A AS name = 'a',
B AS name = 'b',
C AS name = 'c'
) AS T;
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
START_ROW.rowtime AS start_tstamp,
_LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
_LAST(PRICE_UP.rowtime) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST PRICE_UP
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
DEFINE
PRICE_DOWN AS
(_LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < _LAST(PRICE_DOWN.price, 1),
PRICE_UP AS
PRICE_UP.price > _LAST(PRICE_DOWN.price, 1)
) MR;
-- Measures Aggregations
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp,
_LAST(A.rowtime) AS end_tstamp,
AVG(A.price) AS avgPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
-- Define a Pattern
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
C.price AS lastPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+ B* C? D{1,} E{,5} F{1,5})
DEFINE
A AS A.price > 10,
B AS B.price < 15,
C AS C.price > 12
);
-- Time constraint
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
C.rowtime AS dropTime,
A.price - C.price AS dropDiff
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
DEFINE
B AS B.price > A.price - 10,
C AS C.price < A.price - 10
);
-- Output Mode
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.price) AS startPrice,
_LAST(A.price) AS topPrice,
B.price AS lastPrice
ONE ROW PER MATCH
PATTERN (A+ B)
DEFINE
A AS _LAST(A.price, 1) IS NULL OR A.price > _LAST(A.price, 1),
B AS B.price < _LAST(A.price)
);
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.price) AS startPrice,
_LAST(A.price) AS topPrice,
B.price AS lastPrice
ALL ROWS PER MATCH
PATTERN (A+ B)
DEFINE
A AS _LAST(A.price, 1) IS NULL OR A.price > _LAST(A.price, 1),
B AS B.price < _LAST(A.price)
);
-- After Match Strategy
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST A
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO FIRST A
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;

View File

@ -0,0 +1,28 @@
-- UNION
(SELECT s FROM t1) UNION (SELECT s FROM t2);
(SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
-- INTERSECT
(SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
(SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
-- EXPECT
(SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
(SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
-- IN
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
-- EXISTS
SELECT user, amount
FROM Orders
WHERE product EXISTS (
SELECT product FROM NewProducts
)

View File

@ -0,0 +1,52 @@
SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES
)
);
SELECT window_start,
window_end,
SUM(price)
FROM TABLE(
TUMBLE(
TABLE Bid,
DESCRIPTOR(bidtime),
INTERVAL '10' MINUTES
)
)
GROUP BY window_start, window_end;
SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
SELECT * FROM TABLE(
HOP(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SLIDE => INTERVAL '5' MINUTES,
SIZE => INTERVAL '10' MINUTES
)
);
SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES
)
);
SELECT window_start,
window_end,
SUM(price)
FROM TABLE(
CUMULATE(
TABLE Bid,
DESCRIPTOR(bidtime),
INTERVAL '2' MINUTES,
INTERVAL '10' MINUTES
))
GROUP BY window_start, window_end;

View File

@ -0,0 +1,7 @@
WITH orders_with_total AS (
SELECT order_id, price + tax AS total
FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

View File

@ -0,0 +1,82 @@
import FlinkSQL from "../../../../src/parser/flinksql";
import { readSQL } from "../../../helper";
const parser = new FlinkSQL();
const features = {
base: readSQL(__dirname, "select.sql"),
withClause: readSQL(__dirname, "selectWithClause.sql"),
distinct: readSQL(__dirname, "selectDistinct.sql"),
windowTVF: readSQL(__dirname, "selectWindowTVF.sql"),
aggregation: readSQL(__dirname, "selectAggregation.sql"),
join: readSQL(__dirname, "selectJoin.sql"),
setOperation: readSQL(__dirname, "selectSetOperations.sql"),
pattern: readSQL(__dirname, "selectPatternRecognition.sql")
};
describe("FlinkSQL Query Statement Tests", () => {
describe("Base Select", () => {
features.base.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
});
});
describe("With Clause Select", () => {
features.withClause.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
});
});
describe("Select DISTINCT", () => {
features.distinct.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
})
})
describe("Select Window TVF", () => {
features.windowTVF.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
})
})
describe("Select Aggregation", () => {
features.aggregation.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
})
})
describe("Select Join", () => {
features.join.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
})
})
describe("Select Set Operations", () => {
features.setOperation.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
})
})
describe("Select Pattern Recognition", () => {
features.pattern.forEach((sql) => {
it(sql, () => {
expect(parser.validate(sql).length).toBe(0);
});
})
})
});