当前位置: 首页 > article >正文

使用Python实现系统时间跳变检测与日志记录

文章目录

    • 概述
    • 脚本实现
      • 参数解析
      • 时间戳获取与转换
      • 日志轮转
      • 时间跳变检测逻辑
      • 主循环
    • 使用方法
      • 运行脚本
        • 自定义参数
      • 手动修改系统时间以测试时间跳变检测
      • 检查日志文件

概述

之前写过:

  • 单线程版本的高精度时间日志记录小程序:C++编程:实现简单的高精度时间日志记录小程序(单线程)
  • 多线程版本的高精度时间日志记录小程序:使用C++实现高精度时间日志记录与时间跳变检测[多线程版本]
  • 使用shell实现高精度时间日志记录与时间跳变检测

本文将使用Python脚本实现类似的功能,该Python脚本主要实现以下功能:

  1. 定时记录时间戳:按照指定的毫秒间隔记录当前系统时间。
  2. 每1000次打印一次时间戳:在正常情况下,每经过1000次记录后,将当前时间戳写入日志文件,减少日志量。
  3. 检测时间跳变
    • 时间回退:如果当前时间戳小于上一个时间戳,标记为时间跳变。
    • 时间跳跃:如果当前时间戳与倒数第二个时间戳之间的实际间隔小于1.5倍的预期间隔,标记为时间跳变。
  4. 记录跳变前后的时间戳:在检测到时间跳变时,记录跳变前的10个时间戳和跳变后的10个时间戳,并在跳变的时间戳上标记[TIME_JUMP]
  5. 日志轮转:当日志文件大小达到指定阈值(默认100MB)时,自动进行日志轮转,避免日志文件过大。

脚本实现

以下是完整的Python脚本代码,随后将逐步解析其实现细节。

#!/usr/bin/env python3

import argparse
import time
from datetime import datetime
import os
import sys
from collections import deque

def parse_arguments():
    parser = argparse.ArgumentParser(description='Time Jump Detection Script')
    parser.add_argument('-i', '--interval', type=int, default=20, help='Set the time interval in milliseconds (default: 20)')
    parser.add_argument('-f', '--file', type=str, default='timestamps.txt', help='Set the output filename (default: timestamps.txt)')
    parser.add_argument('-t', '--time', type=int, default=7200, help='Set the run time in seconds (default: 7200)')
    parser.add_argument('--disable_selective_logging', action='store_true', help='Disable selective logging feature')
    return parser.parse_args()

def get_current_time_str():
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')

def get_timestamp_microseconds(time_str):
    try:
        dt = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S.%f')
    except ValueError:
        # Handle cases where microseconds are missing
        dt = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
    epoch = datetime(1970, 1, 1)
    delta = dt - epoch
    return int(delta.total_seconds() * 1_000_000)

def rotate_log(filename, max_size):
    if os.path.exists(filename):
        size = os.path.getsize(filename)
        if size >= max_size:
            new_filename = f"{filename}.{datetime.now().strftime('%Y%m%d%H%M%S')}"
            try:
                os.rename(filename, new_filename)
                print(f"Rotated log file to {new_filename}")
            except OSError as e:
                print(f"Failed to rotate log file: {e}", file=sys.stderr)

def main():
    args = parse_arguments()
    interval_sec = args.interval / 1000.0
    filename = args.file
    run_time = args.time
    selective_logging = not args.disable_selective_logging
    max_file_size = 100 * 1024 * 1024  # 100MB

    print(f"Time Interval: {args.interval} milliseconds")
    print(f"Output File: {filename}")
    print(f"Run Time: {run_time} seconds")
    print(f"Selective Logging: {'Enabled' if selective_logging else 'Disabled'}")

    # Initialize variables
    last_timestamp_us = 0
    second_last_timestamp_us = 0
    total_timestamps = 0
    in_jump_mode = False
    jump_remaining = 0
    pre_jump_timestamps = deque(maxlen=10)

    # Initialize log file
    try:
        with open(filename, 'w') as f:
            pass  # Just to create or clear the file
    except Exception as e:
        print(f"Failed to initialize log file: {e}", file=sys.stderr)
        sys.exit(1)

    try:
        with open(filename, 'a') as f:
            start_time = time.time()
            while (time.time() - start_time) < run_time:
                current_time_str = get_current_time_str()
                current_timestamp_us = get_timestamp_microseconds(current_time_str)

                time_jump = False
                if last_timestamp_us != 0 and second_last_timestamp_us != 0 and selective_logging:
                    # Check for time regression (time going backwards)
                    if current_timestamp_us < last_timestamp_us:
                        time_jump = True
                    else:
                        # Check if the interval is too large based on second last timestamp
                        expected_interval_us = args.interval * 1000
                        actual_interval_us = current_timestamp_us - second_last_timestamp_us
                        threshold_us = int(expected_interval_us * 1.5)  # 1.5x threshold
                        if actual_interval_us < threshold_us:
                            time_jump = True

                # Update timestamps
                second_last_timestamp_us = last_timestamp_us
                last_timestamp_us = current_timestamp_us

                # Add current timestamp to pre-jump buffer
                pre_jump_timestamps.append(current_time_str)

                if selective_logging and time_jump and not in_jump_mode:
                    # Detected a time jump, enter jump mode
                    in_jump_mode = True
                    jump_remaining = 10  # Number of post-jump timestamps to record

                    # Log pre-jump timestamps
                    f.write("\n--- TIME JUMP DETECTED ---\n")
                    for ts in list(pre_jump_timestamps):
                        f.write(f"{ts}\n")

                    # Log current (jump) timestamp with [TIME_JUMP]
                    f.write(f"{current_time_str} [TIME_JUMP]\n")
                    f.flush()

                elif in_jump_mode:
                    # In jump mode, record post-jump timestamps
                    f.write(f"{current_time_str}\n")
                    f.flush()
                    jump_remaining -= 1
                    if jump_remaining <= 0:
                        in_jump_mode = False

                else:
                    # Normal logging: every 1000 timestamps
                    total_timestamps += 1
                    if total_timestamps % 1000 == 0:
                        f.write(f"{current_time_str}\n")
                        f.flush()

                # Rotate log if needed
                rotate_log(filename, max_file_size)

                # Sleep for the specified interval
                time.sleep(interval_sec)
    except KeyboardInterrupt:
        print("Program interrupted by user.")
    except Exception as e:
        print(f"An error occurred: {e}", file=sys.stderr)

    print("Program has ended.")

if __name__ == "__main__":
    main()

参数解析

脚本通过命令行参数进行配置,支持以下选项:

  • -i--interval:设置时间记录的间隔,单位为毫秒,默认值为20毫秒。
  • -f--file:设置输出日志文件的名称,默认值为timestamps.txt
  • -t--time:设置脚本运行的总时长,单位为秒,默认值为7200秒(2小时)。
  • --disable_selective_logging:禁用选择性记录功能,即不记录时间跳变前后的时间戳。

通过argparse模块,脚本能够灵活地接受和解析这些参数,使其更具通用性和可配置性。

时间戳获取与转换

脚本通过以下两个函数获取并转换当前时间:

  • get_current_time_str():获取当前系统时间,格式为YYYY-MM-DD HH:MM:SS.UUUUUU,精确到微秒。
  • get_timestamp_microseconds(time_str):将时间字符串转换为自1970年1月1日以来的微秒数,用于后续的时间跳变检测。

这种转换方式确保了时间戳的精确性和可比较性,为时间跳变检测提供了基础。

日志轮转

为了防止日志文件过大,脚本实现了日志轮转功能:

def rotate_log(filename, max_size):
    if os.path.exists(filename):
        size = os.path.getsize(filename)
        if size >= max_size:
            new_filename = f"{filename}.{datetime.now().strftime('%Y%m%d%H%M%S')}"
            try:
                os.rename(filename, new_filename)
                print(f"Rotated log file to {new_filename}")
            except OSError as e:
                print(f"Failed to rotate log file: {e}", file=sys.stderr)

当日志文件大小达到或超过max_size(默认100MB)时,脚本会将当前日志文件重命名为包含时间戳的文件名,并创建一个新的日志文件继续记录。这种机制确保了日志文件的可管理性和系统的稳定运行。

时间跳变检测逻辑

时间跳变检测是脚本的核心功能,主要通过以下逻辑实现:

  1. 时间回退检测:如果当前时间戳小于上一个时间戳,表示系统时间回退,标记为时间跳变。
  2. 时间跳跃检测:如果当前时间戳与倒数第二个时间戳之间的实际间隔小于1.5倍的预期间隔,标记为时间跳变。这种情况可能由于系统时间被大幅调整或其他异常导致。

检测到时间跳变后,脚本会记录跳变前的10个时间戳、跳变时的时间戳(标记为[TIME_JUMP]),以及跳变后的10个时间戳,以便后续分析和诊断。

主循环

脚本的主循环负责持续记录时间戳、检测时间跳变以及执行日志轮转。以下是主要步骤:

  1. 获取当前时间戳:通过get_current_time_str()get_timestamp_microseconds()获取并转换当前时间。
  2. 时间跳变检测:根据上述逻辑判断是否发生时间跳变。
  3. 记录时间戳
    • 正常情况:每1000次记录一次时间戳,减少日志量。
    • 检测到跳变:记录跳变前后的时间戳,并标记跳变事件。
  4. 日志轮转:检查日志文件大小,必要时执行轮转。
  5. 暂停:按照设定的时间间隔暂停,控制记录频率。

使用方法

运行脚本

使用默认参数运行脚本:

./time_jump_check.py
自定义参数
  • 设置时间间隔为10毫秒,运行时间为1小时,输出文件为output.txt

    ./time_jump_check.py -i 10 -t 3600 -f output.txt
    
  • 禁用选择性记录功能

    ./time_jump_check.py --disable_selective_logging
    

手动修改系统时间以测试时间跳变检测

在另一个终端中,使用以下命令手动修改系统时间:

sudo date -s "2024-10-09 12:30:00"

注意:手动修改系统时间需要管理员权限,并且会影响系统上的其他程序,请谨慎操作。

检查日志文件

查看日志文件(默认timestamps.txt或自定义名称):

cat timestamps.txt

应包含时间戳记录,并在时间跳变时标记[TIME_JUMP]。例如:

2024-10-09 13:10:49.672701
...
--- TIME JUMP DETECTED ---
2024-10-09 12:30:00.002615 [TIME_JUMP]
2024-10-09 12:30:00.011569
2024-10-09 12:30:00.013045
...

http://www.kler.cn/news/341558.html

相关文章:

  • 【斯坦福CS144】Lab7
  • 基于floor函数报错注入sqli-labs less-5和less-6
  • Ajax面试题:(第二天)
  • 毕设分享 基于协同过滤的电影推荐系统
  • 自动化测试框架(全)
  • 2024双十一究竟买什么比较好?为您精选五款双十一必购好物清单!
  • 汽车主机厂主数据管理中一物多码或多码一物问题的具体表现有哪些?
  • 游戏盾是如何解决游戏行业攻击问题
  • LangChain使用few shot template
  • RK3568驱动指南|第十六篇 SPI-第190章 配置模式下寄存器的配置
  • 《Spring Microservices in Action, 2nd Edition》读后总结
  • vue 权限分组
  • 实景三维赋能矿山安全风险监测预警
  • “万万没想到”,“人工智能”获得2024年诺贝尔物理学奖
  • 比较三组迭代次数的变化
  • ElasticSearch之网络配置
  • 独享动态IP是什么?它有什么独特优势吗?
  • 低级语言和高级语言、大小写敏感、静态语言和动态语言、链接
  • ArgoWorkflow教程(六)---无缝实现步骤间参数传递
  • Unity3d使用JsonUtility.FromJson读取json文件