确保只有一个程序实例在运行

有没有一种 Python 的方法只让一个程序的一个实例运行?

我想到的唯一合理的解决方案是尝试将它作为服务器在某个端口上运行,然后第二个程序尝试绑定到相同的端口——失败。但这不是个好主意也许还有比这更轻量级的东西?

(考虑到程序有时候可能会失败,比如 Segfault ——所以像“ lock file”这样的东西不会起作用)

97894 次浏览

我不知道它是否足够简洁,但是在 Java 世界中,监听一个已定义的端口是一个非常广泛使用的解决方案,因为它可以在所有主要平台上工作,并且不存在程序崩溃的问题。

侦听端口的另一个好处是可以向正在运行的实例发送命令。例如,当用户第二次启动程序时,您可以向正在运行的实例发送一个命令,告诉它打开另一个窗口(例如,Firefox 就是这样做的)。但我不知道他们是否使用 TCP 端口或命名管道之类的东西。

这个可能有用。

  1. 尝试创建一个 PID 文件到一个已知位置。如果您失败,有人已经锁定了文件,您就完成了。

  2. 当您正常完成后,关闭并删除 PID 文件,以便其他人可以覆盖它。

您可以将程序包装在 shell 脚本中,即使程序崩溃,该脚本也会删除 PID 文件。

您还可以使用 PID 文件来终止挂起的程序。

在 unix 上使用锁文件是一种非常常见的方法。如果它崩溃了,你必须手动清理。您可以将 PID 存储在文件中,并在启动时检查是否有具有此 PID 的进程,如果没有,则重写锁文件。(但是,您还需要在 read-file-check-pid-rewrite-file 周围设置一个锁)。您将在 奥斯软件包中找到获取和检查 pid 所需的内容。检查是否存在具有给定 PID 的进程的常见方法是向其发送非致命信号。

其他替代方案可以将其与群或 posx 信号量相结合。

正如 Saua 所提议的,打开一个网络套接字可能是最简单和最便携的。

使用 pid 文件。你有一些已知的位置,“/path/to/pidfile”,在启动的时候你可以这样做(部分是伪代码,因为我还没喝咖啡,不想那么辛苦地工作) :

import os, os.path
pidfilePath = """/path/to/pidfile"""
if os.path.exists(pidfilePath):
pidfile = open(pidfilePath,"r")
pidString = pidfile.read()
if <pidString is equal to os.getpid()>:
# something is real weird
Sys.exit(BADCODE)
else:
<use ps or pidof to see if the process with pid pidString is still running>
if  <process with pid == 'pidString' is still running>:
Sys.exit(ALREADAYRUNNING)
else:
# the previous server must have crashed
<log server had crashed>
<reopen pidfilePath for writing>
pidfile.write(os.getpid())
else:
<open pidfilePath for writing>
pidfile.write(os.getpid())

因此,换句话说,您要检查 pidfile 是否存在; 如果不存在,则将 pid 写入该文件。如果 pidfile 确实存在,那么检查 pid 是否是正在运行的进程的 pid; 如果是,那么您已经有另一个正在运行的进程,因此只需关闭。如果没有,那么前一个进程就崩溃了,所以记录它,然后将您自己的 pid 写入到文件中以取代旧的 pid。那就继续。

简单,跨平台解决方案,在 另一个问题Zgoda发现:

import fcntl
import os
import sys


def instance_already_running(label="default"):
"""
Detect if an an instance with the label is already running, globally
at the operating system level.


Using `os.open` ensures that the file pointer won't be closed
by Python's garbage collector after the function's scope is exited.


The lock will be released when the program exits, or could be
released if the file pointer were closed.
"""


lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY)


try:
fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB)
already_running = False
except IOError:
already_running = True


return already_running

很像 S · 洛特的建议,但有密码。

下面的代码应该可以完成这项工作,它是跨平台的,在 Python 2.4-3.2上运行。我在 Windows,OS X 和 Linux 上测试过。

from tendo import singleton
me = singleton.SingleInstance() # will sys.exit(-1) if other instance is running

最新的代码版本是可用的 Singleton Py。请 这里有窃听器

您可以使用下列方法之一进行安装:

这段代码是 Linux 特有的。它使用“抽象的”UNIX 域套接字,但它很简单,不会留下陈旧的锁文件。相对于上面的解决方案,我更喜欢它,因为它不需要特别保留的 TCP 端口。

try:
import socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
## Create an abstract socket, by prefixing it with null.
s.bind( '\0postconnect_gateway_notify_lock')
except socket.error as e:
error_code = e.args[0]
error_string = e.args[1]
print "Process already running (%d:%s ). Exiting" % ( error_code, error_string)
sys.exit (0)

可以更改唯一字符串 postconnect_gateway_notify_lock以允许执行需要单个实例的多个程序。

我发布这个作为一个答案,因为我是一个新用户和堆栈溢出不会让我投票了。

索林 · 斯巴尼亚的解决方案在 OS X、 Linux 和 Windows 下都适用于我,我对此表示感谢。

然而,temfile.gettemdir ()在 OS X 和 Windows 下表现出一种方式,在其他 some/many/all (?)下表现出另一种方式* nixes (忽略 OS X 也是 Unix 的事实!).这个差异对这个代码很重要。

OS X 和 Windows 都有特定于用户的临时目录,因此一个用户创建的临时文件对另一个用户不可见。相比之下,在很多版本的 * nix 下(我测试过 Ubuntu 9、 RHEL 5、 OpenSolaris 2008和 FreeBSD 8) ,所有用户的临时目录都是/tmp。

这意味着当锁文件在多用户机器上创建时,它是在/tmp 中创建的,只有第一次创建锁文件的用户才能运行应用程序。

一个可能的解决方案是将当前用户名嵌入到锁文件的名称中。

值得注意的是,OP 抓取端口的解决方案在多用户机器上也会出错。

以前从来没有编写过 python,但是这是我刚刚在我的检查点中实现的,以防止它被 crond 启动两次或更多次:

import os
import sys
import fcntl
fh=0
def run_once():
global fh
fh=open(os.path.realpath(__file__),'r')
try:
fcntl.flock(fh,fcntl.LOCK_EX|fcntl.LOCK_NB)
except:
os._exit(0)


run_once()

在另一期( http://stackoverflow.com/questions/2959474)中发布了这篇文章后,我们发现了斯拉瓦-恩的建议。这个函数被调用,锁定正在执行的脚本文件(而不是 pid 文件) ,并维护锁定,直到脚本结束(正常或错误)。

我一直怀疑应该有一个使用进程组的很好的 POSIXy 解决方案,而不必访问文件系统,但是我不能完全确定。比如:

在启动时,您的进程向特定组中的所有进程发送一个“ kill -0”。如果存在这样的进程,它就会退出。然后它加入了小组。没有其他进程使用该组。

然而,这有一个竞争条件——多个进程可以同时完成这项工作,并最终加入组并同时运行。当您添加了某种互斥对象以使其防水时,就不再需要进程组了。

如果您的流程仅由 cron 启动,每分钟或每小时启动一次,那么这可能是可以接受的,但是这会让我有点紧张,因为它会在您不希望它出错的那一天出错。

我想这毕竟不是一个很好的解决方案,除非有人可以改进它?

我在我的巴布亚企鹅上使用 single_process;

pip install single_process

例子 :

from single_process import single_process


@single_process
def main():
print 1


if __name__ == "__main__":
main()

参考资料: https://pypi.python.org/pypi/single_process/

上周我碰到了这个问题,尽管我确实找到了一些好的解决方案,但是我决定编写一个非常简单干净的 python 包,并将它上传到 PyPI。它与 tendo 的不同之处在于它可以锁定任何字符串资源名。虽然你可以锁定 __file__来达到同样的效果。

使用: pip install quicklock安装

使用它非常简单:

[nate@Nates-MacBook-Pro-3 ~/live] python
Python 2.7.6 (default, Sep  9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from quicklock import singleton
>>> # Let's create a lock so that only one instance of a script will run
...
>>> singleton('hello world')
>>>
>>> # Let's try to do that again, this should fail
...
>>> singleton('hello world')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/nate/live/gallery/env/lib/python2.7/site-packages/quicklock/quicklock.py", line 47, in singleton
raise RuntimeError('Resource <{}> is currently locked by <Process {}: "{}">'.format(resource, other_process.pid, other_process.name()))
RuntimeError: Resource <hello world> is currently locked by <Process 24801: "python">
>>>
>>> # But if we quit this process, we release the lock automatically
...
>>> ^D
[nate@Nates-MacBook-Pro-3 ~/live] python
Python 2.7.6 (default, Sep  9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from quicklock import singleton
>>> singleton('hello world')
>>>
>>> # No exception was thrown, we own 'hello world'!

看一下: https://pypi.python.org/pypi/quicklock

Linux 例子

此方法基于在关闭应用程序后自动删除的临时文件的创建。 程序启动时我们验证文件的存在; 如果文件存在(有一个挂起的执行) ,程序将关闭; 否则它将创建文件并继续执行程序。

from tempfile import *
import time
import os
import sys




f = NamedTemporaryFile( prefix='lock01_', delete=True) if not [f  for f in     os.listdir('/tmp') if f.find('lock01_')!=-1] else sys.exit()


YOUR CODE COMES HERE

对于任何使用 WxPython作为应用程序的人,可以使用函数 wx.SingleInstanceChecker 记录在案

我个人使用 wx.App的一个子类,它使用 wx.SingleInstanceChecker,如果有一个应用程序的现有实例已经像这样执行,它就从 OnInit()返回 False:

import wx


class SingleApp(wx.App):
"""
class that extends wx.App and only permits a single running instance.
"""


def OnInit(self):
"""
wx.App init function that returns False if the app is already running.
"""
self.name = "SingleApp-%s".format(wx.GetUserId())
self.instance = wx.SingleInstanceChecker(self.name)
if self.instance.IsAnotherRunning():
wx.MessageBox(
"An instance of the application is already running",
"Error",
wx.OK | wx.ICON_WARNING
)
return False
return True

这是禁止多个实例的 wx.App的简单插入式替换。要使用它,只需将代码中的 wx.App替换为 SingleApp,如下所示:

app = SingleApp(redirect=False)
frame = wx.Frame(None, wx.ID_ANY, "Hello World")
frame.Show(True)
app.MainLoop()

以下是我最终的仅适用于 Windows 的解决方案。将以下内容放入一个模块中,可能称为“ onlyone.py”,或者其他名称。将该模块直接包含到 _ _ main _ _ python 脚本文件中。

import win32event, win32api, winerror, time, sys, os
main_path = os.path.abspath(sys.modules['__main__'].__file__).replace("\\", "/")


first = True
while True:
mutex = win32event.CreateMutex(None, False, main_path + "_{<paste YOUR GUID HERE>}")
if win32api.GetLastError() == 0:
break
win32api.CloseHandle(mutex)
if first:
print "Another instance of %s running, please wait for completion" % main_path
first = False
time.sleep(1)

解释

代码尝试创建一个互斥对象,该互斥对象的名称派生自脚本的完整路径。我们使用正斜杠来避免与真实文件系统的潜在混淆。

好处

  • 不需要配置或“魔术”标识符,可以根据需要在不同的脚本中使用它。
  • 没有过时的文件留下,互斥体随你一起死。
  • 等待时打印有用的消息
import sys,os


# start program
try:  # (1)
os.unlink('lock')  # (2)
fd=os.open("lock", os.O_CREAT|os.O_EXCL) # (3)
except:
try: fd=os.open("lock", os.O_CREAT|os.O_EXCL) # (4)
except:
print "Another Program running !.."  # (5)
sys.exit()


# your program  ...
# ...


# exit program
try: os.close(fd)  # (6)
except: pass
try: os.unlink('lock')
except: pass
sys.exit()

在 Linux 系统上,人们也可以问 为了数量,剧本 在进程列表中找到(选项 -a 显示 完整的命令行字符串)。

import os
import sys
import subprocess


procOut = subprocess.check_output( "/bin/pgrep -u $UID -a python", shell=True,
executable="/bin/bash", universal_newlines=True)


if procOut.count( os.path.basename(__file__)) > 1 :
sys.exit( ("found another instance of >{}<, quitting."
).format( os.path.basename(__file__)))


如果限制应该适用于 所有用户,则删除 -u $UID。 免责声明: a)假定脚本的(基本)名称是唯一的,b)可能存在竞态条件。

在 windows 上实现这一点的最佳解决方案是按照@zgoda 的建议使用互斥对象。

import win32event
import win32api
from winerror import ERROR_ALREADY_EXISTS


mutex = win32event.CreateMutex(None, False, 'name')
last_error = win32api.GetLastError()


if last_error == ERROR_ALREADY_EXISTS:
print("App instance already running")

有些答案使用的是 fctnl(也包含在@sorin tendo 包中) ,这在 windows 上是不可用的,如果你尝试使用像 pyinstaller这样的包来冻结你的 python 应用,它会抛出一个错误。

此外,使用 lock file 方法,在数据库文件中创建了一个 read-only问题(在 sqlite3中遇到过这种情况)。

基于罗伯托 · 罗萨里奥的回答,我想出了以下函数:

SOCKET = None
def run_single_instance(uniq_name):
try:
import socket
global SOCKET
SOCKET = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
## Create an abstract socket, by prefixing it with null.
# this relies on a feature only in linux, when current process quits, the
# socket will be deleted.
SOCKET.bind('\0' + uniq_name)
return True
except socket.error as e:
return False

我们需要定义全局 SOCKET可用性,因为它只有在整个进程退出时才会被垃圾收集。如果我们在函数中声明一个局部变量,它将在函数退出后超出作用域,因此套接字将被删除。

所有的功劳都应该归功于罗伯托 · 罗萨里奥,因为我只是澄清和阐述了他的代码。这段代码只能在 Linux 上使用,正如下面引用的 https://troydhanson.github.io/network/Unix_domain_sockets.html文本所解释的:

Linux 有一个特殊的功能: 如果一个 Unix domain socket 的路径名 以空字节0开头,则其名称不映射到 因此它不会与文件系统中的其他名称冲突。 此外,当服务器在 抽象命名空间,其文件将被删除; 使用常规 UNIX 域 套接字,文件在服务器关闭它之后仍然存在。

回答晚了,但是对于窗户你可以使用:

from win32event import CreateMutex
from win32api import CloseHandle, GetLastError
from winerror import ERROR_ALREADY_EXISTS
import sys


class singleinstance:
""" Limits application to single instance """


def __init__(self):
self.mutexname = "testmutex_{D0E858DF-985E-4907-B7FB-8D732C3FC3B9}"
self.mutex = CreateMutex(None, False, self.mutexname)
self.lasterror = GetLastError()
    

def alreadyrunning(self):
return (self.lasterror == ERROR_ALREADY_EXISTS)
        

def __del__(self):
if self.mutex:
CloseHandle(self.mutex)

用法

# do this at beginnig of your application
myapp = singleinstance()


# check is another instance of same program running
if myapp.alreadyrunning():
print ("Another instance of this program is already running")
sys.exit(1)

下面是我用 Python 3.7.9在 Windows Server 2016和 Ubuntu 20.04上测试的 跨平台的例子:

import os


class SingleInstanceChecker:
def __init__(self, id):
if isWin():
ensure_win32api()
self.mutexname = id
self.lock = win32event.CreateMutex(None, False, self.mutexname)
self.running = (win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS)


else:
ensure_fcntl()
self.lock = open(f"/tmp/isnstance_{id}.lock", 'wb')
try:
fcntl.lockf(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.running = False
except IOError:
self.running = True




def already_running(self):
return self.running
        

def __del__(self):
if self.lock:
try:
if isWin():
win32api.CloseHandle(self.lock)
else:
os.close(self.lock)
except Exception as ex:
pass


# ---------------------------------------
# Utility Functions
# Dynamically load win32api on demand
# Install with: pip install pywin32
win32api=winerror=win32event=None
def ensure_win32api():
global win32api,winerror,win32event
if win32api is None:
import win32api
import winerror
import win32event




# Dynamically load fcntl on demand
# Install with: pip install fcntl
fcntl=None
def ensure_fcntl():
global fcntl
if fcntl is None:
import fcntl




def isWin():
return (os.name == 'nt')
# ---------------------------------------

下面是正在使用的:

import time, sys


def main(argv):
_timeout = 10
print("main() called. sleeping for %s seconds" % _timeout)
time.sleep(_timeout)
print("DONE")




if __name__ == '__main__':
SCR_NAME = "my_script"
sic = SingleInstanceChecker(SCR_NAME)
if sic.already_running():
print("An instance of {} is already running.".format(SCR_NAME))
sys.exit(1)
else:
main(sys.argv[1:])

下面是一个带有 contextmanager 和 memcached 的 django 示例: Https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html

可用于保护不同主机上的同时操作。 可用于管理多个任务。 也可以为简单的 Python 脚本进行更改。

我对以上代码的修改如下:

import time
from contextlib import contextmanager
from django.core.cache import cache




@contextmanager
def memcache_lock(lock_key, lock_value, lock_expire):
timeout_at = time.monotonic() + lock_expire - 3


# cache.add fails if the key already exists
status = cache.add(lock_key, lock_value, lock_expire)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_key)




LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes




def main():
lock_name, lock_value = "lock_1", "locked"
with memcache_lock(lock_name, lock_value, LOCK_EXPIRE) as acquired:
if acquired:
# single instance code here:
pass




if __name__ == "__main__":
main()

下面是一个跨平台实现,它使用上下文管理器创建一个临时锁文件。

可用于管理多个任务。

import os
from contextlib import contextmanager
from time import sleep




class ExceptionTaskInProgress(Exception):
pass




# Context manager for suppressing exceptions
class SuppressException:
def __init__(self):
pass


def __enter__(self):
return self


def __exit__(self, *exc):
return True




# Context manager for task
class TaskSingleInstance:
def __init__(self, task_name, lock_path):
self.task_name = task_name
self.lock_path = lock_path
self.lock_filename = os.path.join(self.lock_path, self.task_name + ".lock")


if os.path.exists(self.lock_filename):
raise ExceptionTaskInProgress("Resource already in use")


def __enter__(self):
self.fl = open(self.lock_filename, "w")
return self


def __exit__(self, exc_type, exc_val, exc_tb):
self.fl.close()
os.unlink(self.lock_filename)




# Here the task is silently interrupted
# if it is already running on another instance.
def main1():
task_name = "task1"
tmp_filename_path = "."
with SuppressException():
with TaskSingleInstance(task_name, tmp_filename_path):
print("The task `{}` has started.".format(task_name))
# The single task instance code is here.
sleep(5)
print("The task `{}` has completed.".format(task_name))




# Here the task is interrupted with a message
# if it is already running in another instance.
def main2():
task_name = "task1"
tmp_filename_path = "."
try:
with TaskSingleInstance(task_name, tmp_filename_path):
print("The task `{}` has started.".format(task_name))
# The single task instance code is here.
sleep(5)
print("Task `{}` completed.".format(task_name))
except ExceptionTaskInProgress as ex:
print("The task `{}` is already running.".format(task_name))




if __name__ == "__main__":
main1()
main2()