Spark(第六节—1)Spark中直接执行hive查询,Spark整合hive,SparkSQL的使用案例,Spark读取MySQL,Spark写入MySQL

1300-黄同学

发表文章数:85

热门标签

首页 » 大数据 » 正文

Spark中直接执行hive查询

在未整合hive的情况下,Spark中直接执行hive查询,spark读取和操作的元数据是在hive自带derby数据库。需要spark与hive整合后才能将读取和操作hive在mysql的元数据。为了避免hive元数据出故障,最好是整合hive后再操作。
创建maven工程,pom如下:

<properties>
	<scala.version>2.11.8</scala.version>
	<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-library</artifactId>
		<version>${scala.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>${spark.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-sql_2.11</artifactId>
		<version>${spark.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>2.7.5</version>
	</dependency>
	<!-- spark连接hive的依赖 -->
	 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.0</version>
	</dependency>

</dependencies>
<build>
	<sourceDirectory>src/main/scala</sourceDirectory>
	<testSourceDirectory>src/test/scala</testSourceDirectory>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.0</version>
			<configuration>
				<source>1.8</source>
				<target>1.8</target>
				<encoding>UTF-8</encoding>
				<!--    <verbal>true</verbal>-->
			</configuration>
		</plugin>
		<plugin>
			<groupId>net.alchim31.maven</groupId>
			<artifactId>scala-maven-plugin</artifactId>
			<version>3.2.0</version>
			<executions>
				<execution>
					<goals>
						<goal>compile</goal>
						<goal>testCompile</goal>
					</goals>
					<configuration>
						<args>
							<arg>-dependencyfile</arg>
							<arg>${project.build.directory}/.scala_dependencies</arg>
						</args>
					</configuration>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>3.1.1</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<filters>
							<filter>
								<artifact>*:*</artifact>
								<excludes>
									<exclude>META-INF/*.SF</exclude>
									<exclude>META-INF/*.DSA</exclude>
									<exclude>META-INF/*.RSA</exclude>
								</excludes>
							</filter>
						</filters>
						<transformers>
							<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
								<mainClass></mainClass>
							</transformer>
						</transformers>
					</configuration>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

代码如下:

import org.apache.spark.sql.SparkSession

object SparkHive {
  def main(args: Array[String]): Unit = {
    // 获取带有hive支持的SparkSession
    val sparkSession=SparkSession.builder().appName("hive on spark")
      .master("local").enableHiveSupport().getOrCreate()
    val sc=sparkSession.sparkContext
    // 创建数据库
    sparkSession.sql("create database if not exists sparkstudy")
    // 进入数据库
    sparkSession.sql("use sparkstudy")
    // 创建hive表
    sparkSession.sql("create table if not exists student(id int,name string,score int) " +
     "row format delimited fields terminated by ','")
    // 加载数据
    sparkSession.sql("load data local inpath './datas/student.csv' " +
     "overwrite into table student")
    // 查询数据
    sparkSession.sql("select * from student").show()

    sc.stop()
    sparkSession.close()
  }
}

Spark整合hive

下面的node03就是集群中hive客户机。

第一步:将hive-site.xml拷贝到spark安装家路径的conf目录下

node03执行以下命令来拷贝hive-site.xml到所有的spark安装服务器上面去

cd /export/servers/hive-1.1.0-cdh5.14.0/conf
cp hive-site.xml  /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
scp hive-site.xml  node02://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
scp hive-site.xml  node01://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/

第二步:将mysql的连接驱动包拷贝到spark的jars目录下

node03执行以下命令将连接驱动包拷贝到spark的jars目录下,三台机器都要进行拷贝

cd /export/servers/hive-1.1.0-cdh5.14.0/lib
cp mysql-connector-java-5.1.38.jar /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/
scp mysql-connector-java-5.1.38.jar node02://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/
scp mysql-connector-java-5.1.38.jar node01://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/

第三步:测试sparksql整合hive是否成功

先启动hadoop集群,在启动spark集群,确保启动成功之后node01执行命令:

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
bin/spark-sql --master spark://node01:7077 --executor-memory 1G --total-executor-cores 2

指明master地址、每一个executor的内存大小、一共所需要的核数、
mysql数据库连接驱动。
执行成功后的界面:进入到spark-sql 客户端命令行界面

查看当前有哪些数据库, 并创建数据库

show databases;
create database sparkhive;

spark 2.x版本整合hive之bug解决

在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,
此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在哪里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。

所有在启动的时候需要加上这样一个参数:

--conf  spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。
如果使用的是spark2.0之前的版本,由于没有sparkSession,不会有spark.sql.warehouse.dir配置项,不会出现上述问题。
最后的执行脚本;
node01执行以下命令重新进去spark-sql

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
bin/spark-sql /
 --master spark://node01:7077 /
 --executor-memory 1G --total-executor-cores 2 /
 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

SparkSQL的使用案例

第一步:准备原始数据

node01服务器准备原始数据

cd /export/servers
hdfs dfs -mkdir -p /sparkhivedatas
vim sparkhive.txt

1 zhangsan 18
2 lisi 28
3 wangwu 20
4 zhaoliu 38
5 chenqi 45

将数据上传到hdfs上面去

hdfs dfs -mkdir /sparkhivedatas
hdfs dfs -put sparkhive.txt /sparkhivedatas

Spark连接MySQL

Spark从MySQL中读数据

导包

pom文件延用sparkhive项目中的pom,但是需要添加MySQL连接驱动jar包依赖:

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.38</version>
</dependency>

编写代码

import org.apache.spark.sql.SparkSession

object SparkMysql {
  val sparkSession=SparkSession.builder().appName("SparkMysql")
    .master("local").getOrCreate()
  val sc=sparkSession.sparkContext
  sc.setLogLevel("WARN")

  // spark读取mysql方法1
  def mysql2spark1: Unit = {
    val connectMap=Map("url" -> "jdbc:mysql://192.168.2.6:3306/userdb",
      "driver" -> "com.mysql.jdbc.Driver",
      "dbtable" -> "emp",
      "user" -> "root", "password" -> "12345")
    val mysqlDF=sparkSession.read.format("jdbc").options(connectMap).load()
    mysqlDF.show()

    sc.stop()
    sparkSession.close()
  }

  // spark读取mysql方法2
  def mysql2spark2: Unit ={
    val url ="jdbc:mysql://192.168.2.6:3306/userdb"
    val tableName = "emp";
    val properties = new Properties()
    //注意:用户名和密码的属性只能是 user   password
    properties.setProperty("user","root")
    properties.setProperty("password","12345")
    val mysqlDF= sparkSession.read.jdbc(url,tableName,properties)
    mysqlDF.show()
    sc.stop()
    sparkSession.close()
  }

  def main(args: Array[String]): Unit = {
    //mysql2spark1
    mysql2spark2
  }
}

Spark将数据写入MySQL

Spark将数据写入MySQL实际是将DataFrame写入MySQL。
项目依赖与Spark读取MySQL的一样。
注意,Spark将数据写入MySQL在没有主键冲突的情况下是续写数据,不是覆写,在有主键冲突的情况下则会报错,不能写。

编写代码

import java.util.Properties

import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkWriteMysql {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("SparkWriteMysql").getOrCreate()
    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")
    //读取文件内容,通过文件内容构建RDD
    val lineRDD = sc.textFile(args(0))
    val personRDD = lineRDD.map(_.split(" "))
      .map(x => Person(x(0).toInt, x(1), x(2).toInt))
    import sparkSession.implicits._
    val df = personRDD.toDF()
    val url = "jdbc:mysql://192.168.2.6:3306/sparkstudy"
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "12345")
    // 将DataFrame写达到mysql,这个方法在主键不重复的情况是会插入数据到数据库,不是覆写
    df.write.mode(SaveMode.Append)
      .jdbc(url = url, table = "person", connectionProperties = properties)

    sc.stop()
    sparkSession.close()
  }
}

打包到Spark上运行

要运行“Spark将数据写入MySQL”的项目,有三种方式:
1、把含依赖的Jar包放到linux上跑,但是这个Jar包太大且里面只有Mysql驱动包是必须打进去的,不划算。
2、在进行spark-submit时,将hive里的MySQL驱动包加入到spark环境,并指定为driver-class,spark-submit的命令如下:

spark-submit --master spark://node01:7077 /
--class SparkWriteMysql /
--executor-memory 1g /
--total-executor-cores 2 /
--jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar /
--driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar /
original-day03-1.0-SNAPSHOT.jar hdfs://node01:8020/sparkdatas/person.txt

3、在Spark集群每台机器的spark的jars目录下放一个MySQL驱动包,然后直接spark-submit,对于MySQL这种常用的驱动包直接放到jars下,对于不常用的就可以用spark-submit再指定的方式,spark-submit的命令如下(尝试用yarn的cluster模式运行):

spark-submit --class SparkWriteMysql /
--master yarn /
--deploy-mode cluster /
--executor-memory 1g /
/export/servers/sparkdatas/spark-sql.jar /
hdfs://node01:8020/sparkdatas/person.txt
标签:
分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录