29 SparkStreaming

976-沈同学

发表文章数:64

热门标签

首页 » 大数据 » 正文

SparkStreaming

1.sparkStreaming的基本介绍

1、什么是Spark Streaming

sparkStreaming是spark当中用于实时处理的一个模块,主要是用于做一些实时处理的功能,类似于storm。
sparkStreaming支持各种各样的数据源,例如kakfa,flume,tcp套接字

29 SparkStreaming
2、为什么要学sparkStreaming

主要就是用于做实时处理

29 SparkStreaming
3、sparkStreaming与strom对比:

1:编程模型不同 sparkStreaming编程模型使用的是DStream

strom编程模型使用的Spout与Bolt

2:编程语言不同

	storm使用的cloujure

	spark使用scala语言

3:实时性不太一样:

	SparkStreaming的实时性不如storm好

4:数据吞吐量不太一样:

	SparkStreaming的数据吞吐量远远高于storm

5:数据驱动模型不一样

	strom是以数据为驱动的	

	sparkStreaming是以时间为驱动的。每隔一段时间去处理一批数据。如果间隔时间足够段,直到时间缩短到0.1s钟。
	基本上就可以当做实时处理的框架来使用。

2.Spark Streaming原理与架构

1:原理 只要间隔时间足够段,就可以当做实时处理的框架来使用

2:计算流程

29 SparkStreaming
3:sparkStreaming的容错性:可以通过rdd确定lineage血统关系,实现容错性,驱动器的容错

4:sparkStreaming的实时性:最小时间间隔可以达到0.1s

5:sparkStreaming的架构

sparkCore  ==>  RDD ==>  SparkContext

sparkSQL  ==>  DF/DS  ==>  SparkSession

SparkStreaming  ==>  DStream   ==>  StreamingContext

以数据为驱动的程序实时性更高一点

3.DStream

1.什么是DStream

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,
DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。

4.DStream相关操作

29 SparkStreaming
1、Transformations on DStreams
29 SparkStreaming
29 SparkStreaming
29 SparkStreaming
2、Output Operations on DStreams
29 SparkStreaming

5.DStream操作实战

需求:SparkStreaming接受socket数据,实现单词计数WordCount

29 SparkStreaming
第一步:创建maven工程并导入jar包

<properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

第二步:node1服务器安装并启动生产者

首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
node1服务器执行以下命令安装socket客户端工具,模拟发送数据
yum -y install nc

第三步:通过netcat工具向指定的端口发送数据

node1服务器执行以下命令,向指定的端口9999发送数据
	nc -lk 9999 

第四步:开发sparkStreaming程序,统计单词出现的次数

package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingDemo1 {
  /**
   * 实现实时数据统计,接收socket里面的数据
   * @param args
   * sparkContext: SparkContext, batchDuration: Duration
   */
  def main(args: Array[String]): Unit = {
    //数据抽象叫做DStream,数据操作对象StreamingContext

    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("streamingFirst")

    val sparkContext = new SparkContext(sparkConf)
    sparkContext.setLogLevel("WARN")

    //获取StreamingContext,设置每隔5s处理一批数据
    val streamingContext = new StreamingContext(sparkContext,Seconds(5))
    //接收数据
    val stream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.91.110",9999)
    //数据处理
    val key: DStream[(String, Int)] = stream.flatMap(x => x.split(" ")).map((_,1)).reduceByKey(_ + _)
    //调用out_put operation实现数据输出
    key.print()
    //启动程序
    streamingContext.start()
    //等待程序结束
    streamingContext.awaitTermination()
  }
}

未保存历史数据
29 SparkStreaming

6.sparkStreaming数据源

6.1 基本数据源

6.1.1、文件数据源

29 SparkStreaming
29 SparkStreaming

package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FileStream {

  //使用updateStateByKey这个算子,需要传入一个updateFunc函数;
  //seq:传入数据值 option:历史累加值
  //返回值是option
  //option some none scala当中比较特殊的类型,为了解决null值的问题
  //option 是 some和none的父类
  //none: 类似于口袋里面什么也没有,但是有一张纸条,上面写着什么也没有
  /**
   * 定义一个方法,实现将历史数据全部保存下来
   * @param inputSum  Seq[Int] 将输入数据都累加
   * @param resultSum 保存历史的数据
   * @return
   */
  def updateFunc(inputSum:Seq[Int], resultSum:Option[Int]) :Option[Int] = {
    //getOrElse 表示获取历史数据,如果没有则给0
    val finalResult: Int = inputSum.sum + resultSum.getOrElse(0)
    Option(finalResult) //返回累加之后的结果
    //Some(finalResult)
  }

  /**
   * 监控hdfs的某一个目录,一旦有文件新生成,处理文件内容,实现单词统计
   * 并且将历史数据保存下来
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //获取StreamingContext
    val sparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("fileStream"))
    sparkContext.setLogLevel("WARN")
    val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
    //将历史结果保存至这个路径下面
    streamingContext.checkpoint("./stream-check1")
    //获取hdfs上面的路径,注意:如果写hdfs路径不好使,直接改成file:///
    val stream: DStream[String] = streamingContext.textFileStream("hdfs://node1:8020/stream-data")
//    val stream: DStream[String] = streamingContext.textFileStream("file:///E://大数据资料//大数据实时资料//3、Spark//spark第四天//data//")
    //updateStateByKey将历史累加结果保存起来
    val array: DStream[String] = stream.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = array.map((_,1))
    val byKey: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    byKey.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

6.1.2、自定义数据源 (socket数据,MySQL数据)

29 SparkStreaming

package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ReceiverStream {

  //定义updateFunc
  def updateFunc(inputSum:Seq[Int], resultSum:Option[Int]) :Option[Int] = {
    val finalResult: Int = inputSum.sum + resultSum.getOrElse(0)
    Option(finalResult)
  }

  /**
   * 自定义数据源,实现从socket当中接收数据
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {

    val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("receiver"))
    sparkContext.setLogLevel("WARN")
    //获取streamingContext
    val streamingContext = new StreamingContext(sparkContext,Seconds(5))
    streamingContext.checkpoint("./myreceiver")
    //receiverStream实现从自定义数据源接收数据
    //需要一个Receiver类
    val stream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.91.110",9999))
    //再对数据进行处理
    val key: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
    key.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

定义接收器类MyReceiver

package cn.itcast.sparkStreaming

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){

  //接收socket里面的数据,然后调用store方法把数据保存起来
  def receiverDatas() = {
    //接收socket数据
    val socket: Socket = new Socket(host,port)
    //获取输入数据流
    val stream: InputStream = socket.getInputStream

    //通过BufferedReader将数据转换成为字符串
    val bufferedReader = new BufferedReader(new InputStreamReader(stream,StandardCharsets.UTF_8))
    var line:String = null
    //判读读取数据不为空,且receiver接收器没有被停掉
    while ((line=bufferedReader.readLine())!=null && !isStopped()){
      //将我们接收到的数据通过store传送出去,供下游继续处理
      store(line)
    }
    bufferedReader.close()
    stream.close()
    socket.close()
  }

  //每5s会不断被调用
  override def onStart(): Unit = {
    //每隔5s开启一个线程去接收socke里面数据
    new Thread(){
      override def run() = {
        //接收socket数据
        //定义一个receiverDatas方法,用于接收socket里面数据
        receiverDatas()
      }
    }.start()
  }
  //停止结束时被调用
  override def onStop(): Unit = {

  }
}

6.1.3、RDD队列

29 SparkStreaming
从队列当中接收数据源:主要用于做一些测试使用

package cn.itcast.sparkStreaming

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable


object QueueStream {
  /**
   * 从队列当中接收数据
   * @param args
   */
  def main(args: Array[String]): Unit = {

    val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("queue"))
    sparkContext.setLogLevel("WARN")
    //获取StreamingContext
    val streamingContext = new StreamingContext(sparkContext,Seconds(5))
    // queue: Queue[RDD[T]]

    val queue: mutable.SynchronizedQueue[RDD[Int]] = new mutable.SynchronizedQueue[RDD[Int]]
    val stream: InputDStream[Int] = streamingContext.queueStream(queue)
    val map: DStream[Int] = stream.map(_*2)
    map.print()
    streamingContext.start()
    for (x <- 1 to 100){
      //val parallelize: RDD[Int] = sparkContext.parallelize(1 to 10)
      //val d: RDD[Int] = sparkContext.makeRDD(1 to 10)
      queue += sparkContext.makeRDD(1 to 10)
      Thread.sleep(3000)
    }
    //等待停止
    streamingContext.awaitTermination()
  }
}

6.2 高级数据源

6.2.1 flume数据源

29 SparkStreaming

29 SparkStreaming

Poll方式

第一步:安装flume

node3安装flume-1.6.0-cdh5.14.0

第二步:下载spark-streaming与flume整合的依赖jar包

下载spark-streaming-flume-sink_2.11-2.2.0.jar放入到flume的lib目录下,直接去mvnrepository.com 
 这个网址搜索spark-streaming-flume-sink即可

第三步:替换flume的scala的jar包

node3替换flume自带的scala-library-2.10.5.jar 这个版本的jar包
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib
rm -rf scala-library-2.10.5.jar
cp /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/scala-library-2.11.8.jar /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib/

第四步:开发flume的配置文件

node3开发flume的配置文件,监控某一个文件夹的变化,产生数据变化,全部收集起来
node3执行以下命令开发配置文件
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
mkdir -p /export/servers/flume/flume-poll
vim flume-poll.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flume/flume-poll
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=node3
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

第五步:启动flume的程序

node3启动flume的进程
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
bin/flume-ng agent -c conf -f conf/flume-poll.conf -n a1 -Dflume.root.logger=DEBUG,CONSOLE

第六步:准备数据文件,上传到flume指定的文件夹

准备数据文件内容如下,上传文件到flume的采集目录

hadoop spark hive spark
hadoop sqoop spark storm

第七步:代码开发spark程序poll拉取flume数据
需要添加pom依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}

object SparkFlumePull {

  def updateFunc(inputValue:Seq[Int], resultValue:Option[Int]):Option[Int] = {
    Option(inputValue.sum + resultValue.getOrElse(0))
  }
  
  /**
   * 使用poll方式,从flume当中拉取数据
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {

    val sparkContxt: SparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("sparkFlumePull"))

    val streamingContext: StreamingContext = new StreamingContext(sparkContxt,Seconds(5))
    streamingContext.checkpoint("./flumePoll")
    /**
     * ssc: StreamingContext,
     * hostname: String,
     * port: Int,
     * 所有flume采集的数据都封装到SparkFlumeEvent里面
     */
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(streamingContext,"node3",8888)
    val line: DStream[String] = stream.map(x => {
      //x代表 SparkFlumeEvent封装对象,这个对象封装了event数据
      val array: Array[Byte] = x.event.getBody.array()
      //将字节数组转换成为String
      val str: String = new String(array)
      str
    })
    val key: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
    key.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
Push方式

第一步:开发flume的配置文件

停止之前的flume进程,然后重新开发flume的配置文件
mkdir -p /export/servers/flume/flume-push/
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim flume-push.conf

#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flume/flume-push
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
#注意这里的ip需要指定的是我们spark程序所运行的服务器的ip,也就是我们的win7的ip地址
a1.sinks.k1.hostname=192.168.1.26
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

第二步:启动flume进程

node3启动flume的进程
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
bin/flume-ng agent -c conf -f conf/flume-push.conf -n a1 -Dflume.root.logger=DEBUG,CONSOLE

第三步:代码实现push模式消费数据

开发sparkstreaming代码,通过push模式消费flume当中的数据
package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}

object SparkFlumePush {
  def main(args: Array[String]): Unit = {
    /**
     * ssc: StreamingContext,
     * hostname: String,
     * port: Int,
     */
    val sparkContext: SparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("sparkFlumePush"))
    sparkContext.setLogLevel("WARN")
    val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
    //sparkStreaming运行在哪主机写哪台ip S
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(streamingContext,"192.168.1.26",8888)
    val line: DStream[String] = stream.map(x => {
      val bytes: Array[Byte] = x.event.getBody.array()
      val str = new String(bytes)
      str
    })
    line.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

第四步:上传数据文件

上传一些数据文件到node3服务器的/export/servers/flume/flume-push 路径下面去,然后查看idea的控制台输出

6.2.2 kafka数据源

一般不会与flume进行整合,会有弊端,flume采集的数据量可能比较大,可能有时候又比较小

一般都是与kafka进行整合,使用kafka实现数据的限流的作用
29 SparkStreaming
sparkStreaming与kafka的整合,两个版本

http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html		官方文档

29 SparkStreaming

kafka0.8版本

接收数据的两种方式

ReceiverDstream:使用HighLeveAPI进行消费,offset保存在zk当中,使用at  least  once消费模式,会造成数据的重复消费。

每隔一段时间,默认自动提交一次offset到zk当中去保存

DirectDstream:使用是LowLeveAPI进行消费,offset保存在默认的topic里面,使用at  most  once消费模式,会造成数据丢失

默认是按照最新的offset进行消费

kafka0.10版本

接收数据只有一种方式

DirectDStream:使用lowLeveAPI进行消费,offset默认保存在topic里面,配合手动提交offset,实现exactly once的消费模式

kafka的安装与使用

1、三台机器安装zookeeper
省略。。。。。
2、三台机器安装kafka集群
2.1 下载kafka安装压缩包
http://archive.apache.org/dist/kafka/

2.2 上传压缩包并解压
将kafka的安装包上传到第一台服务器的/export/softwares路径下面去,然后解压到/export/servers
这里统一使用  kafka_2.11-1.0.0.tgz 这个版本
2.3 修改kafka配置文件
第一台机器修改kafka配置文件server.properties
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node1

第二台机器修改kafka配置文件server.properties
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node2

第三台机器修改kafka配置文件server.properties
broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node3

2.4启动kafka集群
三台机器启动kafka服务
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &    后台启动命令
3、kafka的命令行的管理使用
创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic kafkatopic --zookeeper node1:2181,node2:2181,node3:2181
模拟生产者
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic kafkatopic
模拟消费者
kafka-console-consumer.sh --from-beginning --topic kafkatopic --zookeeper node1:2181,node2:2181,node3:2181

kafka0.8版本

第一种方式对接kafka之CreateDstream方式

第一步:导入jar包

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

第二步:创建kafka的topic

node1服务器执行以下命令创建kafka的topic sparkafka
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh  --create --partitions 3 --replication-factor 2 --topic sparkafka --zookeeper node1:2181,node2:2181,node3:2181

第三步:使用脚本启动kafka生产者

node1服务器执行以下命令通过脚本模拟kafka生产者
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic sparkafka

第四步:开发SparkStreaming对接kafka代码
29 SparkStreaming

package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.immutable

object SparkKafkaReceiverDStream {
  /**
   * 消费Kafka当中的数据,topic sparkkafka 有三个分区
   * 使用highLevelAPI进行消费,将offset保存到zk,消费模式at least one
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //使用一个集合调用map方法,获取集合当中的每一个数据

    val sparkContxt: SparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("sparkKafakReceiverDStream"))
    val streamingContext: StreamingContext = new StreamingContext(sparkContxt,Seconds(5))
    val zkQuorum:String = "node1:2181,node2:2181,node3:2181"
    val groupId:String = "ReceiverDSteamGroup"
    val mapTopic = Map("sparkafka "-> 3)
    //启用3个线程,分别去消费三个分区里面的数据
    //IndexedSeq 存放的是各个分区的数据
    val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
      val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, mapTopic)
      stream
    })
    //合并各个分区的数据
    /**
     *
     * (String, String)
     *   key    value
     */
    val union: DStream[(String, String)] = streamingContext.union(receiverDStream)
    //获取value的值
    val line: DStream[String] = union.map(x => {
      x._2
    })
    line.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

offset默认自动保存到zk中,但是未保存时,程序关闭,下次开启时导致数据的重复消费
29 SparkStreaming

第二种方式对接kafka之CreateDirectStream方式

使用kafka的低阶API进行消费,消费数据的offset全部维护在kafka当中的一个topic当中,会自动提交offset,同时也可以手动提交维护offset更加安全

package cn.itcast.sparkStreaming

import org.apache.commons.codec.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkKafkaDirectDStream {
  /**
   * 使用是Kafka的LowLevelAPI进行消费
   * 将Kafka的offset维护在Kafka默认的一个topic里面,会造成数据的丢失
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * [K:ClassTag,
     * V:ClassTag,
     * KD <: Decoder[K]:ClassTag,
     * VD <: Decoder[V]:ClassTag](
     * ssc:StreamingContext,
     * kafkaParams:Map[String,String],
     * topics:Set[String]
     * )
     *
     */

    val sparkContext:SparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("sparkKafkaDirectDStream"))
    sparkContext.setLogLevel("WARN")
    val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
    //配置Kafka相关参数
    val kafkaParams: Map[String, String] = Map("metadata.broker.list"-> "node1:9092,node2:9092,node3:9092","group.id"->"Kafka_Direct")

    //没有开启指定的线程数。默认是Kafka有多少分区就启动多少线程去消费对应分区的数据
    //(String, String) key(空) value
    val resultDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](streamingContext,kafkaParams,Set("sparkafka"))
    val resultLine: DStream[String] = resultDStream.map(x => {
      x._2
    })
    //直接输出结果值
    resultLine.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

默认是按照最新的offset进行消费,造成数据的丢失,如果指定offset可以避免
29 SparkStreaming

kafka0.10版本

Apche kafka数据源0.10版本对接

手动提交offset,进行offset的管理维护,保证数据不会丢失,推荐使用
第一步:导入jar包

<!-- <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
	<version>2.2.0</version>
</dependency>-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
	<version>2.2.0</version>
</dependency>
package cn.itcast.sparkStreaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies, LocationStrategy}

object SparkStreaming10Direct {
  def main(args: Array[String]): Unit = {
    /**
     * [K, V](
     * ssc: StreamingContext,
     * locationStrategy: LocationStrategy,
     * consumerStrategy: ConsumerStrategy[K, V],
     * perPartitionConfig: PerPartitionConfig)
     */
    val sparkContext: SparkContext = new SparkContext(new SparkConf().setMaster("local[8]").setAppName("sparkKafka10DirectStream"))
    sparkContext.setLogLevel("WARN")
    val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
    // sealed abstract class LocationStrategy 使用sealed关键字修饰的叫做密封类。不让使用,就是不能继承,目的是为了安全
    val consistent: LocationStrategy = LocationStrategies.PreferConsistent

    //Subscribe[K, V](
    //      topics: ju.Collection[jl.String],
    //      kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V]
    //创建topic
    val brokers= "node1:9092,node2:9092,node3:9092"
    val sourcetopic="sparkafka";
    //创建消费者组
    var group="sparkafkaGroup"
    //消费者配置
    val kafkaParam = Map(
      "bootstrap.servers" -> brokers,//用于初始化链接到集群的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      //用于标识这个消费者属于哪个消费团体
      "group.id" -> group,
      //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
      //可以使用这个配置,latest自动重置偏移量为最新的偏移量
      "auto.offset.reset" -> "latest",
      //如果是true,则这个消费者的偏移量会在后台自动提交
      //不要自动提交,配合手动消费
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //ConsumerStrategies.Subscribe订阅某个topic当中的数据
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String,String](Array("sparkafka"),kafkaParam)
    val resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](streamingContext,consistent,consumerStrategy)
    //循环遍历每个RDD当中的数据
    //一个RDD有多个分区,一个分区里面有多条数据
    resultDStream.foreachRDD(iter => {
      if(iter.count() >0){
        //如果RDD里面大于0,表示有数据
         //foreach 属于outputOperation算子
        //循环RDD里面的所有数据,一行行取出来
        iter.foreach(line => {
            val value: String = line.value()
            println(value)
          //为了更加精确的控制offset,我们可以在这里处理一条数据就提交一条数据的offset,效率会比下面这个低
        })
        //处理完一个RDD中的一批数据,手动提交这一批次offset
        //获取RDD当中所有数据的offset,将iter强制转换成为HasOffsetRanges,得到RDD中这一批次的所有数据的offset值
        val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
        //提交offset值,将resultDStream强制转换成CanCommitOffsets这个类,然后调用commitAsync异步提交offset值
        val commitOffsets: CanCommitOffsets = resultDStream.asInstanceOf[CanCommitOffsets]
        commitOffsets.commitAsync(ranges)
      }
    })
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

7.DStreams的转换

有状态以 updateStageByKey,可以将历史的数据全部都保存起来,但是一定要设置checkpoint

无状态转换:map flatMap,filter join union

7.1.无状态转换操作

29 SparkStreaming

7.2 有状态转化操作

追踪状态变化UpdateStateByKey

读取socket里面的数据,然后将历史结果给保存起来,将第一个案例,给改造了,改造成为保存历史数据 作业
1.架构图
29 SparkStreaming
2.实现流程
29 SparkStreaming

package cn.test.spark

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * sparkStreaming流式处理,接受socket数据,实现单词统计并且每个批次数据结果累加
  */
object SparkStreamingTCPTotal {

  //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
  //runningCount 历史的所有相同key的value总和
  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount =runningCount.getOrElse(0)+newValues.sum
    Some(newCount)
  }


  def main(args: Array[String]): Unit = {

    //配置sparkConf参数
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCPTotal").setMaster("local[2]")
    //构建sparkContext对象
    val sc: SparkContext = new SparkContext(sparkConf)
    //设置日志输出的级别
    sc.setLogLevel("WARN")
    //构建StreamingContext对象,每个批处理的时间间隔
    val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
   //设置checkpoint路径,当前项目下有一个ck目录
    scc.checkpoint("./ck")
    //注册一个监听的IP地址和端口  用来收集数据
    val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.200.160", 9999)
    //切分每一行记录
    val words: DStream[String] = lines.flatMap(_.split(" "))
    //每个单词记为1
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    //累计统计单词出现的次数
    val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
    result.print()
    scc.start()
    scc.awaitTermination()
  }
}

通过函数updateStateByKey实现。根据key的当前值和key的之前批次值,对key进行更新,返回一个新状态的DStream

7.3 Window Operations

需求:sparkStreaming的程序是每隔5s钟执行一次。统计每个15S内的数据情况。以及每隔5S钟的数据情况
29 SparkStreaming
29 SparkStreaming
29 SparkStreaming

package cn.itcast.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowSpark {
  def main(args: Array[String]): Unit = {
    //获取streamingContxt,从socket接收数据,实现单词统计

    val context: SparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("sparkWindow"))
    context.setLogLevel("WARN")
    val streamingContext:StreamingContext = new StreamingContext(context,Seconds(5))
    val stream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.91.110",9999)
    val wordAndOne: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_,1))
    //reduceFunc: (V, V) => V,
    //      windowDuration: Duration, 窗口宽度
    //      slideDuration: Duration   滑动窗口的时间间隔
    // 获取数据的时间间隔5s
    // 窗口长度10s
    // 滑动时间间隔 5s    ==>数据出现了两次
    //
    // 获取数据的时间间隔5s
    // 窗口长度5s
    // 滑动时间间隔 10s    ==>数据出现了一次
    val window: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(15),Seconds(15))
    window.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

8.sparkStreaming的容错

检查点容错:可以设置checkPoint路径,方便数据的恢复
29 SparkStreaming

驱动器的容错:不要使用new的方式创建streamingContext
29 SparkStreaming
工作节点的容错:将接收到的数据进行保存,然后使用RDD的血统进行数据的容错

接收器的容错:可以选择比较靠谱的消息源来接收数据,例如kafka等消息队列

处理的保证:尽量保证所有的数据,处理且仅处理一次。实现数据的不丢不漏不重不错

标签:

拜师教育学员文章:作者:976-沈同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《29 SparkStreaming》 发布于2020-08-06

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录