1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| """ Time : 2023/10/20 18:15 Author : JohnsonLiam Motto:What seems to us as bitter trials are often blessings in disguise! Description: Use Spark DataFrame to transfer txt file to pcap file. """ import hashlib
from pyspark.sql import SparkSession from pyspark.sql.types import StringType import os import pyspark.sql.functions as F
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['SPARK_HOME'] = '/export/server/spark'
def collect_data(path): pass
@F.udf def get_md5(message): message_md5 = '' try: md5_obj = hashlib.md5() md5_obj.update(message.encode(encoding='utf-8')) message_md5 = md5_obj.hexdigest() except Exception as ex: print(ex) finally: return message_md5
@F.udf() def get_mac_value(md5_message): """ 将payload的前24位的的md5(16进制)转为2进制字符列表。 如2e,f8... -> [101110, 11111000...] :param md5_message: :return: """ mac_value = [0 for item in range(12)] try: md5_message = md5_message[:24] index, cursor = 0, 0 while cursor < len(md5_message): mac_value[index] = int(md5_message[cursor: cursor + 2], 16) index += 1 cursor += 2 except Exception as ex: print(ex) finally: return mac_value
if __name__ == '__main__': spark = (SparkSession .builder .appName('txt2pcap') .config("spark.sql.shuffle.partition", 2) .getOrCreate())
input_df = (spark .read .option('header', 'true') .option('inferSchema', "true") .option('sep', '\t') .csv('file:///export/data/3517_20171206141000.res')) input_df.show()
input_df.createTempView('info')
ssql = "select `时间` as data_time, split(`返回值`, '=')[1] as payload from info where `返回值` is not null"
spark.sql(ssql).show()
etl_df = (input_df .filter(F.col('返回值').isNotNull()) .select( F.col('时间').alias('data_time'), F.split(F.col('返回值'), '=')[1].alias('payload'), get_mac_value(get_md5(F.split(F.col('返回值'), '=')[1][0:24]).cast(StringType())).alias('md5')) ) etl_df.show()
|