SQL API

标准SQL分类

  • DML(Data Manipulation Language):数据操作语言,用来定义数据库中的记录
  • DCL(Data Control Language):数据控制语言,用来定义访问权限和安全级别
  • DQL(Data Query Language):数据查询语言,用来查询记录
  • DDL(Data Definition Language):数据定义语言,用来定义数据库中的对象

Flink Table API实现了DQL数据查询,Flink SQL实现了DML、DDL、DQL

  • FlnkSQL比DataStreamAPI、DataSetAPI实现简单、方便
  • TableAPI和SQL是流批通用的,代码可以完全复用
  • TableAPI和SQL可以使用Calcite的SQL优化器,可以实现自动程序优化更容易写出执行效率高的应用

Flink1.9版本引入了阿里巴巴的Blink实现流批一体

Apache Calcite

Apache Calcite是一款使用Java编程语言编写的开源动态数据管理框架,它具备很多常用的数据库管理需要的功能,比如:SQL解析、SQL校验、SQL查询优化、SQL生成以及数据连接查询,目前使用Calcite作为SQL解析与优化引擎的有Hive、Drill、Flink、Phoenix和Storm。Spark中的SQL解析与优化引擎是自带的。

Calcite中提供了RBO(Rule-Based Optimization:基于规则)和CBO(Cost-Based Optimization:基于代价)两种优化器,在保证语义的基础上,生成执行成本最低的SQL逻辑树。

SQL Client

进入sql-client:

1
2
# 目前仅支持 embedded,模式默认值embedded
./bin/sql-client.sh embedded

设置输出模式:

1
2
3
4
5
6
# 表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET sql-client.execution.result-mode=table;
# 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流:
SET sql-client.execution.result-mode=changelog;
# Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上:
SET sql-client.execution.result-mode=tableau;

执行 SQL 查询:

1
2
3
SELECT 'Hello World';

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

为了保持 CLI 界面及时响应,变更日志模式仅显示最近的 1000 个更改。表格模式支持浏览更大的结果,这些结果仅受可用主内存和配置的最大行数(sql-client.execution.max-table-result.rows)的限制。

SQL 上下文

TableEnvironment API

TableEnvironment:Table API & SQL 的都集成在一个统一上下文(即 TableEnvironment)中,其地位等同于 DataStream API 中的 StreamExecutionEnvironment 的地位

1
2
3
4
TableEnvironment::executeSql:用于 SQL API 中,可以执行一段完整 DDL,DML SQL。举例,方法入参可以是 CREATE TABLE xxx,INSERT INTO xxx SELECT xxx FROM xxx。
TableEnvironment::from(xxx):用于 Table API 中,可以以强类型接口的方式运行。方法入参是一个表名称。
TableEnvironment::sqlQuery:用于 SQL API 中,可以执行一段查询 SQL,并把结果以 Table 的形式返回。举例,方法的入参是 SELECT xxx FROM xxx
Table::executeInsert:用于将 Table 的结果插入到结果表中。方法入参是写入的目标表。

TableEnvironment 的功能

  • Catalog 管理:Catalog 可以理解为 Flink 的 MetaStore,类似 Hive MetaStore 对在 Hive 中的地位,关于 Flink Catalog 的详细内容后续进行介绍
    表管理:在 Catalog 中注册表
  • SQL 查询:(这 TMD 还用说,最基本的功能啊),就像 DataStream 中提供了 addSource、map、flatmap 等接口
  • UDF 管理:注册用户定义(标量函数:一进一出、表函数:一进多出、聚合函数:多进一出)函数
  • UDF 扩展:加载可插拔 Module(Module 可以理解为 Flink 管理 UDF 的模块,是可插拔的,可以自定义 Module,去支持奇奇怪怪的 UDF 功能)
  • DataStream 和 Table(Table API & SQL 的查询结果)之间进行转换:1.13 版本的只有流任务支持,批任务不支持。1.14 支持流批

创建方式

方式一:通过 EnvironmentSettings 创建 TableEnvironment

1
2
3
4
5
6
7
// 1. 就是设置一些环境信息
final EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
// 2. 创建 TableEnvironment
final TableEnvironment tEnv = TableEnvironment.create(settings);
// 如果是 in_streaming_mode,则最终创建出来的 TableEnvironment 实例为 StreamTableEnvironmentImpl
// 如果是 in_batch_mode,则最终创建出来的 TableEnvironment 实例为 TableEnvironmentImpl
// 虽然两者都继承了 TableEnvironment 接口,但是 StreamTableEnvironmentImpl 支持的功能更多一些。可以直接去看看接口实验一下,这里就不进行详细介绍。

方式二:通过已有的 StreamExecutionEnvironment 创建 TableEnvironment

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

image-20230714091622587

SQL中的表

外部表与视图

一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称

如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default

  • 外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。
  • 视图 VIEW:从已经存在的表中创建,视图一般是一个 SQL 逻辑的查询结果。对比到离线的 Hive SQL 中,在离线的场景(Hive 表)中 VIEW 也都是从已有的表中去创建的。

注意:对于视图不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个 Table 的结果,可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个 Table 的逻辑执行多次。类似于 with tmp as (DML) 的语法

临时表与永久表

  • 临时表:通常保存于内存中并且仅在创建它们的 Flink session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。
  • 永久表:需要外部 Catalog(例如 Hive Metastore)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink session 可见且持续存在,直至从 Catalog 中被明确删除。

如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink session 中,你的任务访问到这个表时,访问到的永远是临时表(即相同名称的表,临时表会屏蔽永久表)。

SQL 查询案例

  • 案例场景:计算每一种商品(sku_id 唯一标识)的售出个数count、总销售额sum、平均销售额avg、最低价min、最高价

  • 数据准备:数据源为商品的销售流水(sku_id:商品,price:销售价格),然后写入到 Kafka 的指定 topic(sku_id:商品,count_result:售出个数、sum_result:总销售额、avg_result:平均销售额、min_result:最低价、max_result:最高价)当中

  • SQL Client中演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//1.创建一个数据源(输入)表,这里的数据源是 flink 自带的一个随机 mock 数据的数据源。
CREATE TABLE source_table (
sku_id STRING,
price BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.sku_id.length' = '1',
'fields.price.min' = '1',
'fields.price.max' = '1000000'
);

//2.创建一个数据汇(输出)表,输出到 kafka 中
CREATE TABLE sink_table (
sku_id STRING,
count_result BIGINT,
sum_result BIGINT,
avg_result DOUBLE,
min_result BIGINT,
max_result BIGINT,
PRIMARY KEY (`sku_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'node1.itcast.cn:9092',
'key.format' = 'json',
'value.format' = 'json'
);

//3.执行一段 group by 的聚合 SQL 查询
insert into sink_table
select sku_id,
count(*) as count_result,
sum(price) as sum_result,
avg(price) as avg_result,
min(price) as min_result,
max(price) as max_result
from source_table
group by sku_id ;

原子数据类型

字符串类型
CHAR
CHAR(n)
定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长
取值范围 [1-2,147,483,647]。如果不指定 n,则默认为 1。
VARCHAR
VARCHAR(n)
STRING
可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度
取值范围 [1-2,147,483,647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。
二进制类型
BINARY
BINARY(n)
定长二进制字符串,n 代表定长
取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。
VARBINARY
VARBINARY(n)
BYTES
可变长二进制字符串,n 代表字符的最大长度
取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。
数值类型
DECIMAL
DECIMAL(p)
DECIMAL(p, s)
DEC、DEC(p)
DEC(p, s)
NUMERIC
NUMERIC(p)
NUMERIC(p, s)
固定长度和精度的数值类型,就和 Java 中的 BigDecimal 一样
p 代表数值位数(长度),取值范围 [1, 38]
s 代表小数点后的位数(精度),取值范围 [0, p]
如果不指定,p 默认为 10,s 默认为 0。
TINYINT -128 to 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。
SMALLINT -32,768 to 32,767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。
INT
INTEGER
-2,147,483,648 to 2,147,483,647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。
BIGINT -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。
FLOAT 4 字节大小的单精度浮点数值,就和 Java 中的 float 一样。
DOUBLE
DOUBLE PRECISION
8 字节大小的双精度浮点数值,就和 Java 中的 double 一样。
特殊类型
NULL类型 NULL
Raw类型 RAW(‘class’, ‘snapshot’) 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器
布尔类型 BOOLEAN
时间类型
DATE 年-月-日组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]
TIME
TIME(p)
小时:分钟:秒[.小数秒]组成的 不带时区含义 的的时间的数据类型,精度高达纳秒
取值范围 [00:00:00.000000000到23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 0。
TIMESTAMP
TIMESTAMP(p)
TIMESTAMP WITHOUT TIME ZONE
TIMESTAMP(p) WITHOUT TIME ZONE
年-月-日 小时:分钟:秒[.小数秒] 组成的不带时区含义的时间类型
取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。
TIMESTAMP WITH TIME ZONE
TIMESTAMP(p) WITH TIME ZONE
年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型
取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。
TIMESTAMP_LTZ
TIMESTAMP_LTZ(p)
年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型
取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。
INTERVAL YEAR TO MONTH
INTERVAL DAY TO SECOND
interval 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。

TIMESTAMP_LTZTIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。

INTERVAL演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Flink SQL> CREATE TABLE sink_table2 (
result_interval_year TIMESTAMP(3),
result_interval_year_p TIMESTAMP(3),
result_interval_year_p_to_month TIMESTAMP(3),
result_interval_month TIMESTAMP(3),
result_interval_day TIMESTAMP(3),
result_interval_day_p1 TIMESTAMP(3),
result_interval_day_p1_to_hour TIMESTAMP(3),
result_interval_day_p1_to_minute TIMESTAMP(3),
result_interval_day_p1_to_second_p2 TIMESTAMP(3),
result_interval_hour TIMESTAMP(3),
result_interval_hour_to_minute TIMESTAMP(3),
result_interval_hour_to_second TIMESTAMP(3),
result_interval_minute TIMESTAMP(3),
result_interval_minute_to_second_p2 TIMESTAMP(3),
result_interval_second TIMESTAMP(3),
result_interval_second_p2 TIMESTAMP(3)
) WITH (
'connector' = 'print'
);

Flink SQL> INSERT INTO sink_table2
SELECT
-- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种

-- 1. 年-月。取值范围为 [-9999-11, +9999-11],其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。
-- INTERVAL YEAR
f1 + INTERVAL '10' YEAR as result_interval_year
-- INTERVAL YEAR(p)
, f1 + INTERVAL '100' YEAR(3) as result_interval_year_p
-- INTERVAL YEAR(p) TO MONTH
, f1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month
-- INTERVAL MONTH
, f1 + INTERVAL '13' MONTH as result_interval_month
-- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999],其中 p1\p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6
-- INTERVAL DAY
, f1 + INTERVAL '10' DAY as result_interval_day
-- INTERVAL DAY(p1)
, f1 + INTERVAL '100' DAY(3) as result_interval_day_p1
-- INTERVAL DAY(p1) TO HOUR
, f1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour
-- INTERVAL DAY(p1) TO MINUTE
, f1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute
-- INTERVAL DAY(p1) TO SECOND(p2)
, f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2
-- INTERVAL HOUR
, f1 + INTERVAL '10' HOUR as result_interval_hour
-- INTERVAL HOUR TO MINUTE
, f1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute
-- INTERVAL HOUR TO SECOND(p2)
, f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second
-- INTERVAL MINUTE
, f1 + INTERVAL '10' MINUTE as result_interval_minute
-- INTERVAL MINUTE TO SECOND(p2)
, f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2
-- INTERVAL SECOND
, f1 + INTERVAL '3' SECOND as result_interval_second
-- INTERVAL SECOND(p2)
, f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2
FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)

复合数据类型

数组类型 ARRAY、t ARRAY数组最大长度为 2,147,483,647。t 代表数组内的数据类型。举例 ARRAY、ARRAY,其等同于 INT ARRAY、STRING ARRAY
Map类型
MAP<kt, vt> Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 Map<STRING, INT>、Map<BIGINT, STRING>
集合类型
MULTISET
t MULTISET
就和 Java 中的 List 类型一样,允许重复的数据。举例 MULTISET,其等同于 INT MULTISET
对象类型
ROW<n0 t0, n1 t1, …>
ROW<n0 t0 ‘d0’, n1 t1 ‘d1’, …>
ROW(n0 t0, n1 t1, …)
ROW(n0 t0 ‘d0’, n1 t1 ‘d1’, …)
就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW<myField INT, myOtherField BOOLEAN>

案例演示:

  • 样例数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
"id":1238123899121,
"name":"itcast",
"date":"1990-10-14",
"obj":{
"time1":"12:12:43",
"str":"sfasfafs",
"lg":2324342345
},
"arr":[
{
"f1":"f1str11",
"f2":134
},
{
"f1":"f1str22",
"f2":555
}
],
"time":"12:12:43",
"timestamp":"1990-10-14 12:12:43",
"map":{
"flink":123
},
"mapinmap":{
"inner_map":{
"key":234
}
}
}
  • 开启netcat
1
{"id":1238123899121,"name":"itcast","date":"1990-10-14","obj":{"time1":"12:12:43","str":"sfasfafs","lg":2324342345},"arr":[{"f1":"f1str11","f2":134},{"f1":"f1str22","f2":555}],"time":"12:12:43","timestamp":"1990-10-14 12:12:43","map":{"flink":123},"mapinmap":{"inner_map":{"key":234}}}
  • 创建映射表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE json_source (
id BIGINT,
name STRING,
`date` DATE,
obj ROW<time1 TIME,str STRING,lg BIGINT>,
arr ARRAY<ROW<f1 STRING,f2 INT>>,
`time` TIME,
`timestamp` TIMESTAMP(3),
`map` MAP<STRING,BIGINT>,
mapinmap MAP<STRING,MAP<STRING,INT>>,
proctime as PROCTIME()
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'json'
);
  • 查询结果
1
select id, name,`date`,obj.str,arr[1].f1,`map`['flink'],mapinmap['inner_map']['key'] from json_source;

流批处理的异同

输入表 处理逻辑 结果表
批处理 静态表:输入数据有限、是有界集合 批式计算:每次执行查询能够访问到完整的输入数据,然后计算,输出完整的结果数据 静态表:数据有限
流处理 动态表:输入数据无限,数据实时增加,并且源源不断 流式计算:执行时不能够访问到完整的输入数据,每次计算的结果都是一个中间结果 动态表:数据无限

要将 SQL 应用于流式任务的三个要解决的核心点:

1.SQL 输入表:分析如何将一个实时的,源源不断的输入流数据表示为 SQL 中的输入表。

2.SQL 处理计算:分析将 SQL 查询逻辑翻译成什么样的底层处理技术才能够实时的处理流式输入数据,然后产出流式输出数据。

3.SQL 输出表:分析如何将 SQL 查询输出的源源不断的流数据表示为一个 SQL 中的输出表。

两种技术方案:

1.动态表:源源不断的输入、输出流数据映射到 动态表

2.连续查询:实时处理输入数据,产出输出数据的实时处理技术

动态表

如果把有界数据集当作表,那么无界数据集(流)就是一个随着时间变化持续写入数据的表,Flink中使用动态表表示流,使用静态表表示传统的批处理中的数据集。

流是Flink DataStream中的概念,动态表是Flink SQL中的概念,两者都是无界数据集。动态表在Flink中抽象为Table API

连续查询

将SQL查询应用于动态表,会持续执行而不会终止,因为数据会持续的产生,所以连续查询不会给出一个最终结果,而是持续不断地更新结果,实际上给出的总是中间结果。

流上的SQL查询运算与批处理中的SQL查询运算在语义上完全相同,对于相同的数据集计算结果也是相同的。

执行过程

image-20221218105938745

从概念上来说:

1.流转换为动态表

2.在动态表上执行连续查询,生成新的动态表

3.生成的动态表转换回流

从开发上来说:

1.将DataStream注册为Table

2.在Table上应用SQL查询语句,结果为一个新的Table

3.将Table转换为DataStream

第一步-流转换为表

第二步-更新和追加查询

1.向结果表中插入新记录、更新旧的记录

2.只会向结果表中插入新记录

同时包含插入(Insert)、更新(Update)的查询必须维护更多的State,消耗更多的CPU、内存资源。

流上使用SQL的限制:

1.需要维护的状态太大

2.计算更新的成本太高

第三步-表转换为流

动态表分为三种类型:

1.只有更新行为,表中的结果被持续更新

2.只有插入行为,没有UPDATA和DELETE行为的结果表

3.既有更新行为又有插入行为的结果表

不同的表类型会转换为不同的流对外输出。

Append流

只支持写入行为,输出的结果只有 INSERT 操作的数据。

Retract流

Retract 流包含两种类型的 message: add messages 和 retract messages 。

  • 将 INSERT 操作编码为 add message
  • 将 DELETE 操作编码为 retract message
  • 将 UPDATE 操作编码为更新先前行的 retract message 和更新(新)行的 add message,从而将动态表转换为 retract 流。

Retract 流写入到输出结果表的数据有 -,+ 两种,分别 - 代表撤回旧数据,+ 代表输出最新的数据。这两种数据最终都会写入到输出的数据引擎中。

image-20221218113941298

如果下游还有任务去消费这条流的话,要注意需要正确处理 -,+ 两种数据,防止数据计算重复或者错误。

Upsert流

Upsert 流包含两种类型的 message: upsert messages 和 delete messages。转换为 upsert 流的动态表需要唯一键(唯一键可以由多个字段组合而成)。

  • 将 INSERT 和 UPDATE 操作编码为 upsert message
  • 将 DELETE 操作编码为 delete message

Upsert流写入到输出结果表的数据每次输出的结果都是当前根据唯一键的最新结果数据,不会有 Retract流中的 - 回撤数据。

image-20221218114220260

如果下游还有一个任务去消费这条流的话,消费流的算子需要知道唯一键(即 user),以便正确地根据唯一键(user)去拿到每一个 user 当前最新的状态。其与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。

image-20230714102350539

三种时间属性

  • 事件时间:必须是由数据本身携带的数据,这个时间标志的是事件产生的时间。
  • 处理时间:指的是算子计算这个数据的时候产生的时间。
  • 到达时间:指的是数据从数据源进入到计算引擎的时间。

指定时间属性

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转为 Table 中使用

SQL时间案例

处理时间案例

处理时间语义下,使用当前机器的系统时间作为处理时间。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成watermark

  • 语法:CREATE TABLE DDL 指定时间戳的方式
1
2
3
4
5
6
7
8
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 使用下面这句来将 user_action_time 声明为处理时间
user_action_time AS PROCTIME()
) WITH (
...
);
  • 读取’order.csv’文件的数据,在原本的Schema上添加一个虚拟的时间戳列,时间戳列由PROCTIME()函数计算产生。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 创建映射表
create table InputTable (
`userid` varchar,
`timestamp` bigint,
`money` double,
`category` varchar,
`pt` AS PROCTIME()
) with (
'connector' = 'filesystem',
'path' = 'file:///export/data/input/order.csv',
'format' = 'csv'
);
-- 描述表
desc InputTable ;

事件时间案例

Event Time时间语义使用一条数据实际发生的时间作为时间属性,在Table API & SQL中这个字段通常被称为rowtime。这种模式下多次重复计算时,计算结果是确定的。

Event Time时间语义可以保证流处理和批处理的统一。

Event Time时间语义下,我们需要设置每条数据发生时的时间戳,并提供一个Watermark

  • 语法:CREATE TABLE DDL 指定时间戳的方式
1
2
3
4
5
6
7
8
9
10
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
  • 实际应用中时间戳一般都是秒或者是毫秒(BIGINT 类型)需要转换类型
1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 1. 这个 ts 就是常见的毫秒级别时间戳
ts BIGINT,
-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
  • 读取order.csv’文件的数据,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 创建映射表
-- 这种方式只是增加了一个 rt 的TIMESTAMP列
create table InputTable2 (
`userid` varchar,
`timestamp` bigint,
`money` double,
`category` varchar,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`))
) with (
'connector' = 'filesystem',
'path' = 'file:///export/data/input/order.csv',
'format' = 'csv'
);
-- 描述表
desc InputTable2;
-- 事件时间需要结合watermark(水位线)使用
create table InputTable3 (
`userid` varchar,
`timestamp` bigint,
`money` double,
`category` varchar,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for rt as rt - interval '1' second
) with (
'connector' = 'filesystem',
'path' = 'file:///export/data/input/order.csv',
'format' = 'csv'
);

窗口概述

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

时间窗口和计数窗口

  • 一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达

  • Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。

  • 在Table API & SQL中,主要有两种窗口:Group Windows 和 Over Windows。

    • Group Windows 根据时间或行计数间隔将组行聚合成有限的组,并对每个组计算一次聚合函数
    • Over Windows 窗口内聚合为每个输入行在其相邻行范围内计算一个聚合

Group Windows

滚动窗口

滚动窗口定义:滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中:

img

应用场景:常见的按照一分钟对数据进行聚合,计算一分钟内 PV,UV 数据。

基于DataStream编程

input().keyby().window()分组后的窗口计算,input().windowAll()全局窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DataStream<T> input = ...

// 基于Event Time的滚动窗口
input.keyBy(…)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)

// 基于Processing Time的滚动窗口
input.keyBy(…)1
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)

// 基于EventTime Time,在小时级滚动窗口上设置15分钟的Offset偏移
input.keyBy(…)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)

注意:时间窗口使用的是timeWindow()也可以使用window(),比如,input.keyBy(…).timeWindow(Time.seconds(1))。timeWindow()是一种简写,传入一个参数则滚动窗口,为窗口大小,若传入两个参数为滑动窗口,第一个参数为窗口大小,第二个参数为滑动时间。该方法在新版本中已过期。

基于SQL编程

TUMBLE函数基于时间属性字段将每个元素分配到一个指定大小的窗口中。
在流模式下,时间属性字段必须是事件或处理时间属性。
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。TUMBLE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”“window_end”“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。

TUMBLE函数接受三个必需参数,一个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

  • data: 是一个表参数,可以是与时间属性列的任何关系。
  • timecol: 是一个列描述符,指示数据的哪些时间属性列应映射到翻转窗口。
  • size: 是指定滚动窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

两种 Flink SQL 实现方式:

  • Group Window Aggregation(1.14之前只有此类方案,此方案在 1.14及之后版本已经标记为废弃,不推荐使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 数据源表
CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);

-- 数据处理逻辑
select
user_id,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start
from source_table
group by
user_id,
tumble(row_time, interval '5' second);

可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval ‘1’ minute)

第一个参数为事件时间的时间戳

第二个参数为滚动窗口大小

  • Windowing TVF(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务)
1
2
3
4
5
6
7
8
9
10
11
12
SELECT 
user_id,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND))
GROUP BY window_start,
window_end,
user_id;

可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘60’ SECOND)),包含三部分参数。

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL ‘60’ SECOND 声明滚动窗口大小为 1 min。

滑动窗口

滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动窗口之间每个窗口是可以重叠。在这种情况下,一条数据就会分配到多个窗口当中。举例,有 10 分钟大小的窗口,滑动步长为 5 分钟。这样,每 5 分钟会划分一次窗口,这个窗口包含的数据是过去 10 分钟内的数据,如下图所示:

image-20221224182559887

在流模式下,时间属性字段必须是事件或处理时间属性。

应用场景:比如计算同时在线的数据,要求结果的输出频率是 1 分钟一次,每次计算的数据是过去 5 分钟的数据(有的场景下用户可能在线,但是可能会 2 分钟不活跃,但是这也要算在同时在线数据中,所以取最近 5 分钟的数据就能计算进去了)

基于DataStream编程

我们使用Time类中的时间单位来定义Slide和Size,也可以设置offset。同样,timeWindow是一种缩写,根据执行环境中设置的时间语义来选择相应的方法初始化窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val input: DataStream[T] = ...

// sliding event-time windows
input.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)

// sliding processing-time windows
input.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)

// sliding processing-time windows offset by -8 hours
input.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)
基于SQL编程

在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMPTIMESTAMP _LTZ类型的属性。HOP的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”“window_end”“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
HOP接受四个必需参数,一个可选参数:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

  • data: 是一个表参数,可以是与时间属性列的任何关系
  • timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
  • slide: 是一个持续时间,指定顺序跳跃窗口开始之间的持续
  • size: 是指定跳跃窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

两种Flink SQL实现方式:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 数据源表
Flink SQL> CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);

-- 数据处理逻辑
Flink SQL> SELECT user_id,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '5' SECOND, interval '10' SECOND) AS STRING)) * 1000 as window_start,
sum(price) as sum_price
FROM source_table
GROUP BY user_id
, hop(row_time, interval '5' SECOND, interval '10' SECOND);

可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval ‘1’ minute, interval ‘5’ minute)。其中:

第一个参数为事件时间的时间戳;

第二个参数为滑动窗口的滑动步长;

第三个参数为滑动窗口大小。

  • Windowing TVF 方案(1.14只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):
1
2
3
4
5
6
7
8
9
10
11
12
13
-- 数据处理逻辑
Flink SQL> SELECT
user_id,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, interval '5' SECOND, interval '10' SECOND))
GROUP BY window_start,
window_end,
user_id;

在该模式下,窗口大小必须是滑动距离的整数倍

可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘1’ MINUTES, INTERVAL ‘5’ MINUTES)),包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL ‘1’ MINUTES 声明滚动窗口滑动步长大小为 1 min。

第四个参数 INTERVAL ‘5’ MINUTES 声明滚动窗口大小为 5 min。

Session 窗口

Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。

image-20221224184500079

基于DataStream编程

Session window的窗口大小,则是由数据本身决定

1
2
3
4
5
6
7
8
9
10
11
DataStream input =
DataStream result = input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.apply(<window function>) // or reduce() or fold()


DataStream result = input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.apply(<window function>) // or reduce() or fold()
基于SQL编程

session(row_time, interval '5' minute)

其中:

第一个参数为事件时间的时间戳;

第二个参数为 Session gap 间隔。

使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。


Flink SQL 不支持 Session 窗口的 Window TVF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 数据源表,用户购买行为记录表
Flink SQL> CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
)

-- 数据处理逻辑
Flink SQL> SELECT
user_id,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start,
sum(price) as sum_price
FROM source_table
GROUP BY user_id
, session(row_time, interval '5' SECOND)

其中:

第一个参数为事件时间的时间戳;

第二个参数为 Session gap 间隔。

SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。

Session gap 间隔是5s,实际上是不包含5s,大于5s才会触发计算

渐进式窗口

渐进式窗口定义:渐进式窗口在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。

image-20221224190117981

在流模式下,时间属性字段必须是事件或处理时间属性。
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMPTIMESTAMP _LTZ类型的属性。CUMULATE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”“window_end”“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
CUMULATE接受四个必需参数,一个可选参数:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

  • data: 是一个表参数,可以是与时间属性列的任何关系
  • timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
  • step: 是指定连续累积窗口结束之间增加的窗口大小的持续时间
  • size: 是指定累积窗口的最大宽度的持续时间。size必须是 的整数倍step。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

渐进式窗口目前只有 Windowing TVF 方案(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
-- 数据源表
Flink SQL> CREATE TABLE source_table (
-- 用户 id
user_id BIGINT,
-- 用户
money BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.money.min' = '1',
'fields.money.max' = '100000'
);

-- 数据汇表
Flink SQL> CREATE TABLE sink_table (
window_end bigint,
window_start bigint,
sum_money BIGINT,
count_distinct_id bigint
) WITH (
'connector' = 'print'
);

-- 数据处理逻辑
Flink SQL> insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_start,
sum(money) as sum_money,
count(distinct user_id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end

其中包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL ‘60’ SECOND 声明渐进式窗口触发的渐进步长为 1 min。

第四个参数 INTERVAL ‘1’ DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。

Over Windows

Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
  • Over 聚合:能够保留原始字段

Over 聚合的语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
-- 示例
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
  • ORDER BY:必须是时间戳列(事件时间、处理时间)

  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合

  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。

流处理中的乱序

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。

image-20221219183040634

watermark解决乱序

不以事件时间作为触发计算的条件,而是根据Watermark判断是否触发。

当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

image-20221219183133979

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:image-20221219183212429

基于SQL的水印

实现场景:

  • 使用Socket模拟接收数据

  • 设置WaterMark,设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 创建映射表
CREATE TABLE MyTable (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);

-- 设置滚动窗口进行聚合计算
SELECT
TUMBLE_START(ts, INTERVAL '5' SECOND) AS window_start,
TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,
TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND) as window_rowtime,
item,count(item) as total_item
FROM MyTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

数据有序的场景

测试数据:

1
2
3
4
5
6
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50

数据无序的场景

测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

设置迟到时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
drop table MyTable;
-- 允许Flink处理延迟以5秒内的迟到数据,修改最大乱序时间
CREATE TABLE MyTable (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);

SELECT
TUMBLE_START(ts, INTERVAL '5' SECOND) AS window_start,
TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,
TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND) as window_rowtime,
item,count(item) as total_item
FROM MyTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

基于DataStream的水印

水印策略设置

WatermarkStrategy 可以在 Flink 应用程序中的两处使用:

  • 第一种是直接在数据源上使用
  • 第二种是直接在非数据源的操作之后使用

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy意味着你必须使用特定数据源接口,例如与kafka链接,使用kafka Connerctor。

仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy)

  • 直接在数据源中使用(比如kafka)
1
2
3
4
5
6
7
8
9
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();

env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "Kafka Source");
  • 直接在非数据源的操作之后使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);

使用去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。WatermarkStrategy。

水印策略案例

单调递增生成水印

周期性 watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。

1
WatermarkStrategy.forMonotonousTimestamps();

这个也就是相当于上述的延迟策略去掉了延迟时间,以event中的时间戳充当了水印。

在程序中可以这样使用:

1
2
3
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
// 它的底层实现是AscendingTimestampsWatermarks,其实它就是BoundedOutOfOrdernessWatermarks类的一个子类,没有了延迟时间,可以通过具体的源码查看实现.

案例演示:

对有序的数据流添加水印,底层调用的是固定延迟生成水印,只是传递的水印等待时间是0,意味着不考虑乱序问题

使用单点递增水印,解决的是数据有序的场景

需求:从socket接受数据,进行转换,然后应用窗口,每隔5s生成一个窗口(非系统时间驱动窗口计算,数据中携带的事件时间),使用水印时间触发窗口计算

eventTime一定是一个毫秒值的时间戳,否则无法参与计算


数据样本:
sensor_1,1641783600000,35 -> 2022-01-10 11:00:00
sensor_2,1641783601000,10 -> 2022-01-10 11:00:01
sensor_2,1641783602000,20 -> 2022-01-10 11:00:02
sensor_2,1641783603000,30 -> 2022-01-10 11:00:03
sensor_2,1641783610000,40 -> 2022-01-10 11:00:10

代码示例:

1

image-20230716114043763

单调递增的水印策略实际上调用的是固定延迟的水印,传参数为0

固定延迟生成水印

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间。使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

1
WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness)

我们实现一个延迟3秒的固定延迟水印:

1
2
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

案例演示:

使用固定延迟生成水印,解决数据乱序问题
从socket接受数据,进行转换,设置2s的延迟时间,然后应用10s滚动窗口,添加水印触发窗口计算


数据样本:
sensor_1,1641783600000,35 -> 2022-01-10 11:00:00
sensor_2,1641783601000,10 -> 2022-01-10 11:00:01
sensor_2,1641783602000,20 -> 2022-01-10 11:00:02
sensor_2,1641783603000,30 -> 2022-01-10 11:00:03
sensor_2,1641783610000,40 -> 2022-01-10 11:00:10 –> 不会触发窗口计算,因为水印乱序2s,事件时间是10s,水印时间是8s,不能满足窗口的endTime

sensor_2,1641783612000,40 -> 2022-01-10 11:00:12 –> 满足了窗口的结束时间,触发了窗口计算

一旦窗口被触发完成计算,属于这个窗口的数据再次到达数据就默认丢失掉!

代码示例:

1

他的底层是使用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。重写实现了两个方法:

  • onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游。

  • onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:

1
env.getConfig().setAutoWatermarkInterval(5000L);

image-20230716114622600

image-20230716115214847

多并行度设置水印

两个基本原则:

  • 一对多:进行广播,所有的下游并行度watermark一致
  • 多对一:取最小值,下游watermark取上游所有并行度watermark的最小值
处理空闲数据案例

在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。

所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。

当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。

通过下面的代码来实现对于空闲数据流的处理

1
2
3
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
长期延迟数据处理

水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。

水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口。这个等待不能一直等,因为会一直缓着数据不计算。一般水印也就是几秒钟最多几分钟而已。

这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)

  • 水印: 乱序数据处理(时间很短的延迟)
  • 延迟处理:长期延迟数据的处理机制

主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据:

  • 设置允许延迟的时间是通过allowedLateness(lateness: Time)设置

    • 给窗口设置一个延迟关闭的时间=EventTime-水印策略的固定延迟-允许延迟的时间

    • 可以减少计算的压力降低延迟

  • 保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

  • 获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取

数据乱序的场景

Kafka中的数据是无序(多分区的情况下),如果使用flink连接Kafka读出来数据肯定是乱序的

如果想让Kafka中的数据有序,可以设置一个分区

watermark是用来解决乱序的问题(一般延迟时间在秒级)

可以设置延迟时间(延迟窗口关闭时间,一般也就最多分钟级)

长期迟到的数据可以保存后续处理(一般分钟级)

Checkpoint 检查点

为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

image-20221225200641646

Checkpoint实现过程

Flink 的数据可以粗略分为以下三类:

  • 第一种是元信息,相当于一个 Flink 作业运行起来所需要的最小信息集合,包括比如 Checkpoint 地址、Job Manager、Dispatcher、Resource Manager 等等,这些信息的容错是由 Kubernetes/Zookeeper 等系统的高可用性来保障的,不在我们讨论的容错范围内。

  • Flink 作业运行起来以后,会从数据源读取数据写到 Sink 里,中间流过的数据称为处理的中间数据 Inflight Data (第二类)。

  • 对于有状态的算子比如聚合算子,处理完输入数据会产生算子状态数据 (第三类)。

Flink 会周期性地对所有算子的状态数据做快照,上传到持久稳定的海量存储中 (Durable Bulk Store),这个过程就是做 Checkpoint。Flink 作业发生错误时,会回滚到过去的一个快照检查点 Checkpoint 恢复。

Checkpointing 的流程分为以下几步:

第一步:

image-20221225202111924

第二步:

image-20221225202139109

第三步:

image-20221225202209851

第四步:

image-20221225202251763

Checkpoint参数配置

在flink-conf.yaml中配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#开启checkpoint 每5000ms 一次
execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
execution.checkpointing.mode: EXACTLY_ONCE
#设置checkpoint的存储方式
state.backend: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 2500
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

State 状态后端

Flink 提供了不同的状态后端,用于指定状态的存储方式和位置。

默认情况下,flink的状态会保存在taskmanager的内存中,⽽checkpoint会保存在jobManager的内存中。

Flink 提供了三种可用的状态后端用于在不同情况下进行状态的保存:

  • MemoryStateBackend

  • FsStateBackend

  • RocksDBStateBackend

MemoryStateBackend

MemoryStateBackend内部将状态(state)数据作为对象保存在java堆内存中(taskManager),通过checkpoint机制,MemoryStateBackend将状态(state)进⾏快照并保存Jobmanager(master)的堆内存中。

image-20221225210024107

使用 MemoryStateBackend 时的注意点:

默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。

状态大小受到 akka 帧大小的限制,所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。

状态的总大小不能超过 JobManager 的内存。

何时使用 MemoryStateBackend:

本地开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。

MemoryStateBackend 最适合小状态的应用场景。例如 Kafka Consumer,或者一次仅一记录的函数 (Map, FlatMap,或 Filter)。

全局配置 flink-conf.yaml

1
2
3
state.backend: hashmap
# 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
state.checkpoint-storage: jobmanager

FsStateBackend

该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件

image-20221225210349994

FsStateBackend适用的场景:

  • 具有大状态,长窗口,大键 / 值状态的作业。

  • 所有高可用性设置。

分布式文件持久化,每次读写都会产生网络IO,整体性能不佳

全局配置 flink-conf.yaml:

1
2
3
4
state.backend: hashmap 
state.checkpoints.dir: file:///checkpoint-dir/
# 默认为FileSystemCheckpointStorage
state.checkpoint-storage: filesystem

RocksDBStateBackend

RocksDB 是一种嵌入式的本地数据库

RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将****增量****的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。

image-20221225210626161

何时使用 RocksDBStateBackend:

  • RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。

  • RocksDBStateBackend 非常适合用于高可用方案。

  • RocksDBStateBackend 是目前唯一支持增量 checkpoint 的后端。增量 checkpoint 非常适用于超大状态的场景。

全局配置 flink-conf.yaml:

1
2
3
4
5
6
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

任务重启策略

重启策略类型

Flink支持的重启策略类型如下:

  • none, off, disable:无重启策略,作业遇到问题直接失败,不会重启。
  • fixeddelay, fixed-delay:固定延迟重启策略,作业失败后,延迟一定时间重启。但是有最大重启次数限制,超过这个限制后作业失败,不再重启。
  • failurerate, failure-rate:失败率重启策略,作业失败后,延迟一定时间重启。但是有最大失败率限制。如果一定时间内作业失败次数超过配置值,则标记为真的失败,不再重启。
  • exponentialdelay, exponential-delay:作业失败后重启延迟时间随着失败次数指数递增。没有最大重启次数限制,无限尝试重启作业。

注意:如果启用了checkpoint并且没有显式配置重启策略,会默认使用fixeddelay策略,最大重试次数为Integer.MAX_VALUE。

全局配置

全局配置影响Flink提交的所有作业的。修改全局配置需要编辑flink-conf.yaml文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
no restart
restart-strategy: none

fixeddelay
restart-strategy: fixed-delay
# 尝试重启次数
restart-strategy.fixed-delay.attempts: 10
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 20 s

failurerate
restart-strategy: failure-rate
# 两次连续重启的间隔时间
restart-strategy.failure-rate.delay: 10 s
# 计算失败率的统计时间跨度
restart-strategy.failure-rate.failure-rate-interval: 2 min
# 计算失败率的统计时间内的最大失败次数
restart-strategy.failure-rate.max-failures-per-interval: 10

exponentialdelay
restart-strategy: exponential-delay
# 初次失败后重启时间间隔(初始值)
restart-strategy.exponential-delay.initial-backoff: 1 s
# 以后每次失败,重启时间间隔为上一次重启时间间隔乘以这个值
restart-strategy.exponential-delay.backoff-multiplier: 2
# 每次重启间隔时间的最大抖动值(加或减去该配置项范围内的一个随机数),防止大量作业在同一时刻重启
restart-strategy.exponential-delay.jitter-factor: 0.1
# 最大重启时间间隔,超过这个最大值后,重启时间间隔不再增大
restart-strategy.exponential-delay.max-backoff: 1 min
# 多长时间作业运行无失败后,重启间隔时间会重置为初始值(第一个配置项的值)
restart-strategy.exponential-delay.reset-backoff-threshold: 1 h

端到端一致性

kafka->source->transformation->sink->kafka

端到端的一致性语义有三部分(读取数据,处理数据,输出数据),取三部分最小值为端到端的一致性

  • 最多一次:保证数据不重复
  • 至少一次:保证数据不丢失
  • 精确一次:保证数据不丢不重

flink怎么实现端到端的精确一致性语义?

端到端的保障指的是在整个数据处理管道上结果都是正确的。在每个组件都提供自身的保障情况下,整个处理管道上端到端的保障会受制于保障最弱的那个组件

  • 内部:Checkpoints机制,在发生故障的时候能够恢复各个环节的数据。
  • Source:数据读取之后还是存在,可设置数据读取的偏移量,当发生故障的时候重置偏移量到故障之前的位置。
  • Sink:从故障恢复时,数据不会重复写入外部系统,需要支持幂等写或事务写。

两阶段提交实现Sink一致性

DDL: Create 子句

DML: With 子句

DML: WHERE 子句

DML: DISTINCT 子句

DML: 窗口聚合

DML: Group 聚合

DML: Joins 语法

Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:

  • 动态表(流)与动态表(流)的 Join

  • 动态表(流)与外部维表(比如 Redis)的 Join

  • 动态表字段的列转行(一种特殊的 Join)

细分 Flink SQL 支持的 Join:

  • Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join-数据会一直保存在内存是个大state计算
  • Interval Join:流与流的 Join,两条流一段时间区间内的 Join-数据不会一直保存在state中,随着watermark推进,数据会fpad
  • Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join
  • Lookup Join:流与外部维表的 Join
  • Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行
  • Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join

DML: 集合操作

DML: TopN

SQL 函数的归类

Flink 中的函数有两个维度的归类标准。

  • 一个归类标准是:系统(内置)函数和 Catalog 函数。系统函数没有命名空间,只能通过其名称来进行引用。Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库的命名空间。用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数来对 Catalog 函数进行引用。

  • 另一个归类标准是:临时函数和持久化函数。临时函数由用户创建,它仅在会话的生命周期(也就是一个 Flink 任务的一次运行生命周期内)内有效。持久化函数不是由系统提供的,是存储在 Catalog 中,它在不同会话的生命周期内都有效。

这两个维度归类标准组合下,Flink SQL 总共提供了 4 种函数:

  • 临时性系统内置函数

  • 系统内置函数

  • 临时性 Catalog 函数(例如:Create Temporary Function)

  • Catalog 函数(例如:Create Function)

请注意,在用户使用函数时,系统函数始终优先于 Catalog 函数解析,临时函数始终优先于持久化函数解析。

SQL 自定义函数

当前 Flink 提供了一下几种 UDF 能力:

  • 标量函数(Scalar functions 或 UDAF):输入一条输出一条,将标量值转换成一个新标量值,对标 Hive 中的 UDF;

  • 表值函数(Table functions 或 UDTF):输入一条条输出多条,对标 Hive 中的 UDTF;

  • 聚合函数(Aggregate functions 或 UDAF):输入多条输出一条,对标 Hive 中的 UDAF;

  • 表值聚合函数(Table aggregate functions 或 UDTAF):仅仅支持 Table API,不支持 SQL API,其可以将多行转为多行;

  • 异步表值函数(Async table functions):这是一种特殊的 UDF,支持异步查询外部数据系统,用在前文介绍到的 lookup join 中作为查询外部系统的函数。