飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

Flink 实践教程-进阶(5):排序(乱序调整)

时间:2021-12-29  作者:TXcloudbigdata  
配合使用 Windowing TVF 调整乱序数据

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 Windowing TVF 配合聚合函数,实时调整乱序数据,经过聚合分析后存入 MySQL 中。

操作视频

前置准备
创建流计算 Oceanus 集群
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。

创建 Topic:
进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备:
进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。

启动 Kafka 生产者命令

bash kafka-console-域名 --broker-list 10.域名:9092 --topic oceanus_advanced5_input --域名ig ../config/域名erties
1
2
// 按顺序插入如下数据,注意这里数据时间是乱序的
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:16"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:30"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:59"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:43"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:09"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:01"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:15"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:50"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:31:15"}
1
2
3
4
5
6
7
8
9
10
11
12
创建 MySQL 实例
进入 MySQL 控制台 [7],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [8]。

-- 建表语句
CREATE TABLE oceanus_advanced5_output (
window_start datetime NOT NULL,
window_end datetime NOT NULL,
num int(11) DEFAULT NULL,
PRIMARY KEY (window_start,window_end)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1
2
3
4
5
6
7
流计算 Oceanus 作业

  1. 创建 Source
    CREATE TABLE kafka_json_source_table (
    order_id VARCHAR,
    num INT,
    event_time TIMESTAMP(3),
    -- 根据事件时间 event_time 设置 10s 的延迟水印
    WATERMARK FOR event_time AS event_time - INTERVAL \'10\' SECOND
    ) WITH (
    \'connector\' = \'kafka\',
    \'topic\' = \'oceanus_advanced5_input\', -- 替换为您要消费的 Topic
    \'域名\' = \'latest-offset\', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    \'域名ers\' = \'10.域名:9092\', -- 替换为您的 Kafka 连接地址
    \'域名\' = \'testGroup\', -- 必选参数, 一定要指定 Group ID
    \'format\' = \'json\',
    \'域名-on-missing-field\' = \'false\', -- 如果设置为 false, 则遇到缺失字段不会报错。
    \'域名re-parse-errors\' = \'true\' -- 如果设置为 true,则忽略任何解析报错。
    );
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
  2. 创建 Sink
    CREATE TABLE jdbc_upsert_sink_table (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    num INT,
    PRIMARY KEY(window_start,window_end) NOT ENFORCED
    ) WITH (
    \'connector\' = \'jdbc\',
    \'url\' = \'jdbc:mysql://10.域名:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai\', -- 请替换为您的实际 MySQL 连接参数
    \'table-name\' = \'oceanus_advanced5_output\', -- 需要写入的数据表
    \'username\' = \'root\', -- 数据库访问的用户名(需要提供 INSERT 权限)
    \'password\' = \'Tencent123$\', -- 数据库访问的密码
    \'域名er-域名-rows\' = \'200\', -- 批量输出的条数
    \'域名er-域名rval\' = \'2s\' -- 批量输出的间隔
    );
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  3. 编写业务 SQL
    INSERT INTO jdbc_upsert_sink_table
    SELECT
    window_start,window_end,SUM(num) AS num
    FROM TABLE(
    -- Windowing TVF
    TUMBLE(TABLE kafka_json_source_table,DESCRIPTOR(event_time),INTERVAL \'1\' MINUTES)
    ) GROUP BY window_start,window_end;
    1
    2
    3
    4
    5
    6
    7
  4. 查询数据
    进入 MySQL 控制台 [7],单击右侧【登陆】快速登陆数据库,选择相应的库表查询数据。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JKvqsDZ5-1640712788701)(MySQL数据查询.png)]

笔者这里设置的 10s 的延迟水印,可以看到在 2930、3031时间段的数据统计是正确,并没有因为数据延时而出现漏统计的现象。31~32时间段的数据并没有统计出来,这是因为我们最后一条数据时间是2021-12-22 14:31:15,其水印时间为2021-12-22 14:31:05,小于窗口关闭时间,导致这段时间窗口还未关闭、未计算。

总结
WARTERMARK是跟随在每条数据上的一条特殊标签,而且只增不减(可以相等)。WARTERMARK并不能影响数据出现在哪个窗口(本例中由event_time决定),其主要决定窗口是否关闭(当水印时间大于窗口结束时间时,窗口关闭并计算)。
如果数据延时过大,例如小时级别,可以配合allowedLateness算子合理性使用WARTERMARK,当达到水印结束时间时,窗口并不关闭,只进行计算操作,当时间到达allowedLateness算子设置的时间后,窗口才真正关闭,并在原先的基础上再次进行计算。如在allowedLateness算子设置的时间后才达到的数据,我们可以使用sideOutputLateData算子将迟到的数据输出到侧输出流进行计算。这里需要注意allowedLateness和sideOutputLateData算子目前只能使用 Stream API 实现。
目前 flink 域名 的 Windowing TVF 函数并不能单独使用,需配合AGGREGATE、JOIN、TOPN使用。建议优先使用 Windowing TVF 实现窗口聚合等功能,因为 Windowing TVF 更符合 SQL 书写规范,底层优化逻辑也更好。
参考链接
[1] Oceanus 控制台:https://域名域名/oceanus/overview
[2] 创建独享集群:https://域名/document/product/849/48298
[3] CKafka 控制台:https://域名域名/ckafka/index?rid=1
[4] CKafka 创建实例:https://域名/document/product/597/54839
[5] Ckafka 创建 Topic:https://域名/document/product/597/54854
[6] 运行 Kafka 客户端:https://域名/document/product/597/56840
[7] MySQL 控制台:https://域名域名/cdb
[8] 创建 MySQL 实例:https://域名/document/product/236/46433

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。