文章目录
前言
项目需要使用pyspark将hive数据进行处理插入到hive表及hbase中,刚开始接触spark,记录了学习的过程帮助初学者。
一、基本思路
1.创建参数初始化方法ReadConfig();
读取key:value格式的配置文件,获取“表名”、“列名”、“日期”等相关的参数传递给数据处理类。
配置文件格式如下:
source_table:test_a #源表
target_table:test_b #目标表
columns:A,B,C #列名
base_hdfs_dir:/warehouse/tablespace/managed/hive/test.db/test_a/ #源表hdfs路径
inner_tb_loc:/warehouse/tablespace/managed/hive/test.db/test_b/ #目标表hdfs路径
2.创建一个数据处理类DataDeal():
包括sparksession的初始化方法:CreateSparkSession(self,SessionName);
包括hive数据读取方法:HiveExtenalRead(self);
数据处理方法:InnerTbDeal(self,DataFrame);
插入到hive方法:JsonToHive(self, DataFrame);
插入到hbase方法:JsonToHbaseShc(self, DataFrame);
二、实现过程及说明
DataDeal类的初始化
def __init__(self,source_tb,target_tb,data_dt,column,tb_loc):
self.source_tb = source_tb
self.target_tb = target_tb
self.data_dt = data_dt
self.column=column
self.tb_loc=tb_loc
1.初始化sparksession
def CreateSparkSession(self,SessionName):
spark = SparkSession.builder.appName(SessionName).config('spark.executor.memory',
'10g').config('hive.exec.dynamic.partition.mode','nonstrict').config('hive.exec.dynamic.partition','true').enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
return spark
2.读取hive数据
# 读取hive外部表,插入到hive内部表中,后续处理速度快
def HiveExtenalRead(self):
time_start = time.time()
sc = self.CreateSparkSession("HiveExtenalRead")
sql="select "+self.column+" from "+self.source_tb +' where data_dt='+self.data_dt
df=sc.sql(sql)
inner_tb=self.target_tb+"_inner"
#写入指定目录下
df.write.orc(self.tb_loc,'overwrite')
return df
3.数据处理
def InnerTbDeal(self,DataFrame):
sc = self.CreateSparkSession("InnerTbDeal")
DataFrame.createOrReplaceTempView('InnerTbDeal')
col=DataFrame.columns
strcol = 'select A,B,C from InnerTbDeal'
dt=sc.sql(strcol)
df.show(20)
return dt
4.插入到hive
def JsonToHive(self, DataFrame):
sc = self.CreateSparkSession("JsonToHive")
json_tb_loc=self.tb_loc
DataFrame.write.orc(json_tb_loc, 'overwrite')
return 0
5.插入到hbase
使用开源shc框架,搜索shc框架下载一份复制到spark各节点就行了。
# Shc框架写入到到hbase
def JsonToHbaseShc(self, DataFrame):
sc = self.CreateSparkSession("JsonToHbaseShc")
table = self.target_tb
cataconf="""{
"table":{"namespace":"名字空间", "name":\""""+表名+"""\"},
"rowkey":"key",
"columns":{
"A":{"cf":"rowkey", "col":"key", "type":"string"},
"B":{"cf":"jk_data", "col":"B", "type":"string"},
"C":{"cf":"jk_data", "col":"C", "type":"string"}
}
}"""
catalog = ''.join(cataconf.split())
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
DataFrame.write.options(catalog=catalog,newTable="5").format(data_source_format).save()
return 0
主函数及参数处理
参数处理方法
def ReadConfig(TbName):
dict = {}
config_loc='./Config/'+TbName+'.ini'
config = open(config_loc, 'r')
for line in config:
if line.find(':') > 0:
strs = line.replace('\n', '').split(':')
dict[strs[0]] = strs[1]
return dict
主函数
if __name__=='__main__':
#获取输入参数
prop = ReadConfig(sys.argv[1])
data_dt=sys.argv[2]
#内部表hdfs路径
tb_loc=prop['base_hdfs_dir']+prop['inner_tb_loc']+'data_dt='+data_dt
s1=DataDeal(prop['source_table'],prop['target_table'],data_dt,prop['columns'],tb_loc)
dt=s1.HiveExtenalRead()
dt2=s1.InnerTbDeal(dt)
s1.JsonToHive(dt2)
s1.JsonToHbaseShc(dt2)
总结
结果是简单的,过程是复杂的。