Spark中直接执行hive查询,Spark整合hive,SparkSQL的使用案例,Spark读取MySQL,Spark写入MySQL

2022-09-02 13:55:47

Spark中直接执行hive查询

在未整合hive的情况下,Spark中直接执行hive查询,spark读取和操作的元数据是在hive自带derby数据库。需要spark与hive整合后才能将读取和操作hive在mysql的元数据。为了避免hive元数据出故障,最好是整合hive后再操作。
创建maven工程,pom如下:

<properties><scala.version>2.11.8</scala.version><spark.version>2.2.0</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.5</version></dependency><!-- spark连接hive的依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.2.0</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding><!--    <verbal>true</verbal>--></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

代码如下:

import org.apache.spark.sql.SparkSessionobject SparkHive{def main(args: Array[String]):Unit={// 获取带有hive支持的SparkSessionval sparkSession=SparkSession.builder().appName("hive on spark").master("local").enableHiveSupport().getOrCreate()val sc=sparkSession.sparkContext// 创建数据库
    sparkSession.sql("create database if not exists sparkstudy")// 进入数据库
    sparkSession.sql("use sparkstudy")// 创建hive表
    sparkSession.sql("create table if not exists student(id int,name string,score int) "+"row format delimited fields terminated by ','")// 加载数据
    sparkSession.sql("load data local inpath './datas/student.csv' "+"overwrite into table student")// 查询数据
    sparkSession.sql("select * from student").show()

    sc.stop()
    sparkSession.close()}}

Spark整合hive

下面的node03就是集群中hive客户机。

第一步:将hive-site.xml拷贝到spark安装家路径的conf目录下

node03执行以下命令来拷贝hive-site.xml到所有的spark安装服务器上面去

cd /export/servers/hive-1.1.0-cdh5.14.0/confcp hive-site.xml  /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/scp hive-site.xml  node02://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/scp hive-site.xml  node01://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/

第二步:将mysql的连接驱动包拷贝到spark的jars目录下

node03执行以下命令将连接驱动包拷贝到spark的jars目录下,三台机器都要进行拷贝

cd /export/servers/hive-1.1.0-cdh5.14.0/libcp mysql-connector-java-5.1.38.jar /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/scp mysql-connector-java-5.1.38.jar node02://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/scp mysql-connector-java-5.1.38.jar node01://export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/

第三步:测试sparksql整合hive是否成功

先启动hadoop集群,在启动spark集群,确保启动成功之后node01执行命令:

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
bin/spark-sql --master spark://node01:7077 --executor-memory 1G --total-executor-cores 2

指明master地址、每一个executor的内存大小、一共所需要的核数、
mysql数据库连接驱动。
执行成功后的界面:进入到spark-sql 客户端命令行界面

查看当前有哪些数据库, 并创建数据库

showdatabases;createdatabase sparkhive;

spark 2.x版本整合hive之bug解决

在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,
此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在哪里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。

所有在启动的时候需要加上这样一个参数:

--conf  spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。
如果使用的是spark2.0之前的版本,由于没有sparkSession,不会有spark.sql.warehouse.dir配置项,不会出现上述问题。
最后的执行脚本;
node01执行以下命令重新进去spark-sql

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
bin/spark-sql \
 --master spark://node01:7077 \
 --executor-memory 1G --total-executor-cores 2 \
 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

SparkSQL的使用案例

第一步:准备原始数据

node01服务器准备原始数据

cd /export/servers
hdfs dfs -mkdir -p /sparkhivedatas
vim sparkhive.txt

1 zhangsan 18
2 lisi 28
3 wangwu 20
4 zhaoliu 38
5 chenqi 45

将数据上传到hdfs上面去

hdfs dfs -mkdir /sparkhivedatas
hdfs dfs -put sparkhive.txt /sparkhivedatas

Spark连接MySQL

Spark从MySQL中读数据

导包

pom文件延用sparkhive项目中的pom,但是需要添加MySQL连接驱动jar包依赖:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency>

编写代码

import org.apache.spark.sql.SparkSessionobject SparkMysql{val sparkSession=SparkSession.builder().appName("SparkMysql").master("local").getOrCreate()val sc=sparkSession.sparkContext
  sc.setLogLevel("WARN")// spark读取mysql方法1def mysql2spark1:Unit={val connectMap=Map("url"->"jdbc:mysql://192.168.2.6:3306/userdb","driver"->"com.mysql.jdbc.Driver","dbtable"->"emp","user"->"root","password"->"12345")val mysqlDF=sparkSession.read.format("jdbc").options(connectMap).load()
    mysqlDF.show()

    sc.stop()
    sparkSession.close()}// spark读取mysql方法2def mysql2spark2:Unit={val url="jdbc:mysql://192.168.2.6:3306/userdb"val tableName="emp";val properties=new Properties()//注意:用户名和密码的属性只能是 user   password
    properties.setProperty("user","root")
    properties.setProperty("password","12345")val mysqlDF= sparkSession.read.jdbc(url,tableName,properties)
    mysqlDF.show()
    sc.stop()
    sparkSession.close()}def main(args: Array[String]):Unit={//mysql2spark1
    mysql2spark2}}

Spark将数据写入MySQL

Spark将数据写入MySQL实际是将DataFrame写入MySQL。
项目依赖与Spark读取MySQL的一样。
注意,Spark将数据写入MySQL在没有主键冲突的情况下是续写数据,不是覆写,在有主键冲突的情况下则会报错,不能写。

编写代码

import java.util.Propertiesimport org.apache.spark.sql.{SaveMode, SparkSession}object SparkWriteMysql{caseclass Person(id:Int, name:String, age:Int)def main(args: Array[String]):Unit={val sparkSession= SparkSession.builder().appName("SparkWriteMysql").getOrCreate()val sc= sparkSession.sparkContext
    sc.setLogLevel("WARN")//读取文件内容,通过文件内容构建RDDval lineRDD= sc.textFile(args(0))val personRDD= lineRDD.map(_.split(" ")).map(x=> Person(x(0).toInt, x(1), x(2).toInt))import sparkSession.implicits._val df= personRDD.toDF()val url="jdbc:mysql://192.168.2.6:3306/sparkstudy"val properties=new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","12345")// 将DataFrame写达到mysql,这个方法在主键不重复的情况是会插入数据到数据库,不是覆写
    df.write.mode(SaveMode.Append).jdbc(url= url, table="person", connectionProperties= properties)

    sc.stop()
    sparkSession.close()}}

打包到Spark上运行

要运行“Spark将数据写入MySQL”的项目,有三种方式:
1、把含依赖的Jar包放到linux上跑,但是这个Jar包太大且里面只有Mysql驱动包是必须打进去的,不划算。
2、在进行spark-submit时,将hive里的MySQL驱动包加入到spark环境,并指定为driver-class,spark-submit的命令如下:

spark-submit --master spark://node01:7077 \
--class SparkWriteMysql \
--executor-memory 1g \
--total-executor-cores 2 \
--jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
--driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
original-day03-1.0-SNAPSHOT.jar hdfs://node01:8020/sparkdatas/person.txt

3、在Spark集群每台机器的spark的jars目录下放一个MySQL驱动包,然后直接spark-submit,对于MySQL这种常用的驱动包直接放到jars下,对于不常用的就可以用spark-submit再指定的方式,spark-submit的命令如下(尝试用yarn的cluster模式运行):

spark-submit--class SparkWriteMysql \--master yarn \--deploy-mode cluster \--executor-memory1g \/export/servers/sparkdatas/spark-sql.jar \
hdfs://node01:8020/sparkdatas/person.txt
  • 作者:hwq317622817
  • 原文链接:https://blog.csdn.net/hwq317622817/article/details/113870669
    更新时间:2022-09-02 13:55:47