需求:如果直接写SQL语句向hive中插入,会非常慢,而且不能批量插入。
(1)将python的dataframe数据落地到磁盘.
(2)将磁盘文件upload到hdfs集群。
(3)将hdfs上的该文件映射成hive外表。
python操作hdfs,参考博文:https://blog.csdn.net/u010916338/article/details/105249271
client = getHDFSClient() putLocalFileToHDFS(client, '/user/hive/warehouse/risk.db/gd_quantity_month_adf_all', '/app/qy_model/HRH/hive_external_table/Quantity_month_ADF_all.csv')
python操作hive参考博文:https://blog.csdn.net/u010916338/article/details/105249388
#创建hive外表 def run(): conn = getHiveConn() #写到数据所在的文件夹即可 sql = ''' create external table hn_power_all_1_no_lable (id string, name string) row format delimited fields terminated by ',' lines terminated by '\n' location '/user/model/hrh/power/power_1' ''' print(sql) curosr = conn.cursor() curosr.execute(sql) conn.close()
能不能将本地文件直接上传到hive默认的/hive/warehouse/数据库名.db/表名 目录下(类似这种,具体路径可能不同)
(1)创建hive表,此时会在hive默认路径下创建一个文件夹。
(2)将本地数据文件上传到与表名同名的文件夹下。
(3)不用再做关联,直接可查。
创建表语句如下,有几点需要注意:
1,必须附带stored as textfile,意为行读取文件。
2,python保存到本地的文件可能带有列名和行索引,hive会将其当成是数据,会造成数据多一行,多一列。
对于列名,建表时附带如下参数tblproperties("skip.header.line.count" = "1"),意为省略第一行。
对于行索引,暂时没有好的办法,建议dataframe落地时就不存行索引 df.to_csv(index_col = False)
conn = getHiveConn() #textfile指的是行存储 #tblproperties("skip.header.line.count" = "1")跳过第一行 sql = ''' create table gd_quantity_month_adf_all (cons_no string, run_cap string, orgno string, trade_code string, adf_Pvalue string, cov_24m string, avg_adfcov string, upamount string, num_0quantity string) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile tblproperties("skip.header.line.count" = "1") ''' print(sql) executeHiveSQL(conn, sql) closeHiveConn(conn)
上传文件
client = getHDFSClient() putLocalFileToHDFS(client, '/user/hive/warehouse/risk.db/gd_quantity_month_adf_all', '/app/qy_model/HRH/hive_external_table/Quantity_month_ADF_all.csv')
可不可以将python的dataframe直接上传到hdfs,中间可以省略一步落地操作。
参考python操作hdfs博文:https://blog.csdn.net/u010916338/article/details/105249271
#DF写入到初次创建文件或者覆盖文件 def writeDFtoHDFS(client, hdfs_path, df): client.write(hdfs_path, df.to_csv(index=False, header=False, sep=','), encoding='utf-8', overwrite=True, append=False) #追加DF数据到hdfs文件 def appendWriteDFtoHDFS(client, hdfs_path, df): client.write(hdfs_path, df.to_csv(index=False, header=False, sep=','), encoding='utf-8', overwrite=False, append=True)
但是可能会报错:connectionreseterror:[Error 104] connection reset by peer
具体原因参考博文:https://www.cnblogs.com/satty/p/8491839.html
若,中间无问题,hive表中无数据,可能需要更新元数据信息
def run(): conn = getHiveConn() #注意带上数据名risk sql = ''' msck repair table risk.gd_quantity_month_adf_all ''' curosr = conn.curosr() curosr.execute(sql) conn.close()