sparkStreaming的数据源与容错性

1227-李同学

发表文章数:84

热门标签

首页 » 大数据 » 正文

数据源

基本数据源

Spark Streaming原生支持一些不同的数据源。一些“核心”数据源已经被打包到Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用local或者local[1]

文件数据源

Socket数据流前面的例子已经看到过。
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。

  1. 文件需要有相同的数据格式
  2. 文件进入 dataDirectory的方式需要通过移动或者重命名来实现。
  3. 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据。

如果文件比较简单,则可以使用 streamingContext.textFileStream(dataDirectory)方法来读取文件。文件流不需要接收器,不需要单独分配CPU核。
案例
注意,需先在hdfs上创建目录

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFile {

  def updateFunc(newValues:Seq[Int],runnintCount:Option[Int]):Option[Int] = {
    val finalResult = runnintCount.getOrElse(0) + newValues.sum
    Option(finalResult)
  }
  def main(args: Array[String]): Unit = {
    //设置sparkConf配置
    val sparkConf: SparkConf = new SparkConf().setAppName("streamingFile").setMaster("local[2]")
    //通过sparkConf得到sparkContext
    val sparkContext = new SparkContext(sparkConf)
    //设置日志输出级别
    sparkContext.setLogLevel("WARN")
    //通过sparkContext得到streamingContext
    val streamingContext = new StreamingContext(sparkContext,Seconds(5))
    //设置sparkStreaming保存目录
    streamingContext.checkpoint("./hdfs-data")
    //读取hdfs某一个目录下的所有的文件
    val fileStream: DStream[String] = streamingContext.textFileStream("hdfs://node01:8020/stream-data")
    //文件内容按照空格进行切分
    val words: DStream[String] = fileStream.flatMap(x => x.split(" "))
    //每个单词记作为1
    val wordAndOne: DStream[(String, Int)] = words.map(x => (x,1))
    //更新每个单词的状态,传入一个我们自定义的updateFunction
    val key: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    key.print()

    streamingContext.start()

    streamingContext.awaitTermination()

  }
}

自定义数据源

如果已经存在的数据源满足不了我们的要求,我们还可以自定义sparkStreaming的数据源进行数据的采集处理
通过继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

RDD队列

测试过程中,可以通过使用streamingContext.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueRdd {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Create the queue through which RDDs can be pushed to
    // a QueueInputDStream
    //创建RDD队列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

    // Create the QueueInputDStream and use it do some processing
    // 创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue)
    //处理队列中的RDD数据
    val mappedStream = inputStream.map(x => (x % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //打印结果
    reducedStream.print()

    //启动计算
    ssc.start()

    // Create and push some RDDs into
    for (i <- 1 to 30) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)

      //通过程序停止StreamingContext的运行
      //ssc.stop()
    }
  }
}

高级数据源

除核心数据源外,还可以用附加数据源接收器来从一些知名数据获取系统中接收的数据,这些接收器都作为Spark Streaming的组件进行独立打包了。它们仍然是Spark的一部分,不过你需要在构建文件中添加额外的包才能使用它们。现有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及ZeroMQ。可以通过添加与Spark版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 来引入这些附加接收器。

flume数据源

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。
Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Poll拉取数据。

Apache Kafka数据源

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时地读取kafka中的数据,然后进行相关计算。

容错

检查点机制

检查点机制是我们在Spark Streaming中用来保障容错性的主要机制。与应用程序逻辑无关的错误(即系统错位,JVM崩溃等)有迅速恢复的能力.
它可以使Spark Streaming阶段性地把应用数据存储到诸如HDFS或Amazon S3这样的可靠存储系统中, 以供恢复时使用。具体来说,检查点机制主要为以下两个目的服务。

  1. 控制发生失败时需要重算的状态数。SparkStreaming可以通 过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
  2. 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序 并让驱动器程序从检查点恢复,这样Spark Streaming就可以读取之前运行的程序处理 数据的进度,并从那里继续 了。实现这个,Spark Streaming需要为容错存储系统checkpoint足够的信息从而使得其可以从失败中恢复过来。有两种类型的数据设置检查点。
    Metadata checkpointing:将定义流计算的信息存入容错的系统如HDFS。元数据包括:
    配置 – 用于创建流应用的配置。
    DStreams操作 – 定义流应用的DStreams操作集合。
    不完整批次 – 批次的工作已进行排队但是并未完成。
    Data checkpointing: 将产生的RDDs存入可靠的存储空间。对于在多批次间合并数据的状态转换,这个很有必要。在这样的转换中,RDDs的产生基于之前批次的RDDs,这样依赖链长度随着时间递增。为了避免在恢复期这种无限的时间增长(和链长度成比例),状态转换中间的RDDs周期性写入可靠地存储空间(如HDFS)从而切短依赖链。

总而言之,元数据检查点在由驱动失效中恢复是首要需要的。而数据或者RDD检查点甚至在使用了状态转换的基础函数中也是必要的。
出于这些原因,检查点机制对于任何生产环境中的流计算应用都至关重要。你可以通过向 ssc.checkpoint() 方法传递一个路径参数(HDFS、S3 或者本地路径均可)来配置检查点机制,同时你的应用应该能够使用检查点的数据

  1. 当程序首次启动,其将创建一个新的StreamingContext,设置所有的流并调用start()。
  2. 当程序在失效后重启,其将依据检查点目录的检查点数据重新创建一个StreamingContext。 通过使用StraemingContext.getOrCreate很容易获得这个性能。
ssc.checkpoint("hdfs://...") 


#创建和设置一个新的StreamingContext
def functionToCreateContext():
    sc = SparkContext(...) # new context
    ssc = new StreamingContext(...)
    lines = ssc.socketTextStream(...) # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory) # 设置检查点目录
    return ssc
# 从检查点数据中获取StreamingContext或者重新创建一个
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# 在需要完成的context上做额外的配置
# 无论其有没有启动
context ...
# 启动context
context.start()
contaxt.awaitTermination()

如果检查点目录(checkpointDirectory)存在,那么context将会由检查点数据重新创建。如果目录不存在(首次运行),那么函数functionToCreateContext将会被调用来创建一个新的context并设置DStreams。
注意RDDs的检查点引起存入可靠内存的开销。在RDDs需要检查点的批次里,处理的时间会因此而延长。所以,检查点的间隔需要很仔细地设置。在小尺寸批次(1秒钟)。每一批次检查点会显著减少操作吞吐量。反之,检查点设置的过于频繁导致“血统”和任务尺寸增长,这会有很不好的影响对于需要RDD检查点设置的状态转换,默认间隔是批次间隔的乘数一般至少为10秒钟。可以通过dstream.checkpoint(checkpointInterval)。通常,检查点设置间隔是5-10个DStream的滑动间隔。

驱动器程序容错

驱动器程序的容错要求我们以特殊的方式创建 StreamingContext。我们需要把检查 点目录提供给 StreamingContext。与直接调用 new StreamingContext 不同,应该使用 StreamingContext.getOrCreate() 函数。

def createStreamingContext() = {
  ...
  val sc = new SparkContext(conf)
// 以1秒作为批次大小创建StreamingContext

val ssc = new StreamingContext(sc, Seconds(1)) ssc.checkpoint(checkpointDir) 
}
...
val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

工作节点容错

为了应对工作节点失败的问题,Spark Streaming使用与Spark的容错机制相同的方法。所 有从外部数据源中收到的数据都在多个工作节点上备份。所有从备份数据转化操作的过程 中创建出来的 RDD 都能容忍一个工作节点的失败,因为根据 RDD 谱系图,系统可以把丢 失的数据从幸存的输入数据备份中重算出来。

接收器容错

运行接收器的工作节点的容错也是很重要的。如果这样的节点发生错误,Spark Streaming 会在集群中别的节点上重启失败的接收器。然而,这种情况会不会导致数据的丢失取决于 数据源的行为(数据源是否会重发数据)以及接收器的实现(接收器是否会向数据源确认 收到数据)。举个例子,使用 Flume 作为数据源时,两种接收器的主要区别在于数据丢失 时的保障。在“接收器从数据池中拉取数据”的模型中,Spark 只会在数据已经在集群中 备份时才会从数据池中移除元素。而在“向接收器推数据”的模型中,如果接收器在数据 备份之前失败,一些数据可能就会丢失。总的来说,对于任意一个接收器,你必须同时考 虑上游数据源的容错性(是否支持事务)来确保零数据丢失。
总的来说,接收器提供以下保证。
• 所有从可靠文件系统中读取的数据(比如通过StreamingContext.hadoopFiles读取的) 都是可靠的,因为底层的文件系统是有备份的。Spark Streaming会记住哪些数据存放到 了检查点中,并在应用崩溃后从检查点处继续执行。
• 对于像Kafka、推式Flume、Twitter这样的不可靠数据源,Spark会把输入数据复制到其 他节点上,但是如果接收器任务崩溃,Spark 还是会丢失数据。在 Spark 1.1 以及更早的版 本中,收到的数据只被备份到执行器进程的内存中,所以一旦驱动器程序崩溃(此时所 有的执行器进程都会丢失连接),数据也会丢失。在 Spark 1.2 中,收到的数据被记录到诸 如 HDFS 这样的可靠的文件系统中,这样即使驱动器程序重启也不会导致数据丢失。
综上所述,确保所有数据都被处理的最佳方式是使用可靠的数据源(例如 HDFS、拉式 Flume 等)。如果你还要在批处理作业中处理这些数据,使用可靠数据源是最佳方式,因为 这种方式确保了你的批处理作业和流计算作业能读取到相同的数据,因而可以得到相同的 结果。

处理保证

由于Spark Streaming工作节点的容错保障,Spark Streaming可以为所有的转化操作提供 “精确一次”执行的语义,即使一个工作节点在处理部分数据时发生失败,最终的转化结果(即转化操作得到的 RDD)仍然与数据只被处理一次得到的结果一样。
然而,当把转化操作得到的结果使用输出操作推入外部系统中时,写结果的任务可能因故 障而执行多次,一些数据可能也就被写了多次。由于这引入了外部系统,因此我们需要专 门针对各系统的代码来处理这样的情况。我们可以使用事务操作来写入外部系统(即原子 化地将一个 RDD 分区一次写入),或者设计幂等的更新操作(即多次运行同一个更新操作 仍生成相同的结果)。比如 Spark Streaming 的 saveAs…File 操作会在一个文件写完时自动 将其原子化地移动到最终位置上,以此确保每个输出文件只存在一份。

标签:

未经允许不得转载:作者:1227-李同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《sparkStreaming的数据源与容错性》 发布于2020-11-15

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录