使用pyspark读取hive数据

2023年1月16日07:58:41

之前试过pyhive直接读取hive数据,有几个依赖一直装不上,经过几天的摸索,终于使用spark自带的sql支持能够成功读取hive的数据。

其中的坑很大,有很多配置文件需要写,特此记录一下。

第一步全局环境变量:

vim ~/.bash_profile(mac)
vim ~/.bashrc(linux)
配置好hadoop spark hive java的环境变量
以及pyspark的默认python解释器的路径和pyspark-shell的ippython路径,不然后报错。

  5 ##homebrew 替换源
  6 export HOMEBREW_BOTTLE_DOMAIN=https://mirrors.ustc.edu.cn/homebrew-bottles
  7 
  8 #hadoop path
  9 export PATH=$PATH:/usr/local/Cellar/hadoop/3.2.1_1/libexec/sbin:/usr/local/Cellar/hadoop/3.2.1_1/libexec
 10 export HADOOP_HOME=/usr/local/Cellar/hadoop/3.2.1_1/libexec
 11 
 12 ## scala_home
 13 export SCALA_HOME=/usr/local/Cellar/scala/2.13.2
 14 export PATH=$PATH:$SCALA_HOME/bin
 15 
 16 ## spark_home and sparkpath
 17 export PATH=$PATH:/usr/local/Cellar/spark/bin
 18 export SPARK_HOME=/usr/local/Cellar/spark
 19 
 20 ## flink_path and flink home path 
 21 
 22 
 23 ## kalka path and kalka home path
 24 
 25 
 26 ## zookeeper path and home path
 27 
 28 
 29 ## hive path and home path
 30 export HIVE_HOME=/usr/local/Cellar/hive/3.1.2_1
 31 export PATH="$HIVE_HOME/bin:$PATH"
 32 
 33 ## JAVA_HOME
 34 export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home
 35 export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip:$PYTHONPATH
 36 
 37 export PYSPARK_PYTHON=/usr/bin/python3



首先需要安装好hadoop和hive,配置好hive的metasore数据库,也就是mysql,需要配置
vim $HIVE_HOME/libexec/conf/hive-site.xml

  1 <?xml version="1.0" encoding="UTF-8" standalone="no"?>
  2 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3 <configuration>
  4 <property>
  5         <name>javax.jdo.option.ConnectionUserName</name>
  6         <value>mysql-username</value>
  7     </property>
  8     <property>
  9         <name>javax.jdo.option.ConnectionPassword</name>
 10         <value>mysql-password</value>
 11     </property>
 12     <property>
 13         <name>javax.jdo.option.ConnectionURL</name>mysql
 14         <value>jdbc:mysql://localhost:3306/hive</value>
 15     </property>
 16     <property>
 17         <name>javax.jdo.option.ConnectionDriverName</name>
 18         <value>com.mysql.cj.jdbc.Driver</value>
 19     </property>
 20     <property>
 21         <name>datanucleus.autoCreateSchema</name>
 22         <value>true</value>
 23     </property>
 24 
 25     <property>
 26         <name>datanucleus.fixedDatastore</name>
 27         <value>true</value>
 28     </property>
 29 
 30    <property>
 31      <name>datanucleus.autoCreateTables</name>
 32      <value>True</value>
 33    </property>

初始化原数据库:
mysql -uusername -ppassword
create database hive default character set utf8mb4 collate utf8mb4_unicode_ci;

hadoop 中创建hive所需仓库

$HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$HADOOP_HOME/bin/hadoop fs -mkdir -p /user/hive/warehouse
$HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

下载 mysql 驱动包
cd /home/hadoop/hive-2.3.0/lib

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

hive 初始化 mysql 中的数据库hive
$HIVE_HOME/bin/schematool -dbType msyql -initSchema

把hive-site.xml复制一份到

S

P

A

R

K

H

O

M

E

/

c

o

n

f

m

y

s

q

l

j

a

r

SPARK_HOME/conf 并把mysql驱动jar包放在

SPARKHOME/confmysqljarSPARK_HOME/jars

并在$SPARK_HOME/conf/spark-env.sh
配置pyspark的环境变量

 73 export SCALA_HOME=/usr/local/Cellar/scala/2.13.2
 74 
 75 export SPARK_MASTER_IP=localhost
 76 
 77 export SPARK_WORKER_MEMORY=20G
 78 export PYSPARK_PYTHON=/usr/bin/python3
 79 
 80 export PYSPARK_DRIVER_PYTHON=/Users/andrew/Library/Python/3.7/bin/ipython

配置完成后


andrew@hackintosh:~#pyspark
Python 3.7.3 (default, Apr  7 2020, 14:06:47) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.14.0 -- An enhanced Interactive Python. Type '?' for help.
20/05/20 11:16:56 WARN Utils: Your hostname, hackintosh.local resolves to a loopback address: 127.0.0.1; using 192.168.31.46 instead (on interface en0)
20/05/20 11:16:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/05/20 11:16:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/05/20 11:16:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/05/20 11:16:57 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
20/05/20 11:16:57 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
20/05/20 11:16:57 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
      /_/

Using Python version 3.7.3 (default, Apr  7 2020 14:06:47)
SparkSession available as 'spark'.

In [1]:  
   ...: from pyspark import SparkConf, SparkContext 
   ...: from pyspark.sql import SparkSession                                                                  

In [2]: spark = SparkSession \ 
   ...:     .builder \ 
   ...:     .appName("Python Spark SQL Hive integration example78") \ 
   ...:     .enableHiveSupport() \ 
   ...:     .getOrCreate()                                                                                    

In [3]: spark.sql("select * from test.stu")                                                                   
20/05/20 11:19:21 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/05/20 11:19:21 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
20/05/20 11:19:22 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Out[3]: DataFrame[id: int, name: string, score: int]

In [4]: spark.sql("select * from test.stu").show()                                                            
+---+----------+-----+
| id|      name|score|
+---+----------+-----+
|  1|      zeng|  100|
|  2|      zeng|  100|
|  3|zengjunjie|   99|
|106|      zhao|  100|
| 65|      zhao|  100|
| 37|      zhao|  100|
| 18|      zhao|  100|
| 94|      zhao|  100|
| 95|      zhao|  100|
| 79|      zhao|  100|
| 11|      zhao|  100|
| 12|      zhao|  100|
| 78|      zhao|  100|
| 50|      zhao|  100|
| 56|      zhao|  100|
| 53|      zhao|  100|
| 29|      zhao|  100|
| 40|      zhao|  100|
| 28|      zhao|  100|
| 59|      zhao|  100|
+---+----------+-----+
only showing top 20 rows

注意3.0spark后没有hivecontext,必须用
sparkseesion

.enableHiveSupport() 

搞定。

  • 作者:weixin_45939774
  • 原文链接:https://blog.csdn.net/weixin_45939774/article/details/106231006
    更新时间:2023年1月16日07:58:41 ,共 5233 字。