python中,用pyspark读写Hive数据

2023年1月4日09:25:37

1、读Hive表数据

        pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下:

from pyspark.sql import HiveContext,SparkSession

_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()

hive_context= HiveContext(spark_session )

# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from  {}.{}".format(hive_database, hive_table)

# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)

 

2 、将数据写入hive表

        pyspark写hive表有两种方式:

        (1)通过SQL语句生成表

from pyspark.sql import SparkSession, HiveContext

_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"

spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()

data = [
    (1,"3","145"),
    (1,"4","146"),
    (1,"5","25"),
    (1,"6","26"),
    (2,"32","32"),
    (2,"8","134"),
    (2,"8","134"),
    (2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])

# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")

(2)saveastable的方式

# method two

# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
#  mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')

 

tips:

spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:

spark-submit --conf spark.sql.catalogImplementation=hive test.py

 

  • 作者:_____miss
  • 原文链接:https://blog.csdn.net/u011412768/article/details/93426353
    更新时间:2023年1月4日09:25:37 ,共 1418 字。