Flink事件时间和水印

tammypi tammypi
2019-01-14 10:50
787
0

 

1.概览

处理时间/系统时间/进入时间

Flink在Streaming程序中,提供了不同的时间概念。

处理时间:处理时间代表着在执行相应操作时的系统时间。

当一个Streaming程序以处理时间运行时,所有基于时间的操作将使用执行相应操作时的系统时间。一个小时级别处理时间的时间窗口,将会包含在系统时钟指示整小时时,到达该特定操作的所有记录。例如,一个程序上午9:15开始运行,第一个小时级处理时间窗口将会包含9:15am到10:00am的所有记录,下一个时间窗口将会包含10:00am到11:00am的所有记录,以此类推。

处理时间是最简单的时间概念,不需要流和主机之间的协调。它提供了最好的处理性能,以及最低的延迟。然而,在分布式和异步环境中,处理时间不能提供确定性。因为,它易受记录到达系统的速度影响(例如消息队列),并且易受记录流入流出系统时的操作速度以及中断(调度,或者其他)的影响。

事件时间:事件时间是每一条记录在其生产主机上生成的时间。这个时间在记录进入Flink前,就已经包含在该记录里了,并且该事件时间戳可以从记录里提取出来。在事件时间中,处理的进度来源于数据,而非来源于任何时钟。在事件时间程序中,需要明确指定如何生成事件时间水印,(事件时间水印)是一种在事件时间里表示处理进展的一种机制。水印机制将在后续的小节里进行描述。

在理想情况下,不论事件以什么时间、何种顺序到达,事件都将产生完全一致并且确定的结果。然而,除非事件是按顺序到达(或者按时间戳),否则事件时间将会在等待乱序事件时出现延迟。由于只能等待有限的时间,这就限制了事件时间程序的确定性。

假设所有数据已经到达了,即使其中有乱序的数据,或者被重新处理的历史数据,事件时间操作也会按预期执行,产生正确的且完全一致的结果。例如,一个小时级别的事件时间处理窗口,将会包含事件时间戳在这个窗口内的所有记录,不论它们以何种顺序到达,或者它们何时被处理。(看晚到事件的事件的小节以便了解更多信息)

注意一些事件时间程序在处理实时数据时,也会使用一些基于处理时间操作,以便确保它们处理数据的及时性。

进入时间:进入时间是一个事件进入Flink的时间。在源操作里(source),每一条记录都会得到源的当前时间作为一个时间戳,并且基于时间的操作(例如时间窗口)将会使用这个时间戳。

从概念上讲,进入时间位于处理时间和事件时间之间。与处理时间相比,它更昂贵一些(slightly more expensive)并且提供了更可预期的结果。由于进入时间使用不变的时间戳(在源中赋值一次),操作记录的不同时间窗口用的是相同的时间戳,不论不同的时间窗口赋给记录的操作时间是否不同(基于本地系统时间,或者有任何传输延迟)。

和事件时间相比,进入时间无法处理任何乱序的数据或者晚到的数据。不过使用进入时间,程序无需指定如何生成水印。

在内部,进入时间和事件时间十分相似,只是有自动的时间戳赋值以及有自动的水印生成机制。

设置一个时间特性

Flink DataStream程序的第一步通常是设置一个基本的时间特性。这个设置确定了Data Stream源的行为方式(例如,是否需要赋一个时间戳),以及窗口操作如KeyedStream.timeWindow(Time.seconds(30))将要使用哪种概念的时间。

下面的例子展示了如何在小时级别的时间窗口里聚合事件。窗口的行为与时间特性相适应。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream stream = env.addSource(new FlinkKafkaConsumer09(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);

注意,如果要使用事件时间的话,一种方式是在源里直接指定事件时间并且生成水印,另一种方式是在程序里使用Timestamp Assigner & Watermark Generator。这些方法描述了如何访问事件时间戳,以及流的乱序程度。

下面的小节描述了时间戳和水印的通用机制。如果想要看如何在Flink DataStream API使用时间戳以及水印,可以参考“生成时间戳及水印”小节。

事件时间和水印

注意:Flink从数据流模型中实现了许多技术。为了更好的介绍事件时间和水印,可以看一下下面的文章:

Streaming 101: The world beyond batch

The Dataflow Model paper

一个支持事件时间的处理器需要一种评估事件时间进度的方式。例如,一个小时级别时间窗口的操作,需要在事件时间大于这个小时的结束时间时得到通知,这样这个操作就可以关闭正在进行的窗口。

事件时间可以独立于操作时间(由时钟评估)被处理。例如,在一个程序里在被相同的速度处理时,事件时间可以稍微落后于处理时间(由于接收数据的延迟)。另一方面,通过快速转发Kafka主题(或者其他消息队列)中已经缓存的历史数据,另一个流程序可能只需要几秒钟的处理,就可以处理数周的事件时间。

Flink使用水印来评估事件时间的处理进度。水印是数据流的一部分,并且带有一个时间戳t。一个水印t代表流中的事件时间已经到达t,代表不会再有<=t时刻的数据。

下图展示了基于事件时间和内联水印的流。在这个例子中事件是有序的,这就意味着水印只是流的周期性标记。

水印对于乱序的流是十分关键的,如下面解释的那样,事件并不如它们的时间那样有序。一般来说,水印是一种声明,在流中的那个点上,直到某个时间戳的所有事件都应该已经到达。一旦一个水印到达一个操作,这个操作将会使用水印更新它的内部事件时钟。

并行流的水印

水印是在源函数处或之后直接生成的。源函数的每个并行子任务通常独立生成其水印。这些水印定义了特定并行源处的事件时间。

当水印流经流程序时,它们会更新它们所到达的操作的事件时间。每当一个操作更新它的事件时间时,它会同时在下游为它的后续操作符生成一个新的水印。

一些操作使用多个输入流;例如,一个union,或keyBy(…)或partition(…)函数后面的操作。这样一个操作的当前事件时间是其输入流的事件时间的最小值。当输入流更新它们的事件时间时,操作也会更新(它的事件时间)。

下图显示了一个事件和水印流经并行流,以及跟踪事件时间的操作的示例。

晚到元素

某些元素可能会违反水印条件,这意味着即使在水印(t)发生之后,还会出现更多带有时间戳t ' <= t的元素。事实上,在许多实际的设置中,某些元素可以被任意延迟,因此指定某个事件时间戳的所有元素发生的时间是不可能的。此外,即使延迟是有界的,将水印延迟太多通常也是不可取的,因为它会在事件时间窗的评估中造成太多的延迟。

因此,流程序可能会显式地预期一些延迟元素。延迟元素是在系统的事件时间时钟(如水印所示)已经超过延迟元素的时间戳的时间之后到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参见“允许延迟”。

调试水印

关于如何在运行时调试水印,请参阅调试窗口和事件时间一节。

2.允许晚到

在用事件时间时,会出现元素晚到的情况,例如,Flink用于跟踪事件时间进度的水印已经超过了这个元素的事件时间所属的时间窗口。

默认的,如果一个元素晚到了--水印大于窗口结束时间时,该元素会被丢弃。但是,Flink允许给窗口操作指定允许的最大延迟。允许的最大延迟定义了,在元素被丢弃之前可以有多长时间的元素晚到,并且它的默认值是0。在水印通过窗口末尾之后到达的元素,加上允许的延迟,仍然被添加到窗口中。根据所使用的触发器,延迟但未删除的元素可能导致窗口再次触发。这是EventTimeTrigger的情况。

为了使其工作,Flink将保持窗口的状态,直到允许的延迟过期。一旦发生这种情况,Flink将删除窗口并删除其状态,正如窗口生命周期部分中所描述的那样。

你可以如下这样定义允许的最大延迟:

DataStream input = ...;

input
    .keyBy()
    .window()
    .allowedLateness()
    .();

关于晚到元素的一个例子,例如3条数据,事件时间分别为14,10,12,以乱序到达Flink。Flink使用3的间隔建立时间窗口,水印=事件时间-1。

窗口数据范围及结束时间如下:

[0,3) --结束时间3
[3,6) --结束时间6
[6,9) --结束时间9
[9,12) --结束时间12
[12,15) --结束时间15

14的到来,使得水印的值变为14-1=13。

当10到来时,由于水印已经比它所属的时间窗口[9,12)的结束时间大了,所以该数据会被丢弃。

当12到来时,由于它小于水印,且水印不大于它所属时间窗口的结束时间,所以会被处理。

如果需要10到达时,可以正确被处理,就需要设置allowLateness()来允许晚到元素到达时再次触发窗口。

 

3.使用事件时间和水印

这一小节是关于使用事件时间。为了使用事件时间,需要相应的指定时间特性。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

时间戳赋值

为了使用事件时间,Flink需要知道事件的时间戳,这就意味着流中的每一个元素需要事件时间戳赋值。通常通过访问元素的某个属性来获取事件时间。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进度。

有两种方法,来给事件时间戳赋值以及生成水印:

  1. 直接在源中
  2. 通过timestamp assigner / watermark generator,在Flink中timestamp assigners也定义水印的生成

有时间戳和水印的源方法

流的源通常在它们生产元素时给时间戳赋值,并且生成水印。一旦这个完成了,就不再需要使用timestamp assigner了。注意,如果一个timestamp assigner被使用的话,那么任何源提供的时间戳和水印都将会被覆盖。

为了在源中直接给一个源赋值时间戳,这个源必须在SourceContext中使用collectWithTimestamp(...)方法。为了生成水印,这个源必须请求emitWatermark(Watermark)方法。

下面是一个在源中赋值时间戳及生成水印的示例:

@Override
public void run(SourceContext ctx) throws Exception {
	while (/* condition */) {
		MyType next = getNext();
		ctx.collectWithTimestamp(next, next.getEventTimestamp());

		if (next.hasWatermarkTime()) {
			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
		}
	}
}

Timestamp Assigners / Watermark Generators

Timestamp assigners操作一个流,并且生成含有时间戳和水印的流。如果原始流中已经有时间戳和水印了,那么timestamp assigner会覆盖它们。

Timestamp assigners通常在数据流之后被定义,但是并不严格要求这样做。例如,常见的模式是在时间戳分配程序之前进行parse(MapFunction)和filter(FilterFunction)。在任何情况下,都需要在事件时间上的第一个操作(例如第一个窗口操作)之前指定时间戳分配程序。作为一种特殊情况,当使用Kafka作为流作业的源时,Flink允许在源(或使用者)本身内部指定一个时间戳分配器/水印发射器。关于如何这样做的更多信息,可以在Kafka连接器文档中找到。

注意:本节的其余部分将介绍程序员必须实现的主要接口,以便创建自己的时间戳提取器/水印发射器。要查看附带Flink的预实现提取器,请参阅预定义的时间戳提取器/水印发射器页面。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

使用周期性水印

AssignerWithPeriodicWatermarks周期性的产生时间戳和水印(可能基于流元素,或者简单的基于处理时间)。

产生水印的间隔可以通过ExecutionConfig.setAutoWatermarkInterval(...)指定。这个生成器的getCurrentWatermark() 会被周期性请求,如果该方法返回的水印为非空且大于前一个水印,则会发出新的水印。

这里我们展示了两个使用周期性水印生成的时间戳分配器的简单示例。请注意,Flink附带了一个BoundedOutOfOrdernessTimestampExtractor,类似于下面所示的BoundedOutOfOrdernessGenerator,您可以在这里阅读相关内容。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks {

	private final long maxTimeLag = 5000; // 5 seconds

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark getCurrentWatermark() {
		// return the watermark as current time minus the maximum time lag
		return new Watermark(System.currentTimeMillis() - maxTimeLag);
	}
}

间断性的水印

若要在某个事件指示可能生成新水印时生成水印,使用AssignerWithPunctuatedWatermarks。对于这个类,Flink首先调用extractTimestamp(…)方法为元素分配一个时间戳,然后立即调用该元素上的checkAndGetNextWatermark(…)方法。

checkAndGetNextWatermark(…)方法将传递在extractTimestamp(…)方法中分配的时间戳,并可以决定是否要生成水印。每当checkAndGetNextWatermark(…)方法返回一个非空水印,并且该水印大于最新的前一个水印时,就会发出新的水印。

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks {

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
	}
}

每个Kafka分区的时间戳

在使用Apache Kafka作为数据源时,每个Kafka分区也许有一个时间模式(递增时间,或者有界无序时间)。然而,在消费Kafka的数据时,多个分区被并行消费,多个分区的数据交叉并且破坏了单个分区的模式。

在这种情况下,您可以使用Flink支持kafka分区的水印生成。使用该特性,每个Kafka分区在Kafka使用者内部生成水印,每个分区的水印合并的方式与流shuffle时合并水印的方式相同。

例如,如果事件时间戳严格按照Kafka分区升序排列,那么使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。

下面的插图展示了如何使用每个kafka分区生成水印,以及在这种情况下水印如何通过流数据流传播。

FlinkKafkaConsumer09 kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream stream = env.addSource(kafkaSource);

4.预定义时间提取和水印生成

正如时间戳和水印处理中所描述的,Flink提供了抽象,允许程序员分配自己的时间戳并发出自己的水印。更具体地说,可以通过实现AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口之一来实现,具体方法取决于用例。简而言之,第一个会周期性地发出水印,而第二种方法则基于传入记录的某些属性,例如每当流中遇到特殊元素时。

为了进一步简化此类任务的编程工作,Flink提供了一些预先实现的时间戳分配器。本节提供了它们的列表。除了它们的开箱即用功能之外,它们的实现还可以作为定制实现的示例。

递增时间戳分配器

对于周期性水印生成,最简单的特殊情况是给定源的时间戳按升序出现。在这种情况下,当前时间戳始终可以充当水印,因为不会有更早的时间戳到达。

注意,只需要将每个并行数据源任务的时间戳升序。例如,如果在特定的设置中,一个Kafka分区被一个并行数据源实例读取,那么只需要在每个Kafka分区中提升时间戳。Flink的水印合并机制将在并行流被打乱、联合、连接或合并时生成正确的水印。

DataStream stream = ...

DataStream withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {

        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

允许一定数量延迟的分配器

周期性水印生成的另一个例子是,当水印滞后于流中看到的最大(事件时间)时间戳一段固定时间时。这种情况涵盖了流中可能遇到的最大延迟是预先知道的场景,例如,当创建一个包含在固定时间内散布时间戳以进行测试的元素的自定义源时。对于这些情况,Flink提供了BoundedOutOfOrdernessTimestampExtractor,它以maxOutOfOrderness为参数,即在计算给定窗口的最终结果时,元素在被忽略之前允许迟到的最大时间。延迟对应于t- t_w(注意:总感觉这里写错了,应该是t_w-t)的结果,其中t是元素的(事件时间)时间戳,t_w是前一个水印的时间戳。如果延迟为> 0,则该元素被认为是延迟的,在为其对应的窗口计算作业的结果时,默认情况下将被忽略。有关使用延迟元素的更多信息,请参阅有关允许延迟的文档(第二节,允许晚到)。

DataStream stream = ...

DataStream withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

 

使用事件时间和水印需要注意的关键之处:

1.指定使用事件时间

在stream里指定使用事件时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

在自定义的事件时间生成器里指定采用数据的哪个字段作为事件时间:

public class TimestampExtractor implements AssignerWithPeriodicWatermarks,Serializable {
   ...
 
   @Override
   public long extractTimestamp(SuperLog superLog, long l) {
      long timestamp = superLog.getTimestamp()*1000L;
      if(superLog.getTimestamp().equals(-1)) {
         timestamp = superLog.getStart_time()*1000L;
      }
      ...
      return timestamp;
   }
}

如上的含义是,如果timestamp字段没有赋值的话,就用start_time字段作为事件时间;如果timestamp有赋值的话,就用timestamp作为事件时间。

乘以1000L的意思是,为了将秒级时间戳变为毫秒级时间戳。Flink要求时间是毫秒级时间戳。

 

2.水印使用的几个关键点

  • 水印用于标记位于水印时间之前的数据都可以直接输出了(水印的含义,时间小于或者等于水印的数据都已经到达了)
  • 水印处理乱序问题的原理:如果一个数据的时间大于水印,那么它会被缓存起来;假设有2条数据,过来的顺序为time2,time1,其中time2>time1,与它们本身过来的顺序正好相反,此时水印时间小于time2和time1,则这两个数据不会输出。当下一次水印更新到大于time2时,乱序的time2和time1可以正确顺序time1,time2被处理和输出。
  • 虽然说大量的文章推荐 水印=最大事件时间-可容忍的延迟时间,但是如果不断的发送同一时间的数据时,由于水印不会更新,则flink会以为没有数据要处理。所以为了让Flink可以正确处理多次同一时间数据:
  1. 周期性的更新水印,如1秒钟一次:
    //定时更新水印
    ExecutionConfig executionConfig = env.getConfig();
    executionConfig.setAutoWatermarkInterval(1000L);

     

  2. 在生成水印的代码里记录上一次的水印时间
  3. 水印时间=当前最大事件时间-可容忍的延迟时间。如果水印时间=上一次的水印时间(可能有同一时间的数据进入,或者没有数据进入),令最大事件时间=最大事件事件+1s,这样也保证了水印时间向后推1s(注意不要大于系统时间);如果水印时间!=上一次的水印时间,则不用做多余的处理。
    package com.ngengine.preprocess;
    import com.ngengine.models.SuperLog;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import javax.annotation.Nullable;
    import java.io.Serializable;
    /**
     * Created by dell on 2018/12/20.
     */
    public class TimestampExtractor implements AssignerWithPeriodicWatermarks,Serializable {
       private long currentMaxTimestamp;
       //最大容忍延迟
       private long maxOutOfOrderness = 10000L;
       //记录前一次水印时间
       private long preWaterMark = 0L;
     
       @Nullable
       @Override
       public Watermark getCurrentWatermark() {
          long currentWaterMark = 0L;
          currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
          if(preWaterMark > 0L && currentWaterMark == preWaterMark){
             //最大时间向后移动1s
             currentMaxTimestamp = currentMaxTimestamp + 1000L;
             //但最大时间不能大于系统时间
             currentMaxTimestamp = Math.min(System.currentTimeMillis(), currentMaxTimestamp);
             currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
          }
          preWaterMark = currentWaterMark;
          return new Watermark(currentWaterMark);
       }
     
       @Override
       public long extractTimestamp(SuperLog superLog, long l) {
          long timestamp = superLog.getTimestamp()*1000L;
          if(superLog.getTimestamp().equals(-1)) {
             timestamp = superLog.getStart_time()*1000L;
          }
          currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
          return timestamp;
       }
    }

     

发表评论

验证码: