python 读文件并处理成DataFrame

2022-07-20 11:59:47

1. 从hive表中批量读取数据,处理成DataFrame。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pyhive import hive
import pandas as pd

def get_batch_data_from_hive(sql_str, batch_size=100):
    # sql_str = 'select * from zx_dm.lx_user_wt_gjc_contact_4_predict_data_d where pt="{}"'.format(last_dt)
    conn = hive.Connection(host='localhost', port=10000, username='zx_dm', auth='NOSASL', database='zx_dm')
    cursor = conn.cursor()
    cursor.execute(sql_str)
    while True:
        batch_data = cursor.fetchmany(size=batch_size)
        if len(batch_data) == 0:
             break
        df = pd.DataFrame(batch_data, columns=cols)
        # 处理一个batch
        fun(df)

    cursor.close()
    conn.close()

2.  使用pandas从hive表中读取DataFrame数据。

def get_data_from_hive(sql_str):
    conn = hive.Connection(host='sjpt-other-standby-9.wxxdc',\
port=10000,auth='KERBEROS', kerberos_service_name="hive")
    cursor = conn.cursor()
    df = pd.read_sql(sql_str, conn)
    cursor.close()
    conn.close()
    return df
pd.read_sql(sql_str, conn, index_col=None, coerce_float=True, 
    params=None, parse_dates=None, columns=None, chunksize=None)
conn:连接sql数据库的engine,可以用SQLalchemy或者pymysql之类的包建立
index_col: 选择某一列作为index
coerce_float: 将数字形式的字符串直接以float型读入
parse_dates: 将某一列日期型字符串转换为datetime型数据,与pd.to_datetime函数功能类似。
             可以直接提供需要转换的列名以默认的日期形式转换,也可以用字典的格式提供列名和转换的日期格式,
             比如{column_name: format "%Y:%m:%H:%M:%S"}。
columns:要选取的列。
chunksize:如果提供了一个整数值,那么就会返回一个generator,每次输出的行数就是提供的值的大小。

3. 使用pyarrow包读orc文件

import pyarrow.orc as orc

def get_data_in_orc(filename):
    with open(filename, 'rb') as f:
        data = orc.ORCFile(f)
        print(type(data))
        # <class 'pyarrow.orc.ORCFile'>
        df = data.read().to_pandas()
    df.columns = cols
    return df

4. 使用spark读orc文件

def get_data_with_spark_in_orc(filename):
    import findspark
    from pyspark.sql import SparkSession

    findspark.init()
    # spark = SparkSession.builder.getOrCreate()
    spark = SparkSession \
        .builder \
        .enableHiveSupport() \
        .master("yarn") \
        .appName("test_lr") \
        .config('spark.driver.maxResultSize', '10g') \
        .config('spark.driver.memory', '4g') \
        .config('spark.excutor.memory', '3g') \
        .config('spark.kryoserializer.buffer.max', '1g') \
        .config('spark.kryoserializer.buffer', '1g') \
        .getOrCreate()

    df1 = spark.read.orc(filename)
    df = df1.toPandas()
    df.columns=cols
    return df

5.使用spark读hive表

6.用矢量方法转换DataFrame数据类型(速度快!!!!

import numpy as np

start = int(time.time())
test_data = df.loc[:, feature_names]
test_data = test_data.values.astype(np.float64)
# test_data = test_data.values.astype(np.float32)
print(type(test_data))
# <class 'numpy.ndarray'>
end = int(time.time())
print("****耗时: " + str(end - start) + "s")
  • 作者:MusicDancing
  • 原文链接:https://blog.csdn.net/MusicDancing/article/details/117290505
    更新时间:2022-07-20 11:59:47