Spark原理篇之Spark Streaming转化操作和输出操作

2022年6月6日12:47:25

1 转化操作

      DStream的转化操作可以分为无状态和有状态两种。
      ① 在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。例如map()、filter()和reduceByKey()等都是无状态转化操作。
      ② 有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

1.1无状态转化操作

      无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下面:Spark原理篇之Spark Streaming转化操作和输出操作
注意:尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作时分别应用到每个RDD上的。例如,reduceByKey会规约每个时间区间中的数据,但不会规约不同时间区间之间的数据。
      无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。例如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()、join()和leftOuterJoin()等。
      最后,如果这些无状态转化操作不够用,DStream还提供了一个叫作transform()的高级操作符,可以让你直接操作其内部的RDD。这个transform()操作允许你对DStream提供任意一个RDD到RDD的函数。这个函数会在数据流中的每个批次中被调用,生成一个新的流。transform()的一个常见应用就是重用你为RDD写的批处理代码。例如,如果你有一个叫做extractOutliers()的函数,用来从一个日志记录的RDD中提取出异常值的RDD(可能通过对消息进行一些统计),你就可以在transform()中重用它。

1.2 有状态转化操作

      DStream的有状态转化操作是跨时间区间跟踪数据的操作;也就是说,一些先前批次的数据也被用来在新的批次中计算结果。主要的两种类型是华东窗口和updateStateByKey(),前者以一个时间阶段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化(例如构建一个代表用户会话的对象)。
      有状态转化操作需要在你的StreamingContext中打开检查点机制来确保容错性。

1.2.1 基于窗口的转化操作

      基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是StreamingContext的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的windowDuration/batchInterval个批次。如果有一个以10秒为批次间隔的源DStream,要创建一个最近30秒的时间窗口(即最近3个批次),就应当把windowDuration设为30秒。而滑动步长的默认值与批次间隔相等,用来控制对新的DStream进行计算的间隔。如果源DStream批次间隔为10秒,并且我们只希望每两个批次计算一次窗口结果,就应该把华东步长设置为20秒。
      对DStream可以用的最简单窗口操作时window(),它返回一个新的DStream来表示所请求的窗口操作的结果数据。换句话说,window()生成的DStream中的每个RDD会包含多个批次中的数据,可以对这些数据进行count()和transform()操作。Spark原理篇之Spark Streaming转化操作和输出操作
      尽管可以使用window()写出所有的窗口操作,Spark Streaming还是提供了一些其他的窗口操作,让用户可以高效而方便地使用。首先,reduceByWindow()和reduceByKeyAndWindow()让我们可以对每个窗口更高效地进行规约操作。它们接收一个规约函数,在整个窗口上执行,比如+。除此以外,它们还有一种特殊的形式,通过只考虑进入窗口的数据和离开窗口的数据,让Spark增量计算规约结果。这种特殊形式需要提供归约函数的一个逆函数,比如+对应的逆函数为-。对于较大的窗口,提供逆函数可以大大提高执行效率。
Spark原理篇之Spark Streaming转化操作和输出操作

1.2.2 UpdateStateByKey转化操作

      updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态)对。例如,在网络服务器日志中,事件可能是对网站的访问,此时键是用户的ID。使用updateStateByKey()可以跟踪每个用户最近访问的10个界面。这个列表就是“状态”对象,我们会在每个时间到来时更新这个状态。
      要使用updateStateByKey(),提供了一个update(events,oldState)函数,接收与某键相关的时间以及该键之前对应的状态,返回这个键对应的新状态。
      ① events:是在当前批次中收到的事件的列表(可能为空)。
      ② oldState:是一个可选的状态对象,存放在Option内;如果一个键没有之前的状态,这个值可以空缺。
      ③ newState:由函数返回,也以Option形式存在;我们可以返回一个空的Option来表示想要删除该状态。
      updateStateByKey()的结果会是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。举个简单的例子,使用updateStateByKey()来跟踪日志消息中各HTTP响应代码的计数。这里的键是响应代码,状态是代表响应代码计数的整数,事件则是页面访问。

2 输出操作

      输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。常用的一种调试性输出操作是print(),它会在每个批次中抓取DStream的前十个元素打印出来。当然还有saveAsTextFiles()和foreachRDD()等输出操作。

参考文章:
[1]《Spark快速大数据分析》

  • 作者:听挽风讲大数据
  • 原文链接:https://blog.csdn.net/huahuaxiaoshao/article/details/91044751
    更新时间:2022年6月6日12:47:25 ,共 2727 字。