当前位置: 首页 > article >正文

python 使用 watchdog 实现类似 Linux 中 tail -f 的功能

一、代码实现

import logging
import os
import threading
import time

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

logger = logging.getLogger(__name__)


class LogWatcher(FileSystemEventHandler):
    def __init__(self, log_file, on_modified_callback=None):
        """"
        初始化 LogWatcher 类的实例。

        参数:
        - log_file:日志文件的路径
        - on_modified_callback:可选的回调函数,在文件修改时调用

        属性:
        - log_file:日志文件的路径
        - file_object:日志文件对象
        - on_modified_callback:文件修改回调函数
        - last_line:最后一行文本
        - observer:观察者对象
        - match_string:需要匹配的字符串
        - stop_watching:停止监视的标志
        """
        self.log_file = log_file
        self.file_object = open(log_file, 'rb')
        self.on_modified_callback = on_modified_callback
        self.last_line = self.get_last_line()  # 初始化时获取最后一行文本
        self.observer = Observer()
        self.observer.schedule(self, ".", recursive=False)

        self.match_string = None
        self.stop_watching = False

    def start(self):
        """
        启动观察者对象,开始监视文件变化。
        """
        self.observer.start()

    def stop(self):
        """
        停止观察者对象,结束监视文件变化。
        """
        self.observer.stop()
        self.observer.join()
        self.file_object.close()

    def get_last_line(self):
        """
        获取日志文件的最后一行文本。它通过将文件指针移动到文件末尾,然后逐个字符向前搜索,直到找到换行符为止。

        返回值:
        - 最后一行文本,如果文件为空则返回None
        """

        # 将文件指针移动到文件末尾
        self.file_object.seek(0, os.SEEK_END)

        # 获取当前文件指针的位置(此时指针在最后一行的末尾)
        position = self.file_object.tell()

        try:
            # 尝试向前移动两个字节
            new_position = max(position - 2, 0)
            self.file_object.seek(new_position, os.SEEK_SET)
        except OSError as e:
            # 如果发生错误,可能是文件太小,返回None
            return None

        # 逐个字符向前搜索,确保文件指针最终停在当前行的第一个字符处
        while True:
            # read(1)读取的是指针位置的下一个字符,每次调用read(1)都会读取一个字符,并将指针向后移动一个字符的位置。
            char = self.file_object.read(1).decode('utf-8', errors='ignore')
            if char == '\n':
                break
            if new_position == 0:
                # 如果已经到达文件开头,跳出循环
                break

            # 尝试向前移动一个字节位置,确保不越界到文件开头
            new_position = max(new_position - 1, 0)
            # 将文件指针移动到新的位置
            self.file_object.seek(new_position, os.SEEK_SET)

        # last_line = self.file_object.readline().decode('utf-8', errors='ignore').strip()
        last_line = self.file_object.read(position - new_position).decode('utf-8', errors='ignore').strip()

        # 输出调试信息
        logger.debug(f'Reading line: {last_line}')
        return last_line

    def on_modified(self, event):
        """
        on_modified方法是FileSystemEventHandler的回调方法,当日志文件发生变化时,都会调用这个方法。
        参数:
        - event:文件变化事件对象
        """
        # 注意,这里一个要用绝对路径比较,不能直接使用 event.src_path == self.log_file,
        # event.src_path == self.log_file 的值为false
        # if event.src_path == self.log_file:
        if os.path.abspath(event.src_path) == os.path.abspath(self.log_file):
            # 在文件发生变化时,实时获取最后一行文本
            self.last_line = self.get_last_line()

        # 用户可在外部传入一个回调方法,在文本发生变化时执行该事件
        if self.on_modified_callback:
            self.on_modified_callback()

        # 调用基类的同名方法,以便执行基类的默认行为
        super(LogWatcher, self).on_modified(event)

    def tail_last_line_and_match(self, match_string=None, max_match_seconds=10):
        """
        实时监控日志文件的变化,并实时获取最后一行文本。如果匹配到指定的字符串,停止监视。

        参数:
        - match_string:需要匹配的字符串
        """
        self.match_string = match_string
        self.start()

        end_time = time.time() + max_match_seconds

        try:
            while not self.stop_watching and time.time() <= end_time:

                if self.match_string and self.match_string in self.last_line:
                    self.stop_watching = True

        except KeyboardInterrupt:
            pass

        self.stop_watching = True  # 停止监视循环


def write_logs(log_file):
    """在新线程中写入日志"""
    for i in range(10):
        with open(log_file, 'a') as file:
            file.write(f'New log entry {i}\n')
        time.sleep(1)  # 每秒写入一次日志


if __name__ == '__main__':
    import logging

    logging.basicConfig(level=logging.DEBUG)

    log_file = 'demo.log'

    # 创建日志文件并写入示例日志
    with open(log_file, 'w') as file:
        file.write('This is the first line of the log.\n')
        file.write('This is the second line of the log.\n')

    log_watcher = LogWatcher(log_file)

    # 启动新线程写入日志
    write_thread = threading.Thread(target=write_logs, args=(log_file,))
    write_thread.start()

    # 启动实时监控日志文件变化,并打印最后一行文本,直到匹配到指定字符串或超时才停止监视
    log_watcher.tail_last_line_and_match(match_string='New log entry 9', max_match_seconds=20)

    # 等待写入线程结束
    write_thread.join()

三、Demo验证

运行代码,控制台的输出结果:

DEBUG:__main__:Reading line: This is the second line of the log.
DEBUG:__main__:Reading line: New log entry 0
DEBUG:__main__:Reading line: New log entry 1
DEBUG:__main__:Reading line: New log entry 2
DEBUG:__main__:Reading line: New log entry 3
DEBUG:__main__:Reading line: New log entry 4
DEBUG:__main__:Reading line: New log entry 5
DEBUG:__main__:Reading line: New log entry 6
DEBUG:__main__:Reading line: New log entry 7
DEBUG:__main__:Reading line: New log entry 8
DEBUG:__main__:Reading line: New log entry 9

Process finished with exit code 0

欢迎技术交流:


http://www.kler.cn/news/162604.html

相关文章:

  • MySQL 教程 2.1
  • 【React设计】React企业级设计模式
  • python-04(入门基础篇4——lists相关的部分语法)
  • IC入门必看| 数字IC前端设计学习路线与方法(内附学习视频)
  • keepalived 高可用主备
  • python基于轻量级GhostNet模型开发构建23种常见中草药图像识别系统
  • swagger入门
  • MySQL视图介绍与实验练习
  • QT 中 多线程(备查)
  • Spring基于注解开发
  • 剪映最新版的4.9,主要更新的功能(于2023年12月2日发布)
  • 3D Web轻量引擎HOOPS Communicator如何实现对大模型的渲染支持?
  • Java IO流(一) 基本知识
  • 你知道模拟养成游戏如何开发吗?
  • Gerber文件使用详解
  • LeetCode Hot100 17.电话号码的字母组合
  • MySQL高级--01_1--数据库缓冲池(buffer pool)
  • 【广州华锐互动VRAR】VR戒毒科普宣传系统有效提高戒毒成功率
  • 反序列化 [网鼎杯 2020 朱雀组]phpweb 1
  • A : DS静态查找之顺序查找
  • 视频后期特效处理软件 Motion 5 mac中文版
  • 使用java线程实现亿级数据处理
  • 【Python】pptx文件转pdf
  • 优秀案例 | 元宇宙双语财经科技主播“舒望”主持首届粤港澳大湾区元宇宙国际传播论坛
  • 多目标追踪评价指标
  • Android Kotlin 泛型:强大的类型抽象和重用利器
  • 智能优化算法应用:基于战争策略算法无线传感器网络(WSN)覆盖优化 - 附代码
  • 维基百科文章爬虫和聚类:高级聚类和可视化
  • 人工智能原理复习--搜索策略(一)
  • vue在哪个生命周期内调用异步请求