使用Python请求的异步请求

我尝试了python的请求图书馆文档中提供的示例。

使用async.map(rs),我得到了响应代码,但我想获得所请求的每个页面的内容。例如,这是行不通的:

out = async.map(rs)
print out[0].content
456083 次浏览

我也尝试过使用python中的异步方法做一些事情,然而我使用twisted进行异步编程的运气要好得多。它的问题较少,并且有良好的文档记录。这里有一个类似于你在twisted中尝试的东西的链接。

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html

请注意

下面的答案是,适用于请求v0.13.0+。在写完这个问题后,异步功能被移到了grequests。然而,你可以用下面的grequests替换requests,它应该可以工作。

我把这个答案保留下来,以反映最初的问题,即使用请求<v0.13.0。


要使用async.map 异步执行多个任务,您必须:

  1. 为每个对象(任务)定义一个函数
  2. 将该函数作为事件钩子添加到请求中
  3. 在所有请求/动作的列表上调用async.map

例子:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async


urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]


# A simple task to do to each response object
def do_something(response):
print response.url


# A list to hold our things to do via async
async_list = []


for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})


# Add the task to our list of things to do via async
async_list.append(action_item)


# Do our list of things to do via async
async.map(async_list)

我已经使用python请求异步调用github的gist API有一段时间了。

举个例子,请看下面的代码:

https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72

这种风格的python可能不是最清晰的例子,但我可以向您保证代码是有效的。如果这让你感到困惑,请告诉我,我会记录下来。

async现在是一个独立模块:grequests

看这里:https://github.com/kennethreitz/grequests

还有通过Python发送多个HTTP请求的理想方法?

安装:

$ pip install grequests

用法:

建立一个堆栈:

import grequests


urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]


rs = (grequests.get(u) for u in urls)

发送堆栈

grequests.map(rs)

结果如下所示

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequest似乎没有设置并发请求的限制,即当多个请求被发送到同一个服务器时。

from threading import Thread


threads=list()


for requestURI in requests:
t = Thread(target=self.openURL, args=(requestURI,))
t.start()
threads.append(t)


for thread in threads:
thread.join()


...


def openURL(self, requestURI):
o = urllib2.urlopen(requestURI, timeout = 600)
o...

我知道这已经关闭了一段时间,但我认为推广另一种基于请求库的异步解决方案可能是有用的。

list_of_requests = ['http://moop.com', 'http://doop.com', ...]


from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
print response.content

文档在这里:http://pythonhosted.org/simple-requests/

也许requests-futures是另一个选择。

from requests_futures.sessions import FuturesSession


session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

办公文件中也推荐使用。如果你不想卷入gevent,这是一个不错的选择。

我测试了requests-futuresgrequests。Grequests速度更快,但会带来猴子补丁和依赖关系的其他问题。请求-期货比请求慢几倍。我决定写我自己的请求,并简单地包装到ThreadPoolExecutor中,它几乎和grerequests一样快,但没有外部依赖。

import requests
import concurrent.futures


def get_urls():
return ["url1","url2"]


def load_url(url, timeout):
return requests.get(url, timeout = timeout)


with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:


future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
resp_err = resp_err + 1
else:
resp_ok = resp_ok + 1

如果你想使用asyncio,那么requests-asyncrequests - https://github.com/encode/requests-async提供了async/await功能

我对发布的大多数答案都有很多问题——他们要么使用了已弃用的库,这些库已经移植了有限的功能,要么提供了一个在执行请求时具有太多魔力的解决方案,使得错误处理变得困难。如果它们不属于上述类别之一,则它们是第三方库或已弃用。

有些解决方案完全适用于http请求,但解决方案不适用于任何其他类型的请求,这是可笑的。这里不需要高度定制的解决方案。

简单地使用python内置库asyncio就足以执行任何类型的异步请求,并为复杂的和特定于用例的错误处理提供足够的流动性。

import asyncio


loop = asyncio.get_event_loop()


def do_thing(params):
async def get_rpc_info_and_do_chores(id):
# do things
response = perform_grpc_call(id)
do_chores(response)


async def get_httpapi_info_and_do_chores(id):
# do things
response = requests.get(URL)
do_chores(response)


async_tasks = []
for element in list(params.list_of_things):
async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))


loop.run_until_complete(asyncio.gather(*async_tasks))

它的工作原理很简单。您正在创建一系列希望异步发生的任务,然后请求一个循环执行这些任务并在完成时退出。不需要维护额外的库,也不缺少所需的功能。

你可以使用httpx

import httpx


async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)


urls = ["http://google.com", "http://wikipedia.org"]


# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

如果你想要一个函数式语法,格拉库将其包装成get_async

然后你就可以


await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])

10是超时时间,以秒为单位。

(声明:我是作者)

不幸的是,据我所知,请求库不具备执行异步请求的能力。你可以将async/await语法包裹在requests周围,但这将使底层请求的同步程度不会降低。如果您想要真正的异步请求,则必须使用其他提供异步请求的工具。其中一个解决方案是aiohttp (Python 3.5.3+)。根据我在Python 3.7 async/await语法中使用它的经验,它工作得很好。下面我写了执行n个web请求的三个实现

  1. 使用Python requests库的纯同步请求(sync_requests_get_all)
  2. 使用Python 3.7 async/await语法和asyncio包装的Python requests库的同步请求(async_requests_get_all)
  3. 一个真正的异步实现(async_aiohttp_get_all), Python aiohttp库包装在Python 3.7 async/await语法和asyncio
"""
Tested in Python 3.5.10
"""


import time
import asyncio
import requests
import aiohttp


from asgiref import sync


def timed(func):
"""
records approximate durations of function calls
"""
def wrapper(*args, **kwargs):
start = time.time()
print('{name:<30} started'.format(name=func.__name__))
result = func(*args, **kwargs)
duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
name=func.__name__, elapsed=time.time() - start
)
print(duration)
timed.durations.append(duration)
return result
return wrapper


timed.durations = []




@timed
def sync_requests_get_all(urls):
"""
performs synchronous get requests
"""
# use session to reduce network overhead
session = requests.Session()
return [session.get(url).json() for url in urls]




@timed
def async_requests_get_all(urls):
"""
asynchronous wrapper around synchronous requests
"""
session = requests.Session()
# wrap requests.get into an async function
def get(url):
return session.get(url).json()
async_get = sync.sync_to_async(get)


async def get_all(urls):
return await asyncio.gather(*[
async_get(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)


@timed
def async_aiohttp_get_all(urls):
"""
performs asynchronous get requests
"""
async def get_all(urls):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[
fetch(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)




if __name__ == '__main__':
# this endpoint takes ~3 seconds to respond,
# so a purely synchronous implementation should take
# little more than 30 seconds and a purely asynchronous
# implementation should take little more than 3 seconds.
urls = ['https://postman-echo.com/delay/3']*10


async_aiohttp_get_all(urls)
async_requests_get_all(urls)
sync_requests_get_all(urls)
print('----------------------')
[print(duration) for duration in timed.durations]

在我的机器上,这是输出:

async_aiohttp_get_all          started
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         started
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          started
sync_requests_get_all          finished in 30.59 seconds
----------------------
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          finished in 30.59 seconds

DISCLAMER: Following code creates different threads for each function

这对于某些情况可能是有用的,因为它使用起来更简单。但要知道,它不是异步的,但使用多线程会给人一种异步的错觉,尽管decorator建议这样做。

可以使用以下装饰器在函数执行完成后给出回调,回调必须处理函数返回的数据。

请注意,在函数被修饰后,它将返回一个Future对象。

import asyncio


## Decorator implementation of async runner !!
def run_async(callback, loop=None):
if loop is None:
loop = asyncio.get_event_loop()


def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out


return loop.run_in_executor(None, __exec)


return wrapper


return inner

实现示例:

urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = []  # OPTIONAL, used for showing realtime, which urls are loaded !!




def _callback(resp):
print(resp.url)
print(resp)
loaded_urls.append((resp.url, resp))  # OPTIONAL, used for showing realtime, which urls are loaded !!




# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
return requests.get(url)




for url in urls:
get(url)

如果你想看到实时加载的url,你可以在最后添加以下代码:

while True:
print(loaded_urls)
if len(loaded_urls) == len(urls):
break

我第二上面的建议使用HTTPX,但我经常以不同的方式使用它,所以我添加了我的答案。

我个人使用asyncio.run (在Python 3.7中引入)而不是asyncio.gather,也更喜欢aiostream方法,它可以与asyncio和httpx结合使用。

就像我刚刚发布的这个例子一样,这种风格有助于异步处理一组url,即使(常见)发生错误。我特别喜欢这种风格如何阐明响应处理发生在哪里,以及如何简化错误处理(我发现异步调用倾向于提供更多的错误处理)。

发布一个简单的异步发出一堆请求的例子更容易,但通常您还想处理响应内容(用它计算一些东西,可能引用您请求的URL要处理的原始对象)。

这种方法的核心是:

async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs

地点:

  • process_thing是一个异步响应内容处理函数
  • things是输入列表(URL字符串的urls生成器来自于此),例如对象/字典列表
  • pbar是一个进度条(例如tqdm.tqdm)[可选但有用]

所有这些都放在异步函数async_fetch_urlset中,然后通过调用名为fetch_things的同步“顶级”函数来运行该函数,该函数运行协程[这是异步函数返回的内容]并管理事件循环:

def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

由于作为输入传递的列表(这里是things)可以就地修改,因此可以有效地返回输出(就像我们从同步函数调用中习惯的那样)

上面的答案都没有帮助我,因为他们假设你有一个预定义的请求列表,而在我的情况下,我需要能够侦听请求和异步响应(类似于它在nodejs中的工作方式)。

def handle_finished_request(r, **kwargs):
print(r)




# while True:
def main():
while True:
address = listen_to_new_msg()  # based on your server


# schedule async requests and run 'handle_finished_request' on response
req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
job = grequests.send(req)  # does not block! for more info see https://stackoverflow.com/a/16016635/10577976




main()

handle_finished_request回调函数将在收到响应时被调用。注意:由于某些原因,超时(或无响应)在这里不会触发错误

这个简单的循环可以触发异步请求,类似于它在nodejs服务器中的工作方式