文章目录
前言
项目需要使用pyspark将hive数据进行处理插入到hive表及hbase中,刚开始接触spark,记录了学习的过程帮助初学者。
一、基本思路
1.创建参数初始化方法ReadConfig();
读取key:value格式的配置文件,获取“表名”、“列名”、“日期”等相关的参数传递给数据处理类。
 配置文件格式如下:
 source_table:test_a #源表
 target_table:test_b #目标表
 columns:A,B,C #列名
 base_hdfs_dir:/warehouse/tablespace/managed/hive/test.db/test_a/ #源表hdfs路径
 inner_tb_loc:/warehouse/tablespace/managed/hive/test.db/test_b/ #目标表hdfs路径
2.创建一个数据处理类DataDeal():
包括sparksession的初始化方法:CreateSparkSession(self,SessionName);
 包括hive数据读取方法:HiveExtenalRead(self);
 数据处理方法:InnerTbDeal(self,DataFrame);
 插入到hive方法:JsonToHive(self, DataFrame);
 插入到hbase方法:JsonToHbaseShc(self, DataFrame);
二、实现过程及说明
DataDeal类的初始化
    def __init__(self,source_tb,target_tb,data_dt,column,tb_loc):
        self.source_tb = source_tb
        self.target_tb = target_tb
        self.data_dt = data_dt
        self.column=column
        self.tb_loc=tb_loc
1.初始化sparksession
    def CreateSparkSession(self,SessionName):
        spark = SparkSession.builder.appName(SessionName).config('spark.executor.memory',
                                                             '10g').config('hive.exec.dynamic.partition.mode','nonstrict').config('hive.exec.dynamic.partition','true').enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        return spark
2.读取hive数据
# 读取hive外部表,插入到hive内部表中,后续处理速度快
    def HiveExtenalRead(self):
        time_start = time.time()
        sc = self.CreateSparkSession("HiveExtenalRead")
        sql="select "+self.column+" from "+self.source_tb +' where data_dt='+self.data_dt
        df=sc.sql(sql)
        inner_tb=self.target_tb+"_inner"
        #写入指定目录下
        df.write.orc(self.tb_loc,'overwrite')
        return df
3.数据处理
def InnerTbDeal(self,DataFrame):
        sc = self.CreateSparkSession("InnerTbDeal")
        DataFrame.createOrReplaceTempView('InnerTbDeal')
        col=DataFrame.columns
        strcol = 'select A,B,C from InnerTbDeal'
        dt=sc.sql(strcol)
        df.show(20)
		return dt
4.插入到hive
    def JsonToHive(self, DataFrame):
        sc = self.CreateSparkSession("JsonToHive")
        json_tb_loc=self.tb_loc
        DataFrame.write.orc(json_tb_loc, 'overwrite')
        return 0
5.插入到hbase
使用开源shc框架,搜索shc框架下载一份复制到spark各节点就行了。
# Shc框架写入到到hbase
    def JsonToHbaseShc(self, DataFrame):
        sc = self.CreateSparkSession("JsonToHbaseShc")
        table = self.target_tb
        cataconf="""{
            "table":{"namespace":"名字空间", "name":\""""+表名+"""\"},
            "rowkey":"key",
            "columns":{
                "A":{"cf":"rowkey", "col":"key", "type":"string"},
                "B":{"cf":"jk_data", "col":"B", "type":"string"},
                "C":{"cf":"jk_data", "col":"C", "type":"string"}
            }
        }"""
        catalog = ''.join(cataconf.split())
        data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
        DataFrame.write.options(catalog=catalog,newTable="5").format(data_source_format).save()
        return 0
主函数及参数处理
参数处理方法
def ReadConfig(TbName):
    dict = {}
    config_loc='./Config/'+TbName+'.ini'
    config = open(config_loc, 'r')
    for line in config:
        if line.find(':') > 0:
            strs = line.replace('\n', '').split(':')
            dict[strs[0]] = strs[1]
    return dict
主函数
if __name__=='__main__':
    #获取输入参数
    prop = ReadConfig(sys.argv[1])
    data_dt=sys.argv[2]
    #内部表hdfs路径
    tb_loc=prop['base_hdfs_dir']+prop['inner_tb_loc']+'data_dt='+data_dt
    s1=DataDeal(prop['source_table'],prop['target_table'],data_dt,prop['columns'],tb_loc)
    dt=s1.HiveExtenalRead()
    dt2=s1.InnerTbDeal(dt)
    s1.JsonToHive(dt2)
    s1.JsonToHbaseShc(dt2)
总结
结果是简单的,过程是复杂的。




