今天只看了一部分,主要还是有关以连接数据库的一部分代码。

Python代码写的还是比较6的,很多东西我是可能写不粗来,不过能看懂,加上之前自己写过工具类,这里大差不差,无非就是表名,数据库名,一些逻辑发生了变化,这里简单记录一下。

当然代码量是比较大的,先从工具类开始记录。

DBHandler数据库工具类

这个类的作用就是连接MySQL数据库,然后在实例化对象之后就可以对MySQL数据库做一些增删改查之类的操作了。

以下方法都需要有日志的产生,所以后面会阐述日志工具类。

  • 关闭连接方法- close方法
  • 将查询到的数据转为DownloadModel对象- query_for_download方法
  • 向表中插入当前时间的数据- update_download_info方法
  • 更新表中时间- update_business_time方法
  • 将查询到的结果封装成一个ExecutorModel对象- query_for_operation方法
  • 更新数据表business_rule_operation中update_time字段和操作人 - update_for_operation方法
  • 更新表business_rule_sumary中规则状态rule_state - update_state_for_rule方法
  • 向表stat_business_analysis中插入数据(更新)update_business_analysis方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class DBHelper:
# todo: 就是一个数据库工具类

def __init__(self):
try:
self.conn = pymysql.connect(host=DBConfig.HOST, user=DBConfig.USER, password=DBConfig.PASS,
database=DBConfig.NAME, charset=DBConfig.CHARSET)
except Exception as ex:
self.conn = None

# 将出错内容输出到logger日志中
logger.exception(ex)

def close(self):
try:
# 判断连接是否存在,如果存在就关闭,不存在就返回一个空字符串
self.conn.close() if self.conn is not None else ''
except Exception as ex:
# 记录错误日志
logger.exception(ex)

def query_for_download(self):
query_result = list()
try:
cursor = self.conn.cursor()
query = "SELECT d.data_query_id,d.data_time_stamp,d.interval_cycle,u.user_name FROM business_rule_sumary s," \
"business_rule_detail d,sys_user u WHERE s.id=d.rule_sumary_id AND s.rule_state >= 7 AND " \
"s.create_user=u.user_id GROUP BY d.data_query_id;"
cursor.execute(query)

# todo: 将SQL查询到的结果(元组) 将每个元组 转为 DownloadModel对象,并产生对象列表
query_result = [DownloadModel(item) for item in cursor.fetchall()]
except Exception as ex:
logger.exception(ex)
finally:
self.close()
return query_result

def update_download_info(self, id_query, data_time, download_state):
try:
cursor = self.conn.cursor()
# 通过时间工具获取当前时间
storage_time = TimeUtil().get_current_time()
cursor.execute("INSERT INTO stat_download_info(id_query,data_time,download_state,storage_time) "
"VALUES('%s','%s',%d,'%s')" % (id_query, data_time, int(download_state), storage_time))
self.conn.commit()
except Exception as ex:
logger.exception(ex)

def update_business_time(self, id_query, next_time_repr, next_time_str):
try:
cursor = self.conn.cursor()
update_time = TimeUtil().get_current_time()
cursor.execute("SELECT id, rule_sumary_id FROM business_rule_detail WHERE data_query_id='%s'" % id_query)
for rule_info in cursor.fetchall():
command = "UPDATE business_rule_detail d, business_rule_sumary s SET d.data_time_stamp='%s'," \
"d.update_time='%s', s.data_time='%s',s.update_time='%s' WHERE d.id=%d AND s.id=%d" % \
(next_time_str, update_time, next_time_repr, update_time, rule_info[0], rule_info[1])
cursor.execute(command)
self.conn.commit()
except Exception as ex:
logger.exception(ex)

def query_for_operation(self, rule_id, batch_id, query_id):
executor_model = None
try:
cursor = self.conn.cursor()
query_operations_cmd = "SELECT b.rule_sumary_id, b.create_user,s.user_name,k.operation_name,o.operation_id, " \
"o.operation_attach FROM business_rule_detail b, knowledge_rule_operation k, " \
"business_rule_operation o, sys_user s WHERE b.data_rule_id = '%s' AND b.rule_sumary_id" \
"=o.rule_sumary_id AND b.create_user=s.user_id AND o.operation_id=k.id;" % rule_id
cursor.execute(query_operations_cmd)
query_result = cursor.fetchall()
if len(query_result) > 0:
# 将查询到的结果封装成一个ExecutorModel对象
executor_model = ExecutorModel(query_result[0][0], rule_id, query_id, batch_id, query_result[0][1],
query_result[0][2])
for query_item in query_result:
# todo: 向字典中添加一个键值对,键为query_item[3],值为一个元组
executor_model.operations[query_item[3]] = (query_item[4], query_item[5])
except Exception as ex:
logger.exception(ex)
finally:
self.close()
return executor_model

def update_for_operation(self, rule_id, operation_id, user_id, **kwargs):
try:
storage_time = TimeUtil().get_current_time()
cursor = self.conn.cursor()
# 通过占位符的形式将SQL语句给传入参数
update = "UPDATE business_rule_operation SET update_time='%s',update_user=%d" % (storage_time, int(user_id))
# todo: 首先判断kwargs字典(传递的参数如果是*args则多余的参数会以元组的形式存放起来, 如果是**kwargs则多余的参数会以字典的形式存放起来)
if kwargs.get('data_store_detail') is not None:
# 占位符
update += ", data_store_detail='%s'" % kwargs.get('data_store_detail')
# todo: 还是占位符
update += " WHERE rule_sumary_id=%d AND operation_id=%d;" % (int(rule_id), int(operation_id))
cursor.execute(update)
self.conn.commit()
except Exception as ex:
logger.exception(ex)
finally:
self.close()

def update_state_for_rule(self, rule_summary_id, rule_state):
try:
storage_time = TimeUtil().get_current_time()
cursor = self.conn.cursor()
update = "UPDATE business_rule_sumary SET rule_state=%d, update_time='%s' WHERE id=%d;" % \
(int(rule_state), storage_time, int(rule_summary_id))
cursor.execute(update)
self.conn.commit()
except Exception as ex:
logger.exception(ex)
finally:
self.close()

def update_business_analysis(self, id_query, id_batch):
try:
storage_time = TimeUtil().get_current_time()
cursor = self.conn.cursor()
update = "INSERT INTO stat_business_analysis(query_id, batch_id, create_time, update_time) " \
"VALUES('%s','%s','%s','%s')" % (id_query, id_batch, storage_time, storage_time)
cursor.execute(update)
self.conn.commit()
except Exception as ex:
# 将报错日志给记录到日志文件中
logging.exception(ex)
finally:
self.close()