Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

1300-黄同学

发表文章数:85

热门标签

首页 » 大数据 » 正文

SparkStreaming介绍

什么是Spark Streaming

Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据源有很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象操作如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

SparkStreaming与Storm的对比

  • SparkStreaming
    开发语言:Scala
    编程模型:DStream
    Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

  • Storm
    开发语言:Clojure
    编程模型:Spout/Bolt
    Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

Spark Streaming原理与架构

Spark Streaming原理

Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。

Spark Streaming计算流程

Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行缓存或者存储到外部设备。下图显示了Spark Streaming的整个流程。
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

Spark Streaming实时性

对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.1秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。

spark的架构

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的。批次间隔一般设在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以 Spark 作业的方式处理并生成其他的 RDD。 处理的结果可以以批处理的方式传给外部系统。高层次的架构如图:
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例
Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例
Spark Streaming在Spark的驱动器程序—工作节点的结构的执行过程如下图所示。Spark Streaming为每个输入源启动对 应的接收器。接收器以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为 RDD。它们收集到输入数据后会把数据复制到另一个执行器进程来保障容错性(默 认行为)。数据保存在执行器进程的内存中,和缓存 RDD 的方式一样。驱动器程序中的 StreamingContext 会周期性地运行 Spark 作业来处理这些数据,把数据与之前时间区间中的 RDD 进行整合。
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

DStream介绍

什么是DStream

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例
对数据的操作也是按照RDD为单位来进行的
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例
Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。
Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例

DStream相关操作

Transformations on DStreams

Transformation Meaning
map(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream
flatMap(func) 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func) 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
repartition(numPartitions) 增加或减少DStream中的分区数,从而改变DStream的并行度
union(otherStream) 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count() 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func) 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func) 根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStream

特殊的Transformations

  1. UpdateStateByKey Operation
    UpdateStateByKey用于记录历史记录,保存上次的状态
  2. Window Operations(开窗函数)
    滑动窗口转换操作:
    滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。
    Spark(第七节)SparkStreaming介绍,DStream介绍,SparkStreaming接收socket数据、文件数据、自定义数据源数据、RDD队列数据案例
    (1)红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。
    (2)这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。
    所以基于窗口的操作,需要指定2个参数:
    • window length – The duration of the window (3 in the figure)
    • slide interval – The interval at which the window-based operation is performed (2 in the figure).
    a.窗口大小,一段时间内数据的容器。
    b.滑动间隔,每隔多久计算一次。

SparkStreaming接收socket数据案例

需求:通过socket发送形如“hadoop spark”的数据,对数据中的每个单词计数,将计数结果打印在IDEA中。
首先创建maven项目,pom如下:

<properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.5</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.38</version>
    </dependency>

</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!--    <verbal>true</verbal>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass></mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

创建好maven项目后,在node01上安装socket:

yum -y install socket

代码实现:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingTCP {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext
    val sparkContext=new SparkContext(new SparkConf()
      .setMaster("local").setAppName("SparkStreamingTCP"))
    sparkContext.setLogLevel("WARN")
    // 创建Streaming的操作类StreamingContext的实例
    val streamingContext=new StreamingContext(sparkContext,Seconds(5))
    // 接收数据
    val stream=streamingContext.socketTextStream("node01",9999)
    // 处理接收到数据的stream
    val result=stream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    // 打印输出
    result.print()

    // 启动Streaming
    streamingContext.start()
    // 设置Streaming一直运行
    streamingContext.awaitTermination()
  }
}

SparkStreaming接收文件数据案例

SparkStreaming接收文件类似flume采取文件夹中的数据的方式,都是监视某个目录,待目录下有新文件加入,就读取新文件的内容,接收进来。
SparkStreaming接收文件数据的特点:

  1. 文件需要有相同的数据格式。
  2. 文件进入目录方式需要通过移动或者重命名来实现。
  3. 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据。
  4. 可以监视hdfs上的目录。

pom文件沿用之前“SparkStreaming接收socket数据”的。
代码:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object StreamingFile {

  // 定义所有历史数据累加的方法,传入到updateStateByKey中
  def  updateFunc(inputSum:Seq[Int], historySum:Option[Int]):Option[Int] = {
    // getOrElse(0)获取历史数据,如果没有值就赋值0
    val res=inputSum.sum+historySum.getOrElse(0)
    Option(res)
  }

  def main(args: Array[String]): Unit = {
    val sparkContext=new SparkContext(new SparkConf()
      .setAppName("StreamingFile").setMaster("local"))
    sparkContext.setLogLevel("WARN")
    val streamingContext=new StreamingContext(sparkContext,Seconds(5))
    // 事先设置checkpoint的保存路径,以免历史结果没有地方保存而报错
    streamingContext.checkpoint("./stream-checkpoint")
    // 监控hdfs上指定目录下的text文件
    val stream=streamingContext.textFileStream("hdfs://node01:8020/stream-data")
    val wordStream=stream.flatMap(_.split(" "))
    val wordAndOneStream=wordStream.map(x => (x,1))
    // updateStateByKey()可以保留历史数据
    val result=wordAndOneStream.updateStateByKey(updateFunc)
    result.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

在这个案例里,使用updateStateByKey来统计所有历史数据的结果,updateStateByKey就是reduceByKey在DStream中的升级版,它可以保留stream中的历史数据,当有新数据进入stream时,把还会把历史数据的统计结果和新数据的统计结果进行合并,使用这个方法要注意在streamingContext中设置checkpoint的保存数据,updateStateByKey之所以能记住历史数据就是因为checkpoint。

SparkStreaming接收自定义数据源数据案例

如果已经存在的数据源满足不了要求,还可以自定义sparkStreaming的数据源进行数据的采集处理。
通过继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
需求:自定义数据源,接收socket收据,并统计每个单词出现的次数。

pom文件沿用之前“SparkStreaming接收socket数据”的。
代码:
自定义Receiver:

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver


class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){
  // 定义一个receiverData方法,用于接收socket里面的数据
  def receiverData()={
    // 创建Socket对象
    val socket=new Socket(host,port)
    // 获取Socket输入流
    val inputStream=socket.getInputStream
    // 将流转为字符串
    val reader=new BufferedReader(new InputStreamReader(inputStream,StandardCharsets.UTF_8))
    // 创建空字符串对象来按行接收读到的数据
    var line:String=null
    // 判断读到的数据不为空且Receiver接收器没有被停止
    while ((line=reader.readLine()) != null && !isStopped()){
      // 将行字符串传送出去
      store(line)
    }
  }

  // 这个方法会反复调用,调用时间间隔在创建streamingcontext是定
  override def onStart(): Unit = {
    // 每隔5秒开启一个线程去接收socket里面的数据
    new Thread(){
      override def run(): Unit = {
        receiverData()
      }
    }.start()
  }
  // 停止的时候调用
  override def onStop(): Unit = {
  }
}

主类:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


//自定义数据源,接收socket收据,并统计每个单词出现的次数
object CustomReceiver {

  def updateFunc(inputSum:Seq[Int], historySum:Option[Int]):Option[Int]={
    val finalSum:Int=inputSum.sum+historySum.getOrElse(0)
    Option(finalSum)
  }

  def main(args: Array[String]): Unit = {
    val sparkContext=new SparkContext(new SparkConf()
      .setAppName("receiver").setMaster("local"))
    sparkContext.setLogLevel("WARN")
    val streamingContext=new StreamingContext(sparkContext,Seconds(5))
    // 设置checkpoint保存位置
    streamingContext.checkpoint("./CustomReceiver-checkpoint")
    // 从自定义数据源中接收数据
    val stream=streamingContext.receiverStream(new MyReceiver("node01",9999))
    // 处理接收到的数据
    val flat=stream.flatMap(_.split(" "))
    val map=flat.map(x => (x,1))
    val res=map.updateStateByKey(updateFunc)
    res.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

SparkStreaming接收RDD队列数据案例

SparkStreaming接收RDD队列数据主要用在测试中。

pom文件沿用之前“SparkStreaming接收socket数据”的。
代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueueStream {
  def main(args: Array[String]): Unit = {
    val sparkContext=new SparkContext(new SparkConf()
      .setAppName("QueueStream").setMaster("local"))
    sparkContext.setLogLevel("WARN")
    val streamingContext=new StreamingContext(sparkContext,Seconds(5))
    // 获取一个queue队列
    val queue= new mutable.SynchronizedQueue[RDD[Int]]()
    // 接收队列数据
    val stream=streamingContext.queueStream(queue)
    // 处理接收到的队列数据
    val res=stream.map(_*2)
    res.print()

    streamingContext.start()

    for (x <- 1 to 100){
      // 向队列添加数据
      queue+=sparkContext.parallelize(1 to 10)
      // 设置休眠,避免添加数据太快
      Thread.sleep(3000)
    }

    streamingContext.awaitTermination()
  }
}
标签:
分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录