import subprocess
p = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)for line in p.stdout.readlines():print line,retval = p.wait()
from subprocess import call
# Using listcall(["echo", "Hello", "world"])
# Single string argument varies across platforms so better split itcall("echo Hello world".split(" "))
import subprocess
..process = subprocess.Popen(..) # Pass command and arguments to the functionstdout, stderr = process.communicate() # Get command output and error..
subprocess.run(["ls", "-l"]) # Run commandsubprocess.run(["ls", "-l"], stdout=subprocess.PIPE) # This will run the command and return any outputsubprocess.run(shlex.split("ls -l")) # You can also use the shlex library to split the command
pexpect.run("ls -l") # Run command as normalchild = pexpect.spawn('scp foo user@example.com:.') # Spawns child applicationchild.expect('Password:') # When this is the outputchild.sendline('mypassword')
import shleximport psutilimport subprocess
def call_cmd(cmd, stdout=sys.stdout, quiet=False, shell=False, raise_exceptions=True, use_shlex=True, timeout=None):"""Exec command by command line like 'ln -ls "/var/log"'"""if not quiet:print("Run %s", str(cmd))if use_shlex and isinstance(cmd, (str, unicode)):cmd = shlex.split(cmd)if timeout is None:process = subprocess.Popen(cmd, stdout=stdout, stderr=sys.stderr, shell=shell)retcode = process.wait()else:process = subprocess.Popen(cmd, stdout=stdout, stderr=sys.stderr, shell=shell)p = psutil.Process(process.pid)finish, alive = psutil.wait_procs([p], timeout)if len(alive) > 0:ps = p.children()ps.insert(0, p)print('waiting for timeout again due to child process check')finish, alive = psutil.wait_procs(ps, 0)if len(alive) > 0:print('process {} will be killed'.format([p.pid for p in alive]))for p in alive:p.kill()if raise_exceptions:print('External program timeout at {} {}'.format(timeout, cmd))raise CalledProcessTimeout(1, cmd)retcode = process.wait()if retcode and raise_exceptions:print("External program failed %s", str(cmd))raise subprocess.CalledProcessError(retcode, cmd)
import subprocess,sys
def exec_long_running_proc(command, args):cmd = "{} {}".format(command, " ".join(str(arg) if ' ' not in arg else arg.replace(' ','\ ') for arg in args))print(cmd)process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Poll process for new output until finishedwhile True:nextline = process.stdout.readline().decode('UTF-8')if nextline == '' and process.poll() is not None:breaksys.stdout.write(nextline)sys.stdout.flush()
output = process.communicate()[0]exitCode = process.returncode
if (exitCode == 0):return outputelse:raise Exception(command, exitCode, output)
proc = subprocess.Popen(["./some_long_run_cmd.sh"], stdout=subprocess.PIPE)# Do something else# Now some_long_run_cmd.sh exeuction is no longer needed, so kill itos.system('kill -15 ' + str(proc.pid))print 'Output : ' proc.communicate()[0]
import subprocess
# subprocess.run() returns a completed process object that can be inspectedc = subprocess.run(["ls", "-ltrh"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)print(c.stdout.decode('utf-8'))
total 113M-rwxr-xr-x 1 farzad farzad 307 Jan 15 2018 vpnscript-rwxrwxr-x 1 farzad farzad 204 Jan 15 2018 exdrwxrwxr-x 4 farzad farzad 4.0K Jan 22 2018 scripts.... # Some other lines
import subprocessresult = subprocess.run(["ls", "-a"], capture_output=True, text=True)if "stackoverflow-logo.png" in result.stdout:print("You're a fan!")else:print("You're not a fan?")
if result.returncode == 127: print("The program failed for some weird reason")elif result.returncode == 0: print("The program succeeded")else: print("The program failed unexpectedly")
如果你只是想检查程序是否成功(返回代码==0)并抛出Exception,有一个更方便的函数:
result.check_returncode()
但它是Python,所以有一个更方便的参数check会自动为你做同样的事情:
result = subprocess.run(..., check=True)
stderr应该在stdout里面
您可能希望将所有程序输出都包含在标准输出中,甚至是错误。要做到这一点,请运行
result = subprocess.run(..., stderr=subprocess.STDOUT)
def _run(command, timeout_s=False, shell=False):### run a process, capture the output and wait for it to finish. if timeout is specified then Kill the subprocess and its children when the timeout is reached (if parent did not detach)## usage: _run(arg1, arg2, arg3)# arg1: command + arguments. Always pass a string; the function will split it when needed# arg2: (optional) timeout in seconds before force killing# arg3: (optional) shell usage. default shell=False## return: a list containing: exit code, output, and if timeout was reached or not
# - Tested on Python 2 and 3 on Windows XP, Windows 7, Cygwin and Linux.# - preexec_fn=os.setsid (py2) is equivalent to start_new_session (py3) (works on Linux only), in Windows and Cygwin we use TASKKILL# - we use stderr=subprocess.STDOUT to merge standard error and standard outputimport sys, subprocess, os, signal, shlex, time
def _runPY3(command, timeout_s=None, shell=False):# py3.3+ because: timeout was added to communicate() in py3.3.new_session=Falseif sys.platform.startswith('linux'): new_session=Truep = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, start_new_session=new_session, shell=shell)
try:out = p.communicate(timeout=timeout_s)[0].decode('utf-8')is_timeout_reached = Falseexcept subprocess.TimeoutExpired:print('Timeout reached: Killing the whole process group...')killAll(p.pid)out = p.communicate()[0].decode('utf-8')is_timeout_reached = Truereturn p.returncode, out, is_timeout_reached
def _runPY2(command, timeout_s=0, shell=False):preexec=Noneif sys.platform.startswith('linux'): preexec=os.setsidp = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=preexec, shell=shell)
start_time = time.time()is_timeout_reached = Falsewhile timeout_s and p.poll() == None:if time.time()-start_time >= timeout_s:print('Timeout reached: Killing the whole process group...')killAll(p.pid)is_timeout_reached = Truebreaktime.sleep(1)out = p.communicate()[0].decode('utf-8')return p.returncode, out, is_timeout_reached
def killAll(ParentPid):if sys.platform.startswith('linux'):os.killpg(os.getpgid(ParentPid), signal.SIGTERM)elif sys.platform.startswith('cygwin'):# subprocess.Popen(shlex.split('bash -c "TASKKILL /F /PID $(</proc/{pid}/winpid) /T"'.format(pid=ParentPid)))winpid=int(open("/proc/{pid}/winpid".format(pid=ParentPid)).read())subprocess.Popen(['TASKKILL', '/F', '/PID', str(winpid), '/T'])elif sys.platform.startswith('win32'):subprocess.Popen(['TASKKILL', '/F', '/PID', str(ParentPid), '/T'])
# - In Windows, we never need to split the command, but in Cygwin and Linux we need to split if shell=False (default), shlex will split the command for usif shell==False and (sys.platform.startswith('cygwin') or sys.platform.startswith('linux')):command=shlex.split(command)
if sys.version_info >= (3, 3): # py3.3+if timeout_s==False:returnCode, output, is_timeout_reached = _runPY3(command, timeout_s=None, shell=shell)else:returnCode, output, is_timeout_reached = _runPY3(command, timeout_s=timeout_s, shell=shell)else: # Python 2 and up to 3.2if timeout_s==False:returnCode, output, is_timeout_reached = _runPY2(command, timeout_s=0, shell=shell)else:returnCode, output, is_timeout_reached = _runPY2(command, timeout_s=timeout_s, shell=shell)
return returnCode, output, is_timeout_reached
a=_run('cmd /c echo 11111 & echo 22222 & calc',3)for i in a[1].splitlines(): print(i)
或者没有超时:
b=_run('cmd /c echo 11111 & echo 22222 & calc')
更多示例:
b=_run('''wmic nic where 'NetConnectionID="Local Area Connection"' get NetConnectionStatus /value''')print(b)
c=_run('cmd /C netsh interface ip show address "Local Area Connection"')print(c)
d=_run('printf "<%s>\n" "{foo}"')print(d)