Loading...

python向hive中写入数据

需求:如果直接写SQL语句向hive中插入,会非常慢,而且不能批量插入。

(1)将python的dataframe数据落地到磁盘.

(2)将磁盘文件upload到hdfs集群。

(3)将hdfs上的该文件映射成hive外表。

python操作hdfs,参考博文:https://blog.csdn.net/u010916338/article/details/105249271

client = getHDFSClient() putLocalFileToHDFS(client, '/user/hive/warehouse/risk.db/gd_quantity_month_adf_all', '/app/qy_model/HRH/hive_external_table/Quantity_month_ADF_all.csv') 

 python操作hive参考博文:https://blog.csdn.net/u010916338/article/details/105249388

#创建hive外表 def run():     conn = getHiveConn()     #写到数据所在的文件夹即可     sql = '''              create external table hn_power_all_1_no_lable              (id  string,                name string)               row format delimited fields terminated by ','               lines terminated by '\n'              location '/user/model/hrh/power/power_1'           '''     print(sql)     curosr = conn.cursor()     curosr.execute(sql)     conn.close()  

能不能将本地文件直接上传到hive默认的/hive/warehouse/数据库名.db/表名  目录下(类似这种,具体路径可能不同) 

(1)创建hive表,此时会在hive默认路径下创建一个文件夹。

(2)将本地数据文件上传到与表名同名的文件夹下。

(3)不用再做关联,直接可查。

创建表语句如下,有几点需要注意:

1,必须附带stored as textfile,意为行读取文件。
2,python保存到本地的文件可能带有列名和行索引,hive会将其当成是数据,会造成数据多一行,多一列。       

对于列名,建表时附带如下参数tblproperties("skip.header.line.count" = "1"),意为省略第一行。

对于行索引,暂时没有好的办法,建议dataframe落地时就不存行索引  df.to_csv(index_col = False)

conn = getHiveConn() #textfile指的是行存储 #tblproperties("skip.header.line.count" = "1")跳过第一行 sql = '''          create table gd_quantity_month_adf_all          (cons_no string,           run_cap string,           orgno string,           trade_code string,           adf_Pvalue string,           cov_24m string,           avg_adfcov string,           upamount string,           num_0quantity string)            row format delimited fields terminated by ','            lines terminated by '\n'            stored as textfile           tblproperties("skip.header.line.count" = "1")       ''' print(sql) executeHiveSQL(conn, sql) closeHiveConn(conn)

上传文件

client = getHDFSClient() putLocalFileToHDFS(client, '/user/hive/warehouse/risk.db/gd_quantity_month_adf_all', '/app/qy_model/HRH/hive_external_table/Quantity_month_ADF_all.csv') 

可不可以将python的dataframe直接上传到hdfs,中间可以省略一步落地操作。

参考python操作hdfs博文:https://blog.csdn.net/u010916338/article/details/105249271

#DF写入到初次创建文件或者覆盖文件 def writeDFtoHDFS(client, hdfs_path, df):     client.write(hdfs_path, df.to_csv(index=False, header=False, sep=','), encoding='utf-8', overwrite=True, append=False)        #追加DF数据到hdfs文件 def appendWriteDFtoHDFS(client, hdfs_path, df):     client.write(hdfs_path, df.to_csv(index=False, header=False, sep=','), encoding='utf-8', overwrite=False, append=True) 

但是可能会报错:connectionreseterror:[Error 104] connection reset by peer

具体原因参考博文:https://www.cnblogs.com/satty/p/8491839.html 

若,中间无问题,hive表中无数据,可能需要更新元数据信息

def run():     conn = getHiveConn()     #注意带上数据名risk     sql = '''            msck repair table risk.gd_quantity_month_adf_all           '''     curosr = conn.curosr()     curosr.execute(sql)     conn.close()