FlinkSQL
C01 Flink SQL基本介绍
SQL API
标准SQL分类
- DML(Data Manipulation Language):数据操作语言,用来定义数据库中的记录
- DCL(Data Control Language):数据控制语言,用来定义访问权限和安全级别
- DQL(Data Query Language):数据查询语言,用来查询记录
- DDL(Data Definition Language):数据定义语言,用来定义数据库中的对象
Flink Table API实现了DQL数据查询,Flink SQL实现了DML、DDL、DQL
Flink SQL的优势
- FlnkSQL比DataStreamAPI、DataSetAPI实现简单、方便
- TableAPI和SQL是流批通用的,代码可以完全复用
- TableAPI和SQL可以使用Calcite的SQL优化器,可以实现自动程序优化更容易写出执行效率高的应用
Flink1.9版本引入了阿里巴巴的Blink实现流批一体
Apache Calcite
Apache Calcite是一款使用Java编程语言编写的开源动态数据管理框架,它具备很多常用的数据库管理需要的功能,比如:SQL解析、SQL校验、SQL查询优化、SQL生成以及数据连接查询,目前使用Calcite作为SQL解析与优化引擎的有Hive、Drill、Flink、Phoenix和Storm。Spark中的SQL解析与优化引擎是自带的。
Calcite中提供了RBO(Rule-Based Optimization:基于规则)和CBO(Cost-Based Optimization:基于代价)两种优化器,在保证语义的基础上,生成执行成本最低的SQL逻辑树。
SQL Client
进入sql-client:
1 | 目前仅支持 embedded,模式默认值embedded |
设置输出模式:
1 | 表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用: |
执行 SQL 查询:
1 | SELECT 'Hello World'; |
为了保持 CLI 界面及时响应,变更日志模式仅显示最近的 1000 个更改。表格模式支持浏览更大的结果,这些结果仅受可用主内存和配置的最大行数(sql-client.execution.max-table-result.rows)的限制。
SQL 上下文
TableEnvironment API
TableEnvironment:Table API & SQL 的都集成在一个统一上下文(即 TableEnvironment)中,其地位等同于 DataStream API 中的 StreamExecutionEnvironment 的地位
1 | TableEnvironment::executeSql:用于 SQL API 中,可以执行一段完整 DDL,DML SQL。举例,方法入参可以是 CREATE TABLE xxx,INSERT INTO xxx SELECT xxx FROM xxx。 |
TableEnvironment 的功能
- Catalog 管理:Catalog 可以理解为 Flink 的 MetaStore,类似 Hive MetaStore 对在 Hive 中的地位,关于 Flink Catalog 的详细内容后续进行介绍
表管理:在 Catalog 中注册表- SQL 查询:(这 TMD 还用说,最基本的功能啊),就像 DataStream 中提供了 addSource、map、flatmap 等接口
- UDF 管理:注册用户定义(标量函数:一进一出、表函数:一进多出、聚合函数:多进一出)函数
- UDF 扩展:加载可插拔 Module(Module 可以理解为 Flink 管理 UDF 的模块,是可插拔的,可以自定义 Module,去支持奇奇怪怪的 UDF 功能)
- DataStream 和 Table(Table API & SQL 的查询结果)之间进行转换:1.13 版本的只有流任务支持,批任务不支持。1.14 支持流批
创建方式
方式一:通过 EnvironmentSettings 创建 TableEnvironment
1 | // 1. 就是设置一些环境信息 |
方式二:通过已有的 StreamExecutionEnvironment 创建 TableEnvironment
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |

SQL中的表
外部表与视图
一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称。
如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default。
- 外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。
- 视图 VIEW:从已经存在的表中创建,视图一般是一个 SQL 逻辑的查询结果。对比到离线的 Hive SQL 中,在离线的场景(Hive 表)中 VIEW 也都是从已有的表中去创建的。
注意:对于视图不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个 Table 的结果,可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个 Table 的逻辑执行多次。类似于 with tmp as (DML) 的语法
临时表与永久表
- 临时表:通常保存于内存中并且仅在创建它们的 Flink session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。
- 永久表:需要外部 Catalog(例如 Hive Metastore)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink session 可见且持续存在,直至从 Catalog 中被明确删除。
如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink session 中,你的任务访问到这个表时,访问到的永远是临时表(即相同名称的表,临时表会屏蔽永久表)。
SQL 查询案例
案例场景:计算每一种商品(sku_id 唯一标识)的售出个数count、总销售额sum、平均销售额avg、最低价min、最高价
数据准备:数据源为商品的销售流水(sku_id:商品,price:销售价格),然后写入到 Kafka 的指定 topic(sku_id:商品,count_result:售出个数、sum_result:总销售额、avg_result:平均销售额、min_result:最低价、max_result:最高价)当中
SQL Client中演示:
1 | //1.创建一个数据源(输入)表,这里的数据源是 flink 自带的一个随机 mock 数据的数据源。 |
C02 Flink SQL数据类型
原子数据类型
| 字符串类型 | |
|---|---|
| CHAR CHAR(n) |
定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长 取值范围 [1-2,147,483,647]。如果不指定 n,则默认为 1。 |
| VARCHAR VARCHAR(n) STRING |
可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度 取值范围 [1-2,147,483,647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。 |
| 二进制类型 | |
|---|---|
| BINARY BINARY(n) |
定长二进制字符串,n 代表定长 取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。 |
| VARBINARY VARBINARY(n) BYTES |
可变长二进制字符串,n 代表字符的最大长度 取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。 |
| 数值类型 | |
|---|---|
| DECIMAL DECIMAL(p) DECIMAL(p, s) DEC、DEC(p) DEC(p, s) NUMERIC NUMERIC(p) NUMERIC(p, s) |
固定长度和精度的数值类型,就和 Java 中的 BigDecimal 一样 p 代表数值位数(长度),取值范围 [1, 38] s 代表小数点后的位数(精度),取值范围 [0, p] 如果不指定,p 默认为 10,s 默认为 0。 |
| TINYINT | -128 to 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。 |
| SMALLINT | -32,768 to 32,767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。 |
| INT INTEGER |
-2,147,483,648 to 2,147,483,647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。 |
| BIGINT | -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。 |
| FLOAT | 4 字节大小的单精度浮点数值,就和 Java 中的 float 一样。 |
| DOUBLE DOUBLE PRECISION |
8 字节大小的双精度浮点数值,就和 Java 中的 double 一样。 |
| 特殊类型 | |
|---|---|
| NULL类型 | NULL |
| Raw类型 | RAW(‘class’, ‘snapshot’) 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器 |
| 布尔类型 | BOOLEAN |
| 时间类型 | |
|---|---|
| DATE | 由年-月-日组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31] |
| TIME TIME(p) |
由小时:分钟:秒[.小数秒]组成的 不带时区含义 的的时间的数据类型,精度高达纳秒取值范围 [00:00:00.000000000到23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 0。 |
| TIMESTAMP TIMESTAMP(p) TIMESTAMP WITHOUT TIME ZONE TIMESTAMP(p) WITHOUT TIME ZONE |
由 年-月-日 小时:分钟:秒[.小数秒] 组成的不带时区含义的时间类型取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。 |
| TIMESTAMP WITH TIME ZONE TIMESTAMP(p) WITH TIME ZONE |
由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。 |
| TIMESTAMP_LTZ TIMESTAMP_LTZ(p) |
由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。 |
| INTERVAL YEAR TO MONTH INTERVAL DAY TO SECOND |
interval 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。 |
TIMESTAMP_LTZ与TIMESTAMP WITH TIME ZONE的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由table.local-time-zone参数来设置时区。
INTERVAL演示:
1 | Flink SQL> CREATE TABLE sink_table2 ( |
复合数据类型
| 数组类型 | ARRAY、t ARRAY数组最大长度为 2,147,483,647。t 代表数组内的数据类型。举例 ARRAY、ARRAY,其等同于 INT ARRAY、STRING ARRAY |
|---|---|
| Map类型 | |
|---|---|
| MAP<kt, vt> | Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 Map<STRING, INT>、Map<BIGINT, STRING> |
| 集合类型 | |
|---|---|
| MULTISET t MULTISET |
就和 Java 中的 List 类型一样,允许重复的数据。举例 MULTISET,其等同于 INT MULTISET |
| 对象类型 | |
|---|---|
| ROW<n0 t0, n1 t1, …> ROW<n0 t0 ‘d0’, n1 t1 ‘d1’, …> ROW(n0 t0, n1 t1, …) ROW(n0 t0 ‘d0’, n1 t1 ‘d1’, …) |
就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW<myField INT, myOtherField BOOLEAN> |
案例演示:
- 样例数据
1 | { |
- 开启netcat
1 | {"id":1238123899121,"name":"itcast","date":"1990-10-14","obj":{"time1":"12:12:43","str":"sfasfafs","lg":2324342345},"arr":[{"f1":"f1str11","f2":134},{"f1":"f1str22","f2":555}],"time":"12:12:43","timestamp":"1990-10-14 12:12:43","map":{"flink":123},"mapinmap":{"inner_map":{"key":234}}} |
- 创建映射表
1 | CREATE TABLE json_source ( |
- 查询结果
1 | select id, name,`date`,obj.str,arr[1].f1,`map`['flink'],mapinmap['inner_map']['key'] from json_source; |
C03 Flink SQL应用于流
流批处理的异同
| 输入表 | 处理逻辑 | 结果表 | |
|---|---|---|---|
| 批处理 | 静态表:输入数据有限、是有界集合 | 批式计算:每次执行查询能够访问到完整的输入数据,然后计算,输出完整的结果数据 | 静态表:数据有限 |
| 流处理 | 动态表:输入数据无限,数据实时增加,并且源源不断 | 流式计算:执行时不能够访问到完整的输入数据,每次计算的结果都是一个中间结果 | 动态表:数据无限 |
要将 SQL 应用于流式任务的三个要解决的核心点:
1.SQL 输入表:分析如何将一个实时的,源源不断的输入流数据表示为 SQL 中的输入表。
2.SQL 处理计算:分析将 SQL 查询逻辑翻译成什么样的底层处理技术才能够实时的处理流式输入数据,然后产出流式输出数据。
3.SQL 输出表:分析如何将 SQL 查询输出的源源不断的流数据表示为一个 SQL 中的输出表。
两种技术方案:
1.动态表:源源不断的输入、输出流数据映射到 动态表
2.连续查询:实时处理输入数据,产出输出数据的实时处理技术
动态表
如果把有界数据集当作表,那么无界数据集(流)就是一个随着时间变化持续写入数据的表,Flink中使用动态表表示流,使用静态表表示传统的批处理中的数据集。
流是Flink DataStream中的概念,动态表是Flink SQL中的概念,两者都是无界数据集。动态表在Flink中抽象为Table API
连续查询
将SQL查询应用于动态表,会持续执行而不会终止,因为数据会持续的产生,所以连续查询不会给出一个最终结果,而是持续不断地更新结果,实际上给出的总是中间结果。
流上的SQL查询运算与批处理中的SQL查询运算在语义上完全相同,对于相同的数据集计算结果也是相同的。
执行过程

从概念上来说:
1.流转换为动态表
2.在动态表上执行连续查询,生成新的动态表
3.生成的动态表转换回流
从开发上来说:
1.将DataStream注册为Table
2.在Table上应用SQL查询语句,结果为一个新的Table
3.将Table转换为DataStream
第一步-流转换为表
第二步-更新和追加查询
1.向结果表中插入新记录、更新旧的记录
2.只会向结果表中插入新记录
同时包含插入(Insert)、更新(Update)的查询必须维护更多的State,消耗更多的CPU、内存资源。
流上使用SQL的限制:
1.需要维护的状态太大
2.计算更新的成本太高
第三步-表转换为流
动态表分为三种类型:
1.只有更新行为,表中的结果被持续更新
2.只有插入行为,没有UPDATA和DELETE行为的结果表
3.既有更新行为又有插入行为的结果表
不同的表类型会转换为不同的流对外输出。
Append流
只支持写入行为,输出的结果只有 INSERT 操作的数据。
Retract流
Retract 流包含两种类型的 message: add messages 和 retract messages 。
- 将 INSERT 操作编码为 add message
- 将 DELETE 操作编码为 retract message
- 将 UPDATE 操作编码为更新先前行的 retract message 和更新(新)行的 add message,从而将动态表转换为 retract 流。
Retract 流写入到输出结果表的数据有 -,+ 两种,分别 - 代表撤回旧数据,+ 代表输出最新的数据。这两种数据最终都会写入到输出的数据引擎中。
如果下游还有任务去消费这条流的话,要注意需要正确处理 -,+ 两种数据,防止数据计算重复或者错误。
Upsert流
Upsert 流包含两种类型的 message: upsert messages 和 delete messages。转换为 upsert 流的动态表需要唯一键(唯一键可以由多个字段组合而成)。
- 将 INSERT 和 UPDATE 操作编码为 upsert message
- 将 DELETE 操作编码为 delete message
Upsert流写入到输出结果表的数据每次输出的结果都是当前根据唯一键的最新结果数据,不会有 Retract流中的 - 回撤数据。
如果下游还有一个任务去消费这条流的话,消费流的算子需要知道唯一键(即 user),以便正确地根据唯一键(user)去拿到每一个 user 当前最新的状态。
其与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。

D04 Flink 中的时间属性
三种时间属性
- 事件时间:必须是由数据本身携带的数据,这个时间标志的是事件产生的时间。
- 处理时间:指的是算子计算这个数据的时候产生的时间。
- 到达时间:指的是数据从数据源进入到计算引擎的时间。
指定时间属性
- CREATE TABLE DDL 创建表的时候指定
- 可以在 DataStream 中指定,在后续的 DataStream 转为 Table 中使用
SQL时间案例
处理时间案例
处理时间语义下,使用当前机器的系统时间作为处理时间。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成watermark。
- 语法:CREATE TABLE DDL 指定时间戳的方式
1 | CREATE TABLE user_actions ( |
- 读取’order.csv’文件的数据,在原本的Schema上添加一个虚拟的时间戳列,时间戳列由
PROCTIME()函数计算产生。
1 | -- 创建映射表 |
事件时间案例
Event Time时间语义使用一条数据实际发生的时间作为时间属性,在Table API & SQL中这个字段通常被称为rowtime。这种模式下多次重复计算时,计算结果是确定的。
Event Time时间语义可以保证流处理和批处理的统一。
Event Time时间语义下,我们需要设置每条数据发生时的时间戳,并提供一个Watermark。
- 语法:CREATE TABLE DDL 指定时间戳的方式
1 | CREATE TABLE user_actions ( |
- 实际应用中时间戳一般都是秒或者是毫秒(BIGINT 类型)需要转换类型
1 | CREATE TABLE user_actions ( |
- 读取order.csv’文件的数据,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。
1 | -- 创建映射表 |
D05 Flink 中的窗口操作
窗口概述
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。
时间窗口和计数窗口
一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达
Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。
在Table API & SQL中,主要有两种窗口:Group Windows 和 Over Windows。
- Group Windows 根据时间或行计数间隔将组行聚合成有限的组,并对每个组计算一次聚合函数
- Over Windows 窗口内聚合为每个输入行在其相邻行范围内计算一个聚合
Group Windows
滚动窗口
滚动窗口定义:滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中:
应用场景:常见的按照一分钟对数据进行聚合,计算一分钟内 PV,UV 数据。
基于DataStream编程
input().keyby().window()分组后的窗口计算,input().windowAll()全局窗口
1 | DataStream<T> input = ... |
注意:时间窗口使用的是timeWindow()也可以使用window(),比如,input.keyBy(…).timeWindow(Time.seconds(1))。timeWindow()是一种简写,传入一个参数则滚动窗口,为窗口大小,若传入两个参数为滑动窗口,第一个参数为窗口大小,第二个参数为滑动时间。该方法在新版本中已过期。
基于SQL编程
TUMBLE函数基于时间属性字段将每个元素分配到一个指定大小的窗口中。
在流模式下,时间属性字段必须是事件或处理时间属性。
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。TUMBLE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”、“window_end”、“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
TUMBLE函数接受三个必需参数,一个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
- data: 是一个表参数,可以是与时间属性列的任何关系。
- timecol: 是一个列描述符,指示数据的哪些时间属性列应映射到翻转窗口。
- size: 是指定滚动窗口宽度的持续时间。
- offset: 是一个可选参数,用于指定窗口起始位置的偏移量。
两种 Flink SQL 实现方式:
- Group Window Aggregation(1.14之前只有此类方案,此方案在 1.14及之后版本已经标记为废弃,不推荐使用)
1 | -- 数据源表 |
可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval ‘1’ minute)
第一个参数为事件时间的时间戳
第二个参数为滚动窗口大小
- Windowing TVF(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务)
1 | SELECT |
可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘60’ SECOND)),包含三部分参数。
第一个参数 TABLE source_table 声明数据源表;
第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘60’ SECOND 声明滚动窗口大小为 1 min。
滑动窗口
滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动窗口之间每个窗口是可以重叠。在这种情况下,一条数据就会分配到多个窗口当中。举例,有 10 分钟大小的窗口,滑动步长为 5 分钟。这样,每 5 分钟会划分一次窗口,这个窗口包含的数据是过去 10 分钟内的数据,如下图所示:

在流模式下,时间属性字段必须是事件或处理时间属性。
应用场景:比如计算同时在线的数据,要求结果的输出频率是 1 分钟一次,每次计算的数据是过去 5 分钟的数据(有的场景下用户可能在线,但是可能会 2 分钟不活跃,但是这也要算在同时在线数据中,所以取最近 5 分钟的数据就能计算进去了)
基于DataStream编程
我们使用Time类中的时间单位来定义Slide和Size,也可以设置offset。同样,timeWindow是一种缩写,根据执行环境中设置的时间语义来选择相应的方法初始化窗口。
1 | val input: DataStream[T] = ... |
基于SQL编程
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。HOP的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”、“window_end”、“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
HOP接受四个必需参数,一个可选参数:HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data: 是一个表参数,可以是与时间属性列的任何关系
- timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
- slide: 是一个持续时间,指定顺序跳跃窗口开始之间的持续
- size: 是指定跳跃窗口宽度的持续时间。
- offset: 是一个可选参数,用于指定窗口起始位置的偏移量。
两种Flink SQL实现方式:
- Group Window Aggregation 方案(支持 Batch\Streaming 任务):
1 | -- 数据源表 |
可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval ‘1’ minute, interval ‘5’ minute)。其中:
第一个参数为事件时间的时间戳;
第二个参数为滑动窗口的滑动步长;
第三个参数为滑动窗口大小。
- Windowing TVF 方案(1.14只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):
1 | -- 数据处理逻辑 |
在该模式下,窗口大小必须是滑动距离的整数倍
可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘1’ MINUTES, INTERVAL ‘5’ MINUTES)),包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;
第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘1’ MINUTES 声明滚动窗口滑动步长大小为 1 min。
第四个参数 INTERVAL ‘5’ MINUTES 声明滚动窗口大小为 5 min。
Session 窗口
Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。

基于DataStream编程
Session window的窗口大小,则是由数据本身决定
1 | DataStream input = … |
基于SQL编程
session(row_time, interval '5' minute)
其中:
第一个参数为事件时间的时间戳;
第二个参数为 Session gap 间隔。
使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。
Flink SQL 不支持 Session 窗口的 Window TVF:
1 | -- 数据源表,用户购买行为记录表 |
其中:
第一个参数为事件时间的时间戳;
第二个参数为 Session gap 间隔。
SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。
Session gap 间隔是5s,实际上是不包含5s,大于5s才会触发计算
渐进式窗口
渐进式窗口定义:渐进式窗口在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。

在流模式下,时间属性字段必须是事件或处理时间属性。
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。CUMULATE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”、“window_end”、“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
CUMULATE接受四个必需参数,一个可选参数:CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data: 是一个表参数,可以是与时间属性列的任何关系
- timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
- step: 是指定连续累积窗口结束之间增加的窗口大小的持续时间
- size: 是指定累积窗口的最大宽度的持续时间。size必须是 的整数倍step。
- offset: 是一个可选参数,用于指定窗口起始位置的偏移量。
渐进式窗口目前只有 Windowing TVF 方案(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):
1 | -- 数据源表 |
其中包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;
第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘60’ SECOND 声明渐进式窗口触发的渐进步长为 1 min。
第四个参数 INTERVAL ‘1’ DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。
Over Windows
Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。
- 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
- Over 聚合:能够保留原始字段
Over 聚合的语法:
1 | SELECT |
ORDER BY:必须是时间戳列(事件时间、处理时间)
PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。
E06 Flink 中的水印操作
流处理中的乱序
当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。

watermark解决乱序
不以事件时间作为触发计算的条件,而是根据Watermark判断是否触发。
当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:
基于SQL的水印
实现场景:
使用Socket模拟接收数据
设置WaterMark,设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 -- 创建映射表
CREATE TABLE MyTable (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
-- 设置滚动窗口进行聚合计算
SELECT
TUMBLE_START(ts, INTERVAL '5' SECOND) AS window_start,
TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,
TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND) as window_rowtime,
item,count(item) as total_item
FROM MyTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
数据有序的场景
测试数据:
1 | hello,2022-03-25 16:39:45 |
数据无序的场景
测试数据:
1 | hello,2022-03-25 16:39:45 |
设置迟到时间:
1 | drop table MyTable; |
基于DataStream的水印
水印策略设置
WatermarkStrategy 可以在 Flink 应用程序中的两处使用:
- 第一种是直接在数据源上使用
- 第二种是直接在非数据源的操作之后使用
第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy意味着你必须使用特定数据源接口,例如与kafka链接,使用kafka Connerctor。
仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy)
- 直接在数据源中使用(比如kafka)
1 | final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
- 直接在非数据源的操作之后使用
1 | final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
使用去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。WatermarkStrategy。
水印策略案例
单调递增生成水印
周期性 watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。
1 | WatermarkStrategy.forMonotonousTimestamps(); |
这个也就是相当于上述的延迟策略去掉了延迟时间,以event中的时间戳充当了水印。
在程序中可以这样使用:
1 | DataStream dataStream = ...... ; |
案例演示:
对有序的数据流添加水印,底层调用的是固定延迟生成水印,只是传递的水印等待时间是0,意味着不考虑乱序问题
使用单点递增水印,解决的是数据有序的场景需求:从socket接受数据,进行转换,然后应用窗口,每隔5s生成一个窗口(非系统时间驱动窗口计算,数据中携带的事件时间),使用水印时间触发窗口计算
eventTime一定是一个毫秒值的时间戳,否则无法参与计算
数据样本:
sensor_1,1641783600000,35 -> 2022-01-10 11:00:00
sensor_2,1641783601000,10 -> 2022-01-10 11:00:01
sensor_2,1641783602000,20 -> 2022-01-10 11:00:02
sensor_2,1641783603000,30 -> 2022-01-10 11:00:03
sensor_2,1641783610000,40 -> 2022-01-10 11:00:10
代码示例:
1 |

单调递增的水印策略实际上调用的是固定延迟的水印,传参数为0
固定延迟生成水印
通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间。使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。
1 | WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness) |
我们实现一个延迟3秒的固定延迟水印:
1 | DataStream dataStream = ...... ; |
案例演示:
使用固定延迟生成水印,解决数据乱序问题
从socket接受数据,进行转换,设置2s的延迟时间,然后应用10s滚动窗口,添加水印触发窗口计算
数据样本:
sensor_1,1641783600000,35 -> 2022-01-10 11:00:00
sensor_2,1641783601000,10 -> 2022-01-10 11:00:01
sensor_2,1641783602000,20 -> 2022-01-10 11:00:02
sensor_2,1641783603000,30 -> 2022-01-10 11:00:03
sensor_2,1641783610000,40 -> 2022-01-10 11:00:10 –> 不会触发窗口计算,因为水印乱序2s,事件时间是10s,水印时间是8s,不能满足窗口的endTimesensor_2,1641783612000,40 -> 2022-01-10 11:00:12 –> 满足了窗口的结束时间,触发了窗口计算
一旦窗口被触发完成计算,属于这个窗口的数据再次到达数据就默认丢失掉!
代码示例:
1 |
他的底层是使用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。重写实现了两个方法:
onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游。
onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:
1 env.getConfig().setAutoWatermarkInterval(5000L);


多并行度设置水印
两个基本原则:
- 一对多:进行广播,所有的下游并行度watermark一致
- 多对一:取最小值,下游watermark取上游所有并行度watermark的最小值
处理空闲数据案例
在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。
所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。
当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
通过下面的代码来实现对于空闲数据流的处理
1 | WatermarkStrategy |
长期延迟数据处理
水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。
水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口。这个等待不能一直等,因为会一直缓着数据不计算。一般水印也就是几秒钟最多几分钟而已。
这个场景的解决方式就是:
延迟数据处理机制(allowedLateness方法)
- 水印: 乱序数据处理(时间很短的延迟)
- 延迟处理:长期延迟数据的处理机制
主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据:
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
给窗口设置一个延迟关闭的时间=EventTime-水印策略的固定延迟-允许延迟的时间
可以减少计算的压力降低延迟
保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存
获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取
数据乱序的场景
Kafka中的数据是无序(多分区的情况下),如果使用flink连接Kafka读出来数据肯定是乱序的
如果想让Kafka中的数据有序,可以设置一个分区
watermark是用来解决乱序的问题(一般延迟时间在秒级)
可以设置延迟时间(延迟窗口关闭时间,一般也就最多分钟级)
长期迟到的数据可以保存后续处理(一般分钟级)
E07 Flink 中的容错机制
Checkpoint 检查点
为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

Checkpoint实现过程
Flink 的数据可以粗略分为以下三类:
第一种是元信息,相当于一个 Flink 作业运行起来所需要的最小信息集合,包括比如 Checkpoint 地址、Job Manager、Dispatcher、Resource Manager 等等,这些信息的容错是由 Kubernetes/Zookeeper 等系统的高可用性来保障的,不在我们讨论的容错范围内。
Flink 作业运行起来以后,会从数据源读取数据写到 Sink 里,中间流过的数据称为处理的中间数据 Inflight Data (第二类)。
对于有状态的算子比如聚合算子,处理完输入数据会产生算子状态数据 (第三类)。
Flink 会周期性地对所有算子的状态数据做快照,上传到持久稳定的海量存储中 (Durable Bulk Store),这个过程就是做 Checkpoint。Flink 作业发生错误时,会回滚到过去的一个快照检查点 Checkpoint 恢复。
Checkpointing 的流程分为以下几步:
第一步:

第二步:

第三步:

第四步:

Checkpoint参数配置
在flink-conf.yaml中配置:
1 | #开启checkpoint 每5000ms 一次 |
State 状态后端
Flink 提供了不同的状态后端,用于指定状态的存储方式和位置。
默认情况下,flink的状态会保存在taskmanager的内存中,⽽checkpoint会保存在jobManager的内存中。
Flink 提供了三种可用的状态后端用于在不同情况下进行状态的保存:
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
MemoryStateBackend
MemoryStateBackend内部将状态(state)数据作为对象保存在java堆内存中(taskManager),通过checkpoint机制,MemoryStateBackend将状态(state)进⾏快照并保存Jobmanager(master)的堆内存中。

使用 MemoryStateBackend 时的注意点:
默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。
状态大小受到 akka 帧大小的限制,所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。
状态的总大小不能超过 JobManager 的内存。
何时使用 MemoryStateBackend:
本地开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。
MemoryStateBackend 最适合小状态的应用场景。例如 Kafka Consumer,或者一次仅一记录的函数 (Map, FlatMap,或 Filter)。
全局配置 flink-conf.yaml
1 | state.backend: hashmap |
FsStateBackend
该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。

FsStateBackend适用的场景:
具有大状态,长窗口,大键 / 值状态的作业。
所有高可用性设置。
分布式文件持久化,每次读写都会产生网络IO,整体性能不佳
全局配置 flink-conf.yaml:
1 | state.backend: hashmap |
RocksDBStateBackend
RocksDB 是一种嵌入式的本地数据库
RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将****增量****的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。

何时使用 RocksDBStateBackend:
RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
RocksDBStateBackend 非常适合用于高可用方案。
RocksDBStateBackend 是目前
唯一支持增量 checkpoint的后端。增量 checkpoint 非常适用于超大状态的场景。
全局配置 flink-conf.yaml:
1 | state.backend: rocksdb |
任务重启策略
重启策略类型
Flink支持的重启策略类型如下:
- none, off, disable:无重启策略,作业遇到问题直接失败,不会重启。
- fixeddelay, fixed-delay:固定延迟重启策略,作业失败后,延迟一定时间重启。但是有最大重启次数限制,超过这个限制后作业失败,不再重启。
- failurerate, failure-rate:失败率重启策略,作业失败后,延迟一定时间重启。但是有最大失败率限制。如果一定时间内作业失败次数超过配置值,则标记为真的失败,不再重启。
- exponentialdelay, exponential-delay:作业失败后重启延迟时间随着失败次数指数递增。没有最大重启次数限制,无限尝试重启作业。
注意:如果启用了checkpoint并且没有显式配置重启策略,会默认使用fixeddelay策略,最大重试次数为Integer.MAX_VALUE。
全局配置
全局配置影响Flink提交的所有作业的。修改全局配置需要编辑flink-conf.yaml文件。
1 | no restart |
端到端一致性
kafka->source->transformation->sink->kafka
端到端的一致性语义有三部分(读取数据,处理数据,输出数据),取三部分最小值为端到端的一致性
- 最多一次:保证数据不重复
- 至少一次:保证数据不丢失
- 精确一次:保证数据不丢不重
flink怎么实现端到端的精确一致性语义?
端到端的保障指的是在整个数据处理管道上结果都是正确的。在每个组件都提供自身的保障情况下,整个处理管道上端到端的保障会受制于保障最弱的那个组件。
- 内部:Checkpoints机制,在发生故障的时候能够恢复各个环节的数据。
- Source:数据读取之后还是存在,可设置数据读取的偏移量,当发生故障的时候重置偏移量到故障之前的位置。
- Sink:从故障恢复时,数据不会重复写入外部系统,需要支持幂等写或事务写。
两阶段提交实现Sink一致性
F08 Flink SQL常用语法
DDL: Create 子句
DML: With 子句
DML: WHERE 子句
DML: DISTINCT 子句
DML: 窗口聚合
DML: Group 聚合
DML: Joins 语法
Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:
动态表(流)与动态表(流)的 Join
动态表(流)与外部维表(比如 Redis)的 Join
动态表字段的列转行(一种特殊的 Join)
细分 Flink SQL 支持的 Join:
- Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join-数据会一直保存在内存是个大state计算
- Interval Join:流与流的 Join,两条流一段时间区间内的 Join-数据不会一直保存在state中,随着watermark推进,数据会fpad
- Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join
- Lookup Join:流与外部维表的 Join
- Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行
- Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join
DML: 集合操作
DML: TopN
F09 Flink SQL 自定义函数
SQL 函数的归类
Flink 中的函数有两个维度的归类标准。
一个归类标准是:系统(内置)函数和 Catalog 函数。系统函数没有命名空间,只能通过其名称来进行引用。Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库的命名空间。用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数来对 Catalog 函数进行引用。
另一个归类标准是:临时函数和持久化函数。临时函数由用户创建,它仅在会话的生命周期(也就是一个 Flink 任务的一次运行生命周期内)内有效。持久化函数不是由系统提供的,是存储在 Catalog 中,它在不同会话的生命周期内都有效。
这两个维度归类标准组合下,Flink SQL 总共提供了 4 种函数:
临时性系统内置函数
系统内置函数
临时性 Catalog 函数(例如:Create Temporary Function)
Catalog 函数(例如:Create Function)
请注意,在用户使用函数时,系统函数始终优先于 Catalog 函数解析,临时函数始终优先于持久化函数解析。
SQL 自定义函数
当前 Flink 提供了一下几种 UDF 能力:
标量函数(Scalar functions 或 UDAF):输入一条输出一条,将标量值转换成一个新标量值,对标 Hive 中的 UDF;
表值函数(Table functions 或 UDTF):输入一条条输出多条,对标 Hive 中的 UDTF;
聚合函数(Aggregate functions 或 UDAF):输入多条输出一条,对标 Hive 中的 UDAF;
表值聚合函数(Table aggregate functions 或 UDTAF):仅仅支持 Table API,不支持 SQL API,其可以将多行转为多行;
异步表值函数(Async table functions):这是一种特殊的 UDF,支持异步查询外部数据系统,用在前文介绍到的 lookup join 中作为查询外部系统的函数。




