fix: fix flinksql create table grammar, add test (#65)

* fix: fix flinksql create table grammar, add test

* feat: add cross join, left outer join and time temporal join

* test: supplement flinksql join test

* fix: fix catalog table grammar, add test

* fix: fix flinksql data type, add test

* fix: delete console

* feat: add query with clause, add test
This commit is contained in:
nankaNULL
2022-12-28 14:20:33 +08:00
committed by GitHub
parent d4ac1ae940
commit 0c9a831585
13 changed files with 7358 additions and 4280 deletions

View File

@ -102,6 +102,19 @@ describe('FlinkSQL Syntax Tests', () => {
});
// query statements
test('Test With clause', () => {
const sql = `
WITH orders_with_total AS (
SELECT order_id, price + tax AS total
FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
test('Test simple Select Statement', () => {
const sql = `SELECT product, amount FROM Orders;`;
const result = parser.validate(sql);
@ -143,6 +156,42 @@ describe('FlinkSQL Syntax Tests', () => {
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// test left outer join
test('Test Select Statement with left outer join', () => {
const sql = `
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// test cross join
test('Test Select Statement with cross join', () => {
const sql = `
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// test for time temporal join
test('Test Select Statement with time temporal join', () => {
const sql = `SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// test for catalog table
test('Test Select Statement with catalog table', () => {
const sql = `SELECT * FROM catalog1.db1.table1;`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// describe statements
test('Test simple Describe Statement', () => {

View File

@ -0,0 +1,158 @@
import { FlinkSQL } from '../../../../src';
describe('FlinkSQL Create Table Syntax Tests', () => {
const parser = new FlinkSQL();
// simple create table statement
test('Test simple CreateTable Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user_id' BIGINT,
'name' STRING
) WITH (
'connector'='oracle-x'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// Metadata Columns statement
test('Test Metadata Columns Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user_id' BIGINT,
'name' STRING,
'timestamp' BIGINT METADATA, -- part of the query-to-sink schema
'offset' BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
'record_time' TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// Computed Columns statement
test('Test Computed Columns Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user_id' BIGINT,
'price' DOUBLE,
'quantity' DOUBLE,
'cost' AS price * quanitity -- evaluate expression and supply the result to queries
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// WATERMARK statement
test('Test WATERMARK Statement', () => {
const sql = `
CREATE TABLE MyTable (
'user' BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// primary key statement
test('Test Primary Key Statement', () => {
const sql = `
CREATE TABLE MyTable (
id int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// PARTITIONED BY statement
test('Test PARTITIONED BY Statement', () => {
const sql = `
CREATE TABLE Orders_in_file (
'user' BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
)
PARTITIONED BY ('user')
WITH (
'connector' = 'filesystem',
'path' = '...'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// like statement
test('Test Like Statement', () => {
const sql = `
CREATE TABLE Orders_with_watermark (
id int,
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders_in_file (
-- Exclude everything besides the computed columns which we need to generate the watermark for.
-- We do not want to have the partitions or filesystem options as those do not apply to kafka.
EXCLUDING ALL
INCLUDING GENERATED
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// create catalog table
test('Test Create Catalog Table Statement', () => {
const sql = `
CREATE TABLE catalog1.db1.table1 (
id int
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
// data type
test('Test Data Type Statement', () => {
const sql = `
CREATE TABLE catalog1.db1.table1 (
attr0 string,
attr1 boolean,
attr3 decimal(38,18),
attr4 TINYINT,
attr5 smallint,
attr6 int,
attr7 bigint,
attr8 float,
attr9 double,
attr10 date,
attr11 time,
attr12 timestamp(3),
attr13 array<string>,
attr14 row<attr15 float, attr16 timestamp(3)>,
attr17 map<int, bigint>,
name1 VARCHAR(64),
message ROW<data ROW<UPO_TIMESTAMP VARCHAR(20)>>,
raw RAW('class', 'snapshot')
) WITH (
'connector' = 'kafka'
);
`;
const result = parser.validate(sql);
expect(result.length).toBe(0);
});
});