如何查看文件的更改?

我有一个日志文件正在写的另一个进程,我想观察变化。每次发生更改时,我都希望将新数据读入并对其进行一些处理。

最好的方法是什么?我希望在PyWin32库中有某种钩子。我找到了win32file.FindNextChangeNotification函数,但不知道如何要求它监视特定的文件。

如果有人做过类似的事情,我真的很感激能听到…

(编辑)我应该提到,我是在一个解决方案,不需要轮询。

(编辑)诅咒!这似乎不能在映射的网络驱动器上工作。我猜windows不会像在本地磁盘上那样“听到”任何对文件的更新。

399743 次浏览

我不知道任何Windows特有的函数。您可以尝试每秒钟/分钟/小时获取文件的MD5哈希值(取决于您需要它的速度),并将其与最后的哈希值进行比较。当它不同时,您知道文件已被更改,并读取最新的行。

我会尝试这样的方法。

    try:
f = open(filePath)
except IOError:
print "No such file: %s" % filePath
raw_input("Press Enter to close window")
try:
lines = f.readlines()
while True:
line = f.readline()
try:
if not line:
time.sleep(1)
else:
functionThatAnalisesTheLine(line)
except Exception, e:
# handle the exception somehow (for example, log the trace) and raise the same exception again
raw_input("Press Enter to close window")
raise e
finally:
f.close()

循环检查自上次读取文件以来是否有新行——如果有,则将其读取并传递给functionThatAnalisesTheLine函数。如果不是,脚本等待1秒并重试该进程。

检查我的答案类似的问题。您可以在Python中尝试相同的循环。这个页面建议:

import time


while 1:
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
else:
print line, # already has newline

另见问题tail()一个Python文件

如果轮询对您来说足够好,我只观察“修改的时间”文件统计是否发生变化。阅读方法:

os.stat(filename).st_mtime

(还要注意,Windows本机更改事件解决方案并不在所有情况下都有效,例如在网络驱动器上。)

import os


class Monkey(object):
def __init__(self):
self._cached_stamp = 0
self.filename = '/path/to/file'


def ook(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...

正如你在蒂姆·戈登的文章中看到的,由霍斯特•古特曼指向,WIN32是相对复杂的,它监视目录,而不是单个文件。

我想建议你看看IronPython,这是一个net的python实现。 在IronPython中,你可以使用所有net功能——包括

System.IO.FileSystemWatcher

它使用简单的事件接口处理单个文件。

好吧,因为你在使用Python,你可以打开一个文件,并继续从它读取行。

f = open('file.log')

如果读取的行是非空,则处理它。

line = f.readline()
if line:
// Do what you want with the line

你可能忽略了在EOF中继续调用readline是可以的。在这种情况下,它会一直返回空字符串。当将某些内容追加到日志文件时,将根据需要从它停止的地方继续读取。

如果您正在寻找使用事件或特定库的解决方案,请在您的问题中指定。否则,我认为这个解决方案就可以了。

在对蒂姆·戈尔登的剧本进行了一些修改后,我有了以下似乎工作得相当不错的内容:

import os


import win32file
import win32con


path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt


def ProcessNewData( newData ):
print "Text added: %s"%newData


# Set up the bits we'll need for output
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
path_to_watch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)


# Open the file we're interested in
a = open(file_to_watch, "r")


# Throw away any exising log data
a.read()


# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)


# For each change, check to see if it's updating the file we're interested in
for action, file in results:
full_filename = os.path.join (path_to_watch, file)
#print file, ACTIONS.get (action, "Unknown")
if file == file_to_watch:
newText = a.read()
if newText != "":
ProcessNewData( newText )

它可能需要进行更多的错误检查,但对于简单地观察日志文件并在将其输出到屏幕之前对其进行一些处理,这样做效果很好。

谢谢大家的投入-很棒的东西!

它不应该在windows上工作(也许与cygwin ?),但对于unix用户,您应该使用“fcntl”系统调用。下面是Python中的一个例子。如果你需要用C语言写的话,基本上是相同的代码(相同的函数名)

import time
import fcntl
import os
import signal


FNAME = "/HOME/TOTO/FILETOWATCH"


def handler(signum, frame):
print "File %s modified" % (FNAME,)


signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)


while True:
time.sleep(10000)

下面是Kender代码的简化版本,它似乎做了同样的技巧,并且没有导入整个文件:

# Check file for new data.


import time


f = open(r'c:\temp\test.txt', 'r')


while True:


line = f.readline()
if not line:
time.sleep(1)
print 'Nothing New'
else:
print 'Call Function: ', line

查看pyinotify

Inotify在新的linux中取代了dnotify(从以前的答案),并允许文件级而不是目录级监控。

你试过使用监管机构吗?

Python API库和shell实用程序来监视文件系统事件。

目录监视变得很容易

  • 一个跨平台的API。
  • 一个shell工具,用于在目录更改时运行命令。

快速入门中的一个简单例子开始快速入门…

如果你想要一个多平台的解决方案,那么检查QFileSystemWatcher。 下面是一个示例代码(未净化):

from PyQt4 import QtCore


@QtCore.pyqtSlot(str)
def directory_changed(path):
print('Directory Changed!!!')


@QtCore.pyqtSlot(str)
def file_changed(path):
print('File Changed!!!')


fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])


fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

这是对Tim Goldan脚本的另一种修改,它运行在unix类型上,并通过使用dict (file=>time)为文件修改添加了一个简单的监控器。

用法:whatvername .py path_to_dir_to_watch

#!/usr/bin/env python


import os, sys, time


def files_to_timestamp(path):
files = [os.path.join(path, f) for f in os.listdir(path)]
return dict ([(f, os.path.getmtime(f)) for f in files])


if __name__ == "__main__":


path_to_watch = sys.argv[1]
print('Watching {}..'.format(path_to_watch))


before = files_to_timestamp(path_to_watch)


while 1:
time.sleep (2)
after = files_to_timestamp(path_to_watch)


added = [f for f in after.keys() if not f in before.keys()]
removed = [f for f in before.keys() if not f in after.keys()]
modified = []


for f in before.keys():
if not f in removed:
if os.path.getmtime(f) != before.get(f):
modified.append(f)


if added: print('Added: {}'.format(', '.join(added)))
if removed: print('Removed: {}'.format(', '.join(removed)))
if modified: print('Modified: {}'.format(', '.join(modified)))


before = after
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001


class myThread (threading.Thread):
def __init__(self, threadID, fileName, directory, origin):
threading.Thread.__init__(self)
self.threadID = threadID
self.fileName = fileName
self.daemon = True
self.dir = directory
self.originalFile = origin
def run(self):
startMonitor(self.fileName, self.dir, self.originalFile)


def startMonitor(fileMonitoring,dirPath,originalFile):
hDir = win32file.CreateFile (
dirPath,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Wait for new data and call ProcessNewData for each new chunk that's
# written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're
# interested in
for action, file_M in results:
full_filename = os.path.join (dirPath, file_M)
#print file, ACTIONS.get (action, "Unknown")
if len(full_filename) == len(fileMonitoring) and action == 3:
#copy to main file
...

这是一个检查文件更改的示例。这可能不是最好的方法,但肯定是一条捷径。

方便的工具,重新启动应用程序时,已作出更改的源。我在玩pygame的时候做了这个,这样我就可以看到文件保存后立即发生的效果。

当在pygame中使用时,确保'while'循环中的东西被放置在你的游戏循环中,也就是更新或其他什么。否则你的应用将陷入无限循环,你将看不到游戏的更新。

file_size_stored = os.stat('neuron.py').st_size


while True:
try:
file_size_current = os.stat('neuron.py').st_size
if file_size_stored != file_size_current:
restart_program()
except:
pass

如果你想要重启代码我在网上找到的。在这儿。(与问题无关,但可能会派上用场)

def restart_program(): #restart application
python = sys.executable
os.execl(python, python, * sys.argv)

让电子做你想让它们做的事情。

对我来说,最简单的解决方法就是使用看门狗的工具watchmedo

https://pypi.python.org/pypi/watchdog我现在有一个进程,在一个目录中查找sql文件,并在必要时执行它们。

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

下面是一个示例,用于观察每秒写入不超过一行但通常要少得多的输入文件。目标是将最后一行(最近的写入)追加到指定的输出文件。我从我的一个项目中复制了这个,只是删除了所有不相关的行。你必须填写或修改缺失的符号。

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd


class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
QMainWindow.__init__(self, parent)
Ui_MainWindow.__init__(self)
self._fileWatcher = QFileSystemWatcher()
self._fileWatcher.fileChanged.connect(self.fileChanged)


def fileChanged(self, filepath):
QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
# ^^ About to test this, may need more sophisticated solution
with open(filepath) as file:
lastLine = list(file)[-1]
destPath = self._filemap[filepath]['dest file']
with open(destPath, 'a') as out_file:               # a= append
out_file.writelines([lastLine])

当然,并不是严格要求包含QMainWindow类。您可以单独使用QFileSystemWatcher。

最好和最简单的解决方案是使用pygtail: https://pypi.python.org/pypi/pygtail < / p >
from pygtail import Pygtail
import sys


while True:
for line in Pygtail("some.log"):
sys.stdout.write(line)

为了用轮询和最小依赖来观察单个文件,这里有一个完全充实的例子,基于Deestan(上面)的答案:

import os
import sys
import time


class Watcher(object):
running = True
refresh_delay_secs = 1


# Constructor
def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self.filename = watch_file
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs


# Look for changes
def look(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
print('File changed')
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)


# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except:
print('Unhandled error: %s' % sys.exc_info()[0])


# Call this function each time a change happens
def custom_action(text):
print(text)


watch_file = 'my_file.txt'


# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

你也可以使用一个简单的库repyt,下面是一个例子:

repyt ./app.py

似乎没有人张贴fswatch。它是一个跨平台的文件系统监视器。只要安装它,运行它,并按照提示。

我在python和golang程序中使用过它,它只是工作。

相关@4Oh4解决方案一个流畅的更改文件列表观看;

import os
import sys
import time


class Watcher(object):
running = True
refresh_delay_secs = 1


# Constructor
def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self._cached_stamp_files = {}
self.filenames = watch_files
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs


# Look for changes
def look(self):
for file in self.filenames:
stamp = os.stat(file).st_mtime
if not file in self._cached_stamp_files:
self._cached_stamp_files[file] = 0
if stamp != self._cached_stamp_files[file]:
self._cached_stamp_files[file] = stamp
# File has changed, so do something...
file_to_read = open(file, 'r')
value = file_to_read.read()
print("value from file", value)
file_to_read.seek(0)
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)


# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except Exception as e:
print(e)
print('Unhandled error: %s' % sys.exc_info()[0])


# Call this function each time a change happens
def custom_action(text):
print(text)
# pass


watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']


# watcher = Watcher(watch_file)  # simple






if __name__ == "__main__":
watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

如果您正在使用窗口,请创建此POLL。CMD文件

@echo off
:top
xcopy /m /y %1 %2 | find /v "File(s) copied"
timeout /T 1 > nul
goto :top

然后你可以输入“poll dir1 dir2”;它会将所有文件从dir1复制到dir2,并每秒检查一次更新。

“find"是可选的,只是为了减少控制台的噪音。

这不是递归的。也许你可以在xcopy中使用/e使它成为递归。

因为我已经全局安装了它,所以我最喜欢的方法是使用nodemon。如果你的源代码在src中,而你的入口点是src/app.py,那么它就像:

nodemon -w 'src/**' -e py,html --exec python src/app.py

... 其中-e py,html让你控制什么文件类型来观察变化。

import inotify.adapters
from datetime import datetime




LOG_FILE='/var/log/mysql/server_audit.log'




def main():
start_time = datetime.now()
while True:
i = inotify.adapters.Inotify()
i.add_watch(LOG_FILE)
for event in i.event_gen(yield_nones=False):
break
del i


with open(LOG_FILE, 'r') as f:
for line in f:
entry = line.split(',')
entry_time = datetime.strptime(entry[0],
'%Y%m%d %H:%M:%S')
if entry_time > start_time:
start_time = entry_time
print(entry)




if __name__ == '__main__':
main()

最简单的解决方案是在间隔之后获取同一个文件的两个实例并进行比较。你可以试试这样的东西

    while True:
# Capturing the two instances models.py after certain interval of time
print("Looking for changes in " + app_name.capitalize() + " models.py\nPress 'CTRL + C' to stop the program")
with open(app_name.capitalize() + '/filename', 'r+') as app_models_file:
filename_content = app_models_file.read()
time.sleep(5)
with open(app_name.capitalize() + '/filename', 'r+') as app_models_file_1:
filename_content_1 = app_models_file_1.read()
# Comparing models.py after certain interval of time
if filename_content == filename_content_1:
pass
else:
print("You made a change in " + app_name.capitalize() + " filename.\n")
cmd = str(input("Do something with the file?(y/n):"))
if cmd == 'y':
# Do Something
elif cmd == 'n':
# pass or do something
else:
print("Invalid input")

因为没人提到过,我就把它提出来:在标准库中有一个名为filecmp的Python模块,它有这个cmp()函数,用于比较两个文件。

只要确保你没有使用from filecmp import cmp来掩盖Python 2.x中内置的cmp()函数。这在Python 3中是可以的。x,因为没有这样的内置cmp()函数了。

不管怎样,它的用法是这样的:

import filecmp
filecmp.cmp(path_to_file_1, path_to_file_2, shallow=True)

参数默认为真正的。如果参数的值是真正的,则比较文件的只有元数据;但是,如果参数的值是,则比较文件的内容

也许这个信息对某人有用。

watchfiles (https://github.com/samuelcolvin/watchfiles)是一个Python API和CLI,它使用了用Rust编写的Notify (https://github.com/notify-rs/notify)库。

rust实现目前(2022-10-09)支持:

  • Linux / Android: inotify
  • macOS: FSEvents或kqueue,参见特性
  • Windows: ReadDirectoryChangesW
  • FreeBSD / NetBSD / OpenBSD / DragonflyBSD: kqueue
  • 所有平台:投票

PyPI (https://pypi.org/project/watchfiles/)和conda-forge (https://github.com/conda-forge/watchfiles-feedstock)上可用的二进制文件。