【第三方包】
pyhdfs(pypi,github,支持HA)
【功能】
重命名 hdfs 文件或目錄
# encoding: utf-8
# author: walker
# date: 2018-03-17
# summary: 利用 pyhdfs 重命名 hdfs 文件或目錄
import os, sys, time
from pyhdfs import HdfsClient
SrcPath = '/test/xxx'
DstPath = '/test/yyy'
NameNode = 'nn1.example.com:50070,nn2.example.com:50070'
# 將 SrcPath 改名為 DstPath
def Rename(SrcPath, DstPath):
fs = HdfsClient(hosts=NameNode)
if not fs.exists(SrcPath):
print('Error: not found %s' % SrcPath)
sys.exit(-1)
print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath))
fs.rename(SrcPath, DstPath)
if __name__ == '__main__':
Rename(SrcPath, DstPath)上傳文件
# encoding: utf-8
# author: walker
# date: 2018-01-23
# summary: 上傳本地文件到 hdfs 目錄
import os, sys, time
from pyhdfs import HdfsClient
from configparser import ConfigParser
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
StartTime = time.time()
FileSize = 0 #文件總大小
LocalDir = ''
HdfsDir = ''
NameNode = ''
UserName = ''
#讀取配置文件
def ReadConfig():
global LocalDir, HdfsDir, NameNode, UserName
cfg = ConfigParser()
cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')
if not os.path.exists(cfgFile):
input(cfgFile + ' not found')
sys.exit(-1)
cfgLst = cfg.read(cfgFile)
if len(cfgLst) < 1:
input('Read config.ini failed...')
sys.exit(-1)
LocalDir = cfg.get('config', 'LocalDir').strip()
if not os.path.exists(LocalDir):
input(LocalDir + ' not found')
sys.exit(-1)
print('LocalDir:' + LocalDir)
HdfsDir = cfg.get('config', 'HdfsDir').strip()
print('HdfsDir:' + HdfsDir)
NameNode = cfg.get('config', 'NameNode').strip()
print('NameNode:' + NameNode)
UserName = cfg.get('config', 'UserName').strip()
print('UserName:' + UserName)
print('Read config.ini successed!')
#處理一個
def ProcOne(client, srcFile, dstFile):
global FileSize
print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))
#目標文件已經存在且大小相同
if client.exists(dstFile) and \
(os.path.getsize(srcFile) == client.list_status(dstFile)[0].length):
print('file exists: %s ' % dstFile)
return True
#注意,如果已存在會被覆蓋
client.copy_from_local(srcFile, dstFile, overwrite=True)
#校驗文件大小
if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length:
FileSize += os.path.getsize(srcFile)
return True
return False
#處理所有
def ProcAll():
client = HdfsClient(hosts=NameNode, user_name=UserName)
if not client.exists(HdfsDir):
print(HdfsDir + ' not found')
sys.exit(-1)
total = len(os.listdir(LocalDir))
processed = 0
failedList = list()
for filename in os.listdir(LocalDir):
srcFile = os.path.join(LocalDir, filename)
dstFile = HdfsDir + '/' + filename
if not ProcOne(client, srcFile, dstFile):
failedList.append(srcFile)
processed += 1
print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))
print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))
if failedList:
print('failedList: %s' % repr(failedList))
else:
print('Good! No Error!')
print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \
(FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))
if __name__ == '__main__':
ReadConfig()
ProcAll()
print('Time total: %.2f s' % (time.time()-StartTime))
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))下載 HDFS 文件到本地
# encoding: utf-8
# author: walker
# date: 2018-06-07
# summary: 下載 HDFS 文件(或目錄)到本地
import os, sys, time
from pyhdfs import HdfsClient
from configparser import ConfigParser
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
StartTime = time.time()
FileSize = 0 #文件總大小
LocalDir = ''
HdfsDir = ''
NameNode = ''
UserName = ''
#讀取配置文件
def ReadConfig():
global LocalDir, HdfsDir, NameNode, UserName
cfg = ConfigParser()
cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')
if not os.path.exists(cfgFile):
input(cfgFile + ' not found')
sys.exit(-1)
cfgLst = cfg.read(cfgFile)
if len(cfgLst) < 1:
input('Read config.ini failed...')
sys.exit(-1)
LocalDir = cfg.get('config', 'LocalDir').strip()
if not os.path.exists(LocalDir):
input(LocalDir + ' not found')
sys.exit(-1)
print('LocalDir:' + LocalDir)
HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/')
print('HdfsDir:' + HdfsDir)
NameNode = cfg.get('config', 'NameNode').strip()
print('NameNode:' + NameNode)
UserName = cfg.get('config', 'UserName').strip()
print('UserName:' + UserName)
print('Read config.ini successed!')
#處理一個
def ProcOne(client, srcFile, dstFile):
global FileSize
print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))
dstDir = os.path.dirname(dstFile)
if not os.path.exists(dstDir):
os.makedirs(dstDir)
# 目標文件已經存在且大小相同
if os.path.exists(dstFile) and \
(os.path.getsize(dstFile) == client.list_status(srcFile)[0].length):
print('file exists: %s ' % dstFile)
return True
# 注意,如果已存在會被覆蓋
client.copy_to_local(srcFile, dstFile, overwrite=True)
if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length: #校驗文件大小
return False
FileSize += os.path.getsize(dstFile)
return True
#處理所有
def ProcAll():
client = HdfsClient(hosts=NameNode, user_name=UserName)
if not client.exists(HdfsDir):
print(HdfsDir + ' not found')
sys.exit(-1)
total = 0
# 先遍歷一遍,得到總文件個數
for parent, dirnames, filenames in client.walk(HdfsDir):
for filename in filenames:
total += 1
processed = 0
failedList = list()
for parent, dirnames, filenames in client.walk(HdfsDir):
for filename in filenames:
srcFile = '%s/%s' % (parent, filename)
relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\') # 相對于根目錄的路徑
dstFile = os.path.join(LocalDir, relPath)
if not ProcOne(client, srcFile, dstFile):
failedList.append(srcFile)
processed += 1
print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))
print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))
if failedList:
print('failedList: %s' % repr(failedList))
else:
print('Good! No Error!')
print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \
(FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))
if __name__ == '__main__':
ReadConfig()
ProcAll()
print('Time total: %.2f s' % (time.time()-StartTime))
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))*** walker ***
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。