Python3操作HDFS-创新互联
【第三方包】
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:国际域名空间、虚拟空间、营销软件、网站建设、化德网站维护、网站推广。pyhdfs(pypi,github,支持HA)
【功能】
重命名 hdfs 文件或目录
# encoding: utf-8 # author: walker # date: 2018-03-17 # summary: 利用 pyhdfs 重命名 hdfs 文件或目录 import os, sys, time from pyhdfs import HdfsClient SrcPath = '/test/xxx' DstPath = '/test/yyy' NameNode = 'nn1.example.com:50070,nn2.example.com:50070' # 将 SrcPath 改名为 DstPath def Rename(SrcPath, DstPath): fs = HdfsClient(hosts=NameNode) if not fs.exists(SrcPath): print('Error: not found %s' % SrcPath) sys.exit(-1) print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath)) fs.rename(SrcPath, DstPath) if __name__ == '__main__': Rename(SrcPath, DstPath)
上传文件
# encoding: utf-8 # author: walker # date: 2018-01-23 # summary: 上传本地文件到 hdfs 目录 import os, sys, time from pyhdfs import HdfsClient from configparser import ConfigParser cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) StartTime = time.time() FileSize = 0 #文件总大小 LocalDir = '' HdfsDir = '' NameNode = '' UserName = '' #读取配置文件 def ReadConfig(): global LocalDir, HdfsDir, NameNode, UserName cfg = ConfigParser() cfgFile = os.path.join(cur_dir_fullpath, 'config.ini') if not os.path.exists(cfgFile): input(cfgFile + ' not found') sys.exit(-1) cfgLst = cfg.read(cfgFile) if len(cfgLst) < 1: input('Read config.ini failed...') sys.exit(-1) LocalDir = cfg.get('config', 'LocalDir').strip() if not os.path.exists(LocalDir): input(LocalDir + ' not found') sys.exit(-1) print('LocalDir:' + LocalDir) HdfsDir = cfg.get('config', 'HdfsDir').strip() print('HdfsDir:' + HdfsDir) NameNode = cfg.get('config', 'NameNode').strip() print('NameNode:' + NameNode) UserName = cfg.get('config', 'UserName').strip() print('UserName:' + UserName) print('Read config.ini successed!') #处理一个 def ProcOne(client, srcFile, dstFile): global FileSize print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile)) #目标文件已经存在且大小相同 if client.exists(dstFile) and \ (os.path.getsize(srcFile) == client.list_status(dstFile)[0].length): print('file exists: %s ' % dstFile) return True #注意,如果已存在会被覆盖 client.copy_from_local(srcFile, dstFile, overwrite=True) #校验文件大小 if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length: FileSize += os.path.getsize(srcFile) return True return False #处理所有 def ProcAll(): client = HdfsClient(hosts=NameNode, user_name=UserName) if not client.exists(HdfsDir): print(HdfsDir + ' not found') sys.exit(-1) total = len(os.listdir(LocalDir)) processed = 0 failedList = list() for filename in os.listdir(LocalDir): srcFile = os.path.join(LocalDir, filename) dstFile = HdfsDir + '/' + filename if not ProcOne(client, srcFile, dstFile): failedList.append(srcFile) processed += 1 print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime)) print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime))) if failedList: print('failedList: %s' % repr(failedList)) else: print('Good! No Error!') print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \ (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__': ReadConfig() ProcAll() print('Time total: %.2f s' % (time.time()-StartTime)) print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
下载 HDFS 文件到本地
# encoding: utf-8 # author: walker # date: 2018-06-07 # summary: 下载 HDFS 文件(或目录)到本地 import os, sys, time from pyhdfs import HdfsClient from configparser import ConfigParser cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) StartTime = time.time() FileSize = 0 #文件总大小 LocalDir = '' HdfsDir = '' NameNode = '' UserName = '' #读取配置文件 def ReadConfig(): global LocalDir, HdfsDir, NameNode, UserName cfg = ConfigParser() cfgFile = os.path.join(cur_dir_fullpath, 'config.ini') if not os.path.exists(cfgFile): input(cfgFile + ' not found') sys.exit(-1) cfgLst = cfg.read(cfgFile) if len(cfgLst) < 1: input('Read config.ini failed...') sys.exit(-1) LocalDir = cfg.get('config', 'LocalDir').strip() if not os.path.exists(LocalDir): input(LocalDir + ' not found') sys.exit(-1) print('LocalDir:' + LocalDir) HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/') print('HdfsDir:' + HdfsDir) NameNode = cfg.get('config', 'NameNode').strip() print('NameNode:' + NameNode) UserName = cfg.get('config', 'UserName').strip() print('UserName:' + UserName) print('Read config.ini successed!') #处理一个 def ProcOne(client, srcFile, dstFile): global FileSize print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile)) dstDir = os.path.dirname(dstFile) if not os.path.exists(dstDir): os.makedirs(dstDir) # 目标文件已经存在且大小相同 if os.path.exists(dstFile) and \ (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length): print('file exists: %s ' % dstFile) return True # 注意,如果已存在会被覆盖 client.copy_to_local(srcFile, dstFile, overwrite=True) if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length: #校验文件大小 return False FileSize += os.path.getsize(dstFile) return True #处理所有 def ProcAll(): client = HdfsClient(hosts=NameNode, user_name=UserName) if not client.exists(HdfsDir): print(HdfsDir + ' not found') sys.exit(-1) total = 0 # 先遍历一遍,得到总文件个数 for parent, dirnames, filenames in client.walk(HdfsDir): for filename in filenames: total += 1 processed = 0 failedList = list() for parent, dirnames, filenames in client.walk(HdfsDir): for filename in filenames: srcFile = '%s/%s' % (parent, filename) relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\') # 相对于根目录的路径 dstFile = os.path.join(LocalDir, relPath) if not ProcOne(client, srcFile, dstFile): failedList.append(srcFile) processed += 1 print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime)) print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime))) if failedList: print('failedList: %s' % repr(failedList)) else: print('Good! No Error!') print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \ (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__': ReadConfig() ProcAll() print('Time total: %.2f s' % (time.time()-StartTime)) print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
*** walker ***
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
分享文章:Python3操作HDFS-创新互联
文章源于:http://pwwzsj.com/article/ddihed.html