Python脚本解析和KeyPoint

接触到的第一个脚本,实际开发周期为一周时间。

脚本的功能已经在代码之前贴上了。

其实这里的Python脚本无非就是用到了一些第三方类库,比如HDFS,Redis,FTP文件服务,还有比如说文件,文件的遍历什么的,把这些东西搞懂这些Python代码基本就没有什么问题了。

手动总结一下:

HDFS相关

首先必须安装HDFS

1
2
3
4
5
6
7
8
9
10
# 不指定版本号安装
pip install hdfs

# 指定版本号安装
pip install hdfs==2.7.0

# 首先导入hdfs或者from hdfs import InsecureClient
import hdfs

hdfs_url = 'http://172.17.4.9:9870'

后续操作

  • 创建HDFS连接对象,相当于pymysql中的conn连接对象:link = InsecureClient(hdfs_url)
  • 获取HDFS路径下的所有文件和目录,返回值是一个列表:link.list(path)
  • 创建文件夹:link.makedirs(path)
  • 写文件:link.write(hdfs_path='/opt/a.txt', overwrite=True, data='hello world'.encode('utf-8'))
    • 该方法还有一个参数(写入时的mode)
      • append参数用于说明要不要覆盖已有的内容,默认为False,即在尾部添加。
      • overwrite参数指明如果文件已经存在时的操作,True表示覆盖,False时如果文件已存在就抛异常
  • 上传下载文件:link.upload(hdfs_path='/', local_path='C:/Users/gzsqy/Desktop/Git.md')
  • 查看文件是否存在:link.status(hdfs_path='/Git.mds', strict=False)
    • 说明:strict如果设置为True时,文件不存在就会抛出异常,如果为False文件不存在就会返回None。
    • 如果文件存在,不管设置了什么都会返回改文件的block信息
  • 查看文件的内容:link.read('/niubi/shang')
  • 删除文件:link.delete('/niubi/shang') # 指定要删除的文件位置
  • 删除文件夹:lian.delete('/niubi') # 指定要删除的文件夹位置
  • 重命名文件夹:lian.rename('/niubi', '/niniubi') # 旧文件夹名称位置,新文件夹名称位置
  • 下载文件:lian.download(hdfs_path='/niniubi/shang', overwrite=True, local_path='C:/Users/gzsqy/Desktop/')

SSH相关

  • 通过paramiko创建ssh连接:ssh = paramiko.SSHClient()
  • 设置自动添加主机密钥:ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1
2
3
4
5
6
7
# 自动添加主机密钥
# 使用 Paramiko 库进行 SSH 连接时,
# 当连接到一个之前没有连接过的主机时,会出现一个警告,提示用户主机的密钥没有被认证。
# 这时候可以使用 set_missing_host_key_policy() 方法来设置主机密钥的策略。
# AutoAddPolicy() 会自动将新的主机密钥添加到本地的 known_hosts 文件中,
# 以便下次连接时可以自动认证主机密钥,而不会再次出现警告。
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  • 连接远程服务器:ssh.connect('hostname', username='root', password='123456')
  • 创建一个新的通道:channel = ssh.invoke_shell()
    • 给建立通道一点时间,如果不加的话会发现下面的输出捕获会有问题:time.sleep(2)
  • 获得ssh连接时的界面: output = channel.recv(1024)
1
2
3
4
5
6
7
# recv()参数是一个整数,表示要接收的数据的最大字节数。
# 如果远程服务器发送的数据超过了指定的字节数,那么recv()方法会一直等待,直到收完为止。
# channel.recv() 方法返回一个 bytes 类型的对象,表示接收到的数据。
# 如果远程服务器已经关闭了连接,那么 channel.recv() 方法会返回一个空的 bytes 对象。
# print会看到类似下面的输出

print(output.decode()) # channel.recv() 方法返回一个 bytes 类型的对象,用decode()查看
  • 在Channel中执行shell命令,并通过recv获得执行的结果,并且一定要decode解码才行。
1
2
3
4
5
6
7
8
9
10
# 在ssh通过channel执行shell命令
channel.send('echo hello\n')
time.sleep(1) # 执行命令如果不等待一下的话,那么output接收到的可能不完整
output = channel.recv(1024)
# 打印一下就可以看到输出了hello
print(output.decode())

# 关闭通道和SSH连接
channel.close()
ssh.close()
  • 在 SSH 连接上打开 SFTP(SSH 文件传输协议)会话:sftp = ssh.open_sftp()
  • 通过sftp协议上传文件:sftp.put(local_file, remote_file)
    • put()方法用于将本地文件local_file上传到远程服务器的路径remote_file
  • 通过sftp协议下载远端文件到本地:sftp.get(remote_file, local_file)
  • 通过sftp协议删除远端文件:sftp.remove(remote_file)
  • 关闭sftp会话:sftp.close()
  • 关闭ssh连接:ssh.close()

文件相关

这一块基本是和os模块相关的

之前我有总结,这里不想多水了:

url

  • os.path.join(a, b): 将a,b进行拼接,合成一个路径
  • os.rmdir(path):删除指定路径(文件夹)
  • os.remove(文件路径):删除指定文件
  • os.listdir(文件夹路径):列出指定文件夹下面的所有文件
  • os.stat(path):返回一个文件状态对象
  • os.walk(path):非递归遍历某个文件夹下的所有文件及文件夹
  • 一般用法是:for root, files, dirs in os.walk(directory):

递归获取某个目录下所有文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
># 递归获取某个文件目录下的所有文件
>def get_dir_file_list(path='./', recursion=False):
# 定义列表用于存储所有文件
all_file_list = []
# 获取文件目录下的所有文件(文件夹)listdir返回值为列表(而且为文件名)
file_dir_list = os.listdir(path)
for file_name in file_dir_list:
# 用path和file_name拼接
abs_path = os.path.join(path, file_name)
if os.path.isfile(abs_path):
# 是文件的逻辑
all_file_list.append(file_name)
else:
# 是文件夹的逻辑
if recursion:
sub_path_list = get_dir_file_list(abs_path, True)
all_file_list += sub_path_list
return all_file_list

Redis相关

  • 创建Redis连接对象:r = redis.Redis(host=redis_host, port=redis_port, password=redis_password)
  • redis_value 对象转换为 JSON 字符串,并将其存储在 Redis 数据库中的指定键处:r.set(redis_key, json.dumps(redis_value))
    • 这使得你可以使用 Redis 数据库来存储和检索结构化数据,如字典、列表等

杂项相关

  • 将数据data计算哈希值并转换为16进制的数据:file_md5 = hashlib.md5(data).hexdigest()
  • 时间相关:import time
    • 获取当前时间戳:time.time()
    • 获取当前时间对象:time.localtime()
    • 将时间对象转换为指定格式的时间字符串:time.strftime('%Y-%m-%d %H:%M:%S', time_obj)
    • 好像都没有最后一个简单:datetime.datetime.today()

FilePutHDFS脚本

脚本功能描述:

脚本功能描述:

脚本的功能是把文件从远端服务器下载到本地,本地读取文件上传到hdfs和redis,并根据文件中”c_event_id”参数通过grep_id.txt文件获取规则组id,将文件放在指定规则组id/规则id/日期的hdfs目录下,将文件已经上传的文件在save_ftp_pth参数指定的目录下按时间向文件中存入已处理的文件名, 并且将符合redis_rule_id参数上传到redis上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# -*- encoding: utf-8 -*-

import os
import json
import time

import redis
import random
import hashlib
import paramiko
import datetime
from hdfs import InsecureClient

# 格式化代码 command alt l
path = r'/opt/script/file_put_hdfs/sj'
# 脚本判断读取文件中规则组id的文件
group_id_path = r'/opt/script/file_put_hdfs/rule/grep_id.txt'
# 脚本远程读取文件的路径
remote_path = r'/opt/script/sj'
# 设置500为一批
file_num = 500
boundary_code = 'utf-8'
owner_code = 'utf-8'

ftp_get_date = ''

# 定义Redis写入服务器列表
redis_random_host = ['172.17.4.7', '172.17.4.8', '172.17.4.9']
# Redis写入规则
redis_rule_id = ["42162", "42171", "42172", "42240", "422517"]

ftp_host, ftp_user, ftp_password = '172.17.71.103', 'root', '111111'
redis_host, redis_port, redis_password = '172.17.68.116', 6379, None
# 服务器HDFS的Url
hdfs_url = 'http://172.17.4.9:9870'
# 定义HDFS路径
hdfs_path = '/test'
save_ftp_pth = ''


def traverse_dir(save_file, link):
"""

:param save_file:
:param link:
:return:
"""
# 非递归获取某个目录下的 文件夹 和 文件,多层遍历,但实际和递归是一样的
for root, dirs, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
# 获取到文件的每一行数据
for b_line in open(file_path, 'rb').readlines():
line = (b_line
.decode('unicode_escape')
.replace('\n', '')
.replace('\t', '')
.replace(' ', '')
.replace('"', '')
.replace("'", ""))
if line == '{{':
json_value = {}
elif line == '},{' or line == '}}':
id_time = file.split('_')[0].replace('-', '')
# 从json_value 字典中获取键为c_event_id 的值,如果值不存在则会报KeyError的错误
rule_id = json_value['c_event_id']
# 根据rule_id 获取规则的组id
rule_group_id = get_grou_id(rule_id)
# 一个列表而已
ls_value = ['NULL', rule_id, json_value['c_src_ip'], json_value['c_dest_ip'],
json_value['c_src_port'], json_value['c_dest_port'], json_value['c_proto_type'],
'NULL', json_value['c_return_info'], 'NULL', 'NULL', 'NULL', 'NULL', 'NULL',
'NULL',
'NULL', 'NULL', json_value['s_boundary'], json_value['s_region'],
json_value['s_district'],
json_value['s_city'],
json_value['s_operators'],
json_value['s_owner'], json_value['d_boundary'], json_value['d_region'],
json_value['d_district'],
json_value['d_city'], json_value['d_operators'], json_value['d_owner'], 'NULL',
'NULL', 'NULL', 'NULL', 'NULL', 'NULL', 'NULL', 'NULL', 'NULL']
# 在列表中每个元素之间添加上制表符
str_vale = '\t'.join(ls_value)

# try:
print(str_vale)
put_hdfs(rule_group_id, rule_id, id_time, str_vale, link)
# except:
# print(f'{file_path}文件传输有问题')
json_value = {}

# 这个逻辑是处理文件的内容的
else:
line_type, line_value = '', ''
for ln in line.split(','):
# 不为空而且是一个键值对的形式
if ln != '' and len(ln.split(':')) == 2:
line_type, line_value = ln.split(':')[0], ln.split(':')[1]
if 's_boundary' == line_type or 'd_boundary' == line_type:
u_line = (b_line.decode(boundary_code)
.replace('\n', '')
.replace('\t', '')
.replace(' ', '')
.replace('"', '')
.replace("'", ""))
for u_ln in u_line.split(','):
if u_ln != '' and len(u_ln.split(':')) == 2:
line_type, line_value = u_ln.split(':')[0], u_ln.split(':')[1]
json_value[line_type] = line_value
elif u_ln != '' and len(u_ln.split(':')) != 2:
json_value[line_type] = json_value[line_type] + ',' + u_ln


elif 'd_owner' == line_type:
u_line = (b_line.decode(owner_code)
.replace('\n', '')
.replace('\t', '')
.replace(' ', '')
.replace('"', '')
.replace("'", ""))
for u_ln in u_line.split(','):
if u_ln != '' and len(u_ln.split(':')) == 2:
line_type, line_value = u_ln.split(':')[0], u_ln.split(':')[1]
json_value[line_type] = line_value
elif u_ln != '' and len(u_ln.split(':')) != 2:
json_value[line_type] = json_value[line_type] + ',' + u_ln
else:
json_value[line_type] = line_value

elif ln != '' and len(ln.split(':')) != 2:
json_value[line_type] = json_value[line_type] + ',' + ln

# 删除root下名为file的文件,说实话os.remove 用的真不多
os.remove(os.path.join(root, file))
# 将file写入到save_file 并在结尾添加一个换行符
save_file.write(file + '\n')


def get_grou_id(rule_id):
rule_group_id = '00001'
# group_id_path = r'/opt/script/file_put_hdfs/rule/grep_id.txt'、
# 循环结构每次读取文件的一行数据
for line in open(group_id_path, 'r', encoding='utf8'):
if rule_id in line:
rule_group_id = line.split('|++|')[0]
return rule_group_id


def get_dir_ls(rule_group_id, group_id, id_time):
"""
根据上面的三个参数获取文件夹的一个列表
:param rule_group_id:
:param group_id:
:param id_time:
:return:
"""
dir_ls = []
if rule_group_id != '':
dir_ls.append(rule_group_id)
dir_ls.append(group_id)
dir_ls.append(id_time)

return dir_ls


def put_hdfs(rule_group_id, rule_id, id_time, str_vale, link):
"""
将文件写入到HDFS上,写入HDFS的几种方式,
:param rule_group_id:
:param rule_id:
:param id_time:
:param str_vale:
:param link:
:return:
"""
dir_ls = get_dir_ls(rule_group_id, rule_id, id_time)
hdfs_path_dir = hdfs_path

for dir_name in dir_ls:
if dir_name not in link.list(hdfs_path_dir):
hdfs_path_dir = hdfs_path_dir + '/' + dir_name
link.makedirs(hdfs_path_dir)
else:
hdfs_path_dir = hdfs_path_dir + '/' + dir_name

# HDFS上传文件
file_name = f"0000000001.log"
hdfs_path_path = hdfs_path_dir + '/' + file_name
print(hdfs_path_path)
try:
link.write(hdfs_path_path, data=str_vale + '\n', encoding='utf-8', append=True)
except:
link.write(hdfs_path_path, data=str_vale + '\n', encoding='utf-8')


def get_file_md5(file_name):
"""
通过第三方类库,hashlib获取文件的md5值并返回
:param file_name:
:return:
"""
with open(file_name, 'rb') as fp:
data = fp.read()
file_md5 = hashlib.md5(data).hexdigest()
return file_md5


def ftp_file(ftp_get_date, link):
"""
传入昨天的日期和一个HDFS链接
:param ftp_get_date: 昨日的日期
:param link: HDFS客户端实例化对象,用于实现对HDFS的增删改查操作
:return:
"""
# 将两个路径拼接成一个路径,格式为 保存文件的路径 + 昨天的日期.log 日志文件
save_dir = os.path.join(save_ftp_pth, f"{ftp_get_date}.log")
# 创建文件
save_file = open(save_dir, 'a+', encoding='utf8')
# todo: 通过调用类名调用方法,实例化一个SSH链接对象,通过该对象可以连接到远程主机,建立SSH连接、执行远程命令、传输文件
# 使用paramiko库创建SSHClient对象(paramiko是用于Python中进行SSH连接的库,用于提供SSH客户端和服务器连接)
ssh = paramiko.SSHClient()
# 需要设置SSHClient对象的策略,用于接受SFTP服务器的主机密钥。在开发环境中,可以使用AutoAddPolicy策略,它会自动接受所有主机密钥
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 使用connect方法连接到SFTP服务器,通过ssh对象指定 用户名 和 密码 来连接到远程主机
ssh.connect(ftp_host, username=ftp_user, password=ftp_password)
# 成功连接到SFTP服务器后,我们需要创建一个SFTP会话。通过SSHClient对象ssh的open_sftp方法创建一个SFTPClient对象
sftp = ssh.open_sftp()
# 使用SFTPClient对象的listdir方法可以列出指定目录下的文件和目录。需要提供目标路径
file_ls = sftp.listdir(remote_path)

# 如果文件的个数 <= 500 没有到阈值
if len(file_ls) <= file_num:
# 遍历目录中的每个文件
for file_name in file_ls:
# 文件名不包含tmp,即不是临时文件,并且是昨天(默认)的文件
if 'tmp' not in file_name and ftp_get_date in file_name:
# 将两个路径和文件名拼接为一个指定路径
remote_file = os.path.join(remote_path, file_name)
# 同上
local_file = os.path.join(path, file_name)
# 从远程ftp下载文件
sftp.get(remote_file, local_file)
# 删除远程ftp指定的文件
sftp.remove(remote_file)

traverse_dir(save_file, link)
# 目录下文件的个数大于文件的数量
else:
# 从0开始到远端文件个数,步长为文件的数量(500),range是左闭右开
for i in range(0, len(file_ls), file_num):
# 遍历0到500个文件
for file_name in file_ls[i:i + file_num]:
if 'tmp' not in file_name and ftp_get_date in file_name:
remote_file = os.path.join(remote_path, file_name)
local_file = os.path.join(path, file_name)
sftp.get(remote_file, local_file)
sftp.remove(remote_file)
traverse_dir(save_file, link)
# 关闭文件所有的文件以及ssh
save_file.close()
sftp.close()
ssh.close()


def put_redis(link):
"""
将从实时服务器读取到本地的文件上传到Redis服务器上面
:param link:
:return:
"""
r = redis.Redis(host=redis_host, port=redis_port, password=redis_password)
for grep_id_name in link.list(hdfs_path):
file_path = os.path.join(hdfs_path, grep_id_name)
for rule_id_name in link.list(file_path):
if rule_id_name in redis_rule_id:
redis_key = f"{grep_id_name}_{rule_id_name}_{ftp_get_date.replace('-', '')}"
redis_value = {"host": redis_random_host[random.randint(0, len(redis_random_host) - 1)],
"file_path": os.path.join(file_path, rule_id_name, ftp_get_date.replace('-', ''),
'0000000001.log'),
"state": "ready",
"username": "xlkh",
"is_zip": False,
"id_query": rule_id_name,
"data_time": ftp_get_date.replace('-', '')}
print(redis_value)
r.set(redis_key, json.dumps(redis_value))
r.close()


if __name__ == '__main__':
"""
脚本的功能是把文件从远端服务器下载到本地,本地读取文件上传到hdfs和redis,
并根据文件中”c_event_id”参数通过grep_id.txt文件获取规则组id,将文件放在指定规则组id/规则id/日期的hdfs目录下,
将文件已经上传的文件在save_ftp_pth参数指定的目录下按时间向文件中存入已处理的文件名, 并且将符合redis_rule_id参数上传到redis上。
"""
# 实例化客户端 - 创建客户端连接对象
link = InsecureClient(hdfs_url)
# 如果ftp_get_date 为空
if ftp_get_date == '':
today = datetime.datetime.today()
# 在当前时间的基础上时间向后推移一天
yesterday = today - datetime.timedelta(days=1)
# time.strftime() 将日期对象转为指定日期格式
ftp_get_date = yesterday.strftime('%Y-%m-%d')
# todo 调用ftp_file 方法(传参为日期 , HDFS客户端连接对象)
ftp_file(ftp_get_date, link)
else:
ftp_file(ftp_get_date, link)
# todo 最终将HDFS中的文件写到Redis中(先给一个链接)
put_redis(link)

脚本文件路径描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
读取文件路径:  
path = /home/data
判断读取文件中规则组id的文件
group_id_path = r'/opt/script/file_put_hdfs/rule/grep_id.txt'
远程读取文件的路径:
remote_path = r'/opt/script/sj'
一次性处理多少个文件:
file_num = 500
中对于文件中s_boundary,d_boundary字段的转码
boundary_code = 'utf-8'
中对于文件中d_owner字段的转码
owner_code = 'utf-8'
脚本读取那天数据的参数(注:可写可不写,不写默认是读取前一天数据):
ftp_get_date = ''
随机存入redis的某个host:
redis_random_host = ['172.17.4.7','172.17.4.8','172.17.4.9']
只向redis存入一下集合中的规则id:
redis_rule_id = ["42162", "42171", "42172", "42240", "422517"]
远程ftp服务器的基本连接参数:
ftp_host, ftp_user, ftp_password = '172.17.71.103', 'root', '111111'
redis的基本连接参数:
redis_host, redis_port, redis_password = '172.17.68.116', 6379, None
上传hdfs路径:
hdfs_url = 'http://172.17.4.9:9870'
hdfs_path = /original/
在本地存储已上传文件目录的位置
save_ftp_pth = ''