运行shell命令并捕获输出

我想写一个函数来执行shell命令并返回其输出作为字符串,不管它是错误还是成功消息。我只想得到与使用命令行相同的结果。

什么代码示例会做这样的事情?

例如:

def run_command(cmd):# ??????
print run_command('mysqladmin create test -uroot -pmysqladmin12')# Should output something like:# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'
1622800 次浏览

像这样的东西:

def runProcess(exe):p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)while(True):# returns None while subprocess is runningretcode = p.poll()line = p.stdout.readline()yield lineif retcode is not None:break

请注意,我正在将stderr重定向到stdout,它可能不是您想要的,但我也想要错误消息。

此函数一行一行地产生(通常您必须等待子进程完成才能获得整个输出)。

对于您的情况,用法将是:

for line in runProcess('mysqladmin create test -uroot -pmysqladmin12'.split()):print line,

在所有官方维护的Python版本中,最简单的方法是使用#0函数:

>>> subprocess.check_output(['ls', '-l'])b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

check_output运行一个只接受参数作为输入的程序。1它返回的结果与打印到stdout的结果完全相同。如果您需要将输入写入stdin,请跳到runPopen部分。如果您想执行复杂的shell命令,请参阅本答案末尾的shell=True注释。

check_output函数适用于所有官方维护的Python版本。但对于更新的版本,可以使用更灵活的方法。

现代版本的Python(3.5或更高版本):run

如果您使用的是python3.5+不需要向后兼容,那么对于大多数任务,官方留档推荐使用新的#0函数。它为#1模块提供了一个非常通用的高级API。要捕获程序的输出,请将subprocess.PIPE标志传递给stdout关键字参数。然后访问返回的subprocess0对象的stdout属性:

>>> import subprocess>>> result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)>>> result.stdoutb'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

返回值是一个bytes对象,所以如果你想要一个合适的字符串,你需要decode它。假设被调用的进程返回一个UTF-8编码的字符串:

>>> result.stdout.decode('utf-8')'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

如果需要,这一切都可以压缩为单行代码:

>>> subprocess.run(['ls', '-l'], stdout=subprocess.PIPE).stdout.decode('utf-8')'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

如果您想将输入传递给进程的stdin,您可以将bytes对象传递给input关键字参数:

>>> cmd = ['awk', 'length($0) > 5']>>> ip = 'foo\nfoofoo\n'.encode('utf-8')>>> result = subprocess.run(cmd, stdout=subprocess.PIPE, input=ip)>>> result.stdout.decode('utf-8')'foofoo\n'

您可以通过传递stderr=subprocess.PIPE(捕获到result.stderr)或stderr=subprocess.STDOUT(捕获到result.stdout以及常规输出)来捕获错误。如果您希望run在进程返回非零退出代码时抛出异常,您可以传递check=True。(或者您可以检查上面resultreturncode属性。)当安全不是问题时,您还可以通过传递shell=True来运行更复杂的shell命令,如本答案末尾所述。

Python的后续版本进一步简化了上述内容。在Python 3.7+中,上述单行代码可以这样拼写:

>>> subprocess.run(['ls', '-l'], capture_output=True, text=True).stdout'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

与旧的做事方式相比,以这种方式使用run只会增加一点复杂性。但是现在您几乎可以单独使用run函数完成任何需要做的事情。

旧版本的Python(3-3.4):更多关于check_output

如果您使用的是较旧版本的Python,或者需要适度的向后兼容性,您可以使用上面简要描述的check_output函数。它从Python 2.7开始可用。

subprocess.check_output(*popenargs, **kwargs)

它接受与Popen相同的参数(见下文),并返回一个包含程序输出的字符串。这个答案的开头有一个更详细的用法示例。在Python 3.5+中,check_output等效于用check=Truestdout=PIPE执行run,并仅返回stdout属性。

您可以传递stderr=subprocess.STDOUT以确保返回的输出中包含错误消息。当安全不是问题时,您还可以通过传递shell=True来运行更复杂的shell命令,如本答案末尾所述。

如果您需要从stderr管道或将输入传递给进程,check_output将无法完成任务。在这种情况下,请参阅下面的Popen示例。

复杂的应用程序和Python的遗留版本(2.6及以下):Popen

如果您需要深度向后兼容性,或者您需要比check_outputrun提供的更复杂的功能,您必须直接使用Popen对象,它封装了子进程的低级API。

Popen构造函数接受一个命令不带参数,或者一个列表包含一个命令作为其第一项,后跟任意数量的参数,每个参数作为列表中的一个单独项。#1可以帮助将字符串解析为适当格式的列表。Popen对象也接受许多不同的论点用于进程IO管理和低级配置。

要发送输入和捕获输出,communicate几乎总是首选方法。如:

output = subprocess.Popen(["mycmd", "myarg"],stdout=subprocess.PIPE).communicate()[0]

>>> import subprocess>>> p = subprocess.Popen(['ls', '-a'], stdout=subprocess.PIPE,...                                    stderr=subprocess.PIPE)>>> out, err = p.communicate()>>> print out...foo

如果设置stdin=PIPEcommunicate还允许您通过stdin将数据传递给进程:

>>> cmd = ['awk', 'length($0) > 5']>>> p = subprocess.Popen(cmd, stdout=subprocess.PIPE,...                           stderr=subprocess.PIPE,...                           stdin=subprocess.PIPE)>>> out, err = p.communicate('foo\nfoofoo\n')>>> print outfoofoo

注意Aaron Hall的回答,这表明在某些系统上,您可能需要将stdoutstderrstdin全部设置为PIPE(或DEVNULL)才能使communicate完全工作。

在一些罕见的情况下,您可能需要复杂的实时输出捕获。Vartec的答案提出了前进的方向,但communicate以外的方法如果不小心使用,很容易出现死锁。

与上述所有函数一样,当安全不是问题时,您可以通过传递shell=True来运行更复杂的shell命令。

备注

1.运行shell命令:shell=True参数

通常,每次调用runcheck_outputPopen构造函数都会执行单一程序。这意味着没有花哨的bash风格的管道。如果你想运行复杂的shell命令,你可以传递shell=True,这三个函数都支持。例如:

>>> subprocess.check_output('cat books/* | wc', shell=True, text=True)' 1299377 17005208 101299376\n'

但是,这样做会引发安全问题。如果你做的不仅仅是简单的脚本编写,你最好单独调用每个进程,并将每个进程的输出作为输入传递给下一个进程,通过

run(cmd, [stdout=etc...], input=other_output)

Popen(cmd, [stdout=etc...]).communicate(other_output)

直接连接管道的诱惑是强烈的;抵制它。否则,您可能会看到死锁或不得不做类似这个的事情。

这更容易,但仅适用于Unix(包括Cygwin)和Python2.7。

import commandsprint commands.getstatusoutput('wc -l file')

它返回一个带有(return_value,输出)的元组。

对于同时适用于Python2和Python3的解决方案,请改用subprocess模块:

from subprocess import Popen, PIPEoutput = Popen(["date"],stdout=PIPE)response = output.communicate()print response

Vartec的答案不会读取所有行,所以我做了一个版本:

def run_command(command):p = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)return iter(p.stdout.readline, b'')

用法与接受的答案相同:

command = 'mysqladmin create test -uroot -pmysqladmin12'.split()for line in run_command(command):print(line)

您的里程可能会有所不同,我在Python 2.6.5的Windows中尝试@senderle对Vartec的解决方案进行旋转,但我遇到错误,没有其他解决方案有效。我的错误是:WindowsError: [Error 6] The handle is invalid

我发现我必须将PIPE分配给每个句柄以使其返回我期望的输出-以下方法对我有用。

import subprocess
def run_command(cmd):"""given shell command, returns communication tuple of stdout and stderr"""return subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.PIPE).communicate()

并像这样调用,([0]获取元组的第一个元素,stdout):

run_command('tracert 11.1.0.1')[0]

了解更多之后,我相信我需要这些管道参数,因为我正在使用不同的句柄的自定义系统上工作,所以我必须直接控制所有的std。

要停止控制台弹出窗口(使用Windows),请执行以下操作:

def run_command(cmd):"""given shell command, returns communication tuple of stdout and stderr"""# instantiate a startupinfo obj:startupinfo = subprocess.STARTUPINFO()# set the use show window flag, might make conditional on being in Windows:startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW# pass as the startupinfo keyword argument:return subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.PIPE,startupinfo=startupinfo).communicate()
run_command('tracert 11.1.0.1')

如果您需要在多个文件上运行shell命令,这对我来说很有用。

import osimport subprocess
# Define a function for running commands and capturing stdout line by line# (Modified from Vartec's solution because it wasn't printing all lines)def runProcess(exe):p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)return iter(p.stdout.readline, b'')
# Get all filenames in working directoryfor filename in os.listdir('./'):# This command will be run on each filecmd = 'nm ' + filename
# Run the command and capture the output line by line.for line in runProcess(cmd.split()):# Eliminate leading and trailing whitespaceline.strip()# Split the outputoutput = line.split()
# Filter the output and print relevant linesif len(output) > 2:if ((output[2] == 'set_program_name')):print filenameprint line

编辑:刚刚看到马克斯·佩尔松的解决方案和J. F.塞巴斯蒂安的建议。继续前进并纳入其中。

这是一个棘手超级简单的解决方案,适用于许多情况:

import osos.system('sample_cmd > tmp')print(open('tmp', 'r').read())

使用命令的输出创建一个临时文件(这里是tmp),您可以从中读取所需的输出。

订阅关于评论的评论:一次性作业的情况下可以删除tmp文件。如果需要多次执行此操作,则无需删除tmp。

os.remove('tmp')

我有一个稍微不同的味道相同的问题与以下要求:

  1. 捕获并返回STDOUT消息,因为它们在STDOUT缓冲区中累积(即实时)。
    • @vartec通过使用生成器和“产量”以Python方式解决了这个问题
      上面的关键字
  2. 打印所有STDOUT行(即使进程在STDOUT缓冲区可以完全读取之前退出
  3. 不要浪费CPU周期以高频轮询进程
  4. 检查子进程的返回代码
  5. 如果我们得到非零错误返回码,则打印STDERR(与STDOUT分开)。

我结合并调整了之前的答案,得出以下结论:

import subprocessfrom time import sleep
def run_command(command):p = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)# Read stdout from subprocess until the buffer is empty !for line in iter(p.stdout.readline, b''):if line: # Don't print blank linesyield line# This ensures the process has completed, AND sets the 'returncode' attrwhile p.poll() is None:sleep(.1) #Don't waste CPU-cycles# Empty STDERR buffererr = p.stderr.read()if p.returncode != 0:# The run_command() function is responsible for logging STDERRprint("Error: " + str(err))

此代码将与之前的答案相同地执行:

for line in run_command(cmd):print(line)

我有同样的问题,但想出了一个非常简单的方法:

import subprocessoutput = subprocess.getoutput("ls -l")print(output)

希望能帮上忙

注意:此解决方案是Python3特定的,因为subprocess.getoutput()在Python2中不起作用

您可以使用以下命令来运行任何shell命令。我在ubuntu上使用过它们。

import osos.popen('your command here').read()

备注:自python 2.6以来已弃用。现在您必须使用subprocess.Popen。下面是示例

import subprocess
p = subprocess.Popen("Your command", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]print p.split("\n")

例如,执行('ls-ahl')区分三种/四种可能的回报和操作系统平台:

  1. 没有输出,但运行成功
  2. 输出空行,运行成功
  3. 运行失败
  4. 输出点东西,运行成功

下面的功能

def execute(cmd, output=True, DEBUG_MODE=False):"""Executes a bash command.(cmd, output=True)output: whether print shell output to screen, only affects screen display, does not affect returned valuesreturn: ...regardless of output=True/False...returns shell output as a list with each elment is a line of string (whitespace stripped both sides) from outputcould be[], ie, len()=0 --> no output;[''] --> output empty line;None --> error occured, see below
if error ocurs, returns None (ie, is None), print out the error message to screen"""if not DEBUG_MODE:print "Command: " + cmd
# https://stackoverflow.com/a/40139101/2292993def _execute_cmd(cmd):if os.name == 'nt' or platform.system() == 'Windows':# set stdin, out, err all to PIPE to get results (other than None) after run the Popen() instancep = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)else:# Use bash; the default is shp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable="/bin/bash")
# the Popen() instance starts running once instantiated (??)# additionally, communicate(), or poll() and wait process to terminate# communicate() accepts optional input as stdin to the pipe (requires setting stdin=subprocess.PIPE above), return out, err as tuple# if communicate(), the results are buffered in memory
# Read stdout from subprocess until the buffer is empty !# if error occurs, the stdout is '', which means the below loop is essentially skipped# A prefix of 'b' or 'B' is ignored in Python 2;# it indicates that the literal should become a bytes literal in Python 3# (e.g. when code is automatically converted with 2to3).# return iter(p.stdout.readline, b'')for line in iter(p.stdout.readline, b''):# # Windows has \r\n, Unix has \n, Old mac has \r# if line not in ['','\n','\r','\r\n']: # Don't print blank linesyield linewhile p.poll() is None:sleep(.1) #Don't waste CPU-cycles# Empty STDERR buffererr = p.stderr.read()if p.returncode != 0:# responsible for logging STDERRprint("Error: " + str(err))yield None
out = []for line in _execute_cmd(cmd):# error did not occur earlierif line is not None:# trailing comma to avoid a newline (by print itself) being printedif output: print line,out.append(line.strip())else:# error occured earlierout = Nonereturn outelse:print "Simulation! The command is " + cmdprint ""

拆分subprocess的初始命令可能很棘手和麻烦。

使用shlex.split()来帮助自己。

示例命令

git log -n 5 --since "5 years ago" --until "2 year ago"

该代码

from subprocess import check_outputfrom shlex import split
res = check_output(split('git log -n 5 --since "5 years ago" --until "2 year ago"'))print(res)>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

如果没有shlex.split(),代码将如下所示

res = check_output(['git','log','-n','5','--since','5 years ago','--until','2 year ago'])print(res)>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

根据@senderle,如果你像我一样使用python3.6:

def sh(cmd, input=""):rst = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=input.encode("utf-8"))assert rst.returncode == 0, rst.stderr.decode("utf-8")return rst.stdout.decode("utf-8")
sh("ls -a")

将完全像您在bash中运行命令一样

如果您使用subprocess python模块,您可以单独处理STDOUT、STDERR和返回命令代码。您可以看到完整命令调用者实现的示例。当然,如果您愿意,您可以使用try..except扩展它。

下面的函数返回STDOUT、STDERR和返回代码,以便您可以在其他脚本中处理它们。

import subprocess
def command_caller(command=None)sp = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=False)out, err = sp.communicate()if sp.returncode:print("Return code: %(ret_code)s Error message: %(err_msg)s"% {"ret_code": sp.returncode, "err_msg": err})return sp.returncode, out, err

输出可以重定向到文本文件,然后读回。

import subprocessimport osimport tempfile
def execute_to_file(command):"""This function execute the commandand pass its output to a tempfile then read it backIt is usefull for process that deploy child process"""temp_file = tempfile.NamedTemporaryFile(delete=False)temp_file.close()path = temp_file.namecommand = command + " > " + pathproc = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)if proc.stderr:# if command failed returnos.unlink(path)returnwith open(path, 'r') as f:data = f.read()os.unlink(path)return data
if __name__ == "__main__":path = "Somepath"command = 'ecls.exe /files ' + pathprint(execute(command))

这里有一个解决方案,如果您想在进程运行或不运行时打印输出,则可以使用。


我还添加了当前工作目录,它对我不止一次有用。


希望解决方案能帮助某人:)。

import subprocess
def run_command(cmd_and_args, print_constantly=False, cwd=None):"""Runs a system command.
:param cmd_and_args: the command to run with or without a Pipe (|).:param print_constantly: If True then the output is logged in continuous until the command ended.:param cwd: the current working directory (the directory from which you will like to execute the command):return: - a tuple containing the return code, the stdout and the stderr of the command"""output = []
process = subprocess.Popen(cmd_and_args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
while True:next_line = process.stdout.readline()if next_line:output.append(str(next_line))if print_constantly:print(next_line)elif not process.poll():break
error = process.communicate()[1]
return process.returncode, '\n'.join(output), error

我想建议simppl作为考虑的选项。它是一个可以通过pypi:pip install simppl获得的模块,并在python3上运行。

#0允许用户运行shell命令并从屏幕读取输出。

开发人员建议三种类型的用例:

  1. 最简单的用法看起来像这样:
    from simppl.simple_pipeline import SimplePipelinesp = SimplePipeline(start=0, end=100):sp.print_and_run('<YOUR_FIRST_OS_COMMAND>')sp.print_and_run('<YOUR_SECOND_OS_COMMAND>') ```

  1. 要同时运行多个命令,请使用:
    commands = ['<YOUR_FIRST_OS_COMMAND>', '<YOUR_SECOND_OS_COMMAND>']max_number_of_processes = 4sp.run_parallel(commands, max_number_of_processes) ```

  1. 最后,如果您的项目使用cli模块,您可以直接运行另一个command_line_tool作为管道的一部分从相同的进程运行,但它将从日志中显示为管道中的另一个命令。这可以更顺畅地调试和重构调用其他工具的工具。
    from example_module import example_toolsp.print_and_run_clt(example_tool.run, ['first_number', 'second_nmber'],{'-key1': 'val1', '-key2': 'val2'},{'--flag'}) ```

请注意,打印到STDOUT/STDERR是通过python的logging模块。


以下是展示simppl工作原理的完整代码:

import loggingfrom logging.config import dictConfig
logging_config = dict(version = 1,formatters = {'f': {'format':'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'}},handlers = {'h': {'class': 'logging.StreamHandler','formatter': 'f','level': logging.DEBUG}},root = {'handlers': ['h'],'level': logging.DEBUG,},)dictConfig(logging_config)
from simppl.simple_pipeline import SimplePipelinesp = SimplePipeline(0, 100)sp.print_and_run('ls')

python3.7+上,使用#0并传递capture_output=True

import subprocessresult = subprocess.run(['echo', 'hello', 'world'], capture_output=True)print(repr(result.stdout))

这将返回字节:

b'hello world\n'

如果您希望它将字节转换为字符串,请添加text=True

result = subprocess.run(['echo', 'hello', 'world'], capture_output=True, text=True)print(repr(result.stdout))

这将使用您的默认编码读取字节:

'hello world\n'

如果您需要手动指定不同的编码,请使用encoding="your encoding"而不是text=True

result = subprocess.run(['echo', 'hello', 'world'], capture_output=True, encoding="utf8")print(repr(result.stdout))

这是一个简单灵活的解决方案,适用于各种操作系统版本,以及Python 2和3,在shell模式下使用IPython:

from IPython.terminal.embed import InteractiveShellEmbedmy_shell = InteractiveShellEmbed()result = my_shell.getoutput("echo hello world")print(result)
Out: ['hello world']

它有几个优点

  1. 它只需要IPython安装,所以你在使用它时真的不需要担心你的特定Python或OS版本,它带有Jupyter-它有广泛的支持
  2. 默认情况下它需要一个简单的字符串-所以不需要使用shell模式arg或字符串拆分,使其在IMO中稍微清晰
  3. 它还可以更轻松地在字符串本身中替换变量甚至整个Python命令

演示:

var = "hello world "result = my_shell.getoutput("echo {var*2}")print(result)
Out: ['hello world hello world']

只是想给你一个额外的选择,特别是如果你已经安装了Jupyter

当然,如果您使用的是实际的Jupyter笔记本而不是. py脚本,您也可以始终执行以下操作:

result = !echo hello worldprint(result)

为了达到同样的目的。

出于某种原因,这个可以在Python 2.7上运行,你只需要导入os!

import os
def bash(command):output = os.popen(command).read()return output
print_me = bash('ls -l')print(print_me)

改进以更好地记录。
为了更好的输出,你可以使用迭代器。从下面开始,我们变得更好

from subprocess import Popen, getstatusoutput, PIPEdef shell_command(cmd):result = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
output = iter(result.stdout.readline, b'')error = iter(result.stderr.readline, b'')print("##### OutPut ###")for line in output:print(line.decode("utf-8"))print("###### Error ########")for line in error:print(error.decode("utf-8")) # Convert bytes to str
status, terminal_output = run_command(cmd)print(terminal_output)
shell_command("ls") # this will display all the files & folders in directory

其他使用getstatus输出的方法(容易理解)

from subprocess import Popen, getstatusoutput, PIPE
status_Code, output = getstausoutput(command)print(output) # this will give the terminal output
# status_code, output = getstatusoutput("ls") # this will print the all files & folder available in the directory