refactor: standard naming (#278)

* refactor: rename flinksql to flink

* refactor: rename pgsql to postgresql

* refactor: rename trinosql to trino

* refactor: replace all default exports with named export

* refactor: rename basicParser to basicSQL

* refactor: rename basic-parser-types to types

* refactor: replace arrow func with plain func
This commit is contained in:
Hayden
2024-03-27 10:33:25 +08:00
committed by GitHub
parent a99721162b
commit bb0fad1dbe
325 changed files with 33161 additions and 33202 deletions

View File

@ -0,0 +1,3 @@
ALTER DATABASE tempDB SET ("key1"="value1");
ALTER DATABASE db1 SET ('key1' = 'value1','key2.a' = 'value2.a');

View File

@ -0,0 +1,15 @@
ALTER FUNCTION tempFunction AS 'SimpleUdf';
ALTER temporary FUNCTION function1 AS 'org.apache.flink.function.function1';
ALTER temporary FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE scala;
ALTER temporary SYSTEM FUNCTION function1 AS 'org.apache.flink.function.function1';
ALTER temporary SYSTEM FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE java;
ALTER TEMPORARY SYSTEM FUNCTION IF EXISTS tempFunction AS 'SimpleUdf';
ALTER TEMPORARY FUNCTION IF EXISTS tempFunction AS 'SimpleUdf';
ALTER FUNCTION myudf AS 'com.example.MyUdf' LANGUAGE PYTHON;

View File

@ -0,0 +1,65 @@
-- Refer: https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java#L2016
-- Just for simple alter table statements, it not include alter table columns statements
ALTER TABLE
t1 RENAME TO t2;
ALTER TABLE
IF EXISTS t1 RENAME TO t2;
ALTER TABLE
c1.d1.t1 RENAME TO t2;
ALTER TABLE
IF EXISTS c1.d1.t1 RENAME TO t2;
ALTER TABLE
t1 RENAME a TO b;
ALTER TABLE
IF EXISTS t1 RENAME a TO b;
ALTER TABLE
IF EXISTS t1 RENAME a.x TO a.y;
ALTER TABLE
t1
set
('key1' = 'value1');
ALTER TABLE
IF EXISTS t1
set
('key1' = 'value1');
ALTER TABLE
t1
ADD
CONSTRAINT ct1 PRIMARY KEY(a, b);
ALTER TABLE
t1
ADD
CONSTRAINT ct1 PRIMARY KEY(a, b) NOT ENFORCED;
ALTER TABLE
IF EXISTS t1
ADD
CONSTRAINT ct1 PRIMARY KEY(a, b) NOT ENFORCED;
ALTER TABLE
t1
ADD
UNIQUE(a, b);
ALTER TABLE
IF EXISTS t1
ADD
UNIQUE(a, b);
ALTER TABLE
t1 DROP CONSTRAINT ct1;
ALTER TABLE
IF EXISTS t1 DROP CONSTRAINT ct1;

View File

@ -0,0 +1,8 @@
ALTER VIEW v1 RENAME TO v2;
ALTER VIEW v1 AS
SELECT
c1,
c2
FROM
tbl;

View File

@ -0,0 +1,27 @@
-- LOAD/UNLOAD
LOAD MODULE CORE;
LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2');
UNLOAD MODULE CORE;
--SET/RESET
SET;
SET 'test-key' = 'test-value';
RESET;
RESET 'test-key';
-- ADD/REMOVE JAR
ADD JAR '<path_to_filename>.jar'
REMOVE JAR '<path_to_filename>.jar'
-- Complex Arithmetic Expression
INSERT INTO avg_request_size_5m SELECT
window_start,
window_end,
(server_logs_window_5m.a/server_logs_window_5m.b+c)/d*e%f-g AS avg_size
FROM server_logs_window_5m;

View File

@ -0,0 +1,7 @@
----test comment1
--test comment2
/*
* test comments 3
*/

View File

@ -0,0 +1 @@
CREATE CATALOG c1 WITH ('key1' = 'value1', 'key2' = 'value2');

View File

@ -0,0 +1,9 @@
CREATE DATABASE db1 WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE IF NOT EXISTS db1 WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE catalog1.db1 WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE db1 COMMENT 'test create database' WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE IF NOT EXISTS dataApi COMMENT 'test create database' WITH ('key1' = 'value1', 'key2.a' = 'value2.a');

View File

@ -0,0 +1,28 @@
CREATE FUNCTION IF NOT EXISTS tempFunction AS 'SimpleUdf';
CREATE FUNCTION catalog1.db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION catalog1.db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION IF NOT EXISTS catalog1.db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE JAVA;
CREATE TEMPORARY SYSTEM FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE SCALA;
CREATE TEMPORARY SYSTEM FUNCTION IF NOT EXISTS function1 AS 'org.apache.flink.function.function1' LANGUAGE PYTHON;
-- test create function USING jar
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar',
JAR 'hdfs:///path/to/test2.jar';
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR '/path/to/test.jar';
CREATE TEMPORARY SYSTEM FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR '/path/to/test.jar';
CREATE FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar',
JAR 'hdfs:///path/to/test2.jar';

View File

@ -0,0 +1,304 @@
CREATE TABLE MyTable ('user_id' BIGINT, 'name' STRING) WITH ('connector' = 'oracle-x');
CREATE TABLE MyTable WITH ('connector' = 'oracle-x');
CREATE TEMPORARY TABLE client_errors (
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'stream-x'
);
-- 尽管官方文档的 BNF 里没有支持创建临时表,但实际上是支持的
CREATE TEMPORARY TABLE MyTable ('user_id' BIGINT, 'name' STRING) WITH ('connector' = 'oracle-x');
CREATE TABLE MyTable (
'user_id' BIGINT,
'name' STRING,
'timestamp' BIGINT METADATA, -- part of the query-to-sink schema
'offset' BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
'record_time' TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH ('connector' = 'kafka');
CREATE TABLE MyTable (
'user_id' BIGINT,
'price' DOUBLE,
'quantity' DOUBLE,
'cost' AS price * quanitity -- evaluate expression and supply the result to queries
) WITH ('connector' = 'kafka');
CREATE TABLE MyTable (
'user' BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka');
CREATE TABLE MyTable (id INT, PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'kafka');
CREATE TABLE tbl1 (
a BIGINT,
h VARCHAR,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b VARCHAR,
proc AS PROCTIME(),
meta STRING METADATA,
my_meta STRING METADATA FROM 'meta',
my_meta STRING METADATA FROM 'meta' VIRTUAL,
meta STRING METADATA VIRTUAL,
PRIMARY KEY (a, b) NOT ENFORCED
) PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE Orders_in_file (
'user' BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
) PARTITIONED BY ('user') WITH (
'connector' = 'filesystem',
'path' = '...'
);
CREATE TABLE Orders_with_watermark (
id INT,
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
) LIKE Orders_in_file (
-- Exclude everything besides the computed columns which we need to generate the watermark for.
-- We do not want to have the partitions or filesystem options as those do not apply to kafka.
EXCLUDING ALL
INCLUDING GENERATED
);
CREATE TABLE my_ctas_table WITH ('connector' = 'kafka')
AS SELECT
id,
name,
age
FROM
source_table
WHERE
mod(id, 10) = 0;
CREATE TABLE catalog1.db1.table1 (id INT) WITH ('connector' = 'kafka');
CREATE TABLE catalog1.db1.table1 (
attr0 STRING,
attr1 BOOLEAN,
attr3 DECIMAL(38, 18),
attr4 TINYINT,
attr5 SMALLINT,
attr6 INT,
attr7 BIGINT,
attr8 FLOAT,
attr9 DOUBLE,
attr10 DATE,
attr11 TIME,
attr12 TIMESTAMP(3),
attr13 ARRAY<STRING>,
attr14 ROW<attr15 FLOAT, attr16 TIMESTAMP(3)>,
attr17 MAP<INT, BIGINT>,
name1 VARCHAR(64),
message ROW<data ROW<UPO_TIMESTAMP VARCHAR(20)>>,
`raw` RAW('class', 'snapshot')
) WITH ('connector' = 'kafka');
CREATE TABLE IF NOT EXISTS tbl1 (
a BIGINT,
h VARCHAR,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b VARCHAR,
proc AS PROCTIME(),
PRIMARY KEY (a, b) NOT ENFORCED
) PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a BIGINT COMMENT 'test column comment AAA.',
h VARCHAR,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b VARCHAR,
proc AS PROCTIME(),
meta STRING METADATA,
my_meta STRING METADATA FROM 'meta',
my_meta STRING METADATA FROM 'meta' VIRTUAL,
PRIMARY KEY (a, b) NOT ENFORCED
) COMMENT 'test table comment ABC.' PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a BIGINT COMMENT 'test column comment AAA.',
h VARCHAR,
g AS 2 * (a + 1) COMMENT 'test computed column.',
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b VARCHAR,
proc AS PROCTIME(),
PRIMARY KEY (a, b) NOT ENFORCED
) COMMENT 'test table comment ABC.' PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a BIGINT,
h VARCHAR,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b VARCHAR,
proc AS PROCTIME(),
PRIMARY KEY (a, b) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a BIGINT PRIMARY KEY NOT ENFORCED COMMENT 'test column comment AAA.',
h VARCHAR CONSTRAINT ct1 PRIMARY KEY NOT ENFORCED,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
proc AS PROCTIME()
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
ts TIMESTAMP(3),
id VARCHAR,
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
log_ts VARCHAR,
ts AS to_timestamp(log_ts),
WATERMARK FOR ts AS ts + INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
f1 ROW<q1 BIGINT, q2 ROW<t1 TIMESTAMP, t2 VARCHAR>, q3 BOOLEAN>,
WATERMARK FOR f1.q2.t1 AS NOW()
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a ARRAY<BIGINT>,
b MAP<INT, VARCHAR>,
c ROW<cc0 INT, cc1 FLOAT, cc2 VARCHAR>,
d MULTISET<VARCHAR>,
PRIMARY KEY (a, b) NOT ENFORCED
) WITH (
'x' = 'y',
'asd' = 'data'
);
CREATE TABLE tbl1 (
a ARRAY<ARRAY<BIGINT>>,
b MAP<MAP<INT, VARCHAR>, ARRAY<VARCHAR>>,
c ROW<cc0 ARRAY<INT>, cc1 FLOAT, cc2 VARCHAR>,
d MULTISET<ARRAY<INT>>,
f TIMESTAMP(9),
PRIMARY KEY (a, b) NOT ENFORCED
) WITH (
'x' = 'y',
'asd' = 'data'
);
CREATE TABLE tbl1 (
a ARRAY<ARRAY<BIGINT>>,
b MAP<MAP<INT, VARCHAR>, ARRAY<VARCHAR>>,
c ROW<cc0 ARRAY<INT>, cc1 FLOAT, cc2 VARCHAR>,
d MULTISET<ARRAY<INT>>,
f TIMESTAMP(9),
PRIMARY KEY (a, b) NOT ENFORCED
) WITH (
'x' = 'y',
'asd' = 'data'
) LIKE Orders (
OVERWRITING OPTIONS
EXCLUDING CONSTRAINTS
);
CREATE TEMPORARY TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'faker'
);
CREATE TEMPORARY TABLE aggregations1 (
`browser` STRING,
`status_code` STRING,
`end_time` TIMESTAMP(3),
`requests` BIGINT NOT NULL
) WITH (
'connector' = 'blackhole'
);
CREATE TABLE dt_catalog.dt_db.doctor_sightings (
doctor STRING,
sighting_time TIMESTAMP(3),
WATERMARK FOR sighting_time AS sighting_time - INTERVAL '15' SECONDS
) WITH (
'connector' = 'faker'
);
CREATE TABLE dt_catalog.dt_db.bids (
bid_id STRING,
currency_code STRING,
bid_price DOUBLE,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECONDS -- 定义事件时间允许的最大窗口延迟为5s
) WITH (
'connector' = 'faker'
);
CREATE TABLE dt_catalog.dt_db.currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS, -- 定义事件时间
PRIMARY KEY (currency_code) NOT ENFORCED -- 定义主键
) WITH (
'connector' = 'faker'
);
CREATE TABLE dt_catalog.dt_db.users (
-- That was weird, NOT ENFORCED should have been necessary but we got a demo like the following and it could work!
user_id INT PRIMARY KEY,
user_name VARCHAR(255) NOT NULL,
age INT NULL
) WITH (
'connector' = 'faker'
);

View File

@ -0,0 +1,56 @@
CREATE VIEW v AS
SELECT
col1
FROM
tbl;
CREATE TEMPORARY VIEW v AS
SELECT
col1
FROM
tbl;
CREATE VIEW v COMMENT 'this is a view' AS
SELECT
col1
FROM
tbl;
CREATE VIEW v(col1, col2) AS
SELECT
col3,
col4
FROM
tbl;
CREATE VIEW v(col1, col2) COMMENT 'this is a view' AS
SELECT
col3,
col4
FROM
tbl;
CREATE TEMPORARY VIEW IF NOT EXISTS v AS
SELECT
col1
FROM
tbl;
CREATE TEMPORARY VIEW browsers AS
SELECT
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
status_code,
log_time
FROM
server_logs;
CREATE VIEW server_logs_window_1m AS
SELECT
TUMBLE_START(log_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_ROWTIME(log_time, INTERVAL '1' MINUTE) AS window_end,
SUM(size) AS total_size,
COUNT(*) AS num_requests
FROM
server_logs
GROUP BY
TUMBLE(log_time, INTERVAL '1' MINUTE);

View File

@ -0,0 +1,7 @@
DESCRIBE Orders;
DESC Orders;
DESCRIBE catalog1.db1.table1;
DESC catalog1.db1.table1;

View File

@ -0,0 +1,3 @@
DROP CATALOG catalog1;
DROP CATALOG IF EXISTS catalog2;

View File

@ -0,0 +1,3 @@
DROP DATABASE Orders;
DROP DATABASE IF EXISTS Orders RESTRICT;
DROP DATABASE IF EXISTS Orders CASCADE;

View File

@ -0,0 +1,5 @@
DROP FUNCTION Orders;
DROP TEMPORARY FUNCTION IF EXISTS Orders;
DROP TEMPORARY SYSTEM FUNCTION IF EXISTS Orders;

View File

@ -0,0 +1,5 @@
DROP TABLE Orders;
DROP TABLE IF EXISTS Orders;
DROP TEMPORARY TABLE IF EXISTS Orders;

View File

@ -0,0 +1,3 @@
DROP VIEW Orders;
DROP TEMPORARY VIEW IF EXISTS Orders;

View File

@ -0,0 +1,25 @@
ADD JAR WITH /home/test.jar;
ADD JAR WITH /home/test.jar AS test.jar;
ADD FILE WITH /home/admin/sftp/dttest_cn/DsCenter_1687/krb5.conf;
ADD FILE WITH /home/admin/sftp/dttest_cn/DsCenter_1687/krb5.conf AS krb5.conf;
ADD FILE WITH /home/admin/sftp/dttest_cn/DsCenter_1687/krb5.conf RENAME test.conf;
ADD PYTHON_FILES WITH /test.py RENAME test.py;
ADD PYTHON_REQUIREMENTS WITH /requirements.txt RENAME requirements.txt;
ADD PYTHON_DEPENDENCIES WITH /dependencies.txt RENAME dependencies.txt;
ADD PYTHON_JAR WITH /python_jar.jar RENAME py_jar.jar;
ADD PYTHON_PARAMETER my_parameter.py;
ADD PYTHON_ARCHIVES WITH /archives.py RENAME archive.tx;
ADD ENGINE FILE WITH /filePath RENAME theName KEY theKey;
ADD CONFIG FILE WITH /config.ini FOR hiveConf as hadoop.cong.dir;

View File

@ -0,0 +1,15 @@
EXPLAIN SELECT * FROM emps;
EXPLAIN PLAN FOR SELECT * FROM emps;
EXPLAIN PLAN FOR insert into emps1 SELECT * FROM emps2;
EXPLAIN CHANGELOG_MODE SELECT * FROM emps;
EXPLAIN ESTIMATED_COST SELECT * FROM emps;
EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM emps;
EXPLAIN CHANGELOG_MODE, JSON_EXECUTION_PLAN, ESTIMATED_COST SELECT * FROM emps;
EXPLAIN INSERT INTO EMPS1 SELECT * FROM EMPS2;

View File

@ -0,0 +1,8 @@
SELECT FROM_UNIXTIME(t1.create_time / 1000, 'yyyyMMddHHmmss') AS create_time FROM t1;
SELECT FROM_UNIXTIME(t1.create_time/1000, 'yyyyMMddHHmmss') AS create_time FROM t1;
SELECT FROM_UNIXTIME(t1.create_time/1000 * 1, 'yyyyMMddHHmmss') AS create_time FROM t1;
SELECT FROM_UNIXTIME(t1.create_time/1000 + 1, 'yyyyMMddHHmmss') AS create_time FROM t1;
SELECT FROM_UNIXTIME(t1.create_time/1000 - 1, 'yyyyMMddHHmmss') AS create_time FROM t1;
SELECT FROM_UNIXTIME(t1.create_time/1000 % 2, 'yyyyMMddHHmmss') AS create_time FROM t1;
SELECT FROM_UNIXTIME(t1.create_time/1000 / 1, 'yyyyMMddHHmmss') AS create_time FROM t1;

View File

@ -0,0 +1,54 @@
INSERT INTO country_page_view
SELECT `user`,
cnt
FROM page_view_source;
INSERT INTO catalog1.db1.country_page_view
SELECT `user`,
cnt
FROM page_view_source;
--- Execute InsertStatement
EXECUTE
INSERT INTO country_page_view PARTITION (`date` = '2019-8-30', country = 'China')
SELECT `user`,
cnt
FROM page_view_source;
--- Partition Clause: Static Partition
INSERT INTO country_page_view PARTITION (`date` = '2019-8-30', country = 'China')
SELECT `user`,
cnt
FROM page_view_source;
--- Partition Clause: Dynamic Partition
INSERT INTO country_page_view PARTITION (`date` = '2019-8-30')
SELECT `user`,
cnt,
country
FROM page_view_source;
--- Column List Statement
INSERT INTO country_page_view PARTITION (`date` = '2019-8-30', country = 'China') (`date`, country)
SELECT `user`,
cnt
FROM page_view_source;
--- Insert Method: OverWrite
INSERT OVERWRITE country_page_view PARTITION (`date` = '2019-8-30')
SELECT `user`,
cnt,
country
FROM page_view_source;
--- Insert with function
INSERT INTO hbase_table
SELECT
st.id as rowKey,
ROW(id, name, age) as baseInfo
FROM sourceTable st;

View File

@ -0,0 +1,22 @@
-- FlinkSQL 1.16insert multiple table statement
EXECUTE STATEMENT SET
BEGIN
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
END;
-- FlinkSQL 1.15insert multiple table statement
BEGIN STATEMENT SET;
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
END;

View File

@ -0,0 +1,22 @@
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
EXECUTE
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
EXECUTE
INSERT OVERWRITE country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
EXECUTE
INSERT INTO country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);
INSERT INTO catalog1.db1.country_page_view
VALUES ('Chinese', 'mumiao', 18),
('Amercian', 'georage', 22);

View File

@ -0,0 +1,19 @@
SELECT * FROM Orders;
SELECT order_id, price + tax FROM Orders;
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
SELECT price + tax FROM Orders WHERE id = 10;
SELECT * FROM person WHERE id = 200 OR id = 300;
SELECT id, sum(quantity) FROM dealer GROUP BY id;
SELECT PRETTY_PRINT(order_id) FROM Orders;
SELECT * FROM Orders ORDER BY order_time, order_id;
SELECT * FROM Orders ORDER BY orderTime LIMIT 3;
SELECT * FROM catalog1.db1.table1;

View File

@ -0,0 +1,122 @@
-- Window TVF Aggregation
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);
SELECT
window_start,
window_end,
item, supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (supplier_id, item);
-- GROUPING SETS
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY
window_start,
window_end,
GROUPING SETS ((supplier_id), ());
SELECT
window_start,
window_end,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY
window_start,
window_end,
ROLLUP (supplier_id);
SELECT
window_start,
window_end,
item,
supplier_id,
SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY
window_start,
window_end,
CUBE (supplier_id, item);
-- Group Window Aggregation
SELECT
`user`,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
SUM(amount) FROM Orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' DAY),
`user`;
SELECT
`user`,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
SUM(amount) FROM Orders
GROUP BY
HOP(order_time, INTERVAL '1' DAY),
`user`;
SELECT
`user`,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
SUM(amount) FROM Orders
GROUP BY
SESSION(order_time, INTERVAL '1' DAY),
`user`;
-- Having
SELECT SUM(amount)
FROM Orders
GROUP BY `users`
HAVING SUM(amount) > 50;
-- Over Aggregation
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders;
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table;
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW);

View File

@ -0,0 +1,7 @@
SELECT DISTINCT order_id, price + tax FROM Orders;
SELECT DISTINCT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
SELECT DISTINCT price + tax FROM Orders WHERE id = 10;
SELECT DISTINCT PRETTY_PRINT(order_id) FROM Orders;

View File

@ -0,0 +1,128 @@
-- INNER Equi-JOIN
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id;
-- OUTER Equi-JOIN
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id;
SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id;
SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id;
-- Interval Joins
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time;
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time = s.ship_time; -- ltime = rtime
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time >= s.ship_time -- ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
AND o.order_time < s.ship_time + INTERVAL '10' MINUTE;
-- Temporal Joins
SELECT
order_id,
price,
orders.currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency;
-- Lookup Join
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
-- Table Function INNER JOIN
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res);
-- Table Function LEFT OUTER JOIN
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE;
-- Cross Join
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
-- FULL OUTER Window Join
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
COALESCE(L.window_start, R.window_start) as window_start,
COALESCE(L.window_end, R.window_end) as window_end
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
-- Semi Window Joins
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE L.num IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end
);
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE L.num IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end
);
-- Anti Window Joins
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE L.num NOT IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end
);
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L WHERE NOT EXISTS (
SELECT * FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end
);

View File

@ -0,0 +1,165 @@
-- basic pattern recognition
SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
PARTITION BY userid
ORDER BY proctime
MEASURES
A.id AS aid,
B.id AS bid,
C.id AS cid
PATTERN (A B C)
DEFINE
A AS name = 'a',
B AS name = 'b',
C AS name = 'c'
) AS T;
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
START_ROW.rowtime AS start_tstamp,
_LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
_LAST(PRICE_UP.rowtime) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST PRICE_UP
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
DEFINE
PRICE_DOWN AS
(_LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < _LAST(PRICE_DOWN.price, 1),
PRICE_UP AS
PRICE_UP.price > _LAST(PRICE_DOWN.price, 1)
) MR;
-- Measures Aggregations
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp,
_LAST(A.rowtime) AS end_tstamp,
AVG(A.price) AS avgPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
-- Define a Pattern
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
C.price AS lastPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+ B* C? D{1,} E{,5} F{1,5})
DEFINE
A AS A.price > 10,
B AS B.price < 15,
C AS C.price > 12
);
-- Time constraint
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
C.rowtime AS dropTime,
A.price - C.price AS dropDiff
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
DEFINE
B AS B.price > A.price - 10,
C AS C.price < A.price - 10
);
-- Output Mode
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.price) AS startPrice,
_LAST(A.price) AS topPrice,
B.price AS lastPrice
ONE ROW PER MATCH
PATTERN (A+ B)
DEFINE
A AS _LAST(A.price, 1) IS NULL OR A.price > _LAST(A.price, 1),
B AS B.price < _LAST(A.price)
);
SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.price) AS startPrice,
_LAST(A.price) AS topPrice,
B.price AS lastPrice
ALL ROWS PER MATCH
PATTERN (A+ B)
DEFINE
A AS _LAST(A.price, 1) IS NULL OR A.price > _LAST(A.price, 1),
B AS B.price < _LAST(A.price)
);
-- After Match Strategy
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST A
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
FIRST(A.rowtime) AS start_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO FIRST A
PATTERN (A+ B)
DEFINE
A AS AVG(A.price) < 15
) MR;

View File

@ -0,0 +1,28 @@
-- UNION
(SELECT s FROM t1) UNION (SELECT s FROM t2);
(SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
-- INTERSECT
(SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
(SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
-- EXPECT
(SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
(SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
-- IN
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
-- EXISTS
SELECT user, amount
FROM Orders
WHERE product EXISTS (
SELECT product FROM NewProducts
)

View File

@ -0,0 +1,27 @@
SELECT id, age FROM table1 WHERE age IS NOT NULL;
SELECT id, age FROM table1 WHERE age IS NOT DISTINCT FROM 12;
SELECT id, age FROM table1 WHERE age BETWEEN SYMMETRIC 25 AND 18;
SELECT id, age FROM table1 WHERE age NOT LIKE "%aa_d%" ESCAPE "a";
SELECT addr FROM table1 WHERE addr NOT SIMILAR TO '%(123|yz)%' ESCAPE "y";
SELECT id, age FROM table1 WHERE age NOT IN (18,19,20);
SELECT id FROM table1 WHERE id NOT IN ( SELECT * FROM table2 );
SELECT S,SNAME
FROM S
WHERE NOT EXISTS
(SELECT *
FROM C
WHERE NOT EXISTS
(SELECT *
FROM SC
WHERE SC.S=S.S AND SC.C=C.C));
SELECT id, age FROM table1 WHERE age > 18 OR id = 1;
SELECT id, age FROM table1 WHERE age > 18 AND id > 10;

View File

@ -0,0 +1,52 @@
SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES
)
);
SELECT window_start,
window_end,
SUM(price)
FROM TABLE(
TUMBLE(
TABLE Bid,
DESCRIPTOR(bidtime),
INTERVAL '10' MINUTES
)
)
GROUP BY window_start, window_end;
SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
SELECT * FROM TABLE(
HOP(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SLIDE => INTERVAL '5' MINUTES,
SIZE => INTERVAL '10' MINUTES
)
);
SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES
)
);
SELECT window_start,
window_end,
SUM(price)
FROM TABLE(
CUMULATE(
TABLE Bid,
DESCRIPTOR(bidtime),
INTERVAL '2' MINUTES,
INTERVAL '10' MINUTES
))
GROUP BY window_start, window_end;

View File

@ -0,0 +1,7 @@
WITH orders_with_total AS (
SELECT order_id, price + tax AS total
FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

View File

@ -0,0 +1,29 @@
SHOW CATALOGS;
SHOW CURRENT CATALOG;
SHOW DATABASES;
SHOW CURRENT DATABASE;
SHOW TABLES;
SHOW TABLES FROM catalog1.db1 NOT LIKE '%';
SHOW CREATE TABLE my_table;
SHOW COLUMNS FROM my_table LIKE '%f%';
SHOW VIEWS;
SHOW CREATE VIEW my_view;
SHOW FUNCTIONS;
SHOW USER FUNCTIONS;
SHOW MODULES;
SHOW FULL MODULES;
SHOW JARS;

View File

@ -0,0 +1,884 @@
--********************************************************************--
-- Flink SQL 快速入门示例 创建表
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 执行创建临时表 DDL不需要指定catalog.database
CREATE TABLE orders (
order_uid BIGINT,
product_id BIGINT,
price DECIMAL(32, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);
--********************************************************************--
-- Flink SQL 快速入门示例 INSERT INTO
--********************************************************************--
-- 定义数据源表
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 定义结果表,实际应用中会选择 Kafka、JDBC 等作为结果表
CREATE TABLE client_errors (
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM server_logs
WHERE
status_code SIMILAR TO '4[0-9][0-9]';
--********************************************************************--
-- Flink SQL 快速入门示例 Statement Set
--********************************************************************--
-- 定义数据源表
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT,
WATERMARK FOR log_time AS log_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 定义结果表1
CREATE TABLE aggregations1 (
`browser` STRING,
`status_code` STRING,
`end_time` TIMESTAMP(3),
`requests` BIGINT NOT NULL
) WITH (
'connector' = 'blackhole'
);
-- 定义结果表2
CREATE TABLE aggregations2 (
`browser` STRING,
`status_code` STRING,
`requests` BIGINT NOT NULL
) WITH (
'connector' = 'stream-x'
);
-- This is a shared view that will be used by both insert into statements
CREATE VIEW browsers AS
SELECT
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
status_code,
log_time
FROM server_logs;
-- 封装多个INSERT INTO语句到一个STATEMENT SET语句中
BEGIN STATEMENT SET;
-- 5min窗口粒度聚合
INSERT INTO aggregations1
SELECT
browser,
status_code,
TUMBLE_ROWTIME(log_time, INTERVAL '5' MINUTE) AS end_time,
COUNT(*) requests
FROM browsers
GROUP BY
browser,
status_code,
TUMBLE(log_time, INTERVAL '5' MINUTE);
-- 1h窗口粒度聚合
INSERT INTO aggregations2
SELECT
browser,
status_code,
COUNT(*) requests
FROM browsers
GROUP BY
browser,
status_code,
TUMBLE(log_time, INTERVAL '1' HOUR);
END;
--********************************************************************--
-- Flink SQL 快速入门示例 Watermark
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.doctor_sightings (
doctor STRING,
sighting_time TIMESTAMP(3),
-- 通过watermark把sighting_time标识为事件时间定义最大的乱序时间期望所有的记录在目击发生后的15秒内都到达。
WATERMARK FOR sighting_time AS sighting_time - INTERVAL '15' SECONDS
) WITH (
'connector' = 'faker',
'fields.doctor.expression' = '#{dr_who.the_doctors}',
'fields.sighting_time.expression' = '#{date.past ''15'',''SECONDS''}'
);
SELECT
doctor,
-- 在滚动窗口中使用sighting_time字段
TUMBLE_ROWTIME(sighting_time, INTERVAL '1' MINUTE) AS sighting_time,
COUNT(*) AS sightings
FROM dt_catalog.dt_db.doctor_sightings
GROUP BY
TUMBLE(sighting_time, INTERVAL '1' MINUTE),
doctor;
--********************************************************************--
-- Flink SQL 快速入门示例 GROUP BY
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 定义数据源表
CREATE TABLE dt_catalog.dt_db.server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 采样user_agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A
-- 正则表达式: '[^\/]+' (匹配 '/' 之前的所有字符)
SELECT
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
status_code,
COUNT(*) AS cnt_status
FROM dt_catalog.dt_db.server_logs
-- 按浏览器和状态码两个维度统计日志数量
GROUP BY
REGEXP_EXTRACT(user_agent,'[^\/]+'),
status_code;
--********************************************************************--
-- Flink SQL 快速入门示例 滚动窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
request_line STRING,
status_code STRING,
log_time AS PROCTIME() -- 使用当前系统处理时间作为表的时间字段
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
);
-- 按 window_start, window_end 维度计算每分钟窗口上不同的 ip 数量
SELECT window_start, window_end, COUNT(DISTINCT client_ip) AS ip_addresses
FROM TABLE(
-- 定义1min滑动窗口
TUMBLE(TABLE dt_catalog.dt_db.server_logs, DESCRIPTOR(log_time), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end;
--********************************************************************--
-- Flink SQL 快速入门示例 滑动窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.bids (
bid_id STRING,
currency_code STRING,
bid_price DOUBLE,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECONDS -- 定义事件时间允许的最大窗口延迟为5s
) WITH (
'connector' = 'faker',
'fields.bid_id.expression' = '#{Internet.UUID}',
'fields.currency_code.expression' = '#{regexify ''(EUR|USD|CNY)''}',
'fields.bid_price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 定义1min 的滑动窗口,每隔 30s 滚动一次
SELECT window_start, window_end, currency_code, ROUND(AVG(bid_price),2) AS MovingAverageBidPrice
FROM TABLE(
HOP(TABLE dt_catalog.dt_db.bids, DESCRIPTOR(transaction_time), INTERVAL '30' SECONDS, INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, currency_code;
--********************************************************************--
-- Flink SQL 快速入门示例 累计窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 商品销售订单表
CREATE TABLE dt_catalog.dt_db.orders (
order_id BIGINT, -- 订单ID
goods_id BIGINT, -- 商品ID
goods_sales DOUBLE, -- 商品销售额
order_time TIMESTAMP(3), -- 下单时间
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS -- 定义事件时间允许的最大窗口延迟为5s
) WITH (
'connector' = 'faker',
'fields.order_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
'fields.goods_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
'fields.goods_sales.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.order_time.expression' = '#{date.past ''30'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 每分钟更新一次从零点开始截止到当前时刻的累计销售额
SELECT
window_start,
window_end,
SUM(goods_sales) AS cumulate_gmv -- 当天累计销售额
FROM TABLE(
-- 定义窗口最大长度为一天的累计窗口窗口滚动步长为1分钟
CUMULATE(
TABLE dt_catalog.dt_db.orders,
DESCRIPTOR(order_time),
INTERVAL '1' MINUTES,
INTERVAL '1' DAY))
GROUP BY window_start, window_end;
--********************************************************************--
-- Flink SQL 快速入门示例 会话窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS -- 定义 watermark
) WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '#{regexify ''(morsapaes|knauf|sjwiesman){1}''}',
'fields.log_time.expression' = '#{date.past ''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
);
SELECT
userid,
SESSION_START(log_time, INTERVAL '10' SECOND) AS session_beg,
SESSION_ROWTIME(log_time, INTERVAL '10' SECOND) AS session_end,
COUNT(request_line) AS request_cnt
FROM dt_catalog.dt_db.server_logs
WHERE status_code = '403'
GROUP BY
userid,
-- 会话窗口的最大空闲间隔为10s当10s内该窗口没有接收到新的请求会关闭当前窗口
SESSION(log_time, INTERVAL '10' SECOND);
--********************************************************************--
-- Flink SQL 快速入门示例 OVER窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.temperature_measurements (
measurement_time TIMESTAMP(3),
city STRING,
temperature FLOAT,
WATERMARK FOR measurement_time AS measurement_time - INTERVAL '15' SECONDS -- 定义时间属性字段OVER窗口排序时使用
) WITH (
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
'fields.measurement_time.expression' = '#{date.past ''15'',''SECONDS''}',
'fields.temperature.expression' = '#{number.numberBetween ''0'',''50''}',
'fields.city.expression' = '#{regexify ''(Chicago|Munich|Berlin|Portland|Hangzhou|Seatle|Beijing|New York){1}''}'
);
SELECT
measurement_time,
city,
temperature,
AVG(CAST(temperature AS FLOAT)) OVER last_minute AS avg_temperature_minute, -- 计算平均值
MAX(temperature) OVER last_minute AS min_temperature_minute, -- 计算最大值
MIN(temperature) OVER last_minute AS max_temperature_minute, -- 计算最小值
STDDEV(CAST(temperature AS FLOAT)) OVER last_minute AS stdev_temperature_minute -- 计算标准差
FROM dt_catalog.dt_db.temperature_measurements
WINDOW last_minute AS ( -- 定义1min时间间隔的OVER窗口按城市粒度分区温度测量值排序每个元素都会触发一次计算
PARTITION BY city
ORDER BY measurement_time
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
);
--********************************************************************--
-- Flink SQL 快速入门示例 级联窗口聚合
--********************************************************************--
CREATE TEMPORARY TABLE server_logs (
log_time TIMESTAMP(3),
client_ip STRING,
client_identity STRING,
userid STRING,
request_line STRING,
status_code STRING,
size INT,
WATERMARK FOR log_time AS log_time - INTERVAL '15' SECONDS -- 定义watermark
) WITH (
'connector' = 'faker',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 1min聚合结果表
CREATE TEMPORARY TABLE avg_request_size_1m (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
avg_size BIGINT
) WITH (
'connector' = 'blackhole'
);
-- 5min聚合结果表
CREATE TEMPORARY TABLE avg_request_size_5m (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
avg_size BIGINT
) WITH (
'connector' = 'blackhole'
);
-- 1min窗口查询结果
CREATE VIEW server_logs_window_1m AS
SELECT
TUMBLE_START(log_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_ROWTIME(log_time, INTERVAL '1' MINUTE) AS window_end,
SUM(size) AS total_size,
COUNT(*) AS num_requests
FROM server_logs
GROUP BY
TUMBLE(log_time, INTERVAL '1' MINUTE);
-- 基于1min窗口查询结果进行5min粒度窗口聚合
CREATE VIEW server_logs_window_5m AS
SELECT
TUMBLE_START(window_end, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_ROWTIME(window_end, INTERVAL '5' MINUTE) AS window_end,
SUM(total_size) AS total_size,
SUM(num_requests) AS num_requests
FROM server_logs_window_1m
GROUP BY
TUMBLE(window_end, INTERVAL '5' MINUTE);
BEGIN STATEMENT SET;
-- 写入结果到1min窗口粒度结果表
INSERT INTO avg_request_size_1m SELECT
window_start,
window_end,
total_size/num_requests AS avg_size
FROM server_logs_window_1m;
-- 写入结果到5min窗口粒度结果表
INSERT INTO avg_request_size_5m SELECT
window_start,
window_end,
total_size/num_requests AS avg_size
FROM server_logs_window_5m;
END;
--********************************************************************--
-- Flink SQL 快速入门示例 去重
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.orders (
id INT,
order_time AS CURRENT_TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='100'
);
-- 对于每个order_id按事件时间去重只保留最新时间的记录即可实现去重
SELECT
order_id,
order_time
FROM (
SELECT id AS order_id,
order_time,
-- 按事件时间升序排序
ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
FROM dt_catalog.dt_db.orders)
WHERE rownum = 1; -- 只取排名第一的记录去重是Top-N的一种特例
--********************************************************************--
-- Flink SQL 快速入门示例 Top-N
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.spells_cast (
wizard STRING,
spell STRING
) WITH (
'connector' = 'faker',
'fields.wizard.expression' = '#{harry_potter.characters}',
'fields.spell.expression' = '#{harry_potter.spells}'
);
-- 找出每个巫师最喜欢的两个法术
SELECT wizard, spell, times_cast
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num -- 按法术次数降序排序
FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM dt_catalog.dt_db.spells_cast GROUP BY wizard, spell) -- 计算每个巫师施展的各种法术的次数
)
WHERE row_num <= 2;
--********************************************************************--
-- Flink SQL 快速入门示例 窗口Top-N
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.orders (
bidtime TIMESTAMP(3),
price DOUBLE,
item STRING,
supplier STRING,
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS -- 定义watermark
) WITH (
'connector' = 'faker',
'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.item.expression' = '#{Commerce.productName}',
'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
'rows-per-second' = '100'
);
-- 取出销售排名前三的供应商
SELECT *
FROM (
-- 按窗口时间分区,按价格降序排序
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) AS rownum
FROM (
-- 计算每个窗口内各个供应商的销售额
SELECT window_start, window_end, supplier, SUM(price) AS price, COUNT(*) AS cnt
FROM TABLE(
TUMBLE(TABLE dt_catalog.dt_db.orders, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier
)
)
WHERE rownum <= 3;
--********************************************************************--
-- Flink SQL 快速入门示例 模式检测CEP
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.subscriptions (
id STRING,
user_id INT,
type STRING,
start_date TIMESTAMP(3),
end_date TIMESTAMP(3),
payment_expiration TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.uuid}',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
'fields.end_date.expression' = '#{date.future ''15'',''DAYS''}',
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);
SELECT *
FROM dt_catalog.dt_db.subscriptions
MATCH_RECOGNIZE ( -- 按user_id分区按处理时间proc_time升序排序
PARTITION BY user_id
ORDER BY proc_time
MEASURES
LAST(PREMIUM.type) AS premium_type,
AVG(TIMESTAMPDIFF(DAY,PREMIUM.start_date,PREMIUM.end_date)) AS premium_avg_duration,
BASIC.start_date AS downgrade_date
AFTER MATCH SKIP PAST LAST ROW
--: premiumplatinum
--'user_id''basic'
PATTERN (PREMIUM+ BASIC)
DEFINE PREMIUM AS PREMIUM.type IN ('premium','platinum'),
BASIC AS BASIC.type = 'basic');
--********************************************************************--
-- Flink SQL 快速入门示例 Regular Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.NOC (
agent_id STRING,
codename STRING
) WITH (
'connector' = 'faker',
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
'fields.codename.expression' = '#{superhero.name}',
'number-of-rows' = '10'
);
CREATE TABLE dt_catalog.dt_db.RealNames (
agent_id STRING,
name STRING
) WITH (
'connector' = 'faker',
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
'fields.name.expression' = '#{Name.full_name}',
'number-of-rows' = '10'
);
-- 使用agent_id作为两张表关联的条件左右两边任何一张表来了新数据都会触发join动作
SELECT
name,
codename
FROM dt_catalog.dt_db.NOC t1
INNER JOIN dt_catalog.dt_db.RealNames t2 ON t1.agent_id = t2.agent_id;
--********************************************************************--
-- Flink SQL 快速入门示例 Interval Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.orders (
id INT,
order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='1000'
);
CREATE TABLE dt_catalog.dt_db.shipments (
id INT,
order_id INT,
shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)) AS INT), CURRENT_TIMESTAMP)
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.id.kind'='random',
'fields.id.min'='0',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='1000'
);
-- order表的每条数据会与shipments表过去三天至当前时刻时间范围内的数据进行join
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM dt_catalog.dt_db.orders o
JOIN dt_catalog.dt_db.shipments s ON o.id = s.order_id
WHERE
-- 时间 join 条件shipments.shipment_time - INTERVAL '3' DAY <= orders.order_time < shipments.shipment_time
o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;
--********************************************************************--
-- Flink SQL 快速入门示例 时态表Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 使用主键约束和watermark来定义一张版本表这张表可以是一个cdc表、upsert类型的kafka topic等
CREATE TABLE dt_catalog.dt_db.currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS, -- 定义事件时间
PRIMARY KEY (currency_code) NOT ENFORCED -- 定义主键
) WITH (
'connector' = 'faker',
'fields.currency_code.expression' = '#{Currency.code}',
'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 这是一个append-only类型的动态表需要定义watermk
CREATE TABLE dt_catalog.dt_db.transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS --watermark
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.UUID}',
'fields.currency_code.expression' = '#{Currency.code}',
'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 当左右两张表的watermark对齐时才会触发join动作左右两张表都需要定义watermark
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM dt_catalog.dt_db.transactions t
-- transactions表每条记录都与currency_rates表transaction_time时刻的汇率进行join
JOIN dt_catalog.dt_db.currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
-- 指定join key
ON t.currency_code = c.currency_code;
--********************************************************************--
-- Flink SQL 快速入门示例 维表Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.subscriptions (
id STRING,
user_id INT,
type STRING,
start_date TIMESTAMP(3),
end_date TIMESTAMP(3),
payment_expiration TIMESTAMP(3),
proc_time AS PROCTIME() -- 这里需要定义处理时间属性
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.uuid}',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
'fields.end_date.expression' = '#{date.future ''365'',''DAYS''}',
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);
-- 定义维表为了示例能直接运行这里使用faker 作为维表实际应用中一般会使用JDBC、Redis、Hbase等作为维表
CREATE TABLE dt_catalog.dt_db.users (
user_id INT PRIMARY KEY, -- 定义主键
user_name VARCHAR(255) NOT NULL,
age INT NOT NULL
) WITH (
'connector' = 'faker',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''10''}',
'fields.user_name.expression' = '#{regexify ''(ron|jark|danny){1}''}',
'fields.age.expression' = '#{number.numberBetween ''1'',''30''}'
);
SELECT
id AS subscription_id,
type AS subscription_type,
age AS user_age,
CASE
WHEN age < 18 THEN 1
ELSE 0
END AS is_minor
FROM dt_catalog.dt_db.subscriptions usub
-- subscriptions每条记录都使用当前系统时间与维表users中的最新数据进行join
JOIN dt_catalog.dt_db.users FOR SYSTEM_TIME AS OF usub.proc_time AS u
-- 指定join key
ON usub.user_id = u.user_id;
--********************************************************************--
-- Flink SQL ODS 层
-- 实际应用中该任务应该是一个采集任务源表为RDB
--********************************************************************--
CREATE TABLE source_table (
account_number STRING,
channel_id INT,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期每5秒一条数据
);
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
CREATE TABLE sink_table (
account_number STRING,
channel_id INT,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO sink_table SELECT * FROM source_table
--********************************************************************--
-- Flink SQL DWD 层
-- 实际应用中,源表为 ODS TOPIC
--********************************************************************--
CREATE TABLE source_table (
account_number STRING,
channel_id INT,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期每5秒一条数据
);
-- 定义维表实际应用中应选择RDB作为维表
CREATE TABLE dim_table (
channel_id INT,
channel_name STRING,
PRIMARY KEY (channel_id,channel_name) NOT ENFORCED
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.channel_name.expression' = '#{app.name}' -- 渠道名称
);
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
CREATE TABLE sink_table (
account_number STRING,
channel_id INT,
channel_name STRING,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO sink_table
SELECT
s1.account_number,
s1. channel_id ,
d1.channel_name,
s1.account_open_datetime
FROM source_table s1
left JOIN dim_table d1 ON s1.channel_id=d1.channel_id
--********************************************************************--
-- Flink SQL DWS 层
-- 实际应用中源表为DWD TOPIC
--********************************************************************--
CREATE TABLE source_table (
account_number STRING,
channel_id INT,
channel_name STRING,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.channel_name.expression' = '#{app.name}' ,-- 渠道名称
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期每5秒一条数据
);
DROP TABLE source_table
SELECT * FROM source_table
-- 定义结果表实际应用中应选择Kafka作为结果表
CREATE TABLE sink_table (
channel_id STRING,
open_date STRING,
cnt INT
) WITH (
'connector' = 'stream-x'
);
DROP TABLE sink_table
SELECT * FROM sink_table
-- 写入数据到结果表
INSERT INTO sink_table
SELECT
channel_id,
DATE_FORMAT(account_open_datetime,'yyyy-MM-dd'),
count(account_number)
FROM source_table
GROUP BY channel_id,DATE_FORMAT(account_open_datetime,'yyyy-MM-dd')
--********************************************************************--
-- Flink SQL ADS 层
-- 实际应用中源表为DWS TOPIC
--********************************************************************--
CREATE TABLE source_table (
channel_id STRING,
open_time TIMESTAMP(3),
cnt INT
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.open_time.expression' = '#{Date.future ''15'',''5'',''SECONDS''}' ,-- 日期
'fields.cnt.expression' = '#{number.numberBetween ''1'',''100000000''}'-- 数量
);
-- 实际应用中结果表为RDB
CREATE TABLE sink_table (
open_date STRING,
cnt INT
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO sink_table
SELECT DATE_FORMAT(open_time,'yyyy-MM-dd'),count(1);

View File

@ -0,0 +1,5 @@
USE CATALOG cat1;
USE db1;
USE MODULES hive;