专注为金融业提供基于人工智能的综合解决方案,金融物联网、

智能机器人、人工智能三大产品线服务30家总行、1000家分行

银行智能场景实践专家

查看详情
    • 客服电话

    • 400-8304-999
    • 服务时间

    • 周一至周五 9:00-18:00
    • 官方微信订阅号

和美大家说 | 基于Flink CEP的实时页面时长统计

首页    和美干货    和美大家说 | 基于Flink CEP的实时页面时长统计

Flink CEP 也叫做Flink复杂事件处理,可以在无穷无界的事件流中检测事件规则,通过模式规则匹配的方式对重要信息进行跟踪和分析,从而在实时数据中发掘出有价值的信息。

 

复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。

Flink 基于 DataStrem API 提供了 FlinkCEP 组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘出有价值的信息。

FlinkCEP 中提供了 Pattern API 用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。其中涉及三个部分:

◆Pattern模式的定义

◆将Pattern规则应用在事件流上进行匹配检测

◆获取匹配结果

一、Pattern模式定义

定义 Pattern 可以是单次执行模式或者多次执行模式。单词执行模式一次只接受一个事件,多次执行模式可以接收一个或者多个事件。可以通过设置循环将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过 where 方法进行叠加。每个 Pattern 都是通过 begin 方法定义的

val start = Pattern.begin[Event]("start_pattern")

下一步通过 Pattern.where()方法在 Pattern 上指定 Condition,只有当 Condition 满足之后,当前的 Pattern 才会接受事件。

start.where(_.getCallType == "success")

1.设置循环次数

对于已经创建好的 Pattern,可以指定循环次数,形成循环执行的 Pattern链。

u times:可以通过 times 指定固定的循环执行次数。

//指定循环触发4次

start.times(4);

//可以执行触发次数范围,让循环执行次数在该范围之内

start.times(2, 4);

u optional:也可以通过 optional 关键字指定要么不触发要么触发指定的次数。

start.times(4).optional();

start.times(2, 4).optional();

u greedy:可以通过 greedy 将 Pattern 标记为贪婪模式,在 Pattern 匹配成功的前提下,

会尽可能多地触发。

//触发2、3、4次,尽可能重复执行

start.times(2, 4).greedy();

//触发0、2、3、4次,尽可能重复执行

start.times(2, 4).optional().greedy();

u oneOrMore:可以通过 oneOrMore 方法指定触发一次或多次。

// 触发一次或者多次

start.oneOrMore();

//触发一次或者多次,尽可能重复执行

start.oneOrMore().greedy();

// 触发0次或者多次

start.oneOrMore().optional();

// 触发0次或者多次,尽可能重复执行

start.oneOrMore().optional().greedy();

u timesOrMor:通过 timesOrMore 方法可以指定触发固定次数以上,例如执行两次以上。

// 触发两次或者多次

start.timesOrMore(2);

// 触发两次或者多次,尽可能重复执行

start.timesOrMore(2).greedy();

// 不触发或者触发两次以上,尽可能重复执行

start.timesOrMore(2).optional().greedy();

2.定义条件

每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在 FlinkCFP 中通过 pattern.where()、 pattern.or()及 pattern.until()方法来为 Pattern 指定条件,且 Pattern 条件有 Simple Conditions 及 Combining Conditions 等类型。

u 简单条件:Simple Condition 继承于 Iterative Condition 类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。

// 把通话成功的事件挑选出来

start.where(_.getCallType == "success")

u 组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用 where 方法进行条件的组合,默认每个条件通过 AND 逻辑相连。如果需要使用 OR 逻辑,直接使用 or 方法连接条件即可。

// 把通话成功,或者通话时长大于10秒的事件挑选出来

val start = Pattern.begin[StationLog]("start_pattern")

.where(_.callType=="success")

.or(_.duration>10)

u 终止条件:如果程序中使用了 oneOrMore 或者 oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过 until()方法指定。

pattern.oneOrMore.until(_.callOut.startsWith("186"))

3.模式序列

将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。

u 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。

val strict: Pattern[Event] = start.next("middle").where(...)

u 宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为 OR 的逻辑关系。

val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

 

u 非确定宽松邻近:和宽松邻近条件相比,非确定宽松邻近条件指在模式匹配过程中可以忽略已经匹配的条件。

val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

u 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效

//指定模式在10秒内有效

pattern.within(Time.seconds(10));

 

  • Pattern应用在事件流上检测

调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream

//cep 做模式检测

val patternStream = CEP.pattern[EventLog](dataStream.keyBy(_.id),pattern)

 

  • 选取结果

得到 PatternStream 类型的数据集后,接下来数据获取都基于 PatternStream 进行。该数据集中包含了所有的匹配事件。目前在 FlinkCEP 中提供 select 和 flatSelect 两种方法从 PatternStream 提取事件结果事件。

 

  • 示例

使用Flink CEP在实时数据流中获取开始页面和结束页面的时间,用于计算某一业务的整体耗时。

其程序流程图如下:

部分代码如下:

//从kafka读取json数据

val messageStream = env.addSource(new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema, props))

 

//进行格式解析,并在最后注册水印

val waterStream = messageStream

  .flatMap(str => {

      var pageList = ListBuffer[PageEvent]()

      try {

 

          var first_json = JSONPath.read(str, "$.measurements.EVENT_BUSI_STM_COMpathLogs").toString

          var jsArgs = JSONPath.read(first_json, "$.data.jsArgs").toString

          var device_id = JSONPath.read(jsArgs, "$.commonData.device_id").toString

          var arrayStr = JSONPath.read(jsArgs, "$.data").toString

          val array = JSON.parseArray(arrayStr)

          for (i <- 0 to array.size() - 1) {

              val jSONObject = JSON.parseObject(JSONPath.read(array.get(i).toString, "$.pr").toString)

              var pageEvent = PageEvent(jSONObject.get("event_id").toString, jSONObject.get("session_id").toString, jSONObject.get("menu_id").toString, device_id, jSONObject.get("event_time").toString.toLong)

              pageList.+=(pageEvent)

              //                      println("获取数据 === " + pageEvent)

          }

          pageList

      } catch {

          case ex: Exception => {

              pageList

          }

      }

  })

  .filter(x => x.event_id == "enterMenu" || x.event_id == "goHome")

  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator(maxOutOfOrderness))

//定义pattern,宽松邻近匹配开始页面和结束页面

val pattern = Pattern.begin[PageEvent]("start", AfterMatchSkipStrategy.skipPastLastEvent).where(new SimpleCondition[PageEvent] {

    override def filter(p: PageEvent): Boolean = p.event_id.equals("enterMenu")

}).followedBy("end").where(new SimpleCondition[PageEvent] {

    override def filter(p: PageEvent): Boolean = p.event_id.equals("goHome")

})

 

//进行pattern对datastream进行匹配

val patternStream: PatternStream[PageEvent] = CEP.pattern(waterStream.keyBy(_.device_id), pattern.within(Time.hours(withinTime)))

 

//通过select获取匹配结果

val resultStream = patternStream.select(new PatternSelectFunction[PageEvent, String] {

    override def select(map: util.Map[String, util.List[PageEvent]]): String = {

        //                println("匹配数据:" + map)

        val start = map.get("start").get(0)

        val end = map.get("end").get(0)

        PageResult(end.device_id, start.session_id, end.session_id, start.menu_id, end.menu_id, start.event_time, end.event_time).toString

    }

})

 

2020-11-13 14:58
浏览量:0