2023-05-25 09:24:24 +08:00
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 创建表
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- 执行创建临时表 DDL,不需要指定catalog.database
|
|
|
|
|
CREATE TABLE orders (
|
|
|
|
|
order_uid BIGINT,
|
|
|
|
|
product_id BIGINT,
|
|
|
|
|
price DECIMAL(32, 2),
|
|
|
|
|
order_time TIMESTAMP(3)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'datagen'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 INSERT INTO
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- 定义数据源表
|
|
|
|
|
CREATE TABLE server_logs (
|
|
|
|
|
client_ip STRING,
|
|
|
|
|
client_identity STRING,
|
|
|
|
|
userid STRING,
|
|
|
|
|
user_agent STRING,
|
|
|
|
|
log_time TIMESTAMP(3),
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
size INT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
|
|
|
|
'fields.client_identity.expression' = '-',
|
|
|
|
|
'fields.userid.expression' = '-',
|
|
|
|
|
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
|
|
|
|
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
|
|
|
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
|
|
|
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
|
|
|
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义结果表,实际应用中会选择 Kafka、JDBC 等作为结果表
|
|
|
|
|
CREATE TABLE client_errors (
|
|
|
|
|
log_time TIMESTAMP(3),
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
size INT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'stream-x'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 写入数据到结果表
|
|
|
|
|
INSERT INTO client_errors
|
|
|
|
|
SELECT
|
|
|
|
|
log_time,
|
|
|
|
|
request_line,
|
|
|
|
|
status_code,
|
|
|
|
|
size
|
|
|
|
|
FROM server_logs
|
|
|
|
|
WHERE
|
|
|
|
|
status_code SIMILAR TO '4[0-9][0-9]';
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 Statement Set
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- 定义数据源表
|
|
|
|
|
CREATE TABLE server_logs (
|
|
|
|
|
client_ip STRING,
|
|
|
|
|
client_identity STRING,
|
|
|
|
|
userid STRING,
|
|
|
|
|
user_agent STRING,
|
|
|
|
|
log_time TIMESTAMP(3),
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
size INT,
|
|
|
|
|
WATERMARK FOR log_time AS log_time - INTERVAL '30' SECONDS
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
|
|
|
|
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
|
|
|
|
'fields.client_identity.expression' = '-',
|
|
|
|
|
'fields.userid.expression' = '-',
|
|
|
|
|
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
|
|
|
|
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
|
|
|
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
|
|
|
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
|
|
|
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义结果表1
|
|
|
|
|
CREATE TABLE aggregations1 (
|
|
|
|
|
`browser` STRING,
|
|
|
|
|
`status_code` STRING,
|
|
|
|
|
`end_time` TIMESTAMP(3),
|
|
|
|
|
`requests` BIGINT NOT NULL
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'blackhole'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义结果表2
|
|
|
|
|
CREATE TABLE aggregations2 (
|
|
|
|
|
`browser` STRING,
|
|
|
|
|
`status_code` STRING,
|
|
|
|
|
`requests` BIGINT NOT NULL
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'stream-x'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- This is a shared view that will be used by both insert into statements
|
|
|
|
|
CREATE VIEW browsers AS
|
|
|
|
|
SELECT
|
|
|
|
|
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
|
|
|
|
|
status_code,
|
|
|
|
|
log_time
|
|
|
|
|
FROM server_logs;
|
|
|
|
|
|
|
|
|
|
-- 封装多个INSERT INTO语句到一个STATEMENT SET语句中
|
|
|
|
|
BEGIN STATEMENT SET;
|
|
|
|
|
-- 5min窗口粒度聚合
|
|
|
|
|
INSERT INTO aggregations1
|
|
|
|
|
SELECT
|
|
|
|
|
browser,
|
|
|
|
|
status_code,
|
|
|
|
|
TUMBLE_ROWTIME(log_time, INTERVAL '5' MINUTE) AS end_time,
|
|
|
|
|
COUNT(*) requests
|
|
|
|
|
FROM browsers
|
|
|
|
|
GROUP BY
|
|
|
|
|
browser,
|
|
|
|
|
status_code,
|
|
|
|
|
TUMBLE(log_time, INTERVAL '5' MINUTE);
|
|
|
|
|
-- 1h窗口粒度聚合
|
|
|
|
|
INSERT INTO aggregations2
|
|
|
|
|
SELECT
|
|
|
|
|
browser,
|
|
|
|
|
status_code,
|
|
|
|
|
COUNT(*) requests
|
|
|
|
|
FROM browsers
|
|
|
|
|
GROUP BY
|
|
|
|
|
browser,
|
|
|
|
|
status_code,
|
|
|
|
|
TUMBLE(log_time, INTERVAL '1' HOUR);
|
|
|
|
|
END;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 Watermark
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.doctor_sightings (
|
|
|
|
|
doctor STRING,
|
|
|
|
|
sighting_time TIMESTAMP(3),
|
|
|
|
|
-- 通过watermark把sighting_time标识为事件时间,定义最大的乱序时间:期望所有的记录在目击发生后的15秒内都到达。
|
|
|
|
|
WATERMARK FOR sighting_time AS sighting_time - INTERVAL '15' SECONDS
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.doctor.expression' = '#{dr_who.the_doctors}',
|
|
|
|
|
'fields.sighting_time.expression' = '#{date.past ''15'',''SECONDS''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
SELECT
|
|
|
|
|
doctor,
|
|
|
|
|
-- 在滚动窗口中使用sighting_time字段
|
|
|
|
|
TUMBLE_ROWTIME(sighting_time, INTERVAL '1' MINUTE) AS sighting_time,
|
|
|
|
|
COUNT(*) AS sightings
|
|
|
|
|
FROM dt_catalog.dt_db.doctor_sightings
|
|
|
|
|
GROUP BY
|
|
|
|
|
TUMBLE(sighting_time, INTERVAL '1' MINUTE),
|
|
|
|
|
doctor;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 GROUP BY
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- 定义数据源表
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.server_logs (
|
|
|
|
|
client_ip STRING,
|
|
|
|
|
client_identity STRING,
|
|
|
|
|
userid STRING,
|
|
|
|
|
user_agent STRING,
|
|
|
|
|
log_time TIMESTAMP(3),
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
size INT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
|
|
|
|
'fields.client_identity.expression' = '-',
|
|
|
|
|
'fields.userid.expression' = '-',
|
|
|
|
|
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
|
|
|
|
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
|
|
|
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
|
|
|
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
|
|
|
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 采样user_agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A
|
|
|
|
|
-- 正则表达式: '[^\/]+' (匹配 '/' 之前的所有字符)
|
|
|
|
|
SELECT
|
|
|
|
|
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
|
|
|
|
|
status_code,
|
|
|
|
|
COUNT(*) AS cnt_status
|
|
|
|
|
FROM dt_catalog.dt_db.server_logs
|
|
|
|
|
-- 按浏览器和状态码两个维度统计日志数量
|
|
|
|
|
GROUP BY
|
|
|
|
|
REGEXP_EXTRACT(user_agent,'[^\/]+'),
|
|
|
|
|
status_code;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 滚动窗口聚合
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.server_logs (
|
|
|
|
|
client_ip STRING,
|
|
|
|
|
client_identity STRING,
|
|
|
|
|
userid STRING,
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
log_time AS PROCTIME() -- 使用当前系统处理时间作为表的时间字段
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
|
|
|
|
'fields.client_identity.expression' = '-',
|
|
|
|
|
'fields.userid.expression' = '-',
|
|
|
|
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
|
|
|
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
|
|
|
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 按 window_start, window_end 维度计算每分钟窗口上不同的 ip 数量
|
|
|
|
|
SELECT window_start, window_end, COUNT(DISTINCT client_ip) AS ip_addresses
|
|
|
|
|
FROM TABLE(
|
|
|
|
|
-- 定义1min滑动窗口
|
|
|
|
|
TUMBLE(TABLE dt_catalog.dt_db.server_logs, DESCRIPTOR(log_time), INTERVAL '1' MINUTE))
|
|
|
|
|
GROUP BY window_start, window_end;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 滑动窗口聚合
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.bids (
|
|
|
|
|
bid_id STRING,
|
|
|
|
|
currency_code STRING,
|
|
|
|
|
bid_price DOUBLE,
|
|
|
|
|
transaction_time TIMESTAMP(3),
|
|
|
|
|
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECONDS -- 定义事件时间,允许的最大窗口延迟为5s
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.bid_id.expression' = '#{Internet.UUID}',
|
|
|
|
|
'fields.currency_code.expression' = '#{regexify ''(EUR|USD|CNY)''}',
|
|
|
|
|
'fields.bid_price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
|
|
|
|
|
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
|
|
|
|
|
'rows-per-second' = '100'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义1min 的滑动窗口,每隔 30s 滚动一次
|
|
|
|
|
SELECT window_start, window_end, currency_code, ROUND(AVG(bid_price),2) AS MovingAverageBidPrice
|
|
|
|
|
FROM TABLE(
|
|
|
|
|
HOP(TABLE dt_catalog.dt_db.bids, DESCRIPTOR(transaction_time), INTERVAL '30' SECONDS, INTERVAL '1' MINUTE))
|
|
|
|
|
GROUP BY window_start, window_end, currency_code;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 累计窗口聚合
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- 商品销售订单表
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.orders (
|
|
|
|
|
order_id BIGINT, -- 订单ID
|
|
|
|
|
goods_id BIGINT, -- 商品ID
|
|
|
|
|
goods_sales DOUBLE, -- 商品销售额
|
|
|
|
|
order_time TIMESTAMP(3), -- 下单时间
|
|
|
|
|
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS -- 定义事件时间,允许的最大窗口延迟为5s
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.order_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
|
|
|
|
|
'fields.goods_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
|
|
|
|
|
'fields.goods_sales.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
|
|
|
|
|
'fields.order_time.expression' = '#{date.past ''30'',''SECONDS''}',
|
|
|
|
|
'rows-per-second' = '100'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 每分钟更新一次从零点开始截止到当前时刻的累计销售额
|
|
|
|
|
SELECT
|
|
|
|
|
window_start,
|
|
|
|
|
window_end,
|
|
|
|
|
SUM(goods_sales) AS cumulate_gmv -- 当天累计销售额
|
|
|
|
|
FROM TABLE(
|
|
|
|
|
-- 定义窗口最大长度为一天的累计窗口,窗口滚动步长为1分钟
|
|
|
|
|
CUMULATE(
|
|
|
|
|
TABLE dt_catalog.dt_db.orders,
|
|
|
|
|
DESCRIPTOR(order_time),
|
|
|
|
|
INTERVAL '1' MINUTES,
|
|
|
|
|
INTERVAL '1' DAY))
|
|
|
|
|
GROUP BY window_start, window_end;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 会话窗口聚合
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.server_logs (
|
|
|
|
|
client_ip STRING,
|
|
|
|
|
client_identity STRING,
|
|
|
|
|
userid STRING,
|
|
|
|
|
log_time TIMESTAMP(3),
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS -- 定义 watermark
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'rows-per-second' = '5',
|
|
|
|
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
|
|
|
|
'fields.client_identity.expression' = '-',
|
|
|
|
|
'fields.userid.expression' = '#{regexify ''(morsapaes|knauf|sjwiesman){1}''}',
|
|
|
|
|
'fields.log_time.expression' = '#{date.past ''5'',''SECONDS''}',
|
|
|
|
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
|
|
|
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
SELECT
|
|
|
|
|
userid,
|
|
|
|
|
SESSION_START(log_time, INTERVAL '10' SECOND) AS session_beg,
|
|
|
|
|
SESSION_ROWTIME(log_time, INTERVAL '10' SECOND) AS session_end,
|
|
|
|
|
COUNT(request_line) AS request_cnt
|
|
|
|
|
FROM dt_catalog.dt_db.server_logs
|
|
|
|
|
WHERE status_code = '403'
|
|
|
|
|
GROUP BY
|
|
|
|
|
userid,
|
|
|
|
|
-- 会话窗口的最大空闲间隔为10s,当10s内该窗口没有接收到新的请求,会关闭当前窗口
|
|
|
|
|
SESSION(log_time, INTERVAL '10' SECOND);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 OVER窗口聚合
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.temperature_measurements (
|
|
|
|
|
measurement_time TIMESTAMP(3),
|
|
|
|
|
city STRING,
|
|
|
|
|
temperature FLOAT,
|
|
|
|
|
WATERMARK FOR measurement_time AS measurement_time - INTERVAL '15' SECONDS -- 定义时间属性字段,OVER窗口排序时使用
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
|
|
|
|
|
'fields.measurement_time.expression' = '#{date.past ''15'',''SECONDS''}',
|
|
|
|
|
'fields.temperature.expression' = '#{number.numberBetween ''0'',''50''}',
|
|
|
|
|
'fields.city.expression' = '#{regexify ''(Chicago|Munich|Berlin|Portland|Hangzhou|Seatle|Beijing|New York){1}''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
SELECT
|
|
|
|
|
measurement_time,
|
|
|
|
|
city,
|
|
|
|
|
temperature,
|
|
|
|
|
AVG(CAST(temperature AS FLOAT)) OVER last_minute AS avg_temperature_minute, -- 计算平均值
|
|
|
|
|
MAX(temperature) OVER last_minute AS min_temperature_minute, -- 计算最大值
|
|
|
|
|
MIN(temperature) OVER last_minute AS max_temperature_minute, -- 计算最小值
|
|
|
|
|
STDDEV(CAST(temperature AS FLOAT)) OVER last_minute AS stdev_temperature_minute -- 计算标准差
|
|
|
|
|
FROM dt_catalog.dt_db.temperature_measurements
|
|
|
|
|
WINDOW last_minute AS ( -- 定义1min时间间隔的OVER窗口,按城市粒度分区,温度测量值排序,每个元素都会触发一次计算
|
|
|
|
|
PARTITION BY city
|
|
|
|
|
ORDER BY measurement_time
|
|
|
|
|
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 级联窗口聚合
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TEMPORARY TABLE server_logs (
|
|
|
|
|
log_time TIMESTAMP(3),
|
|
|
|
|
client_ip STRING,
|
|
|
|
|
client_identity STRING,
|
|
|
|
|
userid STRING,
|
|
|
|
|
request_line STRING,
|
|
|
|
|
status_code STRING,
|
|
|
|
|
size INT,
|
|
|
|
|
WATERMARK FOR log_time AS log_time - INTERVAL '15' SECONDS -- 定义watermark
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
|
|
|
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
|
|
|
|
'fields.client_identity.expression' = '-',
|
|
|
|
|
'fields.userid.expression' = '-',
|
|
|
|
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
|
|
|
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
|
|
|
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 1min聚合结果表
|
|
|
|
|
CREATE TEMPORARY TABLE avg_request_size_1m (
|
|
|
|
|
window_start TIMESTAMP(3),
|
|
|
|
|
window_end TIMESTAMP(3),
|
|
|
|
|
avg_size BIGINT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'blackhole'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 5min聚合结果表
|
|
|
|
|
CREATE TEMPORARY TABLE avg_request_size_5m (
|
|
|
|
|
window_start TIMESTAMP(3),
|
|
|
|
|
window_end TIMESTAMP(3),
|
|
|
|
|
avg_size BIGINT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'blackhole'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 1min窗口查询结果
|
|
|
|
|
CREATE VIEW server_logs_window_1m AS
|
|
|
|
|
SELECT
|
|
|
|
|
TUMBLE_START(log_time, INTERVAL '1' MINUTE) AS window_start,
|
|
|
|
|
TUMBLE_ROWTIME(log_time, INTERVAL '1' MINUTE) AS window_end,
|
|
|
|
|
SUM(size) AS total_size,
|
|
|
|
|
COUNT(*) AS num_requests
|
|
|
|
|
FROM server_logs
|
|
|
|
|
GROUP BY
|
|
|
|
|
TUMBLE(log_time, INTERVAL '1' MINUTE);
|
|
|
|
|
|
|
|
|
|
-- 基于1min窗口查询结果,进行5min粒度窗口聚合
|
|
|
|
|
CREATE VIEW server_logs_window_5m AS
|
|
|
|
|
SELECT
|
|
|
|
|
TUMBLE_START(window_end, INTERVAL '5' MINUTE) AS window_start,
|
|
|
|
|
TUMBLE_ROWTIME(window_end, INTERVAL '5' MINUTE) AS window_end,
|
|
|
|
|
SUM(total_size) AS total_size,
|
|
|
|
|
SUM(num_requests) AS num_requests
|
|
|
|
|
FROM server_logs_window_1m
|
|
|
|
|
GROUP BY
|
|
|
|
|
TUMBLE(window_end, INTERVAL '5' MINUTE);
|
|
|
|
|
|
|
|
|
|
BEGIN STATEMENT SET;
|
|
|
|
|
-- 写入结果到1min窗口粒度结果表
|
|
|
|
|
INSERT INTO avg_request_size_1m SELECT
|
|
|
|
|
window_start,
|
|
|
|
|
window_end,
|
2023-05-30 10:19:13 +08:00
|
|
|
|
total_size/num_requests AS avg_size
|
2023-05-25 09:24:24 +08:00
|
|
|
|
FROM server_logs_window_1m;
|
|
|
|
|
|
|
|
|
|
-- 写入结果到5min窗口粒度结果表
|
|
|
|
|
INSERT INTO avg_request_size_5m SELECT
|
|
|
|
|
window_start,
|
|
|
|
|
window_end,
|
2023-05-30 10:19:13 +08:00
|
|
|
|
total_size/num_requests AS avg_size
|
2023-05-25 09:24:24 +08:00
|
|
|
|
FROM server_logs_window_5m;
|
|
|
|
|
END;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 去重
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.orders (
|
|
|
|
|
id INT,
|
|
|
|
|
order_time AS CURRENT_TIMESTAMP,
|
|
|
|
|
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
|
|
|
|
|
)
|
|
|
|
|
WITH (
|
|
|
|
|
'connector' = 'datagen',
|
|
|
|
|
'rows-per-second'='10',
|
|
|
|
|
'fields.id.kind'='random',
|
|
|
|
|
'fields.id.min'='1',
|
|
|
|
|
'fields.id.max'='100'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 对于每个order_id,按事件时间去重,只保留最新时间的记录即可实现去重
|
|
|
|
|
SELECT
|
|
|
|
|
order_id,
|
|
|
|
|
order_time
|
|
|
|
|
FROM (
|
|
|
|
|
SELECT id AS order_id,
|
|
|
|
|
order_time,
|
|
|
|
|
-- 按事件时间升序排序
|
|
|
|
|
ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
|
|
|
|
|
FROM dt_catalog.dt_db.orders)
|
|
|
|
|
WHERE rownum = 1; -- 只取排名第一的记录,去重是Top-N的一种特例
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 Top-N
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.spells_cast (
|
|
|
|
|
wizard STRING,
|
|
|
|
|
spell STRING
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.wizard.expression' = '#{harry_potter.characters}',
|
|
|
|
|
'fields.spell.expression' = '#{harry_potter.spells}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 找出每个巫师最喜欢的两个法术
|
|
|
|
|
SELECT wizard, spell, times_cast
|
|
|
|
|
FROM (
|
|
|
|
|
SELECT *,
|
|
|
|
|
ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num -- 按法术次数降序排序
|
|
|
|
|
FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM dt_catalog.dt_db.spells_cast GROUP BY wizard, spell) -- 计算每个巫师施展的各种法术的次数
|
|
|
|
|
)
|
|
|
|
|
WHERE row_num <= 2;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 窗口Top-N
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.orders (
|
|
|
|
|
bidtime TIMESTAMP(3),
|
|
|
|
|
price DOUBLE,
|
|
|
|
|
item STRING,
|
|
|
|
|
supplier STRING,
|
|
|
|
|
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS -- 定义watermark
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
|
|
|
|
|
'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
|
|
|
|
|
'fields.item.expression' = '#{Commerce.productName}',
|
|
|
|
|
'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
|
|
|
|
|
'rows-per-second' = '100'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 取出销售排名前三的供应商
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM (
|
|
|
|
|
-- 按窗口时间分区,按价格降序排序
|
|
|
|
|
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) AS rownum
|
|
|
|
|
FROM (
|
|
|
|
|
-- 计算每个窗口内各个供应商的销售额
|
|
|
|
|
SELECT window_start, window_end, supplier, SUM(price) AS price, COUNT(*) AS cnt
|
|
|
|
|
FROM TABLE(
|
|
|
|
|
TUMBLE(TABLE dt_catalog.dt_db.orders, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
|
|
|
|
|
GROUP BY window_start, window_end, supplier
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
WHERE rownum <= 3;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 模式检测CEP
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.subscriptions (
|
|
|
|
|
id STRING,
|
|
|
|
|
user_id INT,
|
|
|
|
|
type STRING,
|
|
|
|
|
start_date TIMESTAMP(3),
|
|
|
|
|
end_date TIMESTAMP(3),
|
|
|
|
|
payment_expiration TIMESTAMP(3),
|
|
|
|
|
proc_time AS PROCTIME()
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.id.expression' = '#{Internet.uuid}',
|
|
|
|
|
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
|
|
|
|
|
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
|
|
|
|
|
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
|
|
|
|
|
'fields.end_date.expression' = '#{date.future ''15'',''DAYS''}',
|
|
|
|
|
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM dt_catalog.dt_db.subscriptions
|
|
|
|
|
MATCH_RECOGNIZE ( -- 按user_id分区,按处理时间proc_time升序排序
|
|
|
|
|
PARTITION BY user_id
|
|
|
|
|
ORDER BY proc_time
|
|
|
|
|
MEASURES
|
|
|
|
|
LAST(PREMIUM.type) AS premium_type,
|
|
|
|
|
AVG(TIMESTAMPDIFF(DAY,PREMIUM.start_date,PREMIUM.end_date)) AS premium_avg_duration,
|
|
|
|
|
BASIC.start_date AS downgrade_date
|
|
|
|
|
AFTER MATCH SKIP PAST LAST ROW
|
|
|
|
|
--模式: 一个或多个‘premium‘或’platinum‘订阅事件
|
|
|
|
|
--对于相同的'user_id',后面跟着一个'basic'订阅事件
|
|
|
|
|
PATTERN (PREMIUM+ BASIC)
|
|
|
|
|
DEFINE PREMIUM AS PREMIUM.type IN ('premium','platinum'),
|
|
|
|
|
BASIC AS BASIC.type = 'basic');
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 Regular Join
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.NOC (
|
|
|
|
|
agent_id STRING,
|
|
|
|
|
codename STRING
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
|
|
|
|
|
'fields.codename.expression' = '#{superhero.name}',
|
|
|
|
|
'number-of-rows' = '10'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.RealNames (
|
|
|
|
|
agent_id STRING,
|
|
|
|
|
name STRING
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
|
|
|
|
|
'fields.name.expression' = '#{Name.full_name}',
|
|
|
|
|
'number-of-rows' = '10'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 使用agent_id作为两张表关联的条件,左右两边任何一张表来了新数据都会触发join动作
|
|
|
|
|
SELECT
|
|
|
|
|
name,
|
|
|
|
|
codename
|
|
|
|
|
FROM dt_catalog.dt_db.NOC t1
|
|
|
|
|
INNER JOIN dt_catalog.dt_db.RealNames t2 ON t1.agent_id = t2.agent_id;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 Interval Join
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.orders (
|
|
|
|
|
id INT,
|
|
|
|
|
order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'datagen',
|
|
|
|
|
'rows-per-second'='10',
|
|
|
|
|
'fields.id.kind'='sequence',
|
|
|
|
|
'fields.id.start'='1',
|
|
|
|
|
'fields.id.end'='1000'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.shipments (
|
|
|
|
|
id INT,
|
|
|
|
|
order_id INT,
|
|
|
|
|
shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)) AS INT), CURRENT_TIMESTAMP)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'datagen',
|
|
|
|
|
'rows-per-second'='5',
|
|
|
|
|
'fields.id.kind'='random',
|
|
|
|
|
'fields.id.min'='0',
|
|
|
|
|
'fields.order_id.kind'='sequence',
|
|
|
|
|
'fields.order_id.start'='1',
|
|
|
|
|
'fields.order_id.end'='1000'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- order表的每条数据会与shipments表过去三天至当前时刻时间范围内的数据进行join
|
|
|
|
|
SELECT
|
|
|
|
|
o.id AS order_id,
|
|
|
|
|
o.order_time,
|
|
|
|
|
s.shipment_time,
|
|
|
|
|
TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
|
|
|
|
|
FROM dt_catalog.dt_db.orders o
|
|
|
|
|
JOIN dt_catalog.dt_db.shipments s ON o.id = s.order_id
|
|
|
|
|
WHERE
|
|
|
|
|
-- 时间 join 条件:shipments.shipment_time - INTERVAL '3' DAY <= orders.order_time < shipments.shipment_time
|
|
|
|
|
o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 时态表Join
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- 使用主键约束和watermark来定义一张版本表,这张表可以是一个cdc表、upsert类型的kafka topic等
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.currency_rates (
|
|
|
|
|
`currency_code` STRING,
|
|
|
|
|
`eur_rate` DECIMAL(6,4),
|
|
|
|
|
`rate_time` TIMESTAMP(3),
|
|
|
|
|
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS, -- 定义事件时间
|
|
|
|
|
PRIMARY KEY (currency_code) NOT ENFORCED -- 定义主键
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.currency_code.expression' = '#{Currency.code}',
|
|
|
|
|
'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
|
|
|
|
|
'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
|
|
|
|
|
'rows-per-second' = '100'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 这是一个append-only类型的动态表,需要定义watermk
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.transactions (
|
|
|
|
|
`id` STRING,
|
|
|
|
|
`currency_code` STRING,
|
|
|
|
|
`total` DECIMAL(10,2),
|
|
|
|
|
`transaction_time` TIMESTAMP(3),
|
|
|
|
|
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS --定义watermark
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.id.expression' = '#{Internet.UUID}',
|
|
|
|
|
'fields.currency_code.expression' = '#{Currency.code}',
|
|
|
|
|
'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
|
|
|
|
|
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
|
|
|
|
|
'rows-per-second' = '100'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 当左右两张表的watermark对齐时,才会触发join动作,左右两张表都需要定义watermark
|
|
|
|
|
SELECT
|
|
|
|
|
t.id,
|
|
|
|
|
t.total * c.eur_rate AS total_eur,
|
|
|
|
|
t.total,
|
|
|
|
|
c.currency_code,
|
|
|
|
|
t.transaction_time
|
|
|
|
|
FROM dt_catalog.dt_db.transactions t
|
|
|
|
|
-- transactions表每条记录都与currency_rates表transaction_time时刻的汇率进行join
|
|
|
|
|
JOIN dt_catalog.dt_db.currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
|
|
|
|
|
-- 指定join key
|
|
|
|
|
ON t.currency_code = c.currency_code;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL 快速入门示例 维表Join
|
|
|
|
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.subscriptions (
|
|
|
|
|
id STRING,
|
|
|
|
|
user_id INT,
|
|
|
|
|
type STRING,
|
|
|
|
|
start_date TIMESTAMP(3),
|
|
|
|
|
end_date TIMESTAMP(3),
|
|
|
|
|
payment_expiration TIMESTAMP(3),
|
|
|
|
|
proc_time AS PROCTIME() -- 这里需要定义处理时间属性
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.id.expression' = '#{Internet.uuid}',
|
|
|
|
|
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
|
|
|
|
|
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
|
|
|
|
|
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
|
|
|
|
|
'fields.end_date.expression' = '#{date.future ''365'',''DAYS''}',
|
|
|
|
|
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义维表,为了示例能直接运行,这里使用faker 作为维表,实际应用中一般会使用JDBC、Redis、Hbase等作为维表
|
|
|
|
|
CREATE TABLE dt_catalog.dt_db.users (
|
|
|
|
|
user_id INT PRIMARY KEY, -- 定义主键
|
|
|
|
|
user_name VARCHAR(255) NOT NULL,
|
|
|
|
|
age INT NOT NULL
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker',
|
|
|
|
|
'fields.user_id.expression' = '#{number.numberBetween ''1'',''10''}',
|
|
|
|
|
'fields.user_name.expression' = '#{regexify ''(ron|jark|danny){1}''}',
|
|
|
|
|
'fields.age.expression' = '#{number.numberBetween ''1'',''30''}'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
SELECT
|
|
|
|
|
id AS subscription_id,
|
|
|
|
|
type AS subscription_type,
|
|
|
|
|
age AS user_age,
|
|
|
|
|
CASE
|
|
|
|
|
WHEN age < 18 THEN 1
|
|
|
|
|
ELSE 0
|
|
|
|
|
END AS is_minor
|
|
|
|
|
FROM dt_catalog.dt_db.subscriptions usub
|
|
|
|
|
-- subscriptions每条记录都使用当前系统时间与维表users中的最新数据进行join
|
|
|
|
|
JOIN dt_catalog.dt_db.users FOR SYSTEM_TIME AS OF usub.proc_time AS u
|
|
|
|
|
-- 指定join key
|
|
|
|
|
ON usub.user_id = u.user_id;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL ODS 层
|
|
|
|
|
-- 实际应用中,该任务应该是一个采集任务,源表为RDB
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE source_table (
|
|
|
|
|
account_number STRING,
|
|
|
|
|
channel_id INT,
|
|
|
|
|
account_open_datetime TIMESTAMP(3)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
|
|
|
|
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
|
|
|
|
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
|
|
|
|
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期,每5秒一条数据
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
|
|
|
|
|
CREATE TABLE sink_table (
|
|
|
|
|
account_number STRING,
|
|
|
|
|
channel_id INT,
|
|
|
|
|
account_open_datetime TIMESTAMP(3)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'stream-x'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 写入数据到结果表
|
|
|
|
|
INSERT INTO sink_table SELECT * FROM source_table
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL DWD 层
|
|
|
|
|
-- 实际应用中,源表为 ODS TOPIC
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE source_table (
|
|
|
|
|
account_number STRING,
|
|
|
|
|
channel_id INT,
|
|
|
|
|
account_open_datetime TIMESTAMP(3)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
|
|
|
|
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
|
|
|
|
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
|
|
|
|
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期,每5秒一条数据
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义维表,实际应用中应选择RDB作为维表
|
|
|
|
|
CREATE TABLE dim_table (
|
|
|
|
|
channel_id INT,
|
|
|
|
|
channel_name STRING,
|
|
|
|
|
PRIMARY KEY (channel_id,channel_name) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
|
|
|
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
|
|
|
|
'fields.channel_name.expression' = '#{app.name}' -- 渠道名称
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
|
|
|
|
|
CREATE TABLE sink_table (
|
|
|
|
|
account_number STRING,
|
|
|
|
|
channel_id INT,
|
|
|
|
|
channel_name STRING,
|
|
|
|
|
account_open_datetime TIMESTAMP(3)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'stream-x'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 写入数据到结果表
|
|
|
|
|
INSERT INTO sink_table
|
|
|
|
|
SELECT
|
|
|
|
|
s1.account_number,
|
|
|
|
|
s1. channel_id ,
|
|
|
|
|
d1.channel_name,
|
|
|
|
|
s1.account_open_datetime
|
|
|
|
|
FROM source_table s1
|
|
|
|
|
left JOIN dim_table d1 ON s1.channel_id=d1.channel_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL DWS 层
|
|
|
|
|
-- 实际应用中,源表为DWD TOPIC
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE source_table (
|
|
|
|
|
account_number STRING,
|
|
|
|
|
channel_id INT,
|
|
|
|
|
channel_name STRING,
|
|
|
|
|
account_open_datetime TIMESTAMP(3)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
|
|
|
|
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
|
|
|
|
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
|
|
|
|
'fields.channel_name.expression' = '#{app.name}' ,-- 渠道名称
|
|
|
|
|
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期,每5秒一条数据
|
|
|
|
|
);
|
|
|
|
|
DROP TABLE source_table
|
|
|
|
|
SELECT * FROM source_table
|
|
|
|
|
|
|
|
|
|
-- 定义结果表,实际应用中应选择Kafka作为结果表
|
|
|
|
|
CREATE TABLE sink_table (
|
|
|
|
|
channel_id STRING,
|
|
|
|
|
open_date STRING,
|
|
|
|
|
cnt INT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'stream-x'
|
|
|
|
|
);
|
|
|
|
|
DROP TABLE sink_table
|
|
|
|
|
SELECT * FROM sink_table
|
|
|
|
|
|
|
|
|
|
-- 写入数据到结果表
|
|
|
|
|
INSERT INTO sink_table
|
|
|
|
|
SELECT
|
|
|
|
|
channel_id,
|
|
|
|
|
DATE_FORMAT(account_open_datetime,'yyyy-MM-dd'),
|
|
|
|
|
count(account_number)
|
|
|
|
|
FROM source_table
|
|
|
|
|
GROUP BY channel_id,DATE_FORMAT(account_open_datetime,'yyyy-MM-dd')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
-- Flink SQL ADS 层
|
|
|
|
|
-- 实际应用中,源表为DWS TOPIC
|
|
|
|
|
--********************************************************************--
|
|
|
|
|
CREATE TABLE source_table (
|
|
|
|
|
channel_id STRING,
|
|
|
|
|
open_time TIMESTAMP(3),
|
|
|
|
|
cnt INT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
|
|
|
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
|
|
|
|
'fields.open_time.expression' = '#{Date.future ''15'',''5'',''SECONDS''}' ,-- 日期
|
|
|
|
|
'fields.cnt.expression' = '#{number.numberBetween ''1'',''100000000''}'-- 数量
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 实际应用中,结果表为RDB
|
|
|
|
|
CREATE TABLE sink_table (
|
|
|
|
|
open_date STRING,
|
|
|
|
|
cnt INT
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'stream-x'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
-- 写入数据到结果表
|
|
|
|
|
INSERT INTO sink_table
|
|
|
|
|
SELECT DATE_FORMAT(open_time,'yyyy-MM-dd'),count(1);
|