Merge branch 'master' of github.com:DTStack/dt-sql-parser

This commit is contained in:
wewoor 2023-05-25 10:53:40 +08:00
commit 99ee6c0998
13 changed files with 7642 additions and 6560 deletions

View File

@ -16,6 +16,7 @@ dt-sql-parser 是一个基于 [ANTLR4](https://github.com/antlr/antlr4) 开发
- Spark SQL
- Hive SQL
- PL/SQL
- Trino SQL
> 提示:当前的 Parser 是 `Javascript` 语言版本,如果有必要,可以尝试编译 Grammar 文件到其他目标语言
@ -103,7 +104,6 @@ console.log(tokens)
tokenIndex: -1
type: 137
_text: null
text: "SELECT"
},
...
]

View File

@ -26,6 +26,7 @@ Supported SQL:
- Hive SQL
- PL/SQL
- PostgreSQL
- Trino SQL
>Tips: This project is the default for Javascript language, also you can try to compile it to other languages if you need.
@ -121,7 +122,6 @@ console.log(tokens)
tokenIndex: -1
type: 137
_text: null
text: "SELECT"
},
...
]

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -161,6 +161,7 @@ import { SearchedCaseContext } from "./FlinkSqlParser";
import { PositionContext } from "./FlinkSqlParser";
import { FirstContext } from "./FlinkSqlParser";
import { FunctionNameContext } from "./FlinkSqlParser";
import { FunctionParamContext } from "./FlinkSqlParser";
import { DereferenceDefinitionContext } from "./FlinkSqlParser";
import { CorrelationNameContext } from "./FlinkSqlParser";
import { QualifiedNameContext } from "./FlinkSqlParser";
@ -170,7 +171,6 @@ import { MultiUnitsIntervalContext } from "./FlinkSqlParser";
import { ErrorCapturingUnitToUnitIntervalContext } from "./FlinkSqlParser";
import { UnitToUnitIntervalContext } from "./FlinkSqlParser";
import { IntervalValueContext } from "./FlinkSqlParser";
import { IntervalTimeUnitContext } from "./FlinkSqlParser";
import { ColumnAliasContext } from "./FlinkSqlParser";
import { TableAliasContext } from "./FlinkSqlParser";
import { ErrorCapturingIdentifierContext } from "./FlinkSqlParser";
@ -198,11 +198,15 @@ import { BitOperatorContext } from "./FlinkSqlParser";
import { MathOperatorContext } from "./FlinkSqlParser";
import { UnaryOperatorContext } from "./FlinkSqlParser";
import { ConstantContext } from "./FlinkSqlParser";
import { TimePointLiteralContext } from "./FlinkSqlParser";
import { StringLiteralContext } from "./FlinkSqlParser";
import { DecimalLiteralContext } from "./FlinkSqlParser";
import { BooleanLiteralContext } from "./FlinkSqlParser";
import { SetQuantifierContext } from "./FlinkSqlParser";
import { ReservedKeywordsUsedAsFunctionNameContext } from "./FlinkSqlParser";
import { TimePointUnitContext } from "./FlinkSqlParser";
import { TimeIntervalUnitContext } from "./FlinkSqlParser";
import { ReservedKeywordsUsedAsFuncParamContext } from "./FlinkSqlParser";
import { ReservedKeywordsUsedAsFuncNameContext } from "./FlinkSqlParser";
import { ReservedKeywordsContext } from "./FlinkSqlParser";
import { NonReservedKeywordsContext } from "./FlinkSqlParser";
@ -1842,6 +1846,16 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
* @param ctx the parse tree
*/
exitFunctionName?: (ctx: FunctionNameContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.functionParam`.
* @param ctx the parse tree
*/
enterFunctionParam?: (ctx: FunctionParamContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.functionParam`.
* @param ctx the parse tree
*/
exitFunctionParam?: (ctx: FunctionParamContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.dereferenceDefinition`.
* @param ctx the parse tree
@ -1932,16 +1946,6 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
* @param ctx the parse tree
*/
exitIntervalValue?: (ctx: IntervalValueContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.intervalTimeUnit`.
* @param ctx the parse tree
*/
enterIntervalTimeUnit?: (ctx: IntervalTimeUnitContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.intervalTimeUnit`.
* @param ctx the parse tree
*/
exitIntervalTimeUnit?: (ctx: IntervalTimeUnitContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.columnAlias`.
* @param ctx the parse tree
@ -2222,6 +2226,16 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
* @param ctx the parse tree
*/
exitConstant?: (ctx: ConstantContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.timePointLiteral`.
* @param ctx the parse tree
*/
enterTimePointLiteral?: (ctx: TimePointLiteralContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.timePointLiteral`.
* @param ctx the parse tree
*/
exitTimePointLiteral?: (ctx: TimePointLiteralContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.stringLiteral`.
* @param ctx the parse tree
@ -2263,15 +2277,45 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
*/
exitSetQuantifier?: (ctx: SetQuantifierContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFunctionName`.
* Enter a parse tree produced by `FlinkSqlParser.timePointUnit`.
* @param ctx the parse tree
*/
enterReservedKeywordsUsedAsFunctionName?: (ctx: ReservedKeywordsUsedAsFunctionNameContext) => void;
enterTimePointUnit?: (ctx: TimePointUnitContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFunctionName`.
* Exit a parse tree produced by `FlinkSqlParser.timePointUnit`.
* @param ctx the parse tree
*/
exitReservedKeywordsUsedAsFunctionName?: (ctx: ReservedKeywordsUsedAsFunctionNameContext) => void;
exitTimePointUnit?: (ctx: TimePointUnitContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.timeIntervalUnit`.
* @param ctx the parse tree
*/
enterTimeIntervalUnit?: (ctx: TimeIntervalUnitContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.timeIntervalUnit`.
* @param ctx the parse tree
*/
exitTimeIntervalUnit?: (ctx: TimeIntervalUnitContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncParam`.
* @param ctx the parse tree
*/
enterReservedKeywordsUsedAsFuncParam?: (ctx: ReservedKeywordsUsedAsFuncParamContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncParam`.
* @param ctx the parse tree
*/
exitReservedKeywordsUsedAsFuncParam?: (ctx: ReservedKeywordsUsedAsFuncParamContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncName`.
* @param ctx the parse tree
*/
enterReservedKeywordsUsedAsFuncName?: (ctx: ReservedKeywordsUsedAsFuncNameContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncName`.
* @param ctx the parse tree
*/
exitReservedKeywordsUsedAsFuncName?: (ctx: ReservedKeywordsUsedAsFuncNameContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywords`.
* @param ctx the parse tree

View File

@ -161,6 +161,7 @@ import { SearchedCaseContext } from "./FlinkSqlParser";
import { PositionContext } from "./FlinkSqlParser";
import { FirstContext } from "./FlinkSqlParser";
import { FunctionNameContext } from "./FlinkSqlParser";
import { FunctionParamContext } from "./FlinkSqlParser";
import { DereferenceDefinitionContext } from "./FlinkSqlParser";
import { CorrelationNameContext } from "./FlinkSqlParser";
import { QualifiedNameContext } from "./FlinkSqlParser";
@ -170,7 +171,6 @@ import { MultiUnitsIntervalContext } from "./FlinkSqlParser";
import { ErrorCapturingUnitToUnitIntervalContext } from "./FlinkSqlParser";
import { UnitToUnitIntervalContext } from "./FlinkSqlParser";
import { IntervalValueContext } from "./FlinkSqlParser";
import { IntervalTimeUnitContext } from "./FlinkSqlParser";
import { ColumnAliasContext } from "./FlinkSqlParser";
import { TableAliasContext } from "./FlinkSqlParser";
import { ErrorCapturingIdentifierContext } from "./FlinkSqlParser";
@ -198,11 +198,15 @@ import { BitOperatorContext } from "./FlinkSqlParser";
import { MathOperatorContext } from "./FlinkSqlParser";
import { UnaryOperatorContext } from "./FlinkSqlParser";
import { ConstantContext } from "./FlinkSqlParser";
import { TimePointLiteralContext } from "./FlinkSqlParser";
import { StringLiteralContext } from "./FlinkSqlParser";
import { DecimalLiteralContext } from "./FlinkSqlParser";
import { BooleanLiteralContext } from "./FlinkSqlParser";
import { SetQuantifierContext } from "./FlinkSqlParser";
import { ReservedKeywordsUsedAsFunctionNameContext } from "./FlinkSqlParser";
import { TimePointUnitContext } from "./FlinkSqlParser";
import { TimeIntervalUnitContext } from "./FlinkSqlParser";
import { ReservedKeywordsUsedAsFuncParamContext } from "./FlinkSqlParser";
import { ReservedKeywordsUsedAsFuncNameContext } from "./FlinkSqlParser";
import { ReservedKeywordsContext } from "./FlinkSqlParser";
import { NonReservedKeywordsContext } from "./FlinkSqlParser";
@ -1188,6 +1192,12 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
* @return the visitor result
*/
visitFunctionName?: (ctx: FunctionNameContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.functionParam`.
* @param ctx the parse tree
* @return the visitor result
*/
visitFunctionParam?: (ctx: FunctionParamContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.dereferenceDefinition`.
* @param ctx the parse tree
@ -1242,12 +1252,6 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
* @return the visitor result
*/
visitIntervalValue?: (ctx: IntervalValueContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.intervalTimeUnit`.
* @param ctx the parse tree
* @return the visitor result
*/
visitIntervalTimeUnit?: (ctx: IntervalTimeUnitContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.columnAlias`.
* @param ctx the parse tree
@ -1415,6 +1419,12 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
* @return the visitor result
*/
visitConstant?: (ctx: ConstantContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.timePointLiteral`.
* @param ctx the parse tree
* @return the visitor result
*/
visitTimePointLiteral?: (ctx: TimePointLiteralContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.stringLiteral`.
* @param ctx the parse tree
@ -1440,11 +1450,29 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
*/
visitSetQuantifier?: (ctx: SetQuantifierContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFunctionName`.
* Visit a parse tree produced by `FlinkSqlParser.timePointUnit`.
* @param ctx the parse tree
* @return the visitor result
*/
visitReservedKeywordsUsedAsFunctionName?: (ctx: ReservedKeywordsUsedAsFunctionNameContext) => Result;
visitTimePointUnit?: (ctx: TimePointUnitContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.timeIntervalUnit`.
* @param ctx the parse tree
* @return the visitor result
*/
visitTimeIntervalUnit?: (ctx: TimeIntervalUnitContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncParam`.
* @param ctx the parse tree
* @return the visitor result
*/
visitReservedKeywordsUsedAsFuncParam?: (ctx: ReservedKeywordsUsedAsFuncParamContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncName`.
* @param ctx the parse tree
* @return the visitor result
*/
visitReservedKeywordsUsedAsFuncName?: (ctx: ReservedKeywordsUsedAsFuncNameContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywords`.
* @param ctx the parse tree

View File

@ -5,7 +5,6 @@ describe('FlinkSQL Lexer tests', () => {
const sql = 'SELECT * FROM table1';
const tokens = parser.getAllTokens(sql);
test('token counts', () => {
expect(tokens.length - 1).toBe(7);
});

View File

@ -0,0 +1,16 @@
import fs from 'fs';
import path from 'path';
import FlinkSQL from "../../../../src/parser/flinksql";
// 综合测试的 sql 不做切割
const features = {
templates: fs.readFileSync(path.join(__dirname, 'fixtures', 'templates.sql'), 'utf-8')
};
describe('FlinkSQL Comprehensive Tests', () => {
const parser = new FlinkSQL();
test('Stream SQL templates', () => {
expect(parser.validate(features.templates).length).toBe(0);
});
});

View File

@ -0,0 +1,884 @@
--********************************************************************--
-- Flink SQL 快速入门示例 创建表
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 执行创建临时表 DDL不需要指定catalog.database
CREATE TABLE orders (
order_uid BIGINT,
product_id BIGINT,
price DECIMAL(32, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);
--********************************************************************--
-- Flink SQL 快速入门示例 INSERT INTO
--********************************************************************--
-- 定义数据源表
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 定义结果表,实际应用中会选择 Kafka、JDBC 等作为结果表
CREATE TABLE client_errors (
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM server_logs
WHERE
status_code SIMILAR TO '4[0-9][0-9]';
--********************************************************************--
-- Flink SQL 快速入门示例 Statement Set
--********************************************************************--
-- 定义数据源表
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT,
WATERMARK FOR log_time AS log_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 定义结果表1
CREATE TABLE aggregations1 (
`browser` STRING,
`status_code` STRING,
`end_time` TIMESTAMP(3),
`requests` BIGINT NOT NULL
) WITH (
'connector' = 'blackhole'
);
-- 定义结果表2
CREATE TABLE aggregations2 (
`browser` STRING,
`status_code` STRING,
`requests` BIGINT NOT NULL
) WITH (
'connector' = 'stream-x'
);
-- This is a shared view that will be used by both insert into statements
CREATE VIEW browsers AS
SELECT
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
status_code,
log_time
FROM server_logs;
-- 封装多个INSERT INTO语句到一个STATEMENT SET语句中
BEGIN STATEMENT SET;
-- 5min窗口粒度聚合
INSERT INTO aggregations1
SELECT
browser,
status_code,
TUMBLE_ROWTIME(log_time, INTERVAL '5' MINUTE) AS end_time,
COUNT(*) requests
FROM browsers
GROUP BY
browser,
status_code,
TUMBLE(log_time, INTERVAL '5' MINUTE);
-- 1h窗口粒度聚合
INSERT INTO aggregations2
SELECT
browser,
status_code,
COUNT(*) requests
FROM browsers
GROUP BY
browser,
status_code,
TUMBLE(log_time, INTERVAL '1' HOUR);
END;
--********************************************************************--
-- Flink SQL 快速入门示例 Watermark
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.doctor_sightings (
doctor STRING,
sighting_time TIMESTAMP(3),
-- 通过watermark把sighting_time标识为事件时间定义最大的乱序时间期望所有的记录在目击发生后的15秒内都到达。
WATERMARK FOR sighting_time AS sighting_time - INTERVAL '15' SECONDS
) WITH (
'connector' = 'faker',
'fields.doctor.expression' = '#{dr_who.the_doctors}',
'fields.sighting_time.expression' = '#{date.past ''15'',''SECONDS''}'
);
SELECT
doctor,
-- 在滚动窗口中使用sighting_time字段
TUMBLE_ROWTIME(sighting_time, INTERVAL '1' MINUTE) AS sighting_time,
COUNT(*) AS sightings
FROM dt_catalog.dt_db.doctor_sightings
GROUP BY
TUMBLE(sighting_time, INTERVAL '1' MINUTE),
doctor;
--********************************************************************--
-- Flink SQL 快速入门示例 GROUP BY
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 定义数据源表
CREATE TABLE dt_catalog.dt_db.server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 采样user_agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A
-- 正则表达式: '[^\/]+' (匹配 '/' 之前的所有字符)
SELECT
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
status_code,
COUNT(*) AS cnt_status
FROM dt_catalog.dt_db.server_logs
-- 按浏览器和状态码两个维度统计日志数量
GROUP BY
REGEXP_EXTRACT(user_agent,'[^\/]+'),
status_code;
--********************************************************************--
-- Flink SQL 快速入门示例 滚动窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
request_line STRING,
status_code STRING,
log_time AS PROCTIME() -- 使用当前系统处理时间作为表的时间字段
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
);
-- 按 window_start, window_end 维度计算每分钟窗口上不同的 ip 数量
SELECT window_start, window_end, COUNT(DISTINCT client_ip) AS ip_addresses
FROM TABLE(
-- 定义1min滑动窗口
TUMBLE(TABLE dt_catalog.dt_db.server_logs, DESCRIPTOR(log_time), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end;
--********************************************************************--
-- Flink SQL 快速入门示例 滑动窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.bids (
bid_id STRING,
currency_code STRING,
bid_price DOUBLE,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECONDS -- 定义事件时间允许的最大窗口延迟为5s
) WITH (
'connector' = 'faker',
'fields.bid_id.expression' = '#{Internet.UUID}',
'fields.currency_code.expression' = '#{regexify ''(EUR|USD|CNY)''}',
'fields.bid_price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 定义1min 的滑动窗口,每隔 30s 滚动一次
SELECT window_start, window_end, currency_code, ROUND(AVG(bid_price),2) AS MovingAverageBidPrice
FROM TABLE(
HOP(TABLE dt_catalog.dt_db.bids, DESCRIPTOR(transaction_time), INTERVAL '30' SECONDS, INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, currency_code;
--********************************************************************--
-- Flink SQL 快速入门示例 累计窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 商品销售订单表
CREATE TABLE dt_catalog.dt_db.orders (
order_id BIGINT, -- 订单ID
goods_id BIGINT, -- 商品ID
goods_sales DOUBLE, -- 商品销售额
order_time TIMESTAMP(3), -- 下单时间
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS -- 定义事件时间允许的最大窗口延迟为5s
) WITH (
'connector' = 'faker',
'fields.order_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
'fields.goods_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
'fields.goods_sales.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.order_time.expression' = '#{date.past ''30'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 每分钟更新一次从零点开始截止到当前时刻的累计销售额
SELECT
window_start,
window_end,
SUM(goods_sales) AS cumulate_gmv -- 当天累计销售额
FROM TABLE(
-- 定义窗口最大长度为一天的累计窗口窗口滚动步长为1分钟
CUMULATE(
TABLE dt_catalog.dt_db.orders,
DESCRIPTOR(order_time),
INTERVAL '1' MINUTES,
INTERVAL '1' DAY))
GROUP BY window_start, window_end;
--********************************************************************--
-- Flink SQL 快速入门示例 会话窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS -- 定义 watermark
) WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '#{regexify ''(morsapaes|knauf|sjwiesman){1}''}',
'fields.log_time.expression' = '#{date.past ''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
);
SELECT
userid,
SESSION_START(log_time, INTERVAL '10' SECOND) AS session_beg,
SESSION_ROWTIME(log_time, INTERVAL '10' SECOND) AS session_end,
COUNT(request_line) AS request_cnt
FROM dt_catalog.dt_db.server_logs
WHERE status_code = '403'
GROUP BY
userid,
-- 会话窗口的最大空闲间隔为10s当10s内该窗口没有接收到新的请求会关闭当前窗口
SESSION(log_time, INTERVAL '10' SECOND);
--********************************************************************--
-- Flink SQL 快速入门示例 OVER窗口聚合
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.temperature_measurements (
measurement_time TIMESTAMP(3),
city STRING,
temperature FLOAT,
WATERMARK FOR measurement_time AS measurement_time - INTERVAL '15' SECONDS -- 定义时间属性字段OVER窗口排序时使用
) WITH (
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
'fields.measurement_time.expression' = '#{date.past ''15'',''SECONDS''}',
'fields.temperature.expression' = '#{number.numberBetween ''0'',''50''}',
'fields.city.expression' = '#{regexify ''(Chicago|Munich|Berlin|Portland|Hangzhou|Seatle|Beijing|New York){1}''}'
);
SELECT
measurement_time,
city,
temperature,
AVG(CAST(temperature AS FLOAT)) OVER last_minute AS avg_temperature_minute, -- 计算平均值
MAX(temperature) OVER last_minute AS min_temperature_minute, -- 计算最大值
MIN(temperature) OVER last_minute AS max_temperature_minute, -- 计算最小值
STDDEV(CAST(temperature AS FLOAT)) OVER last_minute AS stdev_temperature_minute -- 计算标准差
FROM dt_catalog.dt_db.temperature_measurements
WINDOW last_minute AS ( -- 定义1min时间间隔的OVER窗口按城市粒度分区温度测量值排序每个元素都会触发一次计算
PARTITION BY city
ORDER BY measurement_time
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
);
--********************************************************************--
-- Flink SQL 快速入门示例 级联窗口聚合
--********************************************************************--
CREATE TEMPORARY TABLE server_logs (
log_time TIMESTAMP(3),
client_ip STRING,
client_identity STRING,
userid STRING,
request_line STRING,
status_code STRING,
size INT,
WATERMARK FOR log_time AS log_time - INTERVAL '15' SECONDS -- 定义watermark
) WITH (
'connector' = 'faker',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
-- 1min聚合结果表
CREATE TEMPORARY TABLE avg_request_size_1m (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
avg_size BIGINT
) WITH (
'connector' = 'blackhole'
);
-- 5min聚合结果表
CREATE TEMPORARY TABLE avg_request_size_5m (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
avg_size BIGINT
) WITH (
'connector' = 'blackhole'
);
-- 1min窗口查询结果
CREATE VIEW server_logs_window_1m AS
SELECT
TUMBLE_START(log_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_ROWTIME(log_time, INTERVAL '1' MINUTE) AS window_end,
SUM(size) AS total_size,
COUNT(*) AS num_requests
FROM server_logs
GROUP BY
TUMBLE(log_time, INTERVAL '1' MINUTE);
-- 基于1min窗口查询结果进行5min粒度窗口聚合
CREATE VIEW server_logs_window_5m AS
SELECT
TUMBLE_START(window_end, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_ROWTIME(window_end, INTERVAL '5' MINUTE) AS window_end,
SUM(total_size) AS total_size,
SUM(num_requests) AS num_requests
FROM server_logs_window_1m
GROUP BY
TUMBLE(window_end, INTERVAL '5' MINUTE);
BEGIN STATEMENT SET;
-- 写入结果到1min窗口粒度结果表
INSERT INTO avg_request_size_1m SELECT
window_start,
window_end,
total_size / num_requests AS avg_size
FROM server_logs_window_1m;
-- 写入结果到5min窗口粒度结果表
INSERT INTO avg_request_size_5m SELECT
window_start,
window_end,
total_size / num_requests AS avg_size
FROM server_logs_window_5m;
END;
--********************************************************************--
-- Flink SQL 快速入门示例 去重
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.orders (
id INT,
order_time AS CURRENT_TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='100'
);
-- 对于每个order_id按事件时间去重只保留最新时间的记录即可实现去重
SELECT
order_id,
order_time
FROM (
SELECT id AS order_id,
order_time,
-- 按事件时间升序排序
ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
FROM dt_catalog.dt_db.orders)
WHERE rownum = 1; -- 只取排名第一的记录去重是Top-N的一种特例
--********************************************************************--
-- Flink SQL 快速入门示例 Top-N
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.spells_cast (
wizard STRING,
spell STRING
) WITH (
'connector' = 'faker',
'fields.wizard.expression' = '#{harry_potter.characters}',
'fields.spell.expression' = '#{harry_potter.spells}'
);
-- 找出每个巫师最喜欢的两个法术
SELECT wizard, spell, times_cast
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num -- 按法术次数降序排序
FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM dt_catalog.dt_db.spells_cast GROUP BY wizard, spell) -- 计算每个巫师施展的各种法术的次数
)
WHERE row_num <= 2;
--********************************************************************--
-- Flink SQL 快速入门示例 窗口Top-N
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.orders (
bidtime TIMESTAMP(3),
price DOUBLE,
item STRING,
supplier STRING,
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS -- 定义watermark
) WITH (
'connector' = 'faker',
'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.item.expression' = '#{Commerce.productName}',
'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
'rows-per-second' = '100'
);
-- 取出销售排名前三的供应商
SELECT *
FROM (
-- 按窗口时间分区,按价格降序排序
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) AS rownum
FROM (
-- 计算每个窗口内各个供应商的销售额
SELECT window_start, window_end, supplier, SUM(price) AS price, COUNT(*) AS cnt
FROM TABLE(
TUMBLE(TABLE dt_catalog.dt_db.orders, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier
)
)
WHERE rownum <= 3;
--********************************************************************--
-- Flink SQL 快速入门示例 模式检测CEP
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.subscriptions (
id STRING,
user_id INT,
type STRING,
start_date TIMESTAMP(3),
end_date TIMESTAMP(3),
payment_expiration TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.uuid}',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
'fields.end_date.expression' = '#{date.future ''15'',''DAYS''}',
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);
SELECT *
FROM dt_catalog.dt_db.subscriptions
MATCH_RECOGNIZE ( -- 按user_id分区按处理时间proc_time升序排序
PARTITION BY user_id
ORDER BY proc_time
MEASURES
LAST(PREMIUM.type) AS premium_type,
AVG(TIMESTAMPDIFF(DAY,PREMIUM.start_date,PREMIUM.end_date)) AS premium_avg_duration,
BASIC.start_date AS downgrade_date
AFTER MATCH SKIP PAST LAST ROW
--: premiumplatinum
--'user_id''basic'
PATTERN (PREMIUM+ BASIC)
DEFINE PREMIUM AS PREMIUM.type IN ('premium','platinum'),
BASIC AS BASIC.type = 'basic');
--********************************************************************--
-- Flink SQL 快速入门示例 Regular Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.NOC (
agent_id STRING,
codename STRING
) WITH (
'connector' = 'faker',
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
'fields.codename.expression' = '#{superhero.name}',
'number-of-rows' = '10'
);
CREATE TABLE dt_catalog.dt_db.RealNames (
agent_id STRING,
name STRING
) WITH (
'connector' = 'faker',
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
'fields.name.expression' = '#{Name.full_name}',
'number-of-rows' = '10'
);
-- 使用agent_id作为两张表关联的条件左右两边任何一张表来了新数据都会触发join动作
SELECT
name,
codename
FROM dt_catalog.dt_db.NOC t1
INNER JOIN dt_catalog.dt_db.RealNames t2 ON t1.agent_id = t2.agent_id;
--********************************************************************--
-- Flink SQL 快速入门示例 Interval Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.orders (
id INT,
order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='1000'
);
CREATE TABLE dt_catalog.dt_db.shipments (
id INT,
order_id INT,
shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)) AS INT), CURRENT_TIMESTAMP)
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.id.kind'='random',
'fields.id.min'='0',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='1000'
);
-- order表的每条数据会与shipments表过去三天至当前时刻时间范围内的数据进行join
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM dt_catalog.dt_db.orders o
JOIN dt_catalog.dt_db.shipments s ON o.id = s.order_id
WHERE
-- 时间 join 条件shipments.shipment_time - INTERVAL '3' DAY <= orders.order_time < shipments.shipment_time
o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;
--********************************************************************--
-- Flink SQL 快速入门示例 时态表Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
-- 使用主键约束和watermark来定义一张版本表这张表可以是一个cdc表、upsert类型的kafka topic等
CREATE TABLE dt_catalog.dt_db.currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS, -- 定义事件时间
PRIMARY KEY (currency_code) NOT ENFORCED -- 定义主键
) WITH (
'connector' = 'faker',
'fields.currency_code.expression' = '#{Currency.code}',
'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 这是一个append-only类型的动态表需要定义watermk
CREATE TABLE dt_catalog.dt_db.transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS --watermark
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.UUID}',
'fields.currency_code.expression' = '#{Currency.code}',
'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
'rows-per-second' = '100'
);
-- 当左右两张表的watermark对齐时才会触发join动作左右两张表都需要定义watermark
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM dt_catalog.dt_db.transactions t
-- transactions表每条记录都与currency_rates表transaction_time时刻的汇率进行join
JOIN dt_catalog.dt_db.currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
-- 指定join key
ON t.currency_code = c.currency_code;
--********************************************************************--
-- Flink SQL 快速入门示例 维表Join
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
--********************************************************************--
CREATE TABLE dt_catalog.dt_db.subscriptions (
id STRING,
user_id INT,
type STRING,
start_date TIMESTAMP(3),
end_date TIMESTAMP(3),
payment_expiration TIMESTAMP(3),
proc_time AS PROCTIME() -- 这里需要定义处理时间属性
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.uuid}',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
'fields.end_date.expression' = '#{date.future ''365'',''DAYS''}',
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);
-- 定义维表为了示例能直接运行这里使用faker 作为维表实际应用中一般会使用JDBC、Redis、Hbase等作为维表
CREATE TABLE dt_catalog.dt_db.users (
user_id INT PRIMARY KEY, -- 定义主键
user_name VARCHAR(255) NOT NULL,
age INT NOT NULL
) WITH (
'connector' = 'faker',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''10''}',
'fields.user_name.expression' = '#{regexify ''(ron|jark|danny){1}''}',
'fields.age.expression' = '#{number.numberBetween ''1'',''30''}'
);
SELECT
id AS subscription_id,
type AS subscription_type,
age AS user_age,
CASE
WHEN age < 18 THEN 1
ELSE 0
END AS is_minor
FROM dt_catalog.dt_db.subscriptions usub
-- subscriptions每条记录都使用当前系统时间与维表users中的最新数据进行join
JOIN dt_catalog.dt_db.users FOR SYSTEM_TIME AS OF usub.proc_time AS u
-- 指定join key
ON usub.user_id = u.user_id;
--********************************************************************--
-- Flink SQL ODS 层
-- 实际应用中该任务应该是一个采集任务源表为RDB
--********************************************************************--
CREATE TABLE source_table (
account_number STRING,
channel_id INT,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期每5秒一条数据
);
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
CREATE TABLE sink_table (
account_number STRING,
channel_id INT,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO sink_table SELECT * FROM source_table
--********************************************************************--
-- Flink SQL DWD 层
-- 实际应用中,源表为 ODS TOPIC
--********************************************************************--
CREATE TABLE source_table (
account_number STRING,
channel_id INT,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期每5秒一条数据
);
-- 定义维表实际应用中应选择RDB作为维表
CREATE TABLE dim_table (
channel_id INT,
channel_name STRING,
PRIMARY KEY (channel_id,channel_name) NOT ENFORCED
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.channel_name.expression' = '#{app.name}' -- 渠道名称
);
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
CREATE TABLE sink_table (
account_number STRING,
channel_id INT,
channel_name STRING,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO sink_table
SELECT
s1.account_number,
s1. channel_id ,
d1.channel_name,
s1.account_open_datetime
FROM source_table s1
left JOIN dim_table d1 ON s1.channel_id=d1.channel_id
--********************************************************************--
-- Flink SQL DWS 层
-- 实际应用中源表为DWD TOPIC
--********************************************************************--
CREATE TABLE source_table (
account_number STRING,
channel_id INT,
channel_name STRING,
account_open_datetime TIMESTAMP(3)
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.channel_name.expression' = '#{app.name}' ,-- 渠道名称
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期每5秒一条数据
);
DROP TABLE source_table
SELECT * FROM source_table
-- 定义结果表实际应用中应选择Kafka作为结果表
CREATE TABLE sink_table (
channel_id STRING,
open_date STRING,
cnt INT
) WITH (
'connector' = 'stream-x'
);
DROP TABLE sink_table
SELECT * FROM sink_table
-- 写入数据到结果表
INSERT INTO sink_table
SELECT
channel_id,
DATE_FORMAT(account_open_datetime,'yyyy-MM-dd'),
count(account_number)
FROM source_table
GROUP BY channel_id,DATE_FORMAT(account_open_datetime,'yyyy-MM-dd')
--********************************************************************--
-- Flink SQL ADS 层
-- 实际应用中源表为DWS TOPIC
--********************************************************************--
CREATE TABLE source_table (
channel_id STRING,
open_time TIMESTAMP(3),
cnt INT
) WITH (
'connector' = 'faker', -- 使用 Flink Faker Connector
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
'fields.open_time.expression' = '#{Date.future ''15'',''5'',''SECONDS''}' ,-- 日期
'fields.cnt.expression' = '#{number.numberBetween ''1'',''100000000''}'-- 数量
);
-- 实际应用中结果表为RDB
CREATE TABLE sink_table (
open_date STRING,
cnt INT
) WITH (
'connector' = 'stream-x'
);
-- 写入数据到结果表
INSERT INTO sink_table
SELECT DATE_FORMAT(open_time,'yyyy-MM-dd'),count(1);