CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
"""
Time : 2023/10/20 18:15
Author : JohnsonLiam
Motto:What seems to us as bitter trials are often blessings in disguise!
Description: Use Spark DataFrame to transfer txt file to pcap file.
"""
import hashlib

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
import os
import pyspark.sql.functions as F

# 设置环境变量
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'


# todo: 读取linux上的res文件,转为DataFrame
def collect_data(path):
pass


@F.udf
def get_md5(message):
message_md5 = ''
try:
# 获取一个md5对象
md5_obj = hashlib.md5()
# 向摘要数据计算添加一些二进制数据 并使用md5摘要算法,进行摘要计算
md5_obj.update(message.encode(encoding='utf-8'))
# 获取摘要对象的摘要值,并将摘要值转换为16进制并返回给message_md5
message_md5 = md5_obj.hexdigest()
except Exception as ex:
print(ex)
finally:
return message_md5


@F.udf()
def get_mac_value(md5_message):
"""
将payload的前24位的的md5(16进制)转为2进制字符列表。 如2e,f8... -> [101110, 11111000...]
:param md5_message:
:return:
"""
# 创建一个长度为12的数组,初始化所有元素为全0
mac_value = [0 for item in range(12)]
try:
# 将传入列表md5-message的0-24个元素,赋值给md5_message
md5_message = md5_message[:24]
index, cursor = 0, 0
while cursor < len(md5_message):
# 表示从md5_message的cursor位置截取两个元素,后面的16表示16进制数字,将16进制数据转换为int整型数据
mac_value[index] = int(md5_message[cursor: cursor + 2], 16)
index += 1
cursor += 2
except Exception as ex:
print(ex)
finally:
return mac_value


if __name__ == '__main__':
# 创建SparkSession对象
spark = (SparkSession
.builder
.appName('txt2pcap')
.config("spark.sql.shuffle.partition", 2)
.getOrCreate())

# 指定表结构
input_df = (spark
.read
.option('header', 'true')
.option('inferSchema', "true")
.option('sep', '\t')
.csv('file:///export/data/3517_20171206141000.res'))
input_df.show()

# todo: ----------------------------spark sql 方式 ------------------------------
# 创建视图
input_df.createTempView('info')

ssql = "select `时间` as data_time, split(`返回值`, '=')[1] as payload from info where `返回值` is not null"

spark.sql(ssql).show()

# todo:----------------------------DSL 风格 ------------------------------------
etl_df = (input_df
.filter(F.col('返回值').isNotNull())
.select(
F.col('时间').alias('data_time'),
F.split(F.col('返回值'), '=')[1].alias('payload'),
get_mac_value(get_md5(F.split(F.col('返回值'), '=')[1][0:24]).cast(StringType())).alias('md5'))
)
etl_df.show()

未完待续…