Check to see if python script is running

我有一个 python 守护进程作为我的 web 应用程序的一部分运行/如果我的守护进程正在运行,我如何快速检查(使用 python) ,如果没有,启动它?

我想这样做,以修复任何崩溃的守护进程,这样的脚本不必手动运行,它会自动运行尽快调用,然后保持运行。

如果我的脚本正在运行,我如何检查(使用 python) ?

198772 次浏览

有无数种选择。一种方法是使用系统调用或为您执行此类调用的 python 库。另一种是简单地产生一个过程,比如:

ps ax | grep processName

and parse the output. Many people choose this approach, it isn't necessarily a bad approach in my view.

在 UNIX 上有非常好的重新启动进程的包。有一个关于构建和配置它的很棒的教程是 Monit。通过一些调整,您可以拥有一个坚如磐石的技术来维护您的守护进程。

放置一个 pidfile (例如/tmp)。然后可以通过检查文件中的 PID 是否存在来检查进程是否正在运行。不要忘记在干净地关闭时删除该文件,并在启动时检查它。

#/usr/bin/env python


import os
import sys


pid = str(os.getpid())
pidfile = "/tmp/mydaemon.pid"


if os.path.isfile(pidfile):
print "%s already exists, exiting" % pidfile
sys.exit()
file(pidfile, 'w').write(pid)
try:
# Do some actual work here
finally:
os.unlink(pidfile)

然后,可以通过检查/tmp/mydaemon.pid 的内容是否是现有进程来检查该进程是否正在运行。Monit (上面提到过)可以为您做到这一点,或者您可以编写一个简单的 shell 脚本,使用 ps 中的返回代码来检查它。

ps up `cat /tmp/mydaemon.pid ` >/dev/null && echo "Running" || echo "Not running"

为了获得额外的学分,您可以使用 atexit 模块来确保您的程序在任何情况下都能清除它的 pidfile (当被关闭、引发异常等)。

我是管理守护进程的 主管的超级粉丝。它是用 Python 编写的,因此有大量示例说明如何与 Python 交互或从 Python 扩展它。对于您的目的,过程控制 API应该工作得很好。

当然,来自丹的例子不会像它应该的那样起作用。

实际上,如果脚本崩溃、出现异常或不清理 pid 文件,脚本将运行多次。

我从另一个网站建议如下:

这是为了检查是否已经存在一个锁文件

\#/usr/bin/env python
import os
import sys
if os.access(os.path.expanduser("~/.lockfile.vestibular.lock"), os.F_OK):
#if the lockfile is already there then check the PID number
#in the lock file
pidfile = open(os.path.expanduser("~/.lockfile.vestibular.lock"), "r")
pidfile.seek(0)
old_pid = pidfile.readline()
# Now we check the PID from lock file matches to the current
# process PID
if os.path.exists("/proc/%s" % old_pid):
print "You already have an instance of the program running"
print "It is running as process %s," % old_pid
sys.exit(1)
else:
print "File is there but the program is not running"
print "Removing lock file for the: %s as it can be there because of the program last time it was run" % old_pid
os.remove(os.path.expanduser("~/.lockfile.vestibular.lock"))

这是我们在锁文件中放入 PID 文件的代码的一部分

pidfile = open(os.path.expanduser("~/.lockfile.vestibular.lock"), "w")
pidfile.write("%s" % os.getpid())
pidfile.close()

This code will check the value of pid compared to existing running process., avoiding double execution.

希望能有所帮助。

A technique that is handy on a Linux system is using domain sockets:

import socket
import sys
import time


def get_lock(process_name):
# Without holding a reference to our socket somewhere it gets garbage
# collected when the function exits
get_lock._lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)


try:
# The null byte (\0) means the socket is created
# in the abstract namespace instead of being created
# on the file system itself.
# Works only in Linux
get_lock._lock_socket.bind('\0' + process_name)
print 'I got the lock'
except socket.error:
print 'lock exists'
sys.exit()




get_lock('running_test')
while True:
time.sleep(3)

它是原子的,并且如果您的进程被发送了 SIGKILL,它可以避免在周围存在锁文件的问题

您可以使套接字在垃圾收集时自动关闭的 阅读 socket.close的文档

其他的答案对于 cron 作业这样的事情来说是非常好的,但是如果您正在运行一个守护进程,那么您应该使用类似 Daemontools的程序来监视它。

考虑下面的例子来解决你的问题:

#!/usr/bin/python
# -*- coding: latin-1 -*-


import os, sys, time, signal


def termination_handler (signum,frame):
global running
global pidfile
print 'You have requested to terminate the application...'
sys.stdout.flush()
running = 0
os.unlink(pidfile)


running = 1
signal.signal(signal.SIGINT,termination_handler)


pid = str(os.getpid())
pidfile = '/tmp/'+os.path.basename(__file__).split('.')[0]+'.pid'


if os.path.isfile(pidfile):
print "%s already exists, exiting" % pidfile
sys.exit()
else:
file(pidfile, 'w').write(pid)


# Do some actual work here


while running:
time.sleep(10)

I suggest this script because it can be executed one time only.

试试另一个版本

def checkPidRunning(pid):
'''Check For the existence of a unix pid.
'''
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True


# Entry point
if __name__ == '__main__':
pid = str(os.getpid())
pidfile = os.path.join("/", "tmp", __program__+".pid")


if os.path.isfile(pidfile) and checkPidRunning(int(file(pidfile,'r').readlines()[0])):
print "%s already exists, exiting" % pidfile
sys.exit()
else:
file(pidfile, 'w').write(pid)


# Do some actual work here
main()


os.unlink(pidfile)

使用 bash 查找具有当前脚本名称的进程。

import commands
import os
import time
import sys


def stop_if_already_running():
script_name = os.path.basename(__file__)
l = commands.getstatusoutput("ps aux | grep -e '%s' | grep -v grep | awk '{print $2}'| awk '{print $2}'" % script_name)
if l[1]:
sys.exit(0);

要测试,请添加

stop_if_already_running()
print "running normally"
while True:
time.sleep(3)
ps ax | grep processName

如果您在 Pycharm 中调试脚本总是退出

pydevd.py --multiproc --client 127.0.0.1 --port 33882 --file processName

与其开发您自己的 PID 文件解决方案(其中包含比您想象的更多的细微之处和角落情况) ,不如看看 主管——这是一个过程控制系统,可以很容易地将作业控制和守护进程行为包装到现有的 Python 脚本中。

pid库完全可以做到这一点。

from pid import PidFile


with PidFile():
do_something()

它还将自动处理 pidfile 存在但进程没有运行的情况。

试试这个:

#/usr/bin/env python
import os, sys, atexit


try:
# Set PID file
def set_pid_file():
pid = str(os.getpid())
f = open('myCode.pid', 'w')
f.write(pid)
f.close()


def goodby():
pid = str('myCode.pid')
os.remove(pid)


atexit.register(goodby)
set_pid_file()
# Place your code here


except KeyboardInterrupt:
sys.exit(0)

Here is more useful code (with checking if exactly python executes the script):

#! /usr/bin/env python


import os
from sys import exit




def checkPidRunning(pid):
global script_name
if pid<1:
print "Incorrect pid number!"
exit()
try:
os.kill(pid, 0)
except OSError:
print "Abnormal termination of previous process."
return False
else:
ps_command = "ps -o command= %s | grep -Eq 'python .*/%s'" % (pid,script_name)
process_exist = os.system(ps_command)
if process_exist == 0:
return True
else:
print "Process with pid %s is not a Python process. Continue..." % pid
return False




if __name__ == '__main__':
script_name = os.path.basename(__file__)
pid = str(os.getpid())
pidfile = os.path.join("/", "tmp/", script_name+".pid")
if os.path.isfile(pidfile):
print "Warning! Pid file %s existing. Checking for process..." % pidfile
r_pid = int(file(pidfile,'r').readlines()[0])
if checkPidRunning(r_pid):
print "Python process with pid = %s is already running. Exit!" % r_pid
exit()
else:
file(pidfile, 'w').write(pid)
else:
file(pidfile, 'w').write(pid)


# main programm
....
....


os.unlink(pidfile)

Here is string:

ps_command = "ps -o command= %s | grep -Eq 'python .*/%s'" % (pid,script_name)

如果“ grep”成功,并且进程“ python”当前正以脚本名作为参数运行,则返回0。

A simple example if you only are looking for a process name exist or not:

import os


def pname_exists(inp):
os.system('ps -ef > /tmp/psef')
lines=open('/tmp/psef', 'r').read().split('\n')
res=[i for i in lines if inp in i]
return True if res else False


Result:
In [21]: pname_exists('syslog')
Out[21]: True


In [22]: pname_exists('syslog_')
Out[22]: False

我自己也遇到了这个老问题,想找到解决的办法。

使用 Psutil:

import psutil
import sys
from subprocess import Popen


for process in psutil.process_iter():
if process.cmdline() == ['python', 'your_script.py']:
sys.exit('Process found: exiting.')


print('Process not found: starting it.')
Popen(['python', 'your_script.py'])

我的解决方案是检查进程和命令行参数 在 windows 和 ubuntu linux 上测试

import psutil
import os


def is_running(script):
for q in psutil.process_iter():
if q.name().startswith('python'):
if len(q.cmdline())>1 and script in q.cmdline()[1] and q.pid !=os.getpid():
print("'{}' Process is already running".format(script))
return True


return False




if not is_running("test.py"):
n = input("What is Your Name? ")
print ("Hello " + n)

这是我在 Linux 中用来避免启动已经运行的脚本的方法:

import os
import sys




script_name = os.path.basename(__file__)
pidfile = os.path.join("/tmp", os.path.splitext(script_name)[0]) + ".pid"




def create_pidfile():
if os.path.exists(pidfile):
with open(pidfile, "r") as _file:
last_pid = int(_file.read())


# Checking if process is still running
last_process_cmdline = "/proc/%d/cmdline" % last_pid
if os.path.exists(last_process_cmdline):
with open(last_process_cmdline, "r") as _file:
cmdline = _file.read()
if script_name in cmdline:
raise Exception("Script already running...")


with open(pidfile, "w") as _file:
pid = str(os.getpid())
_file.write(pid)




def main():
"""Your application logic goes here"""




if __name__ == "__main__":
create_pidfile()
main()

这种方法在不依赖外部模块的情况下工作得很好。

我一直在寻找这个问题的答案,在我的案例中,我想到了一个非常简单和非常好的解决方案,在我看来(因为不可能存在一个错误的肯定,我想——如果程序不这样做,TXT 上的时间戳如何更新) :

根据你的需要,在一个 TXT 上按照一定的时间间隔写下当前的时间戳(这里每半个小时都是完美的)。

如果 TXT 上的时间戳相对于当前的时间戳已经过时,那么在程序上就出现了问题,应该重新启动或者您更喜欢做什么。

依赖于 multiprocessing.shared_memory的便携式解决方案:

import atexit
from multiprocessing import shared_memory


_ensure_single_process_store = {}




def ensure_single_process(name: str):
if name in _ensure_single_process_store:
return
try:
shm = shared_memory.SharedMemory(name='ensure_single_process__' + name,
create=True,
size=1)
except FileExistsError:
print(f"{name} is already running!")
raise
_ensure_single_process_store[name] = shm
atexit.register(shm.unlink)

通常您不必使用 atexit,但有时它有助于清理异常退出。