test: add flink template test cases (#111)
* feat: rebuild flink sql parser * test: add sqls templates in stream app test cases
This commit is contained in:
parent
89b22af5ce
commit
3509964afc
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -161,6 +161,7 @@ import { SearchedCaseContext } from "./FlinkSqlParser";
|
|||||||
import { PositionContext } from "./FlinkSqlParser";
|
import { PositionContext } from "./FlinkSqlParser";
|
||||||
import { FirstContext } from "./FlinkSqlParser";
|
import { FirstContext } from "./FlinkSqlParser";
|
||||||
import { FunctionNameContext } from "./FlinkSqlParser";
|
import { FunctionNameContext } from "./FlinkSqlParser";
|
||||||
|
import { FunctionParamContext } from "./FlinkSqlParser";
|
||||||
import { DereferenceDefinitionContext } from "./FlinkSqlParser";
|
import { DereferenceDefinitionContext } from "./FlinkSqlParser";
|
||||||
import { CorrelationNameContext } from "./FlinkSqlParser";
|
import { CorrelationNameContext } from "./FlinkSqlParser";
|
||||||
import { QualifiedNameContext } from "./FlinkSqlParser";
|
import { QualifiedNameContext } from "./FlinkSqlParser";
|
||||||
@ -170,7 +171,6 @@ import { MultiUnitsIntervalContext } from "./FlinkSqlParser";
|
|||||||
import { ErrorCapturingUnitToUnitIntervalContext } from "./FlinkSqlParser";
|
import { ErrorCapturingUnitToUnitIntervalContext } from "./FlinkSqlParser";
|
||||||
import { UnitToUnitIntervalContext } from "./FlinkSqlParser";
|
import { UnitToUnitIntervalContext } from "./FlinkSqlParser";
|
||||||
import { IntervalValueContext } from "./FlinkSqlParser";
|
import { IntervalValueContext } from "./FlinkSqlParser";
|
||||||
import { IntervalTimeUnitContext } from "./FlinkSqlParser";
|
|
||||||
import { ColumnAliasContext } from "./FlinkSqlParser";
|
import { ColumnAliasContext } from "./FlinkSqlParser";
|
||||||
import { TableAliasContext } from "./FlinkSqlParser";
|
import { TableAliasContext } from "./FlinkSqlParser";
|
||||||
import { ErrorCapturingIdentifierContext } from "./FlinkSqlParser";
|
import { ErrorCapturingIdentifierContext } from "./FlinkSqlParser";
|
||||||
@ -198,11 +198,15 @@ import { BitOperatorContext } from "./FlinkSqlParser";
|
|||||||
import { MathOperatorContext } from "./FlinkSqlParser";
|
import { MathOperatorContext } from "./FlinkSqlParser";
|
||||||
import { UnaryOperatorContext } from "./FlinkSqlParser";
|
import { UnaryOperatorContext } from "./FlinkSqlParser";
|
||||||
import { ConstantContext } from "./FlinkSqlParser";
|
import { ConstantContext } from "./FlinkSqlParser";
|
||||||
|
import { TimePointLiteralContext } from "./FlinkSqlParser";
|
||||||
import { StringLiteralContext } from "./FlinkSqlParser";
|
import { StringLiteralContext } from "./FlinkSqlParser";
|
||||||
import { DecimalLiteralContext } from "./FlinkSqlParser";
|
import { DecimalLiteralContext } from "./FlinkSqlParser";
|
||||||
import { BooleanLiteralContext } from "./FlinkSqlParser";
|
import { BooleanLiteralContext } from "./FlinkSqlParser";
|
||||||
import { SetQuantifierContext } from "./FlinkSqlParser";
|
import { SetQuantifierContext } from "./FlinkSqlParser";
|
||||||
import { ReservedKeywordsUsedAsFunctionNameContext } from "./FlinkSqlParser";
|
import { TimePointUnitContext } from "./FlinkSqlParser";
|
||||||
|
import { TimeIntervalUnitContext } from "./FlinkSqlParser";
|
||||||
|
import { ReservedKeywordsUsedAsFuncParamContext } from "./FlinkSqlParser";
|
||||||
|
import { ReservedKeywordsUsedAsFuncNameContext } from "./FlinkSqlParser";
|
||||||
import { ReservedKeywordsContext } from "./FlinkSqlParser";
|
import { ReservedKeywordsContext } from "./FlinkSqlParser";
|
||||||
import { NonReservedKeywordsContext } from "./FlinkSqlParser";
|
import { NonReservedKeywordsContext } from "./FlinkSqlParser";
|
||||||
|
|
||||||
@ -1842,6 +1846,16 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
|
|||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
*/
|
*/
|
||||||
exitFunctionName?: (ctx: FunctionNameContext) => void;
|
exitFunctionName?: (ctx: FunctionNameContext) => void;
|
||||||
|
/**
|
||||||
|
* Enter a parse tree produced by `FlinkSqlParser.functionParam`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
enterFunctionParam?: (ctx: FunctionParamContext) => void;
|
||||||
|
/**
|
||||||
|
* Exit a parse tree produced by `FlinkSqlParser.functionParam`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
exitFunctionParam?: (ctx: FunctionParamContext) => void;
|
||||||
/**
|
/**
|
||||||
* Enter a parse tree produced by `FlinkSqlParser.dereferenceDefinition`.
|
* Enter a parse tree produced by `FlinkSqlParser.dereferenceDefinition`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
@ -1932,16 +1946,6 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
|
|||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
*/
|
*/
|
||||||
exitIntervalValue?: (ctx: IntervalValueContext) => void;
|
exitIntervalValue?: (ctx: IntervalValueContext) => void;
|
||||||
/**
|
|
||||||
* Enter a parse tree produced by `FlinkSqlParser.intervalTimeUnit`.
|
|
||||||
* @param ctx the parse tree
|
|
||||||
*/
|
|
||||||
enterIntervalTimeUnit?: (ctx: IntervalTimeUnitContext) => void;
|
|
||||||
/**
|
|
||||||
* Exit a parse tree produced by `FlinkSqlParser.intervalTimeUnit`.
|
|
||||||
* @param ctx the parse tree
|
|
||||||
*/
|
|
||||||
exitIntervalTimeUnit?: (ctx: IntervalTimeUnitContext) => void;
|
|
||||||
/**
|
/**
|
||||||
* Enter a parse tree produced by `FlinkSqlParser.columnAlias`.
|
* Enter a parse tree produced by `FlinkSqlParser.columnAlias`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
@ -2222,6 +2226,16 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
|
|||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
*/
|
*/
|
||||||
exitConstant?: (ctx: ConstantContext) => void;
|
exitConstant?: (ctx: ConstantContext) => void;
|
||||||
|
/**
|
||||||
|
* Enter a parse tree produced by `FlinkSqlParser.timePointLiteral`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
enterTimePointLiteral?: (ctx: TimePointLiteralContext) => void;
|
||||||
|
/**
|
||||||
|
* Exit a parse tree produced by `FlinkSqlParser.timePointLiteral`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
exitTimePointLiteral?: (ctx: TimePointLiteralContext) => void;
|
||||||
/**
|
/**
|
||||||
* Enter a parse tree produced by `FlinkSqlParser.stringLiteral`.
|
* Enter a parse tree produced by `FlinkSqlParser.stringLiteral`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
@ -2263,15 +2277,45 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
|
|||||||
*/
|
*/
|
||||||
exitSetQuantifier?: (ctx: SetQuantifierContext) => void;
|
exitSetQuantifier?: (ctx: SetQuantifierContext) => void;
|
||||||
/**
|
/**
|
||||||
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFunctionName`.
|
* Enter a parse tree produced by `FlinkSqlParser.timePointUnit`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
*/
|
*/
|
||||||
enterReservedKeywordsUsedAsFunctionName?: (ctx: ReservedKeywordsUsedAsFunctionNameContext) => void;
|
enterTimePointUnit?: (ctx: TimePointUnitContext) => void;
|
||||||
/**
|
/**
|
||||||
* Exit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFunctionName`.
|
* Exit a parse tree produced by `FlinkSqlParser.timePointUnit`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
*/
|
*/
|
||||||
exitReservedKeywordsUsedAsFunctionName?: (ctx: ReservedKeywordsUsedAsFunctionNameContext) => void;
|
exitTimePointUnit?: (ctx: TimePointUnitContext) => void;
|
||||||
|
/**
|
||||||
|
* Enter a parse tree produced by `FlinkSqlParser.timeIntervalUnit`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
enterTimeIntervalUnit?: (ctx: TimeIntervalUnitContext) => void;
|
||||||
|
/**
|
||||||
|
* Exit a parse tree produced by `FlinkSqlParser.timeIntervalUnit`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
exitTimeIntervalUnit?: (ctx: TimeIntervalUnitContext) => void;
|
||||||
|
/**
|
||||||
|
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncParam`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
enterReservedKeywordsUsedAsFuncParam?: (ctx: ReservedKeywordsUsedAsFuncParamContext) => void;
|
||||||
|
/**
|
||||||
|
* Exit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncParam`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
exitReservedKeywordsUsedAsFuncParam?: (ctx: ReservedKeywordsUsedAsFuncParamContext) => void;
|
||||||
|
/**
|
||||||
|
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncName`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
enterReservedKeywordsUsedAsFuncName?: (ctx: ReservedKeywordsUsedAsFuncNameContext) => void;
|
||||||
|
/**
|
||||||
|
* Exit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncName`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
*/
|
||||||
|
exitReservedKeywordsUsedAsFuncName?: (ctx: ReservedKeywordsUsedAsFuncNameContext) => void;
|
||||||
/**
|
/**
|
||||||
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywords`.
|
* Enter a parse tree produced by `FlinkSqlParser.reservedKeywords`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
|
@ -161,6 +161,7 @@ import { SearchedCaseContext } from "./FlinkSqlParser";
|
|||||||
import { PositionContext } from "./FlinkSqlParser";
|
import { PositionContext } from "./FlinkSqlParser";
|
||||||
import { FirstContext } from "./FlinkSqlParser";
|
import { FirstContext } from "./FlinkSqlParser";
|
||||||
import { FunctionNameContext } from "./FlinkSqlParser";
|
import { FunctionNameContext } from "./FlinkSqlParser";
|
||||||
|
import { FunctionParamContext } from "./FlinkSqlParser";
|
||||||
import { DereferenceDefinitionContext } from "./FlinkSqlParser";
|
import { DereferenceDefinitionContext } from "./FlinkSqlParser";
|
||||||
import { CorrelationNameContext } from "./FlinkSqlParser";
|
import { CorrelationNameContext } from "./FlinkSqlParser";
|
||||||
import { QualifiedNameContext } from "./FlinkSqlParser";
|
import { QualifiedNameContext } from "./FlinkSqlParser";
|
||||||
@ -170,7 +171,6 @@ import { MultiUnitsIntervalContext } from "./FlinkSqlParser";
|
|||||||
import { ErrorCapturingUnitToUnitIntervalContext } from "./FlinkSqlParser";
|
import { ErrorCapturingUnitToUnitIntervalContext } from "./FlinkSqlParser";
|
||||||
import { UnitToUnitIntervalContext } from "./FlinkSqlParser";
|
import { UnitToUnitIntervalContext } from "./FlinkSqlParser";
|
||||||
import { IntervalValueContext } from "./FlinkSqlParser";
|
import { IntervalValueContext } from "./FlinkSqlParser";
|
||||||
import { IntervalTimeUnitContext } from "./FlinkSqlParser";
|
|
||||||
import { ColumnAliasContext } from "./FlinkSqlParser";
|
import { ColumnAliasContext } from "./FlinkSqlParser";
|
||||||
import { TableAliasContext } from "./FlinkSqlParser";
|
import { TableAliasContext } from "./FlinkSqlParser";
|
||||||
import { ErrorCapturingIdentifierContext } from "./FlinkSqlParser";
|
import { ErrorCapturingIdentifierContext } from "./FlinkSqlParser";
|
||||||
@ -198,11 +198,15 @@ import { BitOperatorContext } from "./FlinkSqlParser";
|
|||||||
import { MathOperatorContext } from "./FlinkSqlParser";
|
import { MathOperatorContext } from "./FlinkSqlParser";
|
||||||
import { UnaryOperatorContext } from "./FlinkSqlParser";
|
import { UnaryOperatorContext } from "./FlinkSqlParser";
|
||||||
import { ConstantContext } from "./FlinkSqlParser";
|
import { ConstantContext } from "./FlinkSqlParser";
|
||||||
|
import { TimePointLiteralContext } from "./FlinkSqlParser";
|
||||||
import { StringLiteralContext } from "./FlinkSqlParser";
|
import { StringLiteralContext } from "./FlinkSqlParser";
|
||||||
import { DecimalLiteralContext } from "./FlinkSqlParser";
|
import { DecimalLiteralContext } from "./FlinkSqlParser";
|
||||||
import { BooleanLiteralContext } from "./FlinkSqlParser";
|
import { BooleanLiteralContext } from "./FlinkSqlParser";
|
||||||
import { SetQuantifierContext } from "./FlinkSqlParser";
|
import { SetQuantifierContext } from "./FlinkSqlParser";
|
||||||
import { ReservedKeywordsUsedAsFunctionNameContext } from "./FlinkSqlParser";
|
import { TimePointUnitContext } from "./FlinkSqlParser";
|
||||||
|
import { TimeIntervalUnitContext } from "./FlinkSqlParser";
|
||||||
|
import { ReservedKeywordsUsedAsFuncParamContext } from "./FlinkSqlParser";
|
||||||
|
import { ReservedKeywordsUsedAsFuncNameContext } from "./FlinkSqlParser";
|
||||||
import { ReservedKeywordsContext } from "./FlinkSqlParser";
|
import { ReservedKeywordsContext } from "./FlinkSqlParser";
|
||||||
import { NonReservedKeywordsContext } from "./FlinkSqlParser";
|
import { NonReservedKeywordsContext } from "./FlinkSqlParser";
|
||||||
|
|
||||||
@ -1188,6 +1192,12 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
|
|||||||
* @return the visitor result
|
* @return the visitor result
|
||||||
*/
|
*/
|
||||||
visitFunctionName?: (ctx: FunctionNameContext) => Result;
|
visitFunctionName?: (ctx: FunctionNameContext) => Result;
|
||||||
|
/**
|
||||||
|
* Visit a parse tree produced by `FlinkSqlParser.functionParam`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
* @return the visitor result
|
||||||
|
*/
|
||||||
|
visitFunctionParam?: (ctx: FunctionParamContext) => Result;
|
||||||
/**
|
/**
|
||||||
* Visit a parse tree produced by `FlinkSqlParser.dereferenceDefinition`.
|
* Visit a parse tree produced by `FlinkSqlParser.dereferenceDefinition`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
@ -1242,12 +1252,6 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
|
|||||||
* @return the visitor result
|
* @return the visitor result
|
||||||
*/
|
*/
|
||||||
visitIntervalValue?: (ctx: IntervalValueContext) => Result;
|
visitIntervalValue?: (ctx: IntervalValueContext) => Result;
|
||||||
/**
|
|
||||||
* Visit a parse tree produced by `FlinkSqlParser.intervalTimeUnit`.
|
|
||||||
* @param ctx the parse tree
|
|
||||||
* @return the visitor result
|
|
||||||
*/
|
|
||||||
visitIntervalTimeUnit?: (ctx: IntervalTimeUnitContext) => Result;
|
|
||||||
/**
|
/**
|
||||||
* Visit a parse tree produced by `FlinkSqlParser.columnAlias`.
|
* Visit a parse tree produced by `FlinkSqlParser.columnAlias`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
@ -1415,6 +1419,12 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
|
|||||||
* @return the visitor result
|
* @return the visitor result
|
||||||
*/
|
*/
|
||||||
visitConstant?: (ctx: ConstantContext) => Result;
|
visitConstant?: (ctx: ConstantContext) => Result;
|
||||||
|
/**
|
||||||
|
* Visit a parse tree produced by `FlinkSqlParser.timePointLiteral`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
* @return the visitor result
|
||||||
|
*/
|
||||||
|
visitTimePointLiteral?: (ctx: TimePointLiteralContext) => Result;
|
||||||
/**
|
/**
|
||||||
* Visit a parse tree produced by `FlinkSqlParser.stringLiteral`.
|
* Visit a parse tree produced by `FlinkSqlParser.stringLiteral`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
@ -1440,11 +1450,29 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
|
|||||||
*/
|
*/
|
||||||
visitSetQuantifier?: (ctx: SetQuantifierContext) => Result;
|
visitSetQuantifier?: (ctx: SetQuantifierContext) => Result;
|
||||||
/**
|
/**
|
||||||
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFunctionName`.
|
* Visit a parse tree produced by `FlinkSqlParser.timePointUnit`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
* @return the visitor result
|
* @return the visitor result
|
||||||
*/
|
*/
|
||||||
visitReservedKeywordsUsedAsFunctionName?: (ctx: ReservedKeywordsUsedAsFunctionNameContext) => Result;
|
visitTimePointUnit?: (ctx: TimePointUnitContext) => Result;
|
||||||
|
/**
|
||||||
|
* Visit a parse tree produced by `FlinkSqlParser.timeIntervalUnit`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
* @return the visitor result
|
||||||
|
*/
|
||||||
|
visitTimeIntervalUnit?: (ctx: TimeIntervalUnitContext) => Result;
|
||||||
|
/**
|
||||||
|
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncParam`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
* @return the visitor result
|
||||||
|
*/
|
||||||
|
visitReservedKeywordsUsedAsFuncParam?: (ctx: ReservedKeywordsUsedAsFuncParamContext) => Result;
|
||||||
|
/**
|
||||||
|
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywordsUsedAsFuncName`.
|
||||||
|
* @param ctx the parse tree
|
||||||
|
* @return the visitor result
|
||||||
|
*/
|
||||||
|
visitReservedKeywordsUsedAsFuncName?: (ctx: ReservedKeywordsUsedAsFuncNameContext) => Result;
|
||||||
/**
|
/**
|
||||||
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywords`.
|
* Visit a parse tree produced by `FlinkSqlParser.reservedKeywords`.
|
||||||
* @param ctx the parse tree
|
* @param ctx the parse tree
|
||||||
|
16
test/parser/flinksql/syntax/comprehensive.test.ts
Normal file
16
test/parser/flinksql/syntax/comprehensive.test.ts
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
import FlinkSQL from "../../../../src/parser/flinksql";
|
||||||
|
|
||||||
|
// 综合测试的 sql 不做切割
|
||||||
|
const features = {
|
||||||
|
templates: fs.readFileSync(path.join(__dirname, 'fixtures', 'templates.sql'), 'utf-8')
|
||||||
|
};
|
||||||
|
|
||||||
|
describe('FlinkSQL Comprehensive Tests', () => {
|
||||||
|
const parser = new FlinkSQL();
|
||||||
|
test('Stream SQL templates', () => {
|
||||||
|
expect(parser.validate(features.templates).length).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
884
test/parser/flinksql/syntax/fixtures/templates.sql
Normal file
884
test/parser/flinksql/syntax/fixtures/templates.sql
Normal file
@ -0,0 +1,884 @@
|
|||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 创建表
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
-- 执行创建临时表 DDL,不需要指定catalog.database
|
||||||
|
CREATE TABLE orders (
|
||||||
|
order_uid BIGINT,
|
||||||
|
product_id BIGINT,
|
||||||
|
price DECIMAL(32, 2),
|
||||||
|
order_time TIMESTAMP(3)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'datagen'
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 INSERT INTO
|
||||||
|
--********************************************************************--
|
||||||
|
-- 定义数据源表
|
||||||
|
CREATE TABLE server_logs (
|
||||||
|
client_ip STRING,
|
||||||
|
client_identity STRING,
|
||||||
|
userid STRING,
|
||||||
|
user_agent STRING,
|
||||||
|
log_time TIMESTAMP(3),
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
size INT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
||||||
|
'fields.client_identity.expression' = '-',
|
||||||
|
'fields.userid.expression' = '-',
|
||||||
|
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
|
||||||
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
||||||
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
||||||
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
||||||
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义结果表,实际应用中会选择 Kafka、JDBC 等作为结果表
|
||||||
|
CREATE TABLE client_errors (
|
||||||
|
log_time TIMESTAMP(3),
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
size INT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'stream-x'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 写入数据到结果表
|
||||||
|
INSERT INTO client_errors
|
||||||
|
SELECT
|
||||||
|
log_time,
|
||||||
|
request_line,
|
||||||
|
status_code,
|
||||||
|
size
|
||||||
|
FROM server_logs
|
||||||
|
WHERE
|
||||||
|
status_code SIMILAR TO '4[0-9][0-9]';
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 Statement Set
|
||||||
|
--********************************************************************--
|
||||||
|
-- 定义数据源表
|
||||||
|
CREATE TABLE server_logs (
|
||||||
|
client_ip STRING,
|
||||||
|
client_identity STRING,
|
||||||
|
userid STRING,
|
||||||
|
user_agent STRING,
|
||||||
|
log_time TIMESTAMP(3),
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
size INT,
|
||||||
|
WATERMARK FOR log_time AS log_time - INTERVAL '30' SECONDS
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
|
||||||
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
||||||
|
'fields.client_identity.expression' = '-',
|
||||||
|
'fields.userid.expression' = '-',
|
||||||
|
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
|
||||||
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
||||||
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
||||||
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
||||||
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义结果表1
|
||||||
|
CREATE TABLE aggregations1 (
|
||||||
|
`browser` STRING,
|
||||||
|
`status_code` STRING,
|
||||||
|
`end_time` TIMESTAMP(3),
|
||||||
|
`requests` BIGINT NOT NULL
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'blackhole'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义结果表2
|
||||||
|
CREATE TABLE aggregations2 (
|
||||||
|
`browser` STRING,
|
||||||
|
`status_code` STRING,
|
||||||
|
`requests` BIGINT NOT NULL
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'stream-x'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- This is a shared view that will be used by both insert into statements
|
||||||
|
CREATE VIEW browsers AS
|
||||||
|
SELECT
|
||||||
|
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
|
||||||
|
status_code,
|
||||||
|
log_time
|
||||||
|
FROM server_logs;
|
||||||
|
|
||||||
|
-- 封装多个INSERT INTO语句到一个STATEMENT SET语句中
|
||||||
|
BEGIN STATEMENT SET;
|
||||||
|
-- 5min窗口粒度聚合
|
||||||
|
INSERT INTO aggregations1
|
||||||
|
SELECT
|
||||||
|
browser,
|
||||||
|
status_code,
|
||||||
|
TUMBLE_ROWTIME(log_time, INTERVAL '5' MINUTE) AS end_time,
|
||||||
|
COUNT(*) requests
|
||||||
|
FROM browsers
|
||||||
|
GROUP BY
|
||||||
|
browser,
|
||||||
|
status_code,
|
||||||
|
TUMBLE(log_time, INTERVAL '5' MINUTE);
|
||||||
|
-- 1h窗口粒度聚合
|
||||||
|
INSERT INTO aggregations2
|
||||||
|
SELECT
|
||||||
|
browser,
|
||||||
|
status_code,
|
||||||
|
COUNT(*) requests
|
||||||
|
FROM browsers
|
||||||
|
GROUP BY
|
||||||
|
browser,
|
||||||
|
status_code,
|
||||||
|
TUMBLE(log_time, INTERVAL '1' HOUR);
|
||||||
|
END;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 Watermark
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.doctor_sightings (
|
||||||
|
doctor STRING,
|
||||||
|
sighting_time TIMESTAMP(3),
|
||||||
|
-- 通过watermark把sighting_time标识为事件时间,定义最大的乱序时间:期望所有的记录在目击发生后的15秒内都到达。
|
||||||
|
WATERMARK FOR sighting_time AS sighting_time - INTERVAL '15' SECONDS
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.doctor.expression' = '#{dr_who.the_doctors}',
|
||||||
|
'fields.sighting_time.expression' = '#{date.past ''15'',''SECONDS''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
doctor,
|
||||||
|
-- 在滚动窗口中使用sighting_time字段
|
||||||
|
TUMBLE_ROWTIME(sighting_time, INTERVAL '1' MINUTE) AS sighting_time,
|
||||||
|
COUNT(*) AS sightings
|
||||||
|
FROM dt_catalog.dt_db.doctor_sightings
|
||||||
|
GROUP BY
|
||||||
|
TUMBLE(sighting_time, INTERVAL '1' MINUTE),
|
||||||
|
doctor;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 GROUP BY
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
-- 定义数据源表
|
||||||
|
CREATE TABLE dt_catalog.dt_db.server_logs (
|
||||||
|
client_ip STRING,
|
||||||
|
client_identity STRING,
|
||||||
|
userid STRING,
|
||||||
|
user_agent STRING,
|
||||||
|
log_time TIMESTAMP(3),
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
size INT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
||||||
|
'fields.client_identity.expression' = '-',
|
||||||
|
'fields.userid.expression' = '-',
|
||||||
|
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
|
||||||
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
||||||
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
||||||
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
||||||
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 采样user_agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A
|
||||||
|
-- 正则表达式: '[^\/]+' (匹配 '/' 之前的所有字符)
|
||||||
|
SELECT
|
||||||
|
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
|
||||||
|
status_code,
|
||||||
|
COUNT(*) AS cnt_status
|
||||||
|
FROM dt_catalog.dt_db.server_logs
|
||||||
|
-- 按浏览器和状态码两个维度统计日志数量
|
||||||
|
GROUP BY
|
||||||
|
REGEXP_EXTRACT(user_agent,'[^\/]+'),
|
||||||
|
status_code;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 滚动窗口聚合
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.server_logs (
|
||||||
|
client_ip STRING,
|
||||||
|
client_identity STRING,
|
||||||
|
userid STRING,
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
log_time AS PROCTIME() -- 使用当前系统处理时间作为表的时间字段
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
||||||
|
'fields.client_identity.expression' = '-',
|
||||||
|
'fields.userid.expression' = '-',
|
||||||
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
||||||
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
||||||
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 按 window_start, window_end 维度计算每分钟窗口上不同的 ip 数量
|
||||||
|
SELECT window_start, window_end, COUNT(DISTINCT client_ip) AS ip_addresses
|
||||||
|
FROM TABLE(
|
||||||
|
-- 定义1min滑动窗口
|
||||||
|
TUMBLE(TABLE dt_catalog.dt_db.server_logs, DESCRIPTOR(log_time), INTERVAL '1' MINUTE))
|
||||||
|
GROUP BY window_start, window_end;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 滑动窗口聚合
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.bids (
|
||||||
|
bid_id STRING,
|
||||||
|
currency_code STRING,
|
||||||
|
bid_price DOUBLE,
|
||||||
|
transaction_time TIMESTAMP(3),
|
||||||
|
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECONDS -- 定义事件时间,允许的最大窗口延迟为5s
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.bid_id.expression' = '#{Internet.UUID}',
|
||||||
|
'fields.currency_code.expression' = '#{regexify ''(EUR|USD|CNY)''}',
|
||||||
|
'fields.bid_price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
|
||||||
|
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
|
||||||
|
'rows-per-second' = '100'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义1min 的滑动窗口,每隔 30s 滚动一次
|
||||||
|
SELECT window_start, window_end, currency_code, ROUND(AVG(bid_price),2) AS MovingAverageBidPrice
|
||||||
|
FROM TABLE(
|
||||||
|
HOP(TABLE dt_catalog.dt_db.bids, DESCRIPTOR(transaction_time), INTERVAL '30' SECONDS, INTERVAL '1' MINUTE))
|
||||||
|
GROUP BY window_start, window_end, currency_code;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 累计窗口聚合
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
-- 商品销售订单表
|
||||||
|
CREATE TABLE dt_catalog.dt_db.orders (
|
||||||
|
order_id BIGINT, -- 订单ID
|
||||||
|
goods_id BIGINT, -- 商品ID
|
||||||
|
goods_sales DOUBLE, -- 商品销售额
|
||||||
|
order_time TIMESTAMP(3), -- 下单时间
|
||||||
|
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS -- 定义事件时间,允许的最大窗口延迟为5s
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.order_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
|
||||||
|
'fields.goods_id.expression' = '#{number.numberBetween ''0'',''1000000000''}',
|
||||||
|
'fields.goods_sales.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
|
||||||
|
'fields.order_time.expression' = '#{date.past ''30'',''SECONDS''}',
|
||||||
|
'rows-per-second' = '100'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 每分钟更新一次从零点开始截止到当前时刻的累计销售额
|
||||||
|
SELECT
|
||||||
|
window_start,
|
||||||
|
window_end,
|
||||||
|
SUM(goods_sales) AS cumulate_gmv -- 当天累计销售额
|
||||||
|
FROM TABLE(
|
||||||
|
-- 定义窗口最大长度为一天的累计窗口,窗口滚动步长为1分钟
|
||||||
|
CUMULATE(
|
||||||
|
TABLE dt_catalog.dt_db.orders,
|
||||||
|
DESCRIPTOR(order_time),
|
||||||
|
INTERVAL '1' MINUTES,
|
||||||
|
INTERVAL '1' DAY))
|
||||||
|
GROUP BY window_start, window_end;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 会话窗口聚合
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.server_logs (
|
||||||
|
client_ip STRING,
|
||||||
|
client_identity STRING,
|
||||||
|
userid STRING,
|
||||||
|
log_time TIMESTAMP(3),
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS -- 定义 watermark
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'rows-per-second' = '5',
|
||||||
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
||||||
|
'fields.client_identity.expression' = '-',
|
||||||
|
'fields.userid.expression' = '#{regexify ''(morsapaes|knauf|sjwiesman){1}''}',
|
||||||
|
'fields.log_time.expression' = '#{date.past ''5'',''SECONDS''}',
|
||||||
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
||||||
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
userid,
|
||||||
|
SESSION_START(log_time, INTERVAL '10' SECOND) AS session_beg,
|
||||||
|
SESSION_ROWTIME(log_time, INTERVAL '10' SECOND) AS session_end,
|
||||||
|
COUNT(request_line) AS request_cnt
|
||||||
|
FROM dt_catalog.dt_db.server_logs
|
||||||
|
WHERE status_code = '403'
|
||||||
|
GROUP BY
|
||||||
|
userid,
|
||||||
|
-- 会话窗口的最大空闲间隔为10s,当10s内该窗口没有接收到新的请求,会关闭当前窗口
|
||||||
|
SESSION(log_time, INTERVAL '10' SECOND);
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 OVER窗口聚合
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.temperature_measurements (
|
||||||
|
measurement_time TIMESTAMP(3),
|
||||||
|
city STRING,
|
||||||
|
temperature FLOAT,
|
||||||
|
WATERMARK FOR measurement_time AS measurement_time - INTERVAL '15' SECONDS -- 定义时间属性字段,OVER窗口排序时使用
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- Faker 连接器仅在 VVR-4.0.12 及以上支持
|
||||||
|
'fields.measurement_time.expression' = '#{date.past ''15'',''SECONDS''}',
|
||||||
|
'fields.temperature.expression' = '#{number.numberBetween ''0'',''50''}',
|
||||||
|
'fields.city.expression' = '#{regexify ''(Chicago|Munich|Berlin|Portland|Hangzhou|Seatle|Beijing|New York){1}''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
measurement_time,
|
||||||
|
city,
|
||||||
|
temperature,
|
||||||
|
AVG(CAST(temperature AS FLOAT)) OVER last_minute AS avg_temperature_minute, -- 计算平均值
|
||||||
|
MAX(temperature) OVER last_minute AS min_temperature_minute, -- 计算最大值
|
||||||
|
MIN(temperature) OVER last_minute AS max_temperature_minute, -- 计算最小值
|
||||||
|
STDDEV(CAST(temperature AS FLOAT)) OVER last_minute AS stdev_temperature_minute -- 计算标准差
|
||||||
|
FROM dt_catalog.dt_db.temperature_measurements
|
||||||
|
WINDOW last_minute AS ( -- 定义1min时间间隔的OVER窗口,按城市粒度分区,温度测量值排序,每个元素都会触发一次计算
|
||||||
|
PARTITION BY city
|
||||||
|
ORDER BY measurement_time
|
||||||
|
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
|
||||||
|
);
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 级联窗口聚合
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TEMPORARY TABLE server_logs (
|
||||||
|
log_time TIMESTAMP(3),
|
||||||
|
client_ip STRING,
|
||||||
|
client_identity STRING,
|
||||||
|
userid STRING,
|
||||||
|
request_line STRING,
|
||||||
|
status_code STRING,
|
||||||
|
size INT,
|
||||||
|
WATERMARK FOR log_time AS log_time - INTERVAL '15' SECONDS -- 定义watermark
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
|
||||||
|
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
|
||||||
|
'fields.client_identity.expression' = '-',
|
||||||
|
'fields.userid.expression' = '-',
|
||||||
|
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
|
||||||
|
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
|
||||||
|
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 1min聚合结果表
|
||||||
|
CREATE TEMPORARY TABLE avg_request_size_1m (
|
||||||
|
window_start TIMESTAMP(3),
|
||||||
|
window_end TIMESTAMP(3),
|
||||||
|
avg_size BIGINT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'blackhole'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 5min聚合结果表
|
||||||
|
CREATE TEMPORARY TABLE avg_request_size_5m (
|
||||||
|
window_start TIMESTAMP(3),
|
||||||
|
window_end TIMESTAMP(3),
|
||||||
|
avg_size BIGINT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'blackhole'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 1min窗口查询结果
|
||||||
|
CREATE VIEW server_logs_window_1m AS
|
||||||
|
SELECT
|
||||||
|
TUMBLE_START(log_time, INTERVAL '1' MINUTE) AS window_start,
|
||||||
|
TUMBLE_ROWTIME(log_time, INTERVAL '1' MINUTE) AS window_end,
|
||||||
|
SUM(size) AS total_size,
|
||||||
|
COUNT(*) AS num_requests
|
||||||
|
FROM server_logs
|
||||||
|
GROUP BY
|
||||||
|
TUMBLE(log_time, INTERVAL '1' MINUTE);
|
||||||
|
|
||||||
|
-- 基于1min窗口查询结果,进行5min粒度窗口聚合
|
||||||
|
CREATE VIEW server_logs_window_5m AS
|
||||||
|
SELECT
|
||||||
|
TUMBLE_START(window_end, INTERVAL '5' MINUTE) AS window_start,
|
||||||
|
TUMBLE_ROWTIME(window_end, INTERVAL '5' MINUTE) AS window_end,
|
||||||
|
SUM(total_size) AS total_size,
|
||||||
|
SUM(num_requests) AS num_requests
|
||||||
|
FROM server_logs_window_1m
|
||||||
|
GROUP BY
|
||||||
|
TUMBLE(window_end, INTERVAL '5' MINUTE);
|
||||||
|
|
||||||
|
BEGIN STATEMENT SET;
|
||||||
|
-- 写入结果到1min窗口粒度结果表
|
||||||
|
INSERT INTO avg_request_size_1m SELECT
|
||||||
|
window_start,
|
||||||
|
window_end,
|
||||||
|
total_size / num_requests AS avg_size
|
||||||
|
FROM server_logs_window_1m;
|
||||||
|
|
||||||
|
-- 写入结果到5min窗口粒度结果表
|
||||||
|
INSERT INTO avg_request_size_5m SELECT
|
||||||
|
window_start,
|
||||||
|
window_end,
|
||||||
|
total_size / num_requests AS avg_size
|
||||||
|
FROM server_logs_window_5m;
|
||||||
|
END;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 去重
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.orders (
|
||||||
|
id INT,
|
||||||
|
order_time AS CURRENT_TIMESTAMP,
|
||||||
|
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
|
||||||
|
)
|
||||||
|
WITH (
|
||||||
|
'connector' = 'datagen',
|
||||||
|
'rows-per-second'='10',
|
||||||
|
'fields.id.kind'='random',
|
||||||
|
'fields.id.min'='1',
|
||||||
|
'fields.id.max'='100'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 对于每个order_id,按事件时间去重,只保留最新时间的记录即可实现去重
|
||||||
|
SELECT
|
||||||
|
order_id,
|
||||||
|
order_time
|
||||||
|
FROM (
|
||||||
|
SELECT id AS order_id,
|
||||||
|
order_time,
|
||||||
|
-- 按事件时间升序排序
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
|
||||||
|
FROM dt_catalog.dt_db.orders)
|
||||||
|
WHERE rownum = 1; -- 只取排名第一的记录,去重是Top-N的一种特例
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 Top-N
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.spells_cast (
|
||||||
|
wizard STRING,
|
||||||
|
spell STRING
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.wizard.expression' = '#{harry_potter.characters}',
|
||||||
|
'fields.spell.expression' = '#{harry_potter.spells}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 找出每个巫师最喜欢的两个法术
|
||||||
|
SELECT wizard, spell, times_cast
|
||||||
|
FROM (
|
||||||
|
SELECT *,
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num -- 按法术次数降序排序
|
||||||
|
FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM dt_catalog.dt_db.spells_cast GROUP BY wizard, spell) -- 计算每个巫师施展的各种法术的次数
|
||||||
|
)
|
||||||
|
WHERE row_num <= 2;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 窗口Top-N
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.orders (
|
||||||
|
bidtime TIMESTAMP(3),
|
||||||
|
price DOUBLE,
|
||||||
|
item STRING,
|
||||||
|
supplier STRING,
|
||||||
|
WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS -- 定义watermark
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
|
||||||
|
'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
|
||||||
|
'fields.item.expression' = '#{Commerce.productName}',
|
||||||
|
'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
|
||||||
|
'rows-per-second' = '100'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 取出销售排名前三的供应商
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
-- 按窗口时间分区,按价格降序排序
|
||||||
|
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) AS rownum
|
||||||
|
FROM (
|
||||||
|
-- 计算每个窗口内各个供应商的销售额
|
||||||
|
SELECT window_start, window_end, supplier, SUM(price) AS price, COUNT(*) AS cnt
|
||||||
|
FROM TABLE(
|
||||||
|
TUMBLE(TABLE dt_catalog.dt_db.orders, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
|
||||||
|
GROUP BY window_start, window_end, supplier
|
||||||
|
)
|
||||||
|
)
|
||||||
|
WHERE rownum <= 3;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 模式检测CEP
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.subscriptions (
|
||||||
|
id STRING,
|
||||||
|
user_id INT,
|
||||||
|
type STRING,
|
||||||
|
start_date TIMESTAMP(3),
|
||||||
|
end_date TIMESTAMP(3),
|
||||||
|
payment_expiration TIMESTAMP(3),
|
||||||
|
proc_time AS PROCTIME()
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.id.expression' = '#{Internet.uuid}',
|
||||||
|
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
|
||||||
|
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
|
||||||
|
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
|
||||||
|
'fields.end_date.expression' = '#{date.future ''15'',''DAYS''}',
|
||||||
|
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM dt_catalog.dt_db.subscriptions
|
||||||
|
MATCH_RECOGNIZE ( -- 按user_id分区,按处理时间proc_time升序排序
|
||||||
|
PARTITION BY user_id
|
||||||
|
ORDER BY proc_time
|
||||||
|
MEASURES
|
||||||
|
LAST(PREMIUM.type) AS premium_type,
|
||||||
|
AVG(TIMESTAMPDIFF(DAY,PREMIUM.start_date,PREMIUM.end_date)) AS premium_avg_duration,
|
||||||
|
BASIC.start_date AS downgrade_date
|
||||||
|
AFTER MATCH SKIP PAST LAST ROW
|
||||||
|
--模式: 一个或多个‘premium‘或’platinum‘订阅事件
|
||||||
|
--对于相同的'user_id',后面跟着一个'basic'订阅事件
|
||||||
|
PATTERN (PREMIUM+ BASIC)
|
||||||
|
DEFINE PREMIUM AS PREMIUM.type IN ('premium','platinum'),
|
||||||
|
BASIC AS BASIC.type = 'basic');
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 Regular Join
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.NOC (
|
||||||
|
agent_id STRING,
|
||||||
|
codename STRING
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
|
||||||
|
'fields.codename.expression' = '#{superhero.name}',
|
||||||
|
'number-of-rows' = '10'
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE dt_catalog.dt_db.RealNames (
|
||||||
|
agent_id STRING,
|
||||||
|
name STRING
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
|
||||||
|
'fields.name.expression' = '#{Name.full_name}',
|
||||||
|
'number-of-rows' = '10'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 使用agent_id作为两张表关联的条件,左右两边任何一张表来了新数据都会触发join动作
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
codename
|
||||||
|
FROM dt_catalog.dt_db.NOC t1
|
||||||
|
INNER JOIN dt_catalog.dt_db.RealNames t2 ON t1.agent_id = t2.agent_id;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 Interval Join
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.orders (
|
||||||
|
id INT,
|
||||||
|
order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'datagen',
|
||||||
|
'rows-per-second'='10',
|
||||||
|
'fields.id.kind'='sequence',
|
||||||
|
'fields.id.start'='1',
|
||||||
|
'fields.id.end'='1000'
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE dt_catalog.dt_db.shipments (
|
||||||
|
id INT,
|
||||||
|
order_id INT,
|
||||||
|
shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)) AS INT), CURRENT_TIMESTAMP)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'datagen',
|
||||||
|
'rows-per-second'='5',
|
||||||
|
'fields.id.kind'='random',
|
||||||
|
'fields.id.min'='0',
|
||||||
|
'fields.order_id.kind'='sequence',
|
||||||
|
'fields.order_id.start'='1',
|
||||||
|
'fields.order_id.end'='1000'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- order表的每条数据会与shipments表过去三天至当前时刻时间范围内的数据进行join
|
||||||
|
SELECT
|
||||||
|
o.id AS order_id,
|
||||||
|
o.order_time,
|
||||||
|
s.shipment_time,
|
||||||
|
TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
|
||||||
|
FROM dt_catalog.dt_db.orders o
|
||||||
|
JOIN dt_catalog.dt_db.shipments s ON o.id = s.order_id
|
||||||
|
WHERE
|
||||||
|
-- 时间 join 条件:shipments.shipment_time - INTERVAL '3' DAY <= orders.order_time < shipments.shipment_time
|
||||||
|
o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 时态表Join
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
-- 使用主键约束和watermark来定义一张版本表,这张表可以是一个cdc表、upsert类型的kafka topic等
|
||||||
|
CREATE TABLE dt_catalog.dt_db.currency_rates (
|
||||||
|
`currency_code` STRING,
|
||||||
|
`eur_rate` DECIMAL(6,4),
|
||||||
|
`rate_time` TIMESTAMP(3),
|
||||||
|
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS, -- 定义事件时间
|
||||||
|
PRIMARY KEY (currency_code) NOT ENFORCED -- 定义主键
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.currency_code.expression' = '#{Currency.code}',
|
||||||
|
'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
|
||||||
|
'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
|
||||||
|
'rows-per-second' = '100'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 这是一个append-only类型的动态表,需要定义watermk
|
||||||
|
CREATE TABLE dt_catalog.dt_db.transactions (
|
||||||
|
`id` STRING,
|
||||||
|
`currency_code` STRING,
|
||||||
|
`total` DECIMAL(10,2),
|
||||||
|
`transaction_time` TIMESTAMP(3),
|
||||||
|
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS --定义watermark
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.id.expression' = '#{Internet.UUID}',
|
||||||
|
'fields.currency_code.expression' = '#{Currency.code}',
|
||||||
|
'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
|
||||||
|
'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
|
||||||
|
'rows-per-second' = '100'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 当左右两张表的watermark对齐时,才会触发join动作,左右两张表都需要定义watermark
|
||||||
|
SELECT
|
||||||
|
t.id,
|
||||||
|
t.total * c.eur_rate AS total_eur,
|
||||||
|
t.total,
|
||||||
|
c.currency_code,
|
||||||
|
t.transaction_time
|
||||||
|
FROM dt_catalog.dt_db.transactions t
|
||||||
|
-- transactions表每条记录都与currency_rates表transaction_time时刻的汇率进行join
|
||||||
|
JOIN dt_catalog.dt_db.currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
|
||||||
|
-- 指定join key
|
||||||
|
ON t.currency_code = c.currency_code;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL 快速入门示例 维表Join
|
||||||
|
-- 该模版仅支持使用"执行"功能。如需"提交"运行,需要您增加 INSERT 相关逻辑
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE dt_catalog.dt_db.subscriptions (
|
||||||
|
id STRING,
|
||||||
|
user_id INT,
|
||||||
|
type STRING,
|
||||||
|
start_date TIMESTAMP(3),
|
||||||
|
end_date TIMESTAMP(3),
|
||||||
|
payment_expiration TIMESTAMP(3),
|
||||||
|
proc_time AS PROCTIME() -- 这里需要定义处理时间属性
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.id.expression' = '#{Internet.uuid}',
|
||||||
|
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
|
||||||
|
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
|
||||||
|
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
|
||||||
|
'fields.end_date.expression' = '#{date.future ''365'',''DAYS''}',
|
||||||
|
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义维表,为了示例能直接运行,这里使用faker 作为维表,实际应用中一般会使用JDBC、Redis、Hbase等作为维表
|
||||||
|
CREATE TABLE dt_catalog.dt_db.users (
|
||||||
|
user_id INT PRIMARY KEY, -- 定义主键
|
||||||
|
user_name VARCHAR(255) NOT NULL,
|
||||||
|
age INT NOT NULL
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker',
|
||||||
|
'fields.user_id.expression' = '#{number.numberBetween ''1'',''10''}',
|
||||||
|
'fields.user_name.expression' = '#{regexify ''(ron|jark|danny){1}''}',
|
||||||
|
'fields.age.expression' = '#{number.numberBetween ''1'',''30''}'
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
id AS subscription_id,
|
||||||
|
type AS subscription_type,
|
||||||
|
age AS user_age,
|
||||||
|
CASE
|
||||||
|
WHEN age < 18 THEN 1
|
||||||
|
ELSE 0
|
||||||
|
END AS is_minor
|
||||||
|
FROM dt_catalog.dt_db.subscriptions usub
|
||||||
|
-- subscriptions每条记录都使用当前系统时间与维表users中的最新数据进行join
|
||||||
|
JOIN dt_catalog.dt_db.users FOR SYSTEM_TIME AS OF usub.proc_time AS u
|
||||||
|
-- 指定join key
|
||||||
|
ON usub.user_id = u.user_id;
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL ODS 层
|
||||||
|
-- 实际应用中,该任务应该是一个采集任务,源表为RDB
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE source_table (
|
||||||
|
account_number STRING,
|
||||||
|
channel_id INT,
|
||||||
|
account_open_datetime TIMESTAMP(3)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
||||||
|
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
|
||||||
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
||||||
|
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期,每5秒一条数据
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
|
||||||
|
CREATE TABLE sink_table (
|
||||||
|
account_number STRING,
|
||||||
|
channel_id INT,
|
||||||
|
account_open_datetime TIMESTAMP(3)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'stream-x'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 写入数据到结果表
|
||||||
|
INSERT INTO sink_table SELECT * FROM source_table
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL DWD 层
|
||||||
|
-- 实际应用中,源表为 ODS TOPIC
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE source_table (
|
||||||
|
account_number STRING,
|
||||||
|
channel_id INT,
|
||||||
|
account_open_datetime TIMESTAMP(3)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
||||||
|
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
|
||||||
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
||||||
|
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期,每5秒一条数据
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义维表,实际应用中应选择RDB作为维表
|
||||||
|
CREATE TABLE dim_table (
|
||||||
|
channel_id INT,
|
||||||
|
channel_name STRING,
|
||||||
|
PRIMARY KEY (channel_id,channel_name) NOT ENFORCED
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
||||||
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
||||||
|
'fields.channel_name.expression' = '#{app.name}' -- 渠道名称
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 定义结果表,实际应用中应选择 Kafka等作为结果表
|
||||||
|
CREATE TABLE sink_table (
|
||||||
|
account_number STRING,
|
||||||
|
channel_id INT,
|
||||||
|
channel_name STRING,
|
||||||
|
account_open_datetime TIMESTAMP(3)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'stream-x'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 写入数据到结果表
|
||||||
|
INSERT INTO sink_table
|
||||||
|
SELECT
|
||||||
|
s1.account_number,
|
||||||
|
s1. channel_id ,
|
||||||
|
d1.channel_name,
|
||||||
|
s1.account_open_datetime
|
||||||
|
FROM source_table s1
|
||||||
|
left JOIN dim_table d1 ON s1.channel_id=d1.channel_id
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL DWS 层
|
||||||
|
-- 实际应用中,源表为DWD TOPIC
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE source_table (
|
||||||
|
account_number STRING,
|
||||||
|
channel_id INT,
|
||||||
|
channel_name STRING,
|
||||||
|
account_open_datetime TIMESTAMP(3)
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
||||||
|
'fields.account_number.expression' = '#{Finance.iban}', -- 随机生成银行账号
|
||||||
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
||||||
|
'fields.channel_name.expression' = '#{app.name}' ,-- 渠道名称
|
||||||
|
'fields.account_open_datetime.expression' = '#{date.past ''15'',''5'',''SECONDS''}' -- 过去15天的日期,每5秒一条数据
|
||||||
|
);
|
||||||
|
DROP TABLE source_table
|
||||||
|
SELECT * FROM source_table
|
||||||
|
|
||||||
|
-- 定义结果表,实际应用中应选择Kafka作为结果表
|
||||||
|
CREATE TABLE sink_table (
|
||||||
|
channel_id STRING,
|
||||||
|
open_date STRING,
|
||||||
|
cnt INT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'stream-x'
|
||||||
|
);
|
||||||
|
DROP TABLE sink_table
|
||||||
|
SELECT * FROM sink_table
|
||||||
|
|
||||||
|
-- 写入数据到结果表
|
||||||
|
INSERT INTO sink_table
|
||||||
|
SELECT
|
||||||
|
channel_id,
|
||||||
|
DATE_FORMAT(account_open_datetime,'yyyy-MM-dd'),
|
||||||
|
count(account_number)
|
||||||
|
FROM source_table
|
||||||
|
GROUP BY channel_id,DATE_FORMAT(account_open_datetime,'yyyy-MM-dd')
|
||||||
|
|
||||||
|
|
||||||
|
--********************************************************************--
|
||||||
|
-- Flink SQL ADS 层
|
||||||
|
-- 实际应用中,源表为DWS TOPIC
|
||||||
|
--********************************************************************--
|
||||||
|
CREATE TABLE source_table (
|
||||||
|
channel_id STRING,
|
||||||
|
open_time TIMESTAMP(3),
|
||||||
|
cnt INT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'faker', -- 使用 Flink Faker Connector
|
||||||
|
'fields.channel_id.expression' = '#{number.numberBetween ''1'',''4''}', -- 渠道ID随机生成1到3之间的数字
|
||||||
|
'fields.open_time.expression' = '#{Date.future ''15'',''5'',''SECONDS''}' ,-- 日期
|
||||||
|
'fields.cnt.expression' = '#{number.numberBetween ''1'',''100000000''}'-- 数量
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 实际应用中,结果表为RDB
|
||||||
|
CREATE TABLE sink_table (
|
||||||
|
open_date STRING,
|
||||||
|
cnt INT
|
||||||
|
) WITH (
|
||||||
|
'connector' = 'stream-x'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 写入数据到结果表
|
||||||
|
INSERT INTO sink_table
|
||||||
|
SELECT DATE_FORMAT(open_time,'yyyy-MM-dd'),count(1);
|
Loading…
Reference in New Issue
Block a user