Pyspark读取hive表数据进行处理,将结果插入到hive及hbase

2023年1月28日11:25:40


前言

项目需要使用pyspark将hive数据进行处理插入到hive表及hbase中,刚开始接触spark,记录了学习的过程帮助初学者。


一、基本思路

1.创建参数初始化方法ReadConfig();

读取key:value格式的配置文件,获取“表名”、“列名”、“日期”等相关的参数传递给数据处理类。
配置文件格式如下:
source_table:test_a #源表
target_table:test_b #目标表
columns:A,B,C #列名
base_hdfs_dir:/warehouse/tablespace/managed/hive/test.db/test_a/ #源表hdfs路径
inner_tb_loc:/warehouse/tablespace/managed/hive/test.db/test_b/ #目标表hdfs路径

2.创建一个数据处理类DataDeal():

包括sparksession的初始化方法:CreateSparkSession(self,SessionName);
包括hive数据读取方法:HiveExtenalRead(self);
数据处理方法:InnerTbDeal(self,DataFrame);
插入到hive方法:JsonToHive(self, DataFrame);
插入到hbase方法:JsonToHbaseShc(self, DataFrame);

二、实现过程及说明

DataDeal类的初始化

    def __init__(self,source_tb,target_tb,data_dt,column,tb_loc):
        self.source_tb = source_tb
        self.target_tb = target_tb
        self.data_dt = data_dt
        self.column=column
        self.tb_loc=tb_loc

1.初始化sparksession

    def CreateSparkSession(self,SessionName):
        spark = SparkSession.builder.appName(SessionName).config('spark.executor.memory',
                                                             '10g').config('hive.exec.dynamic.partition.mode','nonstrict').config('hive.exec.dynamic.partition','true').enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        return spark

2.读取hive数据

# 读取hive外部表,插入到hive内部表中,后续处理速度快
    def HiveExtenalRead(self):
        time_start = time.time()
        sc = self.CreateSparkSession("HiveExtenalRead")
        sql="select "+self.column+" from "+self.source_tb +' where data_dt='+self.data_dt
        df=sc.sql(sql)
        inner_tb=self.target_tb+"_inner"
        #写入指定目录下
        df.write.orc(self.tb_loc,'overwrite')
        return df

3.数据处理

def InnerTbDeal(self,DataFrame):
        sc = self.CreateSparkSession("InnerTbDeal")
        DataFrame.createOrReplaceTempView('InnerTbDeal')
        col=DataFrame.columns
        strcol = 'select A,B,C from InnerTbDeal'
        dt=sc.sql(strcol)
        df.show(20)
		return dt

4.插入到hive

    def JsonToHive(self, DataFrame):
        sc = self.CreateSparkSession("JsonToHive")
        json_tb_loc=self.tb_loc
        DataFrame.write.orc(json_tb_loc, 'overwrite')
        return 0

5.插入到hbase

使用开源shc框架,搜索shc框架下载一份复制到spark各节点就行了。

# Shc框架写入到到hbase
    def JsonToHbaseShc(self, DataFrame):
        sc = self.CreateSparkSession("JsonToHbaseShc")
        table = self.target_tb
        cataconf="""{
            "table":{"namespace":"名字空间", "name":\""""+表名+"""\"},
            "rowkey":"key",
            "columns":{
                "A":{"cf":"rowkey", "col":"key", "type":"string"},
                "B":{"cf":"jk_data", "col":"B", "type":"string"},
                "C":{"cf":"jk_data", "col":"C", "type":"string"}
            }
        }"""
        catalog = ''.join(cataconf.split())
        data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
        DataFrame.write.options(catalog=catalog,newTable="5").format(data_source_format).save()
        return 0

主函数及参数处理

参数处理方法

def ReadConfig(TbName):
    dict = {}
    config_loc='./Config/'+TbName+'.ini'
    config = open(config_loc, 'r')
    for line in config:
        if line.find(':') > 0:
            strs = line.replace('\n', '').split(':')
            dict[strs[0]] = strs[1]
    return dict

主函数

if __name__=='__main__':
    #获取输入参数
    prop = ReadConfig(sys.argv[1])
    data_dt=sys.argv[2]
    #内部表hdfs路径
    tb_loc=prop['base_hdfs_dir']+prop['inner_tb_loc']+'data_dt='+data_dt
    s1=DataDeal(prop['source_table'],prop['target_table'],data_dt,prop['columns'],tb_loc)
    dt=s1.HiveExtenalRead()
    dt2=s1.InnerTbDeal(dt)
    s1.JsonToHive(dt2)
    s1.JsonToHbaseShc(dt2)

总结

结果是简单的,过程是复杂的。

  • 作者:u010489820
  • 原文链接:https://blog.csdn.net/u010489820/article/details/112008502
    更新时间:2023年1月28日11:25:40 ,共 3053 字。