需求:如果直接写SQL语句向hive中插入,会非常慢,而且不能批量插入。
思路1:
(1)将python的dataframe数据落地到磁盘.
(2)将磁盘文件upload到hdfs集群。
(3)将hdfs上的该文件映射成hive外表。
python操作hdfs,参考博文:https://blog.csdn.net/u010916338/article/details/105249271
client = getHDFSClient()
putLocalFileToHDFS(client, '/user/hive/warehouse/risk.db/gd_quantity_month_adf_all', '/app/qy_model/HRH/hive_external_table/Quantity_month_ADF_all.csv')
python操作hive参考博文:https://blog.csdn.net/u010916338/article/details/105249388
#创建hive外表
def run():
conn = getHiveConn()
#写到数据所在的文件夹即可
sql = '''
create external table hn_power_all_1_no_lable
(id string,
name string)
row format delimited fields terminated by ','
lines terminated by '\n'
location '/user/model/hrh/power/power_1'
'''
print(sql)
curosr = conn.cursor()
curosr.execute(sql)
conn.close()
思路2:
能不能将本地文件直接上传到hive默认的/hive/warehouse/数据库名.db/表名 目录下(类似这种,具体路径可能不同)
(1)创建hive表,此时会在hive默认路径下创建一个文件夹。
(2)将本地数据文件上传到与表名同名的文件夹下。
(3)不用再做关联,直接可查。
创建表语句如下,有几点需要注意:
1,必须附带stored as textfile,意为行读取文件。
2,python保存到本地的文件可能带有列名和行索引,hive会将其当成是数据,会造成数据多一行,多一列。
对于列名,建表时附带如下参数tblproperties("skip.header.line.count" = "1"),意为省略第一行。
对于行索引,暂时没有好的办法,建议dataframe落地时就不存行索引 df.to_csv(index_col = False)
conn = getHiveConn()
#textfile指的是行存储
#tblproperties("skip.header.line.count" = "1")跳过第一行
sql = '''
create table gd_quantity_month_adf_all
(cons_no string,
run_cap string,
orgno string,
trade_code string,
adf_Pvalue string,
cov_24m string,
avg_adfcov string,
upamount string,
num_0quantity string)
row format delimited fields terminated by ','
lines terminated by '\n'
stored as textfile
tblproperties("skip.header.line.count" = "1")
'''
print(sql)
executeHiveSQL(conn, sql)
closeHiveConn(conn)
上传文件
client = getHDFSClient()
putLocalFileToHDFS(client, '/user/hive/warehouse/risk.db/gd_quantity_month_adf_all', '/app/qy_model/HRH/hive_external_table/Quantity_month_ADF_all.csv')
思路3:
可不可以将python的dataframe直接上传到hdfs,中间可以省略一步落地操作。
参考python操作hdfs博文:https://blog.csdn.net/u010916338/article/details/105249271
#DF写入到初次创建文件或者覆盖文件
def writeDFtoHDFS(client, hdfs_path, df):
client.write(hdfs_path, df.to_csv(index=False, header=False, sep=','), encoding='utf-8', overwrite=True, append=False)
#追加DF数据到hdfs文件
def appendWriteDFtoHDFS(client, hdfs_path, df):
client.write(hdfs_path, df.to_csv(index=False, header=False, sep=','), encoding='utf-8', overwrite=False, append=True)
但是可能会报错:connectionreseterror:[Error 104] connection reset by peer
具体原因参考博文:https://www.cnblogs.com/satty/p/8491839.html
备注:
若,中间无问题,hive表中无数据,可能需要更新元数据信息
def run():
conn = getHiveConn()
#注意带上数据名risk
sql = '''
msck repair table risk.gd_quantity_month_adf_all
'''
curosr = conn.curosr()
curosr.execute(sql)
conn.close()