【PythonScript(一)】脚本解析
Python脚本解析和KeyPoint
接触到的第一个脚本,实际开发周期为一周时间。
脚本的功能已经在代码之前贴上了。
其实这里的Python脚本无非就是用到了一些第三方类库,比如HDFS,Redis,FTP文件服务,还有比如说文件,文件的遍历什么的,把这些东西搞懂这些Python代码基本就没有什么问题了。
手动总结一下:
HDFS相关
首先必须安装HDFS
1
2
3
4
5
6
7
8
9
10 # 不指定版本号安装
pip install hdfs
# 指定版本号安装
pip install hdfs==2.7.0
# 首先导入hdfs或者from hdfs import InsecureClient
import hdfs
hdfs_url = 'http://172.17.4.9:9870'后续操作
- 创建HDFS连接对象,相当于pymysql中的conn连接对象:
link = InsecureClient(hdfs_url) - 获取HDFS路径下的所有文件和目录,返回值是一个列表:
link.list(path) - 创建文件夹:
link.makedirs(path) - 写文件:
link.write(hdfs_path='/opt/a.txt', overwrite=True, data='hello world'.encode('utf-8'))- 该方法还有一个参数(写入时的mode)
- append参数用于说明要不要覆盖已有的内容,默认为False,即在尾部添加。
- overwrite参数指明如果文件已经存在时的操作,True表示覆盖,False时如果文件已存在就抛异常
- 该方法还有一个参数(写入时的mode)
- 上传下载文件:
link.upload(hdfs_path='/', local_path='C:/Users/gzsqy/Desktop/Git.md') - 查看文件是否存在:
link.status(hdfs_path='/Git.mds', strict=False)- 说明:strict如果设置为True时,文件不存在就会抛出异常,如果为False文件不存在就会返回None。
- 如果文件存在,不管设置了什么都会返回改文件的block信息
- 查看文件的内容:
link.read('/niubi/shang') - 删除文件:
link.delete('/niubi/shang') # 指定要删除的文件位置 - 删除文件夹:
lian.delete('/niubi') # 指定要删除的文件夹位置 - 重命名文件夹:
lian.rename('/niubi', '/niniubi') # 旧文件夹名称位置,新文件夹名称位置 - 下载文件:
lian.download(hdfs_path='/niniubi/shang', overwrite=True, local_path='C:/Users/gzsqy/Desktop/')
SSH相关
- 通过paramiko创建ssh连接:
ssh = paramiko.SSHClient()- 设置自动添加主机密钥:
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1
2
3
4
5
6
7 # 自动添加主机密钥
# 使用 Paramiko 库进行 SSH 连接时,
# 当连接到一个之前没有连接过的主机时,会出现一个警告,提示用户主机的密钥没有被认证。
# 这时候可以使用 set_missing_host_key_policy() 方法来设置主机密钥的策略。
# AutoAddPolicy() 会自动将新的主机密钥添加到本地的 known_hosts 文件中,
# 以便下次连接时可以自动认证主机密钥,而不会再次出现警告。
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- 连接远程服务器:
ssh.connect('hostname', username='root', password='123456')- 创建一个新的通道:
channel = ssh.invoke_shell()
- 给建立通道一点时间,如果不加的话会发现下面的输出捕获会有问题:time.sleep(2)
- 获得ssh连接时的界面:
output = channel.recv(1024)
1
2
3
4
5
6
7 # recv()参数是一个整数,表示要接收的数据的最大字节数。
# 如果远程服务器发送的数据超过了指定的字节数,那么recv()方法会一直等待,直到收完为止。
# channel.recv() 方法返回一个 bytes 类型的对象,表示接收到的数据。
# 如果远程服务器已经关闭了连接,那么 channel.recv() 方法会返回一个空的 bytes 对象。
# print会看到类似下面的输出
print(output.decode()) # channel.recv() 方法返回一个 bytes 类型的对象,用decode()查看
- 在Channel中执行shell命令,并通过recv获得执行的结果,并且一定要decode解码才行。
1
2
3
4
5
6
7
8
9
10 # 在ssh通过channel执行shell命令
channel.send('echo hello\n')
time.sleep(1) # 执行命令如果不等待一下的话,那么output接收到的可能不完整
output = channel.recv(1024)
# 打印一下就可以看到输出了hello
print(output.decode())
# 关闭通道和SSH连接
channel.close()
ssh.close()
- 在 SSH 连接上打开 SFTP(SSH 文件传输协议)会话:
sftp = ssh.open_sftp()- 通过sftp协议上传文件:
sftp.put(local_file, remote_file)
- put()
方法用于将本地文件local_file上传到远程服务器的路径remote_file- 通过sftp协议下载远端文件到本地:
sftp.get(remote_file, local_file)- 通过sftp协议删除远端文件:
sftp.remove(remote_file)- 关闭sftp会话:
sftp.close()- 关闭ssh连接:
ssh.close()
文件相关
这一块基本是和os模块相关的
之前我有总结,这里不想多水了:
- os.path.join(a, b): 将a,b进行拼接,合成一个路径
- os.rmdir(path):删除指定路径(文件夹)
- os.remove(文件路径):删除指定文件
- os.listdir(文件夹路径):列出指定文件夹下面的所有文件
- os.stat(path):返回一个文件状态对象
- os.walk(path):非递归遍历某个文件夹下的所有文件及文件夹
- 一般用法是:for root, files, dirs in os.walk(directory):
递归获取某个目录下所有文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 ># 递归获取某个文件目录下的所有文件
>def get_dir_file_list(path='./', recursion=False):
# 定义列表用于存储所有文件
all_file_list = []
# 获取文件目录下的所有文件(文件夹)listdir返回值为列表(而且为文件名)
file_dir_list = os.listdir(path)
for file_name in file_dir_list:
# 用path和file_name拼接
abs_path = os.path.join(path, file_name)
if os.path.isfile(abs_path):
# 是文件的逻辑
all_file_list.append(file_name)
else:
# 是文件夹的逻辑
if recursion:
sub_path_list = get_dir_file_list(abs_path, True)
all_file_list += sub_path_list
return all_file_list
Redis相关
- 创建Redis连接对象:
r = redis.Redis(host=redis_host, port=redis_port, password=redis_password)- 将
redis_value对象转换为 JSON 字符串,并将其存储在 Redis 数据库中的指定键处:r.set(redis_key, json.dumps(redis_value))
- 这使得你可以使用 Redis 数据库来存储和检索结构化数据,如字典、列表等
杂项相关
- 将数据data计算哈希值并转换为16进制的数据:
file_md5 = hashlib.md5(data).hexdigest()- 时间相关:
import time
- 获取当前时间戳:
time.time()- 获取当前时间对象:
time.localtime()- 将时间对象转换为指定格式的时间字符串:
time.strftime('%Y-%m-%d %H:%M:%S', time_obj)- 好像都没有最后一个简单:
datetime.datetime.today()
FilePutHDFS脚本
脚本功能描述:
脚本功能描述:
脚本的功能是把文件从远端服务器下载到本地,本地读取文件上传到hdfs和redis,并根据文件中”c_event_id”参数通过grep_id.txt文件获取规则组id,将文件放在指定规则组id/规则id/日期的hdfs目录下,将文件已经上传的文件在save_ftp_pth参数指定的目录下按时间向文件中存入已处理的文件名, 并且将符合redis_rule_id参数上传到redis上
1 | # -*- encoding: utf-8 -*- |
脚本文件路径描述:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 读取文件路径:
path = /home/data
判断读取文件中规则组id的文件
group_id_path = r'/opt/script/file_put_hdfs/rule/grep_id.txt'
远程读取文件的路径:
remote_path = r'/opt/script/sj'
一次性处理多少个文件:
file_num = 500
中对于文件中s_boundary,d_boundary字段的转码
boundary_code = 'utf-8'
中对于文件中d_owner字段的转码
owner_code = 'utf-8'
脚本读取那天数据的参数(注:可写可不写,不写默认是读取前一天数据):
ftp_get_date = ''
随机存入redis的某个host:
redis_random_host = ['172.17.4.7','172.17.4.8','172.17.4.9']
只向redis存入一下集合中的规则id:
redis_rule_id = ["42162", "42171", "42172", "42240", "422517"]
远程ftp服务器的基本连接参数:
ftp_host, ftp_user, ftp_password = '172.17.71.103', 'root', '111111'
redis的基本连接参数:
redis_host, redis_port, redis_password = '172.17.68.116', 6379, None
上传hdfs路径:
hdfs_url = 'http://172.17.4.9:9870'
hdfs_path = /original/
在本地存储已上传文件目录的位置
save_ftp_pth = ''
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.


