【Flink】watermark的基本原理和简单案例 原创

601-赵同学

发表文章数:191

首页 » 大数据 » 正文


好文分享

关于watermark的解读,有两篇文章对我来说都比较有用:
一、《Flink Event Time Processing and Watermarks》
二、《[白话解析] Flink的Watermark机制》

当然,官方文档永远都属于最有用的那一档:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time


watermark简单介绍

概述

关于watermark的简单定义,官网给出了这么一句话:The mechanism in Flink to measure progress in event time is watermarks (在Flink中用来衡量event time进程的机制就是watermark)。
watermark本质是一个时间戳,在实际工作环境中,消息并不能完全按照event time的顺序有序发送,有的时候会发生乱序或者延迟的情况,watermark就是用来处理这种情况的。
可以理解为,watermark就是告诉Flink一个消息延迟是多少,定义在什么时候就不再等待更早的数据。watermark翻译为 “水位线” 更准确,它是数据流的隐藏属性,作为数据流的一部分随数据流动。当Flink的运算符接收到watermark时,它就知道早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。
当Flink接收到每一条数据时,都会产生一条Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime – 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于Watermark是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。


简单案例

watermark && 滚动窗口 | 滑动窗口 | 会话窗口

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object WatermarkDemo {
  def main(args: Array[String]): Unit = {
    /**
     * 1.获取执行环境
     * 2.指定数据按照event time进行处理数据
     * 3.获取数据源
     * 4.对数据进行水印处理
     */
    // 1.获取执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.指定数据按照event time进行处理数据
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 3.获取数据源
    val socketStream: DataStream[String] = env.socketTextStream("node01", 9999)

    // 4.对数据进行水印处理
    // 既设置了水印,又指定了延迟时间,还抽取了event time
    val watermarkedStream: DataStream[String] = socketStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
        // 抽取时间,在输入字符串中抽取event time,每20ms执行一次
        override def extractTimestamp(t: String): Long = {
          val eventTime: Long = t.split(" ")(0).toLong
          eventTime
        }
      }
    )
    // 5.统计单词出现的次数
    import org.apache.flink.api.scala._
    // 5.1 对每个出现的单词标1,组成一个对偶数组
    val wordAndOne: DataStream[(String, Int)] = watermarkedStream.map(x => (x.split(" ")(1), 1))

    // 5.2 根据单词进行分组
    val keyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)

    // 5.3 引入窗口
    // 5.3.1 滚动窗口
    val tumblingWindowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    // 5.3.2 滑动窗口
    val slidingWindowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))
    // 5.3.3 会话窗口
    val sessionWindowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))

    // 5.4 进行聚合操作
    val reduceStream: DataStream[(String, Int)] = tumblingWindowedStream.reduce((x1, x2) => {
      (x1._1, x1._2 + x2._2)
    })

    // 6.打印数据
    reduceStream.print()

    env.execute("WatermarkDemo")
  }

}

未经允许不得转载:作者:601-赵同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《【Flink】watermark的基本原理和简单案例 原创》 发布于2020-11-06

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录