python操作HDFS:pyhdfs

2023年1月5日10:25:24
import pyhdfs
from pyhdfs import HdfsException
import os
import telnetlib  as tn
from flask import Flask,request,Response
from gevent.pywsgi import WSGIServer
import werkzeug

'''
python 操作HDFS。
1.访问文件系统列表(目录、文件)
FileStatus(accessTime=0, 
           blockSize=0, 
           childrenNum=15, 
           fileId=16395, 
           group='supergroup', 
           length=0, 
           modificationTime=1572431013356, 
           owner='hdfs', 
           pathSuffix='.cloudera_health_monitoring_canary_files', 
           permission='0', 
           replication=0, 
           storagePolicy=0, 
           type='DIRECTORY'/'FILE')
2.读取文件内容
3.下载HDFS文件到本地
'''

namenodes = 'ip:9870,ip:9870'
file_type = 'FILE'
dir_type = 'DIRECTORY'

app = Flask(__name__)

@app.route('/uploadBytes',methods=['POST'])
def upload_bytes():
    method = request.method
    if method != 'GET' and method != 'POST':
        return Response({0: '请使用GET或POST请求!'})
    # 获取request请求信息
    # args = request.args #args为路径中的参数。
    file = request.files['file']
    dicts = request.form.to_dict()
    dir = dicts['dirPath']
    nodes = dicts['nameNodes']
    filename = dicts['fileName']
    # 上传到hdfs
    myhdfs = MyHdfs(nodes)
    result = myhdfs.upload_to_hdfs2(file, filename, dir)
    return Response(str(result))

@app.route('/upload',methods=['POST'])
def upload():
    method = request.method
    if  method != 'GET' and method != 'POST':
        return Response({0:'请使用GET或POST请求!'})
    #获取request请求信息
    #args = request.args #args为路径中的参数。
    file = request.files['file']
    dicts = request.form.to_dict()
    dir = dicts['dirPath']
    nodes = dicts['nameNodes']
    #上传到hdfs
    myhdfs = MyHdfs(nodes)
    result =  myhdfs.upload_to_hdfs(file,dir)
    return Response(str(result))

@app.route('/download',methods=['GET'])
def download():
    if request.method != 'GET':
        return Response({0:'请使用GET请求!'})
    args = request.args
    nodes = args.get("nameNodes")
    file = args.get("filePath")
    # 连接到hdfs
    myhdfs = MyHdfs(nodes)
    result = myhdfs.get_hdfs_filestream(file)
    if type(result) == dict:
        return Response(str(result))
    return Response(result)
@app.route('/read',methods=['GET'])
def readfile():
    if request.method != 'GET':
        return Response({0:'请使用GET请求!'})
    args = request.args
    nodes = args.get("nameNodes")
    file = args.get("filePath")
    lines = args.get("readLines")
    # 连接到hdfs
    myhdfs = MyHdfs(nodes)
    result = myhdfs.read_hdfs_file(file,lines)
    return Response(str(result))

@app.route('/delete',methods=['GET'])
def delfile():
    if request.method != 'GET':
        return Response({0:'请使用GET请求!'})
    args = request.args
    nodes = args.get("nameNodes")
    file = args.get("filePath")
    # 连接到hdfs
    myhdfs = MyHdfs(nodes)
    result = myhdfs.delete_file(file)
    return Response(str(result))

class PathInfo(object):
    """存储路径(目录、文件)信息"""
    def __init__(self, name, pathInfos=[]):
        #pathInfo自身的路径名称
        self.name = name
        #pathInfo下一级的路径名称
        self.pathInfos = pathInfos
    def __str__(self):
        return "{'name':'%s','sub':%s}"%(self.name,self.pathInfos)

class MyHdfsException(HdfsException):
    def __init__(self, value="请检查主机、端口的正确性!"):
        self.value = value
    def raise_exception(self):
        try:
            raise MyHdfsException
        finally:
            print(self.value)

class MyHdfs(object):
    def __init__(self, hosts,user_name='hdfs'):
        self.hosts = hosts
        self.username = user_name
        self.fs = pyhdfs.HdfsClient(self.hosts,self.username)
        self.__check_server_available()

    def __check_server_available(self):
        if len(self.hosts.strip())==0:
            MyHdfsException().raise_exception()
        try:
            host_list = str(self.hosts).split(",")
            for host in host_list:
                ip,port = host.split(":")
                tn.Telnet(ip, port)
        except:
            MyHdfsException().raise_exception()
        else:
            print('IP和端口可用!')
    def upload_to_hdfs(self,data,dirPath="/"):
        """
        上传文件到HDFS,默认上传到HDFS根目录。
        :param data 需要上传的数据。(``bytes`` or a ``file``-like object)
        :param dirPath HDFS目录。(可以是已经存在的目录,也可以是不存在的)
        :return: 上传成功1,上传失败0
        """
        try:
            # 获取文件名
            filename = data.filename
            print("filename: ", filename)
            #拼接文件路径
            filePath = ''
            if dirPath.endswith("/"):
                filePath = dirPath + filename
            else:
                filePath = dirPath + '/' + filename
            #检查文件是否存在
            if self.fs.exists(filePath):
                print("文件已存在!")
                return {0:"文件已存在!"}
            else:
                # 创建文件
                print("开始上传文件...")
                self.fs.create(filePath, data, overwrite=False, buffersize=2048)
                print("上传文件完成...")
                return {1:"成功上传文件!"}
        except:
            #MyHdfsException("上传文件失败!").raise_exception()
            print("上传文件失败!")
            return {0:"上传文件失败!"}
    def upload_to_hdfs2(self,data,filename,dirPath="/"):
        """
        上传文件到HDFS,默认上传到HDFS根目录。
        :param data 需要上传的数据。(``bytes`` or a ``file``-like object)
        :param fileName 文件名
        :param dirPath HDFS目录。(可以是已经存在的目录,也可以是不存在的)
        :return: 上传成功1,上传失败0
        """
        try:
            #获取文件名
            print("filename: ", filename)
            #拼接文件路径
            filePath = ''
            if dirPath.endswith("/"):
                filePath = dirPath + filename
            else:
                filePath = dirPath + '/' + filename
            #检查文件是否存在
            if self.fs.exists(filePath):
                print("文件已存在!")
                return {0:"文件已存在!"}
            else:
                # 创建文件
                print("开始上传文件...")
                self.fs.create(filePath, data, overwrite=False,buffersize=2048)
                print("上传文件完成...")
                return {1:"成功上传文件!"}
        except:
            #MyHdfsException("上传文件失败!").raise_exception()
            print("上传文件失败!")
            return {0:"上传文件失败!"}

    def get_hdfs_filestream(self,filePath):
        """
        获取指定文件的IO流
        :param filePath:  HDFS文件的路径
        :return: 1表示获取文件流成功;0表示有问题
        """
        try:
            check_result = self.__check_hdfs_path(filePath,file_type)
            if not check_result:
                return {0: "请指定文件路径!"}
            # 返回文件流:urllib3.response.HTTPResponse
            return self.fs.open(filePath)
        except MyHdfsException:
            return {0:"文件路径不存在!"}

    def download_to_local(self,hdfsSource,localDest):
        """
        将HDFS上的文件下载到本地指定目录
        :param hdfsSource: HDFS上的文件路径
        :param localDest: 本地文件/目录路径
        ###param over_write:是否覆盖已经存在的文件
        :return: 提示信息,或下载成功的文件数量n或失败0
        """
        # 判断是否是文件
        if self.__check_hdfs_path(hdfsSource,file_type):
            #检查本地目录:如果不存在,就创建
            localDest,localDest_type = self.check_local_path(localDest)
            # 直接下载
            if localDest_type == dir_type:
                idx = hdfsSource.rindex("/")
                localDest = localDest + "/" + hdfsSource[idx:]
            self.fs.copy_to_local(hdfsSource, localDest)
            return 1
        else:
            print("HDFS路径必须是文件路径!")
            return 0

    def __check_hdfs_path(self,hdfsPath,isFileOrDir):
        """
        判断给定的HDFS是否存在,如果存在,是文件还是目录。
        :param hdfsPath:  路径
        :param isFileOrDir: 文件或目录,取值为'DIRECTORY'或'FILE'
        :return:
        """
        if (not self.fs.exists(hdfsPath)):
            MyHdfsException("路径(%s)不存在!" % (hdfsPath)).raise_exception()
        file_status = self.fs.get_file_status(hdfsPath)
        return file_status.type == isFileOrDir

    def check_local_path(self,givenpath):
        """
        判断文件或目录是否存在,不存在就创建目录
        :param givenpath:
        :return: 返回给定的givenpath是文件还是目录
        """
        os_path = os.path
        #当前位置的名称
        base_name = os_path.basename(givenpath)
        #上一级路径
        dir_name = os_path.dirname(givenpath)
        existFlag = os_path.exists(givenpath)
        #如果givenpath是以斜杠结尾,就去掉斜杠
        if givenpath.endswith("//") :
            idx = str(givenpath).rindex("//")
            givenpath = str(givenpath)[:idx]
        if givenpath.endswith("/") :
            idx = str(givenpath).rindex("/")
            givenpath = str(givenpath)[:idx]
        if base_name.find(".") == -1:
            # 如果是目录且不存在,就创建
            if not existFlag:
                # 创建目录
                os.makedirs(givenpath)
            return givenpath,dir_type
        else:
            #如果是文件就检查上一级目录是否创建
            flag = os_path.exists(dir_name)
            if not flag:
                os.makedirs(dir_name)
            return givenpath,file_type

    def delete_file(self,filePath):
        """
        删除指定的HDFS文件。
        :param filePath: 文件路径
        :return: 1表示删除成功,0删除失败。
        """
        try:
            check_result = self.__check_hdfs_path(filePath,file_type)
            if not check_result:
                return {0: "请指定文件路径!"}
        except MyHdfsException:
            return {0:"文件路径不存在!"}
        #true if delete is successful else false.
        del_result = self.fs.delete(filePath)
        if del_result:
            return {1:"删除成功"}
        else:
            return {0:"删除失败"}

    def read_hdfs_file(self,path,lines=100):
        """
        读取给定的HDFS文件
        :param path: 文件路径
        :param lines:需要读取的行数
        :return: 1表示成功读取到的字节流序列。0表示读取有问题。
        """
        if (not self.fs.exists(path)):
            #raise RuntimeError("路径(%s)不存在!" % (path))
            return {0:"路径(%s)不存在!" % (path)}
        file_status = self.fs.get_file_status(path)
        #判断是否是文件
        if (file_status.type==file_type):
            resp = self.fs.open(path) #urllib3.response.HTTPResponse
            #resp.flush()
            line_bytes = resp.readline() #bytes
            #记录读取的行数
            lines = int(lines)
            lines -= 1
            list_bytes = []
            while len(line_bytes)>0 and lines>=0:
                 #bytes==>str
                list_bytes.append(line_bytes.decode("ISO-8859-1").strip())
                line_bytes = resp.readline()
                lines -= 1
            resp.close()
            return {1:list_bytes}
        else:
            #raise RuntimeError("请输入文件路径!")
            return {0:"请输入文件路径!"}

    def get_subpath(self,pathInfo=PathInfo('/',[]),recursive=False):
        """
        (循环)遍历FS中指定路径下的内容:文件或目录
        :param path: 封装路径信息的对象
        :param recursive: 是否需要循环遍历完给定路径下所有的子路径
        :return:路径信息
        """
        self_path = pathInfo.name
        sub_pathes = pathInfo.pathInfos
        if(not self.fs.exists(self_path)):
            return "路径(%s)不存在!"%(self_path)
        statuses = self.fs.list_status(self_path)
        if (not recursive):
            # path.pathInfos = fs.listdir(self_path)
            if (self_path == "/"):
                pathInfo.pathInfos = [self_path+i for i in self.fs.listdir(self_path)]
            else:
                pathInfo.pathInfos = [self_path +"/" + i for i in self.fs.listdir(self_path)]
        else:
            for status in statuses:
                currentPathSuffix = status.pathSuffix
                # 转成详细的路径
                if (self_path == "/"):
                    currentPathSuffix = self_path + currentPathSuffix
                else:
                    currentPathSuffix = self_path + "/" + currentPathSuffix
                if (status.type == dir_type):
                    # 如果是目录,就封装到对象并继续遍历
                    tmp = PathInfo(currentPathSuffix, [])
                    sub_pathes.append(tmp)
                    self.get_subpath(tmp, recursive)
                else:
                    # 如果是文件,就直接返回
                    sub_pathes.append(currentPathSuffix)
                pathInfo.pathInfos = sub_pathes

if __name__ == '__main__':
    # result = PathInfo('/', [])
    # get_subpath(result,True)
    #print(result)
    #path = "/user/yarn/mapreduce/mr-framework"
    #path = "/testdata"
    #hdfspath="/testdata/2013_trip_data_test.csv" #
    # #path="/testdata/北京大兴机场.jpg"
    # list = [i.decode("utf-8") for i in read_hdfs_file(hdfspath,3)]
    # print(list)

    #download_to_local(path,"d:/tg/gaohan.csv")
    #path="d:/datas/gh/tg"
    #print(download_to_local(hdfspath, path))

    # myhdfs = MyHdfs(namenodes)
    # myhdfs.download_to_local(hdfspath, path)
    #app.run()
    http_server = WSGIServer(("0.0.0.0",5010),app)
    http_server.serve_forever()


 

  • 作者:koukan3
  • 原文链接:https://blog.csdn.net/koukan3/article/details/102836081
    更新时间:2023年1月5日10:25:24 ,共 8555 字。