# 删除本地数据库中的过期数据 import logging.config from datetime import datetime, timedelta from db import MysqlSession from db.mysql import ZxPhhd, ZxIeCost, ZxIeDischarge, ZxIeResult, ZxIeSettlement, ZxPhrec from log import LOGGING_CONFIG # 过期时间 EXPIRATION_DAYS = 180 # 批量删除数量 BATCH_SIZE = 5000 # 数据库会话对象 session = None def batch_delete_by_pk_phhd(model, pk_phhds): """ 批量删除指定模型中主键在指定列表中的数据 参数: model:SQLAlchemy模型类(对应数据库表) pk_phhds:待删除的主键值列表 返回: 删除的记录数量 """ delete_count = ( session.query(model) .filter(model.pk_phhd.in_(pk_phhds)) .delete(synchronize_session=False) ) session.commit() logging.getLogger("sql").info(f"{model.__tablename__}成功删除{delete_count}条数据") return delete_count if __name__ == '__main__': logging.config.dictConfig(LOGGING_CONFIG) deadline = datetime.now() - timedelta(days=EXPIRATION_DAYS) session = MysqlSession() try: while 1: phhds = (session.query(ZxPhhd.pk_phhd) .filter(ZxPhhd.paint_flag == "9") .filter(ZxPhhd.billdate < deadline) .limit(BATCH_SIZE) .all()) if not phhds or len(phhds) <= 0: # 没有符合条件的数据,退出循环 break pk_phhd_values = [phhd.pk_phhd for phhd in phhds] logging.getLogger("sql").info(f"过期的pk_phhd有{','.join(map(str, pk_phhd_values))}") batch_delete_by_pk_phhd(ZxPhrec, pk_phhd_values) batch_delete_by_pk_phhd(ZxIeResult, pk_phhd_values) batch_delete_by_pk_phhd(ZxIeSettlement, pk_phhd_values) batch_delete_by_pk_phhd(ZxIeDischarge, pk_phhd_values) batch_delete_by_pk_phhd(ZxIeCost, pk_phhd_values) batch_delete_by_pk_phhd(ZxPhhd, pk_phhd_values) except Exception as e: session.rollback() logging.getLogger('error').error('过期数据删除失败!', exc_info=e) finally: session.close()