SparkStreaming
1.sparkStreaming的基本介绍
1、什么是Spark Streaming
sparkStreaming是spark当中用于实时处理的一个模块,主要是用于做一些实时处理的功能,类似于storm。
sparkStreaming支持各种各样的数据源,例如kakfa,flume,tcp套接字
2、为什么要学sparkStreaming
主要就是用于做实时处理
3、sparkStreaming与strom对比:
1:编程模型不同 sparkStreaming编程模型使用的是DStream
strom编程模型使用的Spout与Bolt
2:编程语言不同
storm使用的cloujure
spark使用scala语言
3:实时性不太一样:
SparkStreaming的实时性不如storm好
4:数据吞吐量不太一样:
SparkStreaming的数据吞吐量远远高于storm
5:数据驱动模型不一样
strom是以数据为驱动的
sparkStreaming是以时间为驱动的。每隔一段时间去处理一批数据。如果间隔时间足够段,直到时间缩短到0.1s钟。
基本上就可以当做实时处理的框架来使用。
2.Spark Streaming原理与架构
1:原理 只要间隔时间足够段,就可以当做实时处理的框架来使用
2:计算流程
3:sparkStreaming的容错性:可以通过rdd确定lineage血统关系,实现容错性,驱动器的容错
4:sparkStreaming的实时性:最小时间间隔可以达到0.1s
5:sparkStreaming的架构
sparkCore ==> RDD ==> SparkContext
sparkSQL ==> DF/DS ==> SparkSession
SparkStreaming ==> DStream ==> StreamingContext
以数据为驱动的程序实时性更高一点
3.DStream
1.什么是DStream
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,
DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。
4.DStream相关操作
1、Transformations on DStreams
2、Output Operations on DStreams
5.DStream操作实战
需求:SparkStreaming接受socket数据,实现单词计数WordCount
第一步:创建maven工程并导入jar包
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
第二步:node1服务器安装并启动生产者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
node1服务器执行以下命令安装socket客户端工具,模拟发送数据
yum -y install nc
第三步:通过netcat工具向指定的端口发送数据
node1服务器执行以下命令,向指定的端口9999发送数据
nc -lk 9999
第四步:开发sparkStreaming程序,统计单词出现的次数
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingDemo1 {
/**
* 实现实时数据统计,接收socket里面的数据
* @param args
* sparkContext: SparkContext, batchDuration: Duration
*/
def main(args: Array[String]): Unit = {
//数据抽象叫做DStream,数据操作对象StreamingContext
val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("streamingFirst")
val sparkContext = new SparkContext(sparkConf)
sparkContext.setLogLevel("WARN")
//获取StreamingContext,设置每隔5s处理一批数据
val streamingContext = new StreamingContext(sparkContext,Seconds(5))
//接收数据
val stream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.91.110",9999)
//数据处理
val key: DStream[(String, Int)] = stream.flatMap(x => x.split(" ")).map((_,1)).reduceByKey(_ + _)
//调用out_put operation实现数据输出
key.print()
//启动程序
streamingContext.start()
//等待程序结束
streamingContext.awaitTermination()
}
}
未保存历史数据
6.sparkStreaming数据源
6.1 基本数据源
6.1.1、文件数据源
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileStream {
//使用updateStateByKey这个算子,需要传入一个updateFunc函数;
//seq:传入数据值 option:历史累加值
//返回值是option
//option some none scala当中比较特殊的类型,为了解决null值的问题
//option 是 some和none的父类
//none: 类似于口袋里面什么也没有,但是有一张纸条,上面写着什么也没有
/**
* 定义一个方法,实现将历史数据全部保存下来
* @param inputSum Seq[Int] 将输入数据都累加
* @param resultSum 保存历史的数据
* @return
*/
def updateFunc(inputSum:Seq[Int], resultSum:Option[Int]) :Option[Int] = {
//getOrElse 表示获取历史数据,如果没有则给0
val finalResult: Int = inputSum.sum + resultSum.getOrElse(0)
Option(finalResult) //返回累加之后的结果
//Some(finalResult)
}
/**
* 监控hdfs的某一个目录,一旦有文件新生成,处理文件内容,实现单词统计
* 并且将历史数据保存下来
*
* @param args
*/
def main(args: Array[String]): Unit = {
//获取StreamingContext
val sparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("fileStream"))
sparkContext.setLogLevel("WARN")
val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
//将历史结果保存至这个路径下面
streamingContext.checkpoint("./stream-check1")
//获取hdfs上面的路径,注意:如果写hdfs路径不好使,直接改成file:///
val stream: DStream[String] = streamingContext.textFileStream("hdfs://node1:8020/stream-data")
// val stream: DStream[String] = streamingContext.textFileStream("file:///E://大数据资料//大数据实时资料//3、Spark//spark第四天//data//")
//updateStateByKey将历史累加结果保存起来
val array: DStream[String] = stream.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = array.map((_,1))
val byKey: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
byKey.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
6.1.2、自定义数据源 (socket数据,MySQL数据)
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ReceiverStream {
//定义updateFunc
def updateFunc(inputSum:Seq[Int], resultSum:Option[Int]) :Option[Int] = {
val finalResult: Int = inputSum.sum + resultSum.getOrElse(0)
Option(finalResult)
}
/**
* 自定义数据源,实现从socket当中接收数据
*
* @param args
*/
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("receiver"))
sparkContext.setLogLevel("WARN")
//获取streamingContext
val streamingContext = new StreamingContext(sparkContext,Seconds(5))
streamingContext.checkpoint("./myreceiver")
//receiverStream实现从自定义数据源接收数据
//需要一个Receiver类
val stream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.91.110",9999))
//再对数据进行处理
val key: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
key.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
定义接收器类MyReceiver
package cn.itcast.sparkStreaming
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){
//接收socket里面的数据,然后调用store方法把数据保存起来
def receiverDatas() = {
//接收socket数据
val socket: Socket = new Socket(host,port)
//获取输入数据流
val stream: InputStream = socket.getInputStream
//通过BufferedReader将数据转换成为字符串
val bufferedReader = new BufferedReader(new InputStreamReader(stream,StandardCharsets.UTF_8))
var line:String = null
//判读读取数据不为空,且receiver接收器没有被停掉
while ((line=bufferedReader.readLine())!=null && !isStopped()){
//将我们接收到的数据通过store传送出去,供下游继续处理
store(line)
}
bufferedReader.close()
stream.close()
socket.close()
}
//每5s会不断被调用
override def onStart(): Unit = {
//每隔5s开启一个线程去接收socke里面数据
new Thread(){
override def run() = {
//接收socket数据
//定义一个receiverDatas方法,用于接收socket里面数据
receiverDatas()
}
}.start()
}
//停止结束时被调用
override def onStop(): Unit = {
}
}
6.1.3、RDD队列
从队列当中接收数据源:主要用于做一些测试使用
package cn.itcast.sparkStreaming
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueStream {
/**
* 从队列当中接收数据
* @param args
*/
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("queue"))
sparkContext.setLogLevel("WARN")
//获取StreamingContext
val streamingContext = new StreamingContext(sparkContext,Seconds(5))
// queue: Queue[RDD[T]]
val queue: mutable.SynchronizedQueue[RDD[Int]] = new mutable.SynchronizedQueue[RDD[Int]]
val stream: InputDStream[Int] = streamingContext.queueStream(queue)
val map: DStream[Int] = stream.map(_*2)
map.print()
streamingContext.start()
for (x <- 1 to 100){
//val parallelize: RDD[Int] = sparkContext.parallelize(1 to 10)
//val d: RDD[Int] = sparkContext.makeRDD(1 to 10)
queue += sparkContext.makeRDD(1 to 10)
Thread.sleep(3000)
}
//等待停止
streamingContext.awaitTermination()
}
}
6.2 高级数据源
6.2.1 flume数据源
Poll方式
第一步:安装flume
node3安装flume-1.6.0-cdh5.14.0
第二步:下载spark-streaming与flume整合的依赖jar包
下载spark-streaming-flume-sink_2.11-2.2.0.jar放入到flume的lib目录下,直接去mvnrepository.com
这个网址搜索spark-streaming-flume-sink即可
第三步:替换flume的scala的jar包
node3替换flume自带的scala-library-2.10.5.jar 这个版本的jar包
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib
rm -rf scala-library-2.10.5.jar
cp /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/scala-library-2.11.8.jar /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib/
第四步:开发flume的配置文件
node3开发flume的配置文件,监控某一个文件夹的变化,产生数据变化,全部收集起来
node3执行以下命令开发配置文件
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
mkdir -p /export/servers/flume/flume-poll
vim flume-poll.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flume/flume-poll
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=node3
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
第五步:启动flume的程序
node3启动flume的进程
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
bin/flume-ng agent -c conf -f conf/flume-poll.conf -n a1 -Dflume.root.logger=DEBUG,CONSOLE
第六步:准备数据文件,上传到flume指定的文件夹
准备数据文件内容如下,上传文件到flume的采集目录
hadoop spark hive spark
hadoop sqoop spark storm
第七步:代码开发spark程序poll拉取flume数据
需要添加pom依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.0</version>
</dependency>
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
object SparkFlumePull {
def updateFunc(inputValue:Seq[Int], resultValue:Option[Int]):Option[Int] = {
Option(inputValue.sum + resultValue.getOrElse(0))
}
/**
* 使用poll方式,从flume当中拉取数据
*
* @param args
*/
def main(args: Array[String]): Unit = {
val sparkContxt: SparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("sparkFlumePull"))
val streamingContext: StreamingContext = new StreamingContext(sparkContxt,Seconds(5))
streamingContext.checkpoint("./flumePoll")
/**
* ssc: StreamingContext,
* hostname: String,
* port: Int,
* 所有flume采集的数据都封装到SparkFlumeEvent里面
*/
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(streamingContext,"node3",8888)
val line: DStream[String] = stream.map(x => {
//x代表 SparkFlumeEvent封装对象,这个对象封装了event数据
val array: Array[Byte] = x.event.getBody.array()
//将字节数组转换成为String
val str: String = new String(array)
str
})
val key: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
key.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
Push方式
第一步:开发flume的配置文件
停止之前的flume进程,然后重新开发flume的配置文件
mkdir -p /export/servers/flume/flume-push/
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim flume-push.conf
#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flume/flume-push
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
#注意这里的ip需要指定的是我们spark程序所运行的服务器的ip,也就是我们的win7的ip地址
a1.sinks.k1.hostname=192.168.1.26
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
第二步:启动flume进程
node3启动flume的进程
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
bin/flume-ng agent -c conf -f conf/flume-push.conf -n a1 -Dflume.root.logger=DEBUG,CONSOLE
第三步:代码实现push模式消费数据
开发sparkstreaming代码,通过push模式消费flume当中的数据
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
object SparkFlumePush {
def main(args: Array[String]): Unit = {
/**
* ssc: StreamingContext,
* hostname: String,
* port: Int,
*/
val sparkContext: SparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("sparkFlumePush"))
sparkContext.setLogLevel("WARN")
val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
//sparkStreaming运行在哪主机写哪台ip S
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(streamingContext,"192.168.1.26",8888)
val line: DStream[String] = stream.map(x => {
val bytes: Array[Byte] = x.event.getBody.array()
val str = new String(bytes)
str
})
line.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
第四步:上传数据文件
上传一些数据文件到node3服务器的/export/servers/flume/flume-push 路径下面去,然后查看idea的控制台输出
6.2.2 kafka数据源
一般不会与flume进行整合,会有弊端,flume采集的数据量可能比较大,可能有时候又比较小
一般都是与kafka进行整合,使用kafka实现数据的限流的作用
sparkStreaming与kafka的整合,两个版本
http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html 官方文档
kafka0.8版本
接收数据的两种方式
ReceiverDstream:使用HighLeveAPI进行消费,offset保存在zk当中,使用at least once消费模式,会造成数据的重复消费。
每隔一段时间,默认自动提交一次offset到zk当中去保存
DirectDstream:使用是LowLeveAPI进行消费,offset保存在默认的topic里面,使用at most once消费模式,会造成数据丢失
默认是按照最新的offset进行消费
kafka0.10版本
接收数据只有一种方式
DirectDStream:使用lowLeveAPI进行消费,offset默认保存在topic里面,配合手动提交offset,实现exactly once的消费模式
kafka的安装与使用
1、三台机器安装zookeeper
省略。。。。。
2、三台机器安装kafka集群
2.1 下载kafka安装压缩包
http://archive.apache.org/dist/kafka/
2.2 上传压缩包并解压
将kafka的安装包上传到第一台服务器的/export/softwares路径下面去,然后解压到/export/servers
这里统一使用 kafka_2.11-1.0.0.tgz 这个版本
2.3 修改kafka配置文件
第一台机器修改kafka配置文件server.properties
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node1
第二台机器修改kafka配置文件server.properties
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node2
第三台机器修改kafka配置文件server.properties
broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node3
2.4启动kafka集群
三台机器启动kafka服务
nohup bin/kafka-server-start.sh config/server.properties 2>&1 & 后台启动命令
3、kafka的命令行的管理使用
创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic kafkatopic --zookeeper node1:2181,node2:2181,node3:2181
模拟生产者
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic kafkatopic
模拟消费者
kafka-console-consumer.sh --from-beginning --topic kafkatopic --zookeeper node1:2181,node2:2181,node3:2181
kafka0.8版本
第一种方式对接kafka之CreateDstream方式
第一步:导入jar包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
第二步:创建kafka的topic
node1服务器执行以下命令创建kafka的topic sparkafka
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic sparkafka --zookeeper node1:2181,node2:2181,node3:2181
第三步:使用脚本启动kafka生产者
node1服务器执行以下命令通过脚本模拟kafka生产者
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic sparkafka
第四步:开发SparkStreaming对接kafka代码
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.immutable
object SparkKafkaReceiverDStream {
/**
* 消费Kafka当中的数据,topic sparkkafka 有三个分区
* 使用highLevelAPI进行消费,将offset保存到zk,消费模式at least one
* @param args
*/
def main(args: Array[String]): Unit = {
//使用一个集合调用map方法,获取集合当中的每一个数据
val sparkContxt: SparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("sparkKafakReceiverDStream"))
val streamingContext: StreamingContext = new StreamingContext(sparkContxt,Seconds(5))
val zkQuorum:String = "node1:2181,node2:2181,node3:2181"
val groupId:String = "ReceiverDSteamGroup"
val mapTopic = Map("sparkafka "-> 3)
//启用3个线程,分别去消费三个分区里面的数据
//IndexedSeq 存放的是各个分区的数据
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, mapTopic)
stream
})
//合并各个分区的数据
/**
*
* (String, String)
* key value
*/
val union: DStream[(String, String)] = streamingContext.union(receiverDStream)
//获取value的值
val line: DStream[String] = union.map(x => {
x._2
})
line.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
offset默认自动保存到zk中,但是未保存时,程序关闭,下次开启时导致数据的重复消费
第二种方式对接kafka之CreateDirectStream方式
使用kafka的低阶API进行消费,消费数据的offset全部维护在kafka当中的一个topic当中,会自动提交offset,同时也可以手动提交维护offset更加安全
package cn.itcast.sparkStreaming
import org.apache.commons.codec.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object SparkKafkaDirectDStream {
/**
* 使用是Kafka的LowLevelAPI进行消费
* 将Kafka的offset维护在Kafka默认的一个topic里面,会造成数据的丢失
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* [K:ClassTag,
* V:ClassTag,
* KD <: Decoder[K]:ClassTag,
* VD <: Decoder[V]:ClassTag](
* ssc:StreamingContext,
* kafkaParams:Map[String,String],
* topics:Set[String]
* )
*
*/
val sparkContext:SparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("sparkKafkaDirectDStream"))
sparkContext.setLogLevel("WARN")
val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
//配置Kafka相关参数
val kafkaParams: Map[String, String] = Map("metadata.broker.list"-> "node1:9092,node2:9092,node3:9092","group.id"->"Kafka_Direct")
//没有开启指定的线程数。默认是Kafka有多少分区就启动多少线程去消费对应分区的数据
//(String, String) key(空) value
val resultDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](streamingContext,kafkaParams,Set("sparkafka"))
val resultLine: DStream[String] = resultDStream.map(x => {
x._2
})
//直接输出结果值
resultLine.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
默认是按照最新的offset进行消费,造成数据的丢失,如果指定offset可以避免
kafka0.10版本
Apche kafka数据源0.10版本对接
手动提交offset,进行offset的管理维护,保证数据不会丢失,推荐使用
第一步:导入jar包
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
package cn.itcast.sparkStreaming
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies, LocationStrategy}
object SparkStreaming10Direct {
def main(args: Array[String]): Unit = {
/**
* [K, V](
* ssc: StreamingContext,
* locationStrategy: LocationStrategy,
* consumerStrategy: ConsumerStrategy[K, V],
* perPartitionConfig: PerPartitionConfig)
*/
val sparkContext: SparkContext = new SparkContext(new SparkConf().setMaster("local[8]").setAppName("sparkKafka10DirectStream"))
sparkContext.setLogLevel("WARN")
val streamingContext: StreamingContext = new StreamingContext(sparkContext,Seconds(5))
// sealed abstract class LocationStrategy 使用sealed关键字修饰的叫做密封类。不让使用,就是不能继承,目的是为了安全
val consistent: LocationStrategy = LocationStrategies.PreferConsistent
//Subscribe[K, V](
// topics: ju.Collection[jl.String],
// kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V]
//创建topic
val brokers= "node1:9092,node2:9092,node3:9092"
val sourcetopic="sparkafka";
//创建消费者组
var group="sparkafkaGroup"
//消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> brokers,//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
//不要自动提交,配合手动消费
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//ConsumerStrategies.Subscribe订阅某个topic当中的数据
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String,String](Array("sparkafka"),kafkaParam)
val resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](streamingContext,consistent,consumerStrategy)
//循环遍历每个RDD当中的数据
//一个RDD有多个分区,一个分区里面有多条数据
resultDStream.foreachRDD(iter => {
if(iter.count() >0){
//如果RDD里面大于0,表示有数据
//foreach 属于outputOperation算子
//循环RDD里面的所有数据,一行行取出来
iter.foreach(line => {
val value: String = line.value()
println(value)
//为了更加精确的控制offset,我们可以在这里处理一条数据就提交一条数据的offset,效率会比下面这个低
})
//处理完一个RDD中的一批数据,手动提交这一批次offset
//获取RDD当中所有数据的offset,将iter强制转换成为HasOffsetRanges,得到RDD中这一批次的所有数据的offset值
val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
//提交offset值,将resultDStream强制转换成CanCommitOffsets这个类,然后调用commitAsync异步提交offset值
val commitOffsets: CanCommitOffsets = resultDStream.asInstanceOf[CanCommitOffsets]
commitOffsets.commitAsync(ranges)
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
7.DStreams的转换
有状态以 updateStageByKey,可以将历史的数据全部都保存起来,但是一定要设置checkpoint
无状态转换:map flatMap,filter join union
7.1.无状态转换操作
7.2 有状态转化操作
追踪状态变化UpdateStateByKey
读取socket里面的数据,然后将历史结果给保存起来,将第一个案例,给改造了,改造成为保存历史数据 作业
1.架构图
2.实现流程
package cn.test.spark
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* sparkStreaming流式处理,接受socket数据,实现单词统计并且每个批次数据结果累加
*/
object SparkStreamingTCPTotal {
//newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
//runningCount 历史的所有相同key的value总和
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
//配置sparkConf参数
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCPTotal").setMaster("local[2]")
//构建sparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
//设置日志输出的级别
sc.setLogLevel("WARN")
//构建StreamingContext对象,每个批处理的时间间隔
val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
//设置checkpoint路径,当前项目下有一个ck目录
scc.checkpoint("./ck")
//注册一个监听的IP地址和端口 用来收集数据
val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.200.160", 9999)
//切分每一行记录
val words: DStream[String] = lines.flatMap(_.split(" "))
//每个单词记为1
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//累计统计单词出现的次数
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
result.print()
scc.start()
scc.awaitTermination()
}
}
通过函数updateStateByKey实现。根据key的当前值和key的之前批次值,对key进行更新,返回一个新状态的DStream
7.3 Window Operations
需求:sparkStreaming的程序是每隔5s钟执行一次。统计每个15S内的数据情况。以及每隔5S钟的数据情况
package cn.itcast.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowSpark {
def main(args: Array[String]): Unit = {
//获取streamingContxt,从socket接收数据,实现单词统计
val context: SparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("sparkWindow"))
context.setLogLevel("WARN")
val streamingContext:StreamingContext = new StreamingContext(context,Seconds(5))
val stream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.91.110",9999)
val wordAndOne: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_,1))
//reduceFunc: (V, V) => V,
// windowDuration: Duration, 窗口宽度
// slideDuration: Duration 滑动窗口的时间间隔
// 获取数据的时间间隔5s
// 窗口长度10s
// 滑动时间间隔 5s ==>数据出现了两次
//
// 获取数据的时间间隔5s
// 窗口长度5s
// 滑动时间间隔 10s ==>数据出现了一次
val window: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(15),Seconds(15))
window.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
8.sparkStreaming的容错
检查点容错:可以设置checkPoint路径,方便数据的恢复
驱动器的容错:不要使用new的方式创建streamingContext
工作节点的容错:将接收到的数据进行保存,然后使用RDD的血统进行数据的容错
接收器的容错:可以选择比较靠谱的消息源来接收数据,例如kafka等消息队列
处理的保证:尽量保证所有的数据,处理且仅处理一次。实现数据的不丢不漏不重不错
拜师教育学员文章:作者:976-沈同学,
转载或复制请以 超链接形式 并注明出处 拜师资源博客。
原文地址:《29 SparkStreaming》 发布于2020-08-06
评论 抢沙发