mongodb清理删除历史数据
批量清理mongodb历史数据
清理程序的原来
-
目前项目组上很多平台上线历史数据积压,导致入库查询数据缓慢,历史数据有些已经归档,进行历史数据清理删除。 -
之前临时写shell脚本,太简陋,重新使用Python进行改造,新增备份功能,和配置文件删除指定字段和时间范围内数据。
代码篇
#!/usr/local/python3/bin/python3
import configparser,logging.config,sys,os,subprocess
import pymongo,ast
# from pymongo import MongoClient
from datetime import datetime,timedelta
from urllib import parse
def init_mongodb(MongoDBAuth):
if mongodb_auth:
username = parse.quote_plus(mongodb_user)
password = parse.quote_plus(mongodb_passwd)
ConnPasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/"
try:
clients = pymongo.MongoClient(ConnPasswd)
logger.info("init mongodb conn: " + ConnPasswd)
return clients
except Exception as e:
logger.info("use mongodb user pass conn err: " + str(e))
return False
else:
try:
clients = pymongo.MongoClient(mongodb_ip, int(mongodb_port))
logger.info("init mongodb conn: " + mongodb_ip +":" +mongodb_port)
return clients
except Exception as e:
logger.info("use mongodb user pass conn err: " + str(e))
return False
#查看出全部db
def get_mongodb_dbname():
db_names_list = []
db_names = mongo_client.list_database_names()
for db_name in db_names:
db_names_list.append(db_name)
for filter_dbname in need_filter_dbname_list:
if filter_dbname in db_names_list:
db_names_list.remove(filter_dbname)
logger.info("delete need filter dbname: " + filter_dbname)
# logger.info("get all db_name: " +str(db_names_list))
return db_names_list
#查询出db中全部表
def get_mongodb_tables(entid):
db_collections_list = []
db=mongo_client[entid]
collections = db.list_collection_names()
for collection in collections:
db_collections_list.append(collection)
logger.debug("get " + entid + " all collections: " +str(db_collections_list))
return db_collections_list
#查询集合中索引索引和是否分片
def get_index_key_tables(entid,collection_name):
index_list = []
formatted_results = []
db=mongo_client[entid]
collection=db[collection_name]
indexes = collection.list_indexes()
ns_name = entid + "." + collection_name
for result in indexes:
formatted_result = {k.upper(): v for k, v in result.items()}
each_key = formatted_result.get("KEY")
ns_name = formatted_result.get("NS")
ok_index = {key: value for key, value in each_key.items()}
index_list.append(ok_index)
index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d['_id'] == 1)]
collection_stats = db.command("collstats", collection_name)
collection_sharded = collection_stats.get("sharded", False)
if len(index_list) != 0:
logger.debug("get collection " + ns_name + " index: " +str(index_list))
#logger.info("get now In the collection " + ns_name + " sharded status: " +str(collection_sharded))
return index_list,collection_sharded
#创建集合索引
def craete_index(entid,collection_name,index):
db=mongo_client[entid]
collection=db[collection_name]
logger.info("need craete index: " + entid +"."+collection_name + " : "+ str(index))
# index = (list(index.keys())[0], list(index.values())[0])
index = [(k, v) for k, v in index.items()]
result = collection.create_index(index)
logger.info("mongodb " +entid +"."+collection_name + " create index return msg: " + str(result) )
#查看对应dbname是否已经是shards,弃用
def is_database_sharded(database_name):
db = mongo_client["admin"]
sharded_databases = db.command("listshards")["shards"]
for shard in sharded_databases:
if database_name in db.command("listdatabases")["databases"]:
return True
return False
#创建分片索引片键
def create_sharded_func(entid, collection_name, shard_key):
db = mongo_client["admin"]
collection_path = '{}.{}'.format(entid, collection_name)
logger.info("need craete sharded key : " + collection_path + " : " + str(shard_key))
sharding_colunm,sharding_type = "",""
for key, value in shard_key.items():
sharding_colunm= key
sharding_type = value
try:
db.command('enableSharding', entid)
except Exception as e:
logger.error("create dbname sharded key error: return: " + str(e))
try:
result = db.command('shardCollection', collection_path,key = {sharding_colunm:sharding_type})
logger.info(entid + "." + collection_path + " create sharded key return: " + str(result))
except Exception as e:
logger.error("create sharded key error: return: " + str(e))
#读取文件获取对应索引和片键key信息
def read_file_index(index_file):
index_list = []
Shard_list = []
with open(index_file, 'r') as f:
for line in f.readlines():
line = line.replace(" ", "")
#通过mongodbShard: 来区分那个片键的可以,写
# print(line)
if "mongodbShard:" not in line:
table, key_str = line.strip().split("=")
key = ast.literal_eval(key_str)
index_list.append({table: key})
else:
Shard_key_str = line.strip().split("mongodbShard:")[1]
Shard_key_str = ast.literal_eval(Shard_key_str)
Shard_list.append(Shard_key_str)
return index_list,Shard_list
#获取多少天前的时间戳
def get_timestamp_days_ago(get_days):
# 获取当前日期和时间
now = datetime.now()
# 减去30天
date_30_days_ago = now - timedelta(days=int(get_days))
# 将结果转换为当天的整点00:00:00
date_start_of_day = date_30_days_ago.replace(hour=0, minute=0, second=0, microsecond=0)
# 将结果转换为时间戳
timestamp = int(date_start_of_day .timestamp())
return timestamp
#判断字符串类型和长度对应返回需要删除的时间字段值
def if_string_type(data_stamp):
del_timestamp = ""
get_need_del_timestamp = get_timestamp_days_ago(int(Del_day))
if isinstance(data_stamp, str) and len(data_stamp) == 10:
del_timestamp = str(get_need_del_timestamp)
if isinstance(data_stamp, str) and len(data_stamp) == 13:
del_timestamp = str(get_need_del_timestamp) + "000"
if isinstance(data_stamp, int) and len(str(data_stamp)) == 10:
del_timestamp = get_need_del_timestamp
if isinstance(data_stamp, int) and len(str(data_stamp)) == 13:
del_timestamp = int(get_need_del_timestamp) * 1000
return del_timestamp
#获取该集合中一条数据
def get_one_data(entid,collection_name):
db=mongo_client[entid]
collection=db[collection_name]
Filter_conditions_key = str(need_del_table_field)
result = collection.find_one({}, {**{Filter_conditions_key: 1}, '_id': 0})
if result and Filter_conditions_key in result:
start_time_value = result.get(Filter_conditions_key)
logger.debug("get "+ entid + "." + collection_name + " Corresponding " +Filter_conditions_key + " field value: " + str(start_time_value) )
return start_time_value
else:
# logger.info("No " +Filter_conditions_key + " field found in the document. return: " + str(result) )
return False
# 按照日期删除该集合中历史数据
def del_data(entid,collection_name,get_del_timestamp):
db=mongo_client[entid]
collection=db[collection_name]
Filter_conditions_key = str(need_del_table_field)
Filter_conditions_value = get_del_timestamp
try:
result = collection.delete_many({Filter_conditions_key: {"$lt": Filter_conditions_value}})
logger.info(entid +" run sql: db"+"."+collection_name+".remove({"+Filter_conditions_key+ ":"+"{$lt:"+str(Filter_conditions_value) +"})")
if result.deleted_count > 0:
logger.info("By date delete " + str(entid) + "." + collection_name + " less than " + str(get_del_timestamp) + " del document count: " + str(result.deleted_count))
except Exception as e:
logger.error("Error occurred while deleting documents: " + str(e))
# 删除该集合中全部历史数据
def del_all_data(entid,collection_name):
db=mongo_client[entid]
collection=db[collection_name]
try:
result = collection.delete_many({})
if result.deleted_count > 0:
logger.info(entid + " run sql: db"+"."+collection_name+".remove({})")
logger.info(entid + "." + collection_name + " del all document count: " + str(result.deleted_count))
except Exception as e:
logger.info(entid + "." + collection_name + " del all document error: " + str(result) )
# 备份数据
def dump_mongodb_data(dbname,table,not_quiet_dump,del_time):
status_info = ["1"]
if is_del_bakcup_data:
if os.path.exists(mongodump_command_path):
run_status = " && echo $?"
run_commnd = ""
if not_quiet_dump:
if mongodb_auth:
#run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd + " -d " + dbname + " -c " + table + " -q '{" + need_del_table_field + ": {" + + "}}'" + " -o " + bakcup_dir_path
run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}"
else:
# run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path
run_commnd = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table} -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}"
else:
if mongodb_auth:
# run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path
run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -o {bakcup_dir_path}"
else:
# run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path
run_commnd = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table} -o {bakcup_dir_path}"
logger.info("run command: " + run_commnd)
try:
msg = os.popen(run_commnd + run_status)
status_info = [line.strip() for line in msg.readlines()]
logger.info("mongodump command result: " + str(status_info))
except Exception as e:
logger.error("mongodump command error: " + str(e))
else:
logger.info("mongodump command file not exists ," + mongodump_command_path)
else:
logger.debug("config file not set is_del_bakcup_data = True, not dump data")
return status_info
if __name__=="__main__":
cfgpath = "./cfg/config.ini"
conf = configparser.ConfigParser()
conf.read(cfgpath)
mongodb_ip = conf.get("main", "mongodb_ip")
mongodb_port = conf.get("main", "mongodb_port")
mongodb_auth = conf.getboolean("main", "mongodb_auth")
mongodb_user = conf.get("main", "mongodb_user")
mongodb_passwd = conf.get("main", "mongodb_passwd")
mongodb_auth_db = conf.get("main", "mongodb_auth_db")
need_filter_dbname = conf.get("main", "need_filter_dbname")
is_del_bakcup_data = conf.getboolean("main", "is_del_bakcup_data")
bakcup_dir_path = conf.get("main", "bakcup_dir_path")
mongodump_command_path = conf.get("main", "mongodump_command_path")
Del_day = conf.get("main", "Del_day")
need_del_table_field = conf.get("main", "need_del_table_field")
need_del_table_list = conf.get("main", "need_del_table_list")
need_del_table_list = [item for item in need_del_table_list.split(",") if item != '']
need_del_null_table_list = conf.get("main", "need_del_null_table_list")
need_del_null_table_list = [item for item in need_del_null_table_list.split(",") if item != '']
auth_get_entid = conf.getboolean("main", "auth_get_entid")
need_filter_dbname_list = [item for item in need_filter_dbname.split(",") if item != '']
#获取配置项
all_ent_id = conf.get("main", "ent_id")
get_dbname_list = all_ent_id.split(",")
logging.config.fileConfig("./cfg/logger.conf")
logger = logging.getLogger("rotatfile")
# 初始化 MongoDB
mongo_client = init_mongodb(mongodb_auth)
if mongo_client:
logger.info("MongoDB init successfully")
else:
logger.error("Failed to initialize MongoDB")
sys.exit(10)
if auth_get_entid:
get_dbname_list = get_mongodb_dbname()
logger.info("get all dbname list: " + str(get_dbname_list))
else:
logger.info("file get dbname list: " + str(get_dbname_list))
for dbname in get_dbname_list:
get_end_all_table = get_mongodb_tables(dbname)
for table in need_del_table_list:
get_one_data_mes = get_one_data(dbname,table)
if table in get_end_all_table:
get_index_key_tables(dbname,table)
else:
logger.error(dbname + " not have table: " + table)
continue
# break
#删除按照日期数据
if get_one_data_mes:
get_del_timestmap = if_string_type(get_one_data_mes)
if dump_mongodb_data(dbname,table,True,get_del_timestmap)[0] == '0' or is_del_bakcup_data == False:
if get_del_timestmap:
del_data(dbname,table,get_del_timestmap)
else:
logger.error("get del timestmap fail")
else:
if is_del_bakcup_data == False:
logger.error("is_del_bakcup_data seting False, dump mongodb data fail")
else:
logger.error("dump mongodb data fail, but is del backup data")
for null_table in need_del_null_table_list:
if dump_mongodb_data(dbname,null_table,False,"1")[0] == '0' or is_del_bakcup_data == False:
if null_table in get_end_all_table:
#删除全部历史数据
del_all_data(dbname,null_table)
else:
logger.error( dbname + " not have table: " + null_table)
else:
if is_del_bakcup_data == False:
logger.error("is_del_bakcup_data seting False, dump mongodb data fail")
else:
logger.error("dump mongodb data fail, but is del backup data")
mongo_client.close()
logger.info("MongoDB closed")
配置文件篇
-
该配置项大概使用说明 -
支持删除指定时间前,进行数据备份在删除,根据不同配置项进行配置; -
同理可支持不进行备份,也可以清理删除,根据不同配置项进行配置; -
根据字段来查询过滤。
-
[DEFAULT]
mongodb_ip = 10.130.47.197
mongodb_port = 40000
mongodb_auth = False
mongodb_user = admin
mongodb_passwd = test@123
mongodb_auth_db = admin
#从全部dbname中进行过滤不需要处理的dbname,使用逗号分割
need_filter_dbname = local,config,admin
#指定需要按照日期删除的集合,使用逗号分割
need_del_table_list = new_r_ags_e_back,call_detail_back
#指定需要按照日期删除的集合字段过滤
need_del_table_field = start_time
#指定清空删除的集合,使用逗号分割
need_del_null_table_list = call_duration_cache,duration_cache
[main]
#是否自动获取对应mongodb中全部dbname
auth_get_entid = False
#从配置文件中获取dbname
ent_id = 20241205,20250107
#需要删除多少天以前的数据
Del_day = 97
#是否需要备份数据
is_del_bakcup_data = False
#备份目录
bakcup_dir_path = ./data
#备份命令路径
mongodump_command_path = /home/devops/Python/Mongodb_del_history/mongodump
脚本运行
-
脚本运行
[devops@db1 Mongodb_del_history]$ tar xf Mongodb_del_history.tar.gz
[devops@db1 Mongodb_del_history]$ cd Mongodb_del_history
[devops@db1 Mongodb_del_history]$ nohup ./del_history_data &
2025-01-06 14:15:01 139749303605056 del_history_data.py:24 INFO init mongodb conn: 10.130.47.197:40000
2025-01-06 14:15:01 139749303605056 del_history_data.py:303 INFO MongoDB init successfully
2025-01-06 14:15:01 139749303605056 del_history_data.py:39 INFO delete need filter dbname: local
2025-01-06 14:15:01 139749303605056 del_history_data.py:310 INFO get all dbname list: ['0103290010', '0103290012', '0103290013', '0103290015']
2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: jhk_task_status
2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: sd_call_detail_back
2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command: /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c call_duration_cache -o ./data
2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: ['0']
2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command: /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c duration_cache -o ./data
2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: ['0']
2025-01-06 14:15:01 139749303605056 del_history_data.py:347 INFO MongoDB closed
二进制文件程序下载
-
使用链接下载
wget https://zhao138969.com/LinuxPackage/Python/del_history_data
本文由 mdnice 多平台发布