""" 功能:将mysql从源数据表同步目标数据表 主要用于ocr 识别及自动脱敏程序 """ import configparser import logging #import sys import os import platform import re import time from datetime import datetime # from tkinter import EXCEPTION import requests import json import smtplib from binascii import a2b_base64, b2a_base64 # import pymssql import pymysql import xlrd # from datetime import datetime, time from Crypto.Cipher import AES #from Crypto.PublicKey import RSA from sshtunnel import SSHTunnelForwarder from ufile import config, filemanager from ufile.config import BLOCKSIZE, get_default from ufile.logger import logger, set_log_file from email.mime.multipart import MIMEMultipart # 构建邮件头信息,包括发件人,接收人,标题等 from email.mime.text import MIMEText # 构建邮件正文,可以是text,也可以是HTML from email.mime.application import MIMEApplication # 构建邮件附件,理论上,只要是文件即可,一般是图片,Excel表格,word文件等 from email.header import Header # 专门构建邮件标题的,这样做,可以支持标题中文 from auto_email.error_email import send_error_email #import pysnooper #import base64 class Prpcrypt(object): def __init__(self, key): self.mode = AES.MODE_CBC self.key = self.pad16(key) # str不是16的倍数就补足为16的倍数 def pad16(self, text): text = bytes(text, encoding="utf8") while len(text) % 16 != 0: text += b'\0' return text def encrypt(self, text): texts = self.pad16(text) aes = AES.new(self.key, self.mode, self.key) res = aes.encrypt(texts) return str(b2a_base64(res), encoding="utf-8") def decrypt(self, text): texts = a2b_base64(self.pad16(text)) aes = AES.new(self.key, self.mode, self.key) res = str(aes.decrypt(texts), encoding="utf8").replace('\0', '') return res # class Logger(object): # def __init__(self, filename="Default.log"): # self.terminal = sys.stdout # self.log = open(filename, "a") # def write(self, message): # self.terminal.write(message) # self.log.write(message) # def flush(self): # pass def send_data(html_msg, v_head, i): """发送邮件的脚本""" # 邮件服务信息,个人 smtp_server = 'smtp.fucb.cn' username = "xuzhengliang@fucb.cn" password = 'fu3223x6Zl' # 'fu3223xZl' # # 邮件发送和接收人yuhong@fucb.cn sender = username receiver = ["jsxuzl@qq.com", "1515783401@qq.com"] # receiver = ["xuzhengliang@fucb.cn","jsxuzl@qq.com","1804996986@qq.com"] # receiver = ['jsxuzl@qq.com'] # v_disdate = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 邮件头信息 # 邮件类型,如果要加图片等附件,就得是这个 # if i > 0: # msg = MIMEMultipart('related') # content_html = MIMEText(html_msg, "html", "utf-8") # msg.attach(content_html) # else: # # msg = MIMEMultipart('mixed') msg = MIMEText(html_msg, 'plain', 'utf-8') if i >= 300: v_head=v_head+'(处理涂抹状态的案子为'+str(i)+',请检查ocr 脱敏)' # msg['Subject'] = Header("{dept}每日提醒,提醒时间{dtype}".format(dept=v_head,dtype=v_disdate,)) msg['Subject'] = Header(v_head, 'utf-8') msg["From"] = sender msg['To'] = ','.join(receiver) # 这里要注意 logger.info("{user}发送邮件给{receiver}".format(user=username, receiver=receiver)) logger.info(v_head) # 发送邮件,测试成功,流程都是固定的:创建客户端,登陆,发送,关闭 email_client = smtplib.SMTP(smtp_server) email_client.login(username, password) email_client.sendmail(sender, receiver, msg.as_string()) email_client.quit() def sync_dbtarget(source_cursor,source_conn,target_cursor,target_db,i_sync_fields,i_query_sql,i_insert_sql,i_table_name,i_sync_begindate,i_imp_table_name): if i_query_sql.count('%s'): source_cursor.execute(i_query_sql,(i_sync_begindate)) else: source_cursor.execute(i_query_sql) from_data=source_cursor.fetchall() v_fiels_count=i_sync_fields.count(',')+1 v_fiels_list=i_sync_fields.replace('\n','').replace(' ','').split(',') # 获取查询结果的长度 rowcount = len(from_data) v_pk_phhd='' # 下面语句拼接SQL,实现一次插入1000条,j为行标记 j = 1 sql = "" for i in from_data: # v_fields_value=[i[j] for j in v_fiels_list] #有错误 v_fields_value=[i[v_fiels_list[j]] for j in range(0,v_fiels_count)] # sql2 = '("{}","{}","{}","{}","{}")'.format(i[0], i[1], i[2], i[3], i[4]) if i_table_name=='zx_phhd' or i_table_name=='zx_ie_settlement' or i_table_name=='zx_ie_discharge': v_pk_phhd = v_pk_phhd+','+str(i['pk_phhd']) elif i_table_name=='zx_phrec': if v_pk_phhd=='' or not v_pk_phhd.endswith(','+str(i['pk_phhd'])) : v_pk_phhd = v_pk_phhd + ',' + str(i['pk_phhd']) sql2 = (("(" + '"{}",' * v_fiels_count)[:-1] + ")").format(*v_fields_value) #将 "None"-->null ,"b'\x00'"-->0,"b'\x01'"-->1 #目标表 bit 字段可以不用改为int 字段 sql2 = sql2.replace('"None"',"null") sql2 = sql2.replace('\"b\'\\x00\'\"',"0") sql2 = sql2.replace('\"b\'\\x01\'\"',"1") sql = sql + "," + sql2 if divmod(j, 1000)[1] == 0 or j == rowcount: # 如果执行错误回滚当前事务 try: target_cursor.execute(i_insert_sql + sql[1:]) target_db.commit() except Exception as e: target_db.rollback() logging.error('插入失败imp_'+i_table_name+",原因"+str(e)) continue sql = "" j = j + 1 # 每一千条打印 if divmod(j, 1000)[1] == 0: logging.info("已经插入"+i_imp_table_name+":" +str(j) + "条") logging.info("插入"+i_imp_table_name+":" + str(j - 1) + "条") # 如果工作时间段中识别案子为0,邮件警告 if i_imp_table_name == "imp_ie_settlement" and j - 1 == 0: now = datetime.now() hour = now.hour if 9 <= hour < 17: send_error_email('AI赋能数据库同步脚本', '无自动识别结果', '请检查自动识别脚本是否正常运行') if v_pk_phhd[1:] != '': v_sql =''; v_mess=''; if i_imp_table_name == 'imp_zx_phrec': v_sql='update zx_phhd set exreq_times=47 where pk_phhd in (' + v_pk_phhd[1:]+')' v_mess="更新 zx_phhd exreq_times=47 " elif i_imp_table_name == 'imp_zx_phhd': v_sql='update zx_phhd set filetype_id="1" where pk_phhd in (' + v_pk_phhd[1:]+') and exreq_times=47 ' v_mess="更新 zx_phhd filetype_id=1 标志" elif i_imp_table_name.startswith('imp_zx_phapply'): v_sql='update zx_phhd set exreq_times=if(cstatus="2",exreq_times+1,108) where pk_phhd in (' + v_pk_phhd[1:]+') ' v_mess="更新 zx_phhd exreq_times=108 " elif i_imp_table_name.startswith('imp_ie_discharge'): v_sql='update zx_phhd set exreq_times=exreq_times+1 where pk_phhd in (' + v_pk_phhd[1:]+')' v_mess="更新 zx_phhd exreq_times=+1 " elif i_imp_table_name.startswith('imp_ie_settlement'): v_sql='update zx_phhd set exsuccess_flag="9" where pk_phhd in (' + v_pk_phhd[1:]+') and exreq_times>0 ' v_mess="更新 zx_phhd exsuccess_flag=9 标志" elif i_imp_table_name.startswith('imp_ie_paint'): v_sql = 'update zx_phhd set paint_flag="9" where pk_phhd in (' + v_pk_phhd[1:] + ') ' v_mess = "更新 zx_phhd paint_flag=9 标志" if v_sql !='': try: source_cursor.execute(v_sql) source_conn.commit() logging.info(v_mess+'成功!'+' from:'+i_table_name+' ,to:'+i_imp_table_name) except Exception as e: source_conn.rollback() logging.error(v_mess+"失败,原因"+str(e)) def main(): # path = os.path.abspath(os.path.dirname(__file__)) # # sys.stdout = Logger('sync_log.txt') # 设置请求连接超时时间,单位为秒 config.set_default(connection_timeout=60) config.set_default(expires=60) since = time.time() cfg = configparser.ConfigParser() os_name = platform.system() if os_name == "Windows": cfg.read("D:\\myBaseDoc\\fcb_config.ini") key = cfg.get("system", "appname")[8:] cfg.read("D:\\myBaseDoc\\wzx_config3.ini") elif os_name == "Linux": cfg.read("/home/ucloud/config/fcb_config.ini") key = cfg.get("system", "appname")[8:] cfg.read("/home/ucloud/config/wzx_config3.ini") #=1 正常图片 =2 涂抹前图片 down_type = cfg.get("system", "down_type") public_bucket = "pfucb" # 添加公共空间名称 open_file = cfg.get("system", "file_list") #查看连接是否成功cursor = conn.cursor() target_conn = pymysql.connect( user='root', passwd='test9Root', host="192.168.5.226", db='wzxphoto', port=3308, cursorclass=pymysql.cursors.DictCursor, ) # 取数据库连接参数 v_ssh_address_or_host = cfg.get("database", "ssh_address_or_host") v_ssh_username = cfg.get("database", "ssh_username") v_ssh_password = cfg.get("database", "ssh_password") v_remote_bind_address = cfg.get("database", "remote_bind_address") v_mysql_user = cfg.get("database", "mysql_user") v_mysql_password = cfg.get("database", "mysql_password") v_mysql_database = cfg.get("database", "mysql_database") v_mysql_port = int(cfg.get("database", "mysql_port")) #解密参数 v_ssh_address_or_host = Prpcrypt(key).decrypt(v_ssh_address_or_host) #v_ssh_address_or_host = '106.75.249.245' v_ssh_username = Prpcrypt(key).decrypt(v_ssh_username) v_ssh_password = Prpcrypt(key).decrypt(v_ssh_password) v_remote_bind_address = Prpcrypt(key).decrypt(v_remote_bind_address) v_mysql_user = Prpcrypt(key).decrypt(v_mysql_user) v_mysql_password = Prpcrypt(key).decrypt(v_mysql_password) v_mysql_database = Prpcrypt(key).decrypt(v_mysql_database) # v_mysql_user = "drgphoto" # v_mysql_password = "PfcB8.67Xl" # v_mysql_database = "wzx2017" server = SSHTunnelForwarder( ssh_address_or_host=(v_ssh_address_or_host, 22), # 指定ssh登录的跳转机的address ssh_username=v_ssh_username, # 跳转机的用户 ssh_password=v_ssh_password, # 跳转机的密码 remote_bind_address=(v_remote_bind_address, v_mysql_port), ) server.start() source_conn = pymysql.connect( user=v_mysql_user, passwd=v_mysql_password, host="127.0.0.1", # 此处必须是是127.0.0.1 # host="10.23.42.171", #此处必须是是127.0.0.1 db=v_mysql_database, port=server.local_bind_port, cursorclass=pymysql.cursors.DictCursor, ) target_cursor = target_conn.cursor() #清空数据以前先执行 # v_sql="select a.pk_phhd from imp_zx_phapply a left join zx_phhd b on a.pk_phhd=b.pk_phhd where a.cStatus='3' and b.pk_phhd is null" v_sql =''' SELECT a.pk_phhd FROM imp_zx_phapply a LEFT JOIN zx_phhd b ON a.pk_phhd = b.pk_phhd WHERE b.pk_phhd IS NULL UNION ALL SELECT a.pk_phhd FROM imp_zx_phapply a INNER JOIN zx_phhd b ON a.pk_phhd = b.pk_phhd WHERE a.billdate > b.billdate; ''' # a.cStatus = '3' AND a.cStatus = '2' AND target_cursor.execute(v_sql) from_data = target_cursor.fetchall() c_pk_phhdlist = ','.join(str(rec['pk_phhd']) for rec in from_data) c_nopk_phhd='' if target_cursor.rowcount>0: c_nopk_phhd=" or (h.pk_phhd in ("+c_pk_phhdlist+")) " try: v_sql='select imp_table_name from sys_sync_tables where sync_direct="1" and exp_table_delfalg="1" AND imp_table_name<>exp_table_name ;' target_cursor.execute(v_sql) from_data = target_cursor.fetchall() if len(from_data) >0: for rec in from_data: if rec['imp_table_name'].startswith('imp_'): v_sql ='delete from '+ rec['imp_table_name']+';' target_cursor.execute(v_sql) target_conn.commit except Exception as e: target_conn.rollback() logging.error("删除目标表失败"+"imp_开头表,原因"+str(e)) source_cursor = source_conn.cursor() try: v_sql='select imp_table_name from sys_sync_tables where sync_direct="2" and exp_table_delfalg="1" AND imp_table_name<>exp_table_name ;' target_cursor.execute(v_sql) from_data = target_cursor.fetchall() if len(from_data) >0: for rec in from_data: if rec['imp_table_name'].startswith('imp_'): v_sql ='delete from '+ rec['imp_table_name']+';' source_cursor.execute(v_sql) source_conn.commit except Exception as e: source_conn.rollback() logging.error("删除源表失败"+"imp_开头表,原因"+str(e)) #v_sql ="select date_format(date_sub(max(sync_date),interval 1 day) , '%Y-%m-%d') pk_maxvalue from wzx2017.sys_sybnc_date" v_sql ="select date_format(ifnull(max(sync_date),CURRENT_DATE), '%Y-%m-%d') pk_maxvalue from sys_sync_date " target_cursor.execute(v_sql) results = target_cursor.fetchone() v_sync_begindate = results['pk_maxvalue'] # v_sync_begindate ='2022-04-01' v_sql = 'select * from sys_sync_tables where sync_direct="1" and del_flag=0 order by 1 ' target_cursor.execute(v_sql) from_data = target_cursor.fetchall() for rec in from_data: v_exp_table_name = rec['exp_table_name'] v_imp_table_name = rec['imp_table_name'] v_imp_table_maxsql = rec['imp_table_maxsql'] v_fields = rec['sync_fields'].replace('\r', '').replace('\n', '').replace(' ', '') v_query_table = rec['query_table'].replace('#fields', v_fields).replace('#exp_table_name',v_exp_table_name) v_insert_table = rec['insert_table'].replace('#fields',v_fields).replace('#imp_table_name',v_imp_table_name) if v_imp_table_name=='imp_zx_phrec' or v_imp_table_name=='imp_zx_phhd': v_query_table=v_query_table.replace('#nopk_phlist',c_nopk_phhd) # print(v_imp_table_maxsql) if v_imp_table_maxsql != '-': v_sql=v_imp_table_maxsql.replace('#dbname',db_name).replace('#imp_table_name',v_exp_table_name) target_cursor.execute(v_sql) results = target_cursor.fetchone() v_pk_maxvalue = results['pk_maxvalue'] v_query_table =v_query_table.replace('#pk_maxvalue',str(v_pk_maxvalue)) sync_dbtarget(source_cursor,source_conn,target_cursor,target_conn,v_fields,v_query_table,v_insert_table,v_exp_table_name,v_sync_begindate,v_imp_table_name) time_elapsed = time.time() - since v_title = "花费{mtime:.0f}分{stime:.0f}秒时间,同步成功!".format( mtime=time_elapsed // 60, stime=time_elapsed % 60) logging.info(v_title) v_sql = 'insert into sys_sync_date(sync_date,sync_direct,depiction) values(CURRENT_TIMESTAMP,"1","{}") '.format(v_title) try: target_cursor.execute(v_sql) target_conn.commit() except Exception as e: logging.error('追加记录至sys_sybnc_date失败?' + ",原因" + str(e)) target_conn.rollback() v_sql = 'call pro_sync_waitfor_ocrdata(1,@dd);' try: target_cursor.execute(v_sql) target_conn.commit() except Exception as e: logging.error('同步至待ocr 识别完成失败(pro_sync_waitfor_ocrdata)?' + ",原因" + str(e)) target_conn.rollback() v_sql = 'select * from sys_sync_tables where sync_direct="2" and del_flag=0 order by 1 ' target_cursor.execute(v_sql) from_data = target_cursor.fetchall() for rec in from_data: v_exp_table_name = rec['exp_table_name'] v_imp_table_name = rec['imp_table_name'] v_imp_table_maxsql = rec['imp_table_maxsql'] v_fields = rec['sync_fields'].replace('\r', '').replace('\n', '').replace(' ', '') v_query_table = rec['query_table'].replace('#fields', v_fields).replace('#exp_table_name', v_exp_table_name) v_insert_table = rec['insert_table'].replace('#fields', v_fields).replace('#imp_table_name', v_imp_table_name) # print(v_imp_table_maxsql) if v_imp_table_maxsql != '-': v_sql = v_imp_table_maxsql.replace('#dbname', db_name).replace('#imp_table_name', v_exp_table_name) target_cursor.execute(v_sql) results = target_cursor.fetchone() v_pk_maxvalue = results['pk_maxvalue'] v_query_table = v_query_table.replace('#pk_maxvalue', str(v_pk_maxvalue)) sync_dbtarget(target_cursor, target_conn,source_cursor, source_conn, v_fields, v_query_table, v_insert_table, v_exp_table_name, v_sync_begindate,v_imp_table_name) time_elapsed = time.time() - since v_title = "花费{mtime:.0f}分{stime:.0f}秒时间,同步成功!".format( mtime=time_elapsed // 60, stime=time_elapsed % 60) logging.info(v_title) v_sql = 'insert into sys_sync_date(sync_date,sync_direct,depiction) values(CURRENT_TIMESTAMP,"2","{}") '.format(v_title) try: target_cursor.execute(v_sql) target_conn.commit() except Exception as e: logging.error('追加记录至sys_sybnc_date失败?' + ",原因" + str(e)) target_conn.rollback() v_sql = 'call pro_photoocr_update(1,@dd);' try: source_cursor.execute(v_sql) source_conn.commit() except Exception as e: logging.error('ocr识别结果同步完成失败(pro_photoocr_update)?' + ",原因" + str(e)) source_conn.rollback() v_sql = "select a.pk_phhd from imp_zx_phapply a left join zx_phhd b on a.pk_phhd=b.pk_phhd where a.cStatus='2' and b.pk_phhd is null;" target_cursor.execute(v_sql) reccount = target_cursor.rowcount from_data = target_cursor.fetchall() v_pk_phhd = '' v_error='' if reccount > 0: for row_num, row in enumerate(from_data): v_pk_phhd = v_pk_phhd + ',' + str(row['pk_phhd']) v_sql = "update zx_phhd set filetype_id='0' where pk_phhd in ("+v_pk_phhd[1:]+") and cStatus='2' and filetype_id='1'; " try: source_cursor.execute(v_sql) source_conn.commit() except Exception as e: v_error='(存在待ocr识别但没有成功的案子,自动处理失败' + ",原因" + str(e)+')' logging.error(v_error) source_conn.rollback() v_sql = "select count(1) jl from zx_phhd where paint_flag in ('1','2');" target_cursor.execute(v_sql) from_data = target_cursor.fetchone() v_paint=from_data['jl'] v_sql = "select count(1) jl from imp_zx_phapply a left join zx_phhd b on a.pk_phhd=b.pk_phhd where a.cStatus='3' and b.pk_phhd is null;" target_cursor.execute(v_sql) from_data = target_cursor.fetchone() v_paintno = from_data['jl'] v_defpaintno='' if v_paintno>0: v_defpaintno="(存在需要脱敏但不存在的案子{fnum}个)".format(fnum=v_paintno) #关闭数据库连接 source_conn.close() target_conn.close() server.close() html_msg = ' ' v_disdate = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) v_head = '{cerror}{nopaint}案子待ocr识别同步至数据库成功,ocr识别结果反馈成功,花费{mtime:.0f}分{stime:.0f}秒时间,结束时间{dtype}!'.format(mtime=time_elapsed // 60, stime=time_elapsed % 60, dtype=v_disdate, cerror=v_error, nopaint=v_defpaintno, ) # v_head=v_mess+v_head ## 定义上午11点和中午12点的时间对象 now = datetime.now() v_hour = now.hour v_minute = now.minute if ((v_hour == 8 and 30 <= v_minute < 45) or (v_hour == 11 and 0 <= v_minute < 15) or (v_hour == 17 and 0 <= v_minute < 15)) or v_paint>300 : send_data(html_msg, v_head,v_paint) # os._exit(0)