flink-siddhi、事件时间和时序规则

tammypi tammypi
2019-03-25 12:54
352
0

在很多情况下,我们需要使用时序规则来发现威胁事件,例如永恒之蓝的一个时序关系如下。对于一对sip,dip:

SMB尝试登录->ExternalBlue相关漏洞利用-->SMB登录成功

则我们认为触发了永恒之蓝事件。

 

为了实现这样一个功能,我们可以使用flink这样一个流引擎来实时的处理数据(不再像Spark那样硬性的切分批窗口)。而实现时序,我们既可以使用flink自己的CEP算子,也可以集成比较成熟的规则引擎,例如Siddhi。Siddhi是一个非常强大的CEP引擎,支持用类SQL的语法来定义时序规则,对于用户非常友好。例如下面就是使用CEP SQL对于永恒之蓝规则的一个描述(假设SMB尝试登录的规则id是1,ExternalBlue漏洞利用的规则id是2,SMB登录成功的规则id是3):

from s1=inputstream[rule_id=1] -> s2=inputstream[rule_id=2 and sip=s1.sip and dip=s1.dip] -> s3=inputstream[rule_id=3 and sip=s2.sip and dip=s2.dip]

within 5 minute

select s1.timestamp as stime,s3.timestamp as etime,sip,dip

Insert into outputstream

注:

->是用来表示具有follow by 关系

[]内的条件用于做关联和过滤

within后跟的时间用来表示时序关系必须在5分钟内

 

可以看到,CEP SQL还是非常的浅显易懂的,只要是具有SQL知识的人就可以很快的上手写相关的规则。如果要将flink和siddhi结合起来用,还需要依赖一个叫做flink-siddhi的组件,flink-siddhi可以认为是flink和siddhi的一个桥梁,负责将flink datastream的数据发送给siddhi,再将siddhi的输出转换成flink的datastream。

 

1.flink-siddhi对于事件时间和水印的处理

我们都知道,flink里面有两个重要的概念:

  • 事件时间:该时间来自于数据内部,和处理时间、接入时间作为区分
  • 水印:水印用来处理一定程度上的乱序问题(水印用于表示只要时间小于水印的数据都已经到达了)。所以,可以让水印=当前收到的最大日志时间-容忍乱序时间。

那么,对于flink中的这两个关键概念,flink-siddhi是如何进行处理的呢?

 

首先,flink-siddhi会判断用户是否设置了使用事件时间。如果是使用的事件时间的话,那么取出的时间戳字段就是数据内的时间戳字段:

其次,在取出数据之后,会将数据放入到一个优先级队列里(该优先级队列使用时间进行排序)。

最后,在每一次收到水印更新时,取出队列里时间小于水印的所有数据(数据已经有序了),然后再将它发送给siddhi:

 

2.siddhi对于时序规则的实现

时序规则的满足,其实可以理解为是一个状态的转移过程:

在Siddhi里,使用StateEvent这样一个数据结构来表示,即每一个StateEvent实例即是一个真实的命中规则的实例(不论处于哪个状态阶段)

里面用一个StreamEvent数组来保存命中规则某个状态的数据(例如,收到1条1.1对于1.2尝试SMB登录的告警日志,那么就会把它放入到这个数组里)。同时有一个timestamp字段用于记录当前状态阶段的数据的时间戳。(例如,收到1条1.1对于1.2尝试SMB登录的告警日志,又收到一条1.1对于1.2进行externalblue漏洞利用的告警日志,那么streamEvent数组里就有两条记录,而timestamp的值是externalblue漏洞利用告警日志的时间戳)。

一堆没有到结束状态的StateEvent实例,Siddhi把它们都放在pendingStateEventList里。

所以,当每条数据过来时,Siddhi都要做两个判断:

  • 是否命中初始条件:是否命中规则初始条件,如果命中,则初始化一个StateEvent实例,并将这个实例放入到pendingStateEventList里
  • 是否更新已存在实例状态:与已经存在的pendingStateEventList进行比对,判断这条数据是否可以更新某个StateEvent的状态

是否更新已存在实例状态的过程又可以细化成:

  • 前置条件超时:使用新来的这条数据与pendingStateEventList里的数据计算时间差(取时间差的绝对值),如果时间差已经大于within里规定的时间,那么直接判断前置条件超时。直接将该StateEvent实例从pendingStateEventList里移除。
  • 规则匹配:使用在SQL里定义好的逻辑关系进行比对,命中的话就将数据加入StateEvent的StreamEvent数组里,并更新StateEvent的状态;如果命中的已经是最后一个状态,那么就可以输出。

3.一点小定制

从第2小节描述里可以看出,前置条件超时是用新到的数据的时间戳与StateEvent的时间戳。那么在实际情况中可能存在这样的场景,即有一个平台同时接入了两种类型的数据源,一个是实时数据源,一个是ftp数据源每天定时使用文件将前一天的日志在十几分钟内全部接入。

假设它们同时都需要过引擎,那么可能就会存在这样的数据:

  • a,时间戳:2019-03-21 18:21:00
  • c,时间戳:2019-03-20 01:00:00
  • b,时间戳:2019-03-21 18:21:01

 

规则为:a->b within 10 second

 

那么当a经过引擎时,由于命中初始条件,那么就会新建一个StateEvent实例放入到pendingStateEventList里。但是,当c过来时,检测到c和a之间的时间差已经是几十个小时远超过10 second,那么直接就会把这个前置条件实例给超时掉了。所以,目前的实现是无法适应与现实场景的。

 

所以,StreamEvent和StateEvent都应该同时记录事件时间和系统时间,其中StateEvent的系统时间等于它当前状态阶段的StreamEvent的时间戳。

且:

  • CEP时间窗口还是使用事件时间进行比对,如果发现事件时间差超过CEP规定,那么就不进行输出,同时将StateEvent实例移除。这样就可以正确应对十几分钟内接入1天的日志情况。
  • 前置条件超时使用系统时间比对,如果新来的数据的系统时间减去前置条件的系统时间已经大于within规定,那么就判定前置条件超时,并将StateEvent实例移除。这样就可以正确应对实时数据源和延迟数据源(ftp)数据混杂接入的情况。

发表评论

验证码: