feat: improve flinksql createStatement (#91)

* feat: improve flinksql createStatement

* feat: complete CREATE syntax unit tests

* feat: complete CREATA TABLE syntax tests

* feat: develop flinkSQL grammar

* feat: improve tableConstraint

* fix: convert TIMESTAMP_LTZ

* test: improve tests

* feat: build new flinksql parser and lexer

* test: add CREATE TEMPLATE TABLE test
This commit is contained in:
野迂迂 2023-05-11 17:41:34 +08:00 committed by GitHub
parent c1c72def30
commit 370cccf8d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 5547 additions and 5409 deletions

View File

@ -79,6 +79,7 @@ FIRST: 'FIRST';
AFTER: 'AFTER'; AFTER: 'AFTER';
LAST: 'LAST'; LAST: 'LAST';
WITH: 'WITH'; WITH: 'WITH';
WITHOUT: 'WITHOUT';
VALUES: 'VALUES'; VALUES: 'VALUES';
CREATE: 'CREATE'; CREATE: 'CREATE';
TABLE: 'TABLE'; TABLE: 'TABLE';
@ -270,6 +271,7 @@ SYSTEM_TIME: 'SYSTEM_TIME';
ENFORCED: 'ENFORCED'; ENFORCED: 'ENFORCED';
METADATA: 'METADATA'; METADATA: 'METADATA';
VIRTUAL: 'VIRTUAL'; VIRTUAL: 'VIRTUAL';
ZONE: 'ZONE';
// DATA TYPE Keywords // DATA TYPE Keywords

View File

@ -138,15 +138,16 @@ columnName
; ;
columnNameList columnNameList
: columnName (',' columnName)* : LR_BRACKET columnName (',' columnName)* RR_BRACKET
; ;
columnType columnType
: typeName=(DATE | BOOLEAN | NULL) : typeName=(DATE | BOOLEAN | NULL)
| typeName=(CHAR | VARCHAR | STRING | BINARY | VARBINARY | BYTES | typeName=(CHAR | VARCHAR | STRING | BINARY | VARBINARY | BYTES
| TINYINT | SMALLINT | INT | INTEGER | BIGINT | TINYINT | SMALLINT | INT | INTEGER | BIGINT
| TIME | TIMESTAMP | TIMESTAMP_LTZ | DATETIME | TIME | TIMESTAMP_LTZ | DATETIME
) lengthOneDimension? ) lengthOneDimension?
| typeName=TIMESTAMP lengthOneDimension? ((WITHOUT | WITH) LOCAL? TIME ZONE)?
| typeName=(DECIMAL | DEC | NUMERIC | FLOAT | DOUBLE) lengthTwoOptionalDimension? | typeName=(DECIMAL | DEC | NUMERIC | FLOAT | DOUBLE) lengthTwoOptionalDimension?
| type=(ARRAY | MULTISET) lengthOneTypeDimension? | type=(ARRAY | MULTISET) lengthOneTypeDimension?
| type=MAP mapTypeDimension? | type=MAP mapTypeDimension?
@ -179,7 +180,7 @@ rowTypeDimension
; ;
columnConstraint columnConstraint
:(CONSTRAINT constraintName)? PRIMARY KEY (NOT ENFORCED)? :(CONSTRAINT constraintName)? PRIMARY KEY NOT ENFORCED
; ;
commentSpec commentSpec
@ -208,7 +209,7 @@ watermarkDefinition
; ;
tableConstraint tableConstraint
: (CONSTRAINT constraintName)? PRIMARY KEY '(' columnNameList ')' (NOT ENFORCED)? : (CONSTRAINT constraintName)? PRIMARY KEY columnNameList NOT ENFORCED
; ;
constraintName constraintName
@ -247,8 +248,8 @@ sourceTable
; ;
likeOption likeOption
: (INCLUDING | EXCLUDING) (ALL | CONSTRAINTS | PARTITIONS) : ((INCLUDING | EXCLUDING) (ALL | CONSTRAINTS | PARTITIONS))
| (INCLUDING | EXCLUDING | OVERWRITING) (GENERATED | OPTIONS | WATERMARKS) | ((INCLUDING | EXCLUDING | OVERWRITING) (GENERATED | OPTIONS | WATERMARKS))
; ;
createCatalog createCatalog
@ -334,7 +335,7 @@ insertStatement
insertSimpleStatement insertSimpleStatement
: INSERT (INTO | OVERWRITE) uid : INSERT (INTO | OVERWRITE) uid
( (
insertPartitionDefinition? insertColumnListDefinition? queryStatement insertPartitionDefinition? columnNameList? queryStatement
| valuesDefinition | valuesDefinition
) )
; ;
@ -343,10 +344,6 @@ insertPartitionDefinition
: PARTITION tablePropertyList : PARTITION tablePropertyList
; ;
insertColumnListDefinition
: LR_BRACKET columnNameList RR_BRACKET
;
valuesDefinition valuesDefinition
: VALUES valuesRowDefinition (COMMA valuesRowDefinition)* : VALUES valuesRowDefinition (COMMA valuesRowDefinition)*
; ;

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
// dt-sql-parser/src/grammar/flinksql/FlinkSqlParser.g4 by ANTLR 4.12.0 // Generated from /Users/mortalYoung/Projects/dt-sql-parser/src/grammar/flinksql/FlinkSqlParser.g4 by ANTLR 4.12.0
import {ParseTreeListener} from "antlr4"; import {ParseTreeListener} from "antlr4";
@ -75,7 +75,6 @@ import { DropFunctionContext } from "./FlinkSqlParser";
import { InsertStatementContext } from "./FlinkSqlParser"; import { InsertStatementContext } from "./FlinkSqlParser";
import { InsertSimpleStatementContext } from "./FlinkSqlParser"; import { InsertSimpleStatementContext } from "./FlinkSqlParser";
import { InsertPartitionDefinitionContext } from "./FlinkSqlParser"; import { InsertPartitionDefinitionContext } from "./FlinkSqlParser";
import { InsertColumnListDefinitionContext } from "./FlinkSqlParser";
import { ValuesDefinitionContext } from "./FlinkSqlParser"; import { ValuesDefinitionContext } from "./FlinkSqlParser";
import { ValuesRowDefinitionContext } from "./FlinkSqlParser"; import { ValuesRowDefinitionContext } from "./FlinkSqlParser";
import { InsertMulStatementCompatibilityContext } from "./FlinkSqlParser"; import { InsertMulStatementCompatibilityContext } from "./FlinkSqlParser";
@ -913,16 +912,6 @@ export default class FlinkSqlParserListener extends ParseTreeListener {
* @param ctx the parse tree * @param ctx the parse tree
*/ */
exitInsertPartitionDefinition?: (ctx: InsertPartitionDefinitionContext) => void; exitInsertPartitionDefinition?: (ctx: InsertPartitionDefinitionContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.insertColumnListDefinition`.
* @param ctx the parse tree
*/
enterInsertColumnListDefinition?: (ctx: InsertColumnListDefinitionContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.insertColumnListDefinition`.
* @param ctx the parse tree
*/
exitInsertColumnListDefinition?: (ctx: InsertColumnListDefinitionContext) => void;
/** /**
* Enter a parse tree produced by `FlinkSqlParser.valuesDefinition`. * Enter a parse tree produced by `FlinkSqlParser.valuesDefinition`.
* @param ctx the parse tree * @param ctx the parse tree

View File

@ -1,4 +1,4 @@
// dt-sql-parser/src/grammar/flinksql/FlinkSqlParser.g4 by ANTLR 4.12.0 // Generated from /Users/mortalYoung/Projects/dt-sql-parser/src/grammar/flinksql/FlinkSqlParser.g4 by ANTLR 4.12.0
import {ParseTreeVisitor} from 'antlr4'; import {ParseTreeVisitor} from 'antlr4';
@ -75,7 +75,6 @@ import { DropFunctionContext } from "./FlinkSqlParser";
import { InsertStatementContext } from "./FlinkSqlParser"; import { InsertStatementContext } from "./FlinkSqlParser";
import { InsertSimpleStatementContext } from "./FlinkSqlParser"; import { InsertSimpleStatementContext } from "./FlinkSqlParser";
import { InsertPartitionDefinitionContext } from "./FlinkSqlParser"; import { InsertPartitionDefinitionContext } from "./FlinkSqlParser";
import { InsertColumnListDefinitionContext } from "./FlinkSqlParser";
import { ValuesDefinitionContext } from "./FlinkSqlParser"; import { ValuesDefinitionContext } from "./FlinkSqlParser";
import { ValuesRowDefinitionContext } from "./FlinkSqlParser"; import { ValuesRowDefinitionContext } from "./FlinkSqlParser";
import { InsertMulStatementCompatibilityContext } from "./FlinkSqlParser"; import { InsertMulStatementCompatibilityContext } from "./FlinkSqlParser";
@ -626,12 +625,6 @@ export default class FlinkSqlParserVisitor<Result> extends ParseTreeVisitor<Resu
* @return the visitor result * @return the visitor result
*/ */
visitInsertPartitionDefinition?: (ctx: InsertPartitionDefinitionContext) => Result; visitInsertPartitionDefinition?: (ctx: InsertPartitionDefinitionContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.insertColumnListDefinition`.
* @param ctx the parse tree
* @return the visitor result
*/
visitInsertColumnListDefinition?: (ctx: InsertColumnListDefinitionContext) => Result;
/** /**
* Visit a parse tree produced by `FlinkSqlParser.valuesDefinition`. * Visit a parse tree produced by `FlinkSqlParser.valuesDefinition`.
* @param ctx the parse tree * @param ctx the parse tree

View File

@ -1,43 +1,58 @@
import FlinkSQL from "../../../../src/parser/flinksql"; import FlinkSQL from '../../../../src/parser/flinksql';
import fs from 'fs';
import path from 'path';
describe('FlinkSQL Create Table Syntax Tests', () => { const parser = new FlinkSQL();
const parser = new FlinkSQL();
// Create statements const readSQL = (fileName: string) =>
test('Test Create Catalog Statement', () => { fs
const sql = ` .readFileSync(path.join(__dirname, 'fixtures', fileName), 'utf-8')
CREATE CATALOG c1 .split(';')
WITH ( .filter(Boolean)
'key1'='value1', .map((i) => i.trim());
'key2'='value2'
) const features = {
`; table: readSQL('createTable.sql'),
const result = parser.validate(sql); catalog: readSQL('createCatalog.sql'),
expect(result.length).toBe(0); database: readSQL('createDatabase.sql'),
view: readSQL('createView.sql'),
function: readSQL('createFunction.sql'),
};
describe('FlinkSQL Create Syntax Tests', () => {
describe('CREATE TABLE', () => {
features.table.forEach((table) => {
it(table, () => {
expect(parser.validate(table).length).toBe(0);
}); });
test('Test simple Create Database Statement', () => {
const sql = `
CREATE DATABASE IF NOT EXISTS dataApi
WITH (
"owner" = "admin"
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
}); });
test('Test simple Create View Statement', () => {
const sql = `
CREATE TEMPORARY VIEW IF NOT EXISTS tempView
AS SELECT product, amount FROM Orders;
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
}); });
test('Test simple Create Function Statement', () => { describe('CREATE CATALOG', () => {
const sql = ` features.catalog.forEach((catalog) => {
CREATE FUNCTION IF NOT EXISTS tempFunction AS 'SimpleUdf'; it(catalog, () => {
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'; expect(parser.validate(catalog).length).toBe(0);
`; });
const result = parser.validate(sql); });
expect(result.length).toBe(0); });
describe('CREATE DATABASE', () => {
features.database.forEach((database) => {
it(database, () => {
expect(parser.validate(database).length).toBe(0);
});
});
});
describe('CREATE VIEW', () => {
features.view.forEach((view) => {
it(view, () => {
expect(parser.validate(view).length).toBe(0);
});
});
});
describe('CREATE FUNCTION', () => {
features.function.forEach((func) => {
it(func, () => {
expect(parser.validate(func).length).toBe(0);
});
});
}); });
}); });

View File

@ -1,184 +0,0 @@
import FlinkSQL from '../../../../src/parser/flinksql';
describe('FlinkSQL Create Table Syntax Tests', () => {
const parser = new FlinkSQL();
// simple create table statement
test('Test simple Create Table Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user_id' BIGINT,
'name' STRING
) WITH (
'connector'='oracle-x'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// create temporary table statement
test('Test Temporary Create Table Statement', () => {
const sql = `
CREATE TEMPORARY TABLE MyTable (
'user_id' BIGINT,
'name' STRING
) WITH (
'connector'='oracle-x'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// Metadata Columns statement
test('Test Metadata Columns Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user_id' BIGINT,
'name' STRING,
'timestamp' BIGINT METADATA, -- part of the query-to-sink schema
'offset' BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
'record_time' TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// Computed Columns statement
test('Test Computed Columns Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user_id' BIGINT,
'price' DOUBLE,
'quantity' DOUBLE,
'cost' AS price * quanitity -- evaluate expression and supply the result to queries
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// WATERMARK statement
test('Test WATERMARK Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user' BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// primary key statement
test('Test Primary Key Statement', () => {
const sql = `
CREATE TABLE MyTable (
id int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// PARTITIONED BY statement
test('Test PARTITIONED BY Statement', () => {
const sql = `
CREATE TABLE Orders_in_file (
'user' BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
)
PARTITIONED BY ('user')
WITH (
'connector' = 'filesystem',
'path' = '...'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// like statement
test('Test Like Statement', () => {
const sql = `
CREATE TABLE Orders_with_watermark (
id int,
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders_in_file (
-- Exclude everything besides the computed columns which we need to generate the watermark for.
-- We do not want to have the partitions or filesystem options as those do not apply to kafka.
EXCLUDING ALL
INCLUDING GENERATED
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// AS select_statement
test('Test As Select Statement', () => {
const sql = `
CREATE TABLE my_ctas_table
WITH (
'connector' = 'kafka'
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// create catalog table
test('Test Create Catalog Table Statement', () => {
const sql = `
CREATE TABLE catalog1.db1.table1 (
id int
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// data type
test('Test Data Type Statement', () => {
const sql = `
CREATE TABLE catalog1.db1.table1 (
attr0 string,
attr1 boolean,
attr3 decimal(38,18),
attr4 TINYINT,
attr5 smallint,
attr6 int,
attr7 bigint,
attr8 float,
attr9 double,
attr10 date,
attr11 time,
attr12 timestamp(3),
attr13 array<string>,
attr14 row<attr15 float, attr16 timestamp(3)>,
attr17 map<int, bigint>,
name1 VARCHAR(64),
message ROW<data ROW<UPO_TIMESTAMP VARCHAR(20)>>,
raw RAW('class', 'snapshot')
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
});

View File

@ -0,0 +1 @@
CREATE CATALOG c1 WITH ('key1' = 'value1', 'key2' = 'value2');

View File

@ -0,0 +1,9 @@
CREATE DATABASE db1 WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE IF NOT EXISTS db1 WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE catalog1.db1 WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE db1 COMMENT 'test create database' WITH ('key1' = 'value1', 'key2.a' = 'value2.a');
CREATE DATABASE IF NOT EXISTS dataApi COMMENT 'test create database' WITH ('key1' = 'value1', 'key2.a' = 'value2.a');

View File

@ -0,0 +1,28 @@
CREATE FUNCTION IF NOT EXISTS tempFunction AS 'SimpleUdf';
CREATE FUNCTION catalog1.db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION catalog1.db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION IF NOT EXISTS catalog1.db1.function1 AS 'org.apache.flink.function.function1';
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE JAVA;
CREATE TEMPORARY SYSTEM FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE SCALA;
CREATE TEMPORARY SYSTEM FUNCTION IF NOT EXISTS function1 AS 'org.apache.flink.function.function1' LANGUAGE PYTHON;
-- test create function USING jar
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar',
JAR 'hdfs:///path/to/test2.jar';
CREATE TEMPORARY FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR '/path/to/test.jar';
CREATE TEMPORARY SYSTEM FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR '/path/to/test.jar';
CREATE FUNCTION function1 AS 'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar',
jar 'hdfs:///path/to/test2.jar';

View File

@ -0,0 +1,234 @@
CREATE TABLE MyTable ('user_id' BIGINT, 'name' STRING) WITH ('connector' = 'oracle-x');
CREATE TABLE MyTable WITH ('connector' = 'oracle-x');
-- 尽管官方文档的 BNF 里没有支持创建临时表,但实际上是支持的
CREATE TEMPORARY TABLE MyTable ('user_id' BIGINT, 'name' STRING) WITH ('connector' = 'oracle-x');
CREATE TABLE MyTable (
'user_id' BIGINT,
'name' STRING,
'timestamp' BIGINT METADATA, -- part of the query-to-sink schema
'offset' BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
'record_time' TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH ('connector' = 'kafka');
CREATE TABLE MyTable (
'user_id' BIGINT,
'price' DOUBLE,
'quantity' DOUBLE,
'cost' AS price * quanitity -- evaluate expression and supply the result to queries
) WITH ('connector' = 'kafka');
CREATE TABLE MyTable (
'user' BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka');
CREATE TABLE MyTable (id int, PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'kafka');
CREATE TABLE tbl1 (
a bigint,
h varchar,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b varchar,
proc AS PROCTIME(),
meta STRING METADATA,
my_meta STRING METADATA FROM 'meta',
my_meta STRING METADATA FROM 'meta' VIRTUAL,
meta STRING METADATA VIRTUAL,
PRIMARY KEY (a, b) NOT ENFORCED
) PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE Orders_in_file (
'user' BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
) PARTITIONED BY ('user') WITH (
'connector' = 'filesystem',
'path' = '...'
);
CREATE TABLE Orders_with_watermark (
id int,
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
) LIKE Orders_in_file (
-- Exclude everything besides the computed columns which we need to generate the watermark for.
-- We do not want to have the partitions or filesystem options as those do not apply to kafka.
EXCLUDING ALL
INCLUDING GENERATED
);
CREATE TABLE my_ctas_table WITH ('connector' = 'kafka')
AS SELECT
id,
name,
age
FROM
source_table
WHERE
mod(id, 10) = 0;
CREATE TABLE catalog1.db1.table1 (id int) WITH ('connector' = 'kafka');
CREATE TABLE catalog1.db1.table1 (
attr0 STRING,
attr1 boolean,
attr3 decimal(38, 18),
attr4 TINYINT,
attr5 smallint,
attr6 int,
attr7 bigint,
attr8 float,
attr9 double,
attr10 date,
attr11 time,
attr12 timestamp(3),
attr13 array<STRING>,
attr14 ROW<attr15 float, attr16 timestamp(3)>,
attr17 MAP<int, bigint>,
name1 VARCHAR(64),
message ROW<data ROW<UPO_TIMESTAMP VARCHAR(20)>>,
raw RAW('class', 'snapshot')
) WITH ('connector' = 'kafka');
CREATE TABLE IF NOT EXISTS tbl1 (
a bigint,
h varchar,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b varchar,
proc AS PROCTIME(),
PRIMARY KEY (a, b) NOT ENFORCED
) PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a bigint COMMENT 'test column comment AAA.',
h varchar,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b varchar,
proc AS PROCTIME(),
meta STRING METADATA,
my_meta STRING METADATA FROM 'meta',
my_meta STRING METADATA FROM 'meta' VIRTUAL,
PRIMARY KEY (a, b) NOT ENFORCED
) COMMENT 'test table comment ABC.' PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a bigint COMMENT 'test column comment AAA.',
h varchar,
g AS 2 * (a + 1) COMMENT 'test computed column.',
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b varchar,
proc AS PROCTIME(),
PRIMARY KEY (a, b) NOT ENFORCED
) COMMENT 'test table comment ABC.' PARTITIONED BY (a, h) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a bigint,
h varchar,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
b varchar,
proc AS PROCTIME(),
PRIMARY KEY (a, b) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a bigint PRIMARY KEY NOT ENFORCED COMMENT 'test column comment AAA.',
h varchar CONSTRAINT ct1 PRIMARY KEY NOT ENFORCED,
g AS 2 * (a + 1),
ts AS toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'),
proc AS PROCTIME()
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
ts timestamp(3),
id varchar,
watermark FOR ts AS ts - INTERVAL '3' SECOND
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
log_ts varchar,
ts AS to_timestamp(log_ts),
WATERMARK FOR ts AS ts + INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
f1 ROW<q1 bigint, q2 ROW<t1 timestamp, t2 varchar>, q3 boolean>,
WATERMARK FOR f1.q2.t1 AS NOW()
) WITH (
'connector' = 'kafka',
'kafka.topic' = 'log.test'
);
CREATE TABLE tbl1 (
a ARRAY<bigint>,
b MAP<int, varchar>,
c ROW<cc0 int, cc1 float, cc2 varchar>,
d MULTISET<varchar>,
PRIMARY KEY (a, b) NOT ENFORCED
) with (
'x' = 'y',
'asd' = 'data'
);
CREATE TABLE tbl1 (
a ARRAY<ARRAY<bigint>>,
b MAP<MAP<int, varchar>, ARRAY<varchar>>,
c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,
d MULTISET<ARRAY<int>>,
f TIMESTAMP(9),
PRIMARY KEY (a, b) NOT ENFORCED
) with (
'x' = 'y',
'asd' = 'data'
);
CREATE TABLE tbl1 (
a ARRAY<ARRAY<bigint>>,
b MAP<MAP<int, varchar>, ARRAY<varchar>>,
c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,
d MULTISET<ARRAY<int>>,
f TIMESTAMP(9),
PRIMARY KEY (a, b) NOT ENFORCED
) with (
'x' = 'y',
'asd' = 'data'
) LIKE Orders (
OVERWRITING OPTIONS
EXCLUDING CONSTRAINTS
);

View File

@ -0,0 +1,37 @@
CREATE VIEW v AS
SELECT
col1
FROM
tbl;
CREATE TEMPORARY VIEW v AS
SELECT
col1
FROM
tbl;
CREATE VIEW v COMMENT 'this is a view' AS
SELECT
col1
FROM
tbl;
CREATE VIEW v(col1, col2) AS
SELECT
col3,
col4
FROM
tbl;
CREATE VIEW v(col1, col2) COMMENT 'this is a view' AS
SELECT
col3,
col4
FROM
tbl;
CREATE TEMPORARY VIEW IF NOT EXISTS v AS
SELECT
col1
FROM
tbl;