用Python发送100,000个HTTP请求的最快方法是什么?

我正在打开一个有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我使用的是Python 2.6,到目前为止,我看到了Python实现线程/并发的许多令人困惑的方式。我甚至看过python 赞同库,但不知道如何正确地编写这个程序。有人遇到过类似的问题吗?我想我通常需要知道如何尽可能快地在Python中执行数千个任务——我想这意味着“并发”。

302988 次浏览

最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程他们有问题(如序列化),但足够好。你需要一个队列&线程池。其中一个选项是在这里,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。

使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/ < / p >

来自他们网站的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这能有所帮助。

对于您的情况,线程可能会做的技巧,因为您可能会花费大部分时间等待响应。标准库中有一些有用的模块,如队列,可能会有所帮助。

我以前做过类似的并行下载文件的事情,对我来说已经足够好了,但它不是你所说的那种规模。

如果你的任务对cpu的限制更大,你可能想要看看多处理模块,它将允许你利用更多的cpu /核/线程(更多的进程不会相互阻塞,因为锁定是每个进程)

解决这个问题的一个好方法是首先编写获得一个结果所需的代码,然后合并线程代码来并行化应用程序。

在理想情况下,这仅仅意味着同时启动100,000个线程,这些线程将结果输出到字典或列表中以供后续处理,但实际上,以这种方式可以发出的并行HTTP请求的数量是有限的。在本地,你可以限制并发打开多少套接字,你的Python解释器允许执行多少线程。在远程,如果所有请求都针对一台或多个服务器,那么同时连接的数量可能会受到限制。这些限制可能需要您以这样一种方式编写脚本,即在任何时候只轮询一小部分url(正如另一位帖子所提到的,100可能是一个不错的线程池大小,尽管您可能会发现您可以成功部署更多的线程池)。

您可以遵循以下设计模式来解决上述问题:

  1. 启动一个线程,启动新的请求线程,直到当前运行的线程数(您可以通过threading.active_count()或将线程对象推入数据结构来跟踪它们)为>=同时发生的最大请求数(例如100),然后休眠一小段超时。当没有更多url要处理时,该线程应该终止。因此,线程将不断被唤醒,启动新的线程,然后休眠,直到完成。
  2. 让请求线程将结果存储在某个数据结构中,以便以后检索和输出。如果你存储结果的结构是CPython中的listdict,你可以安全地从线程中添加或插入唯一的项目,而不需要锁,但如果你写入文件或在更复杂的跨线程数据交互中需要您应该使用互斥锁来保护该状态不受损坏

我建议你使用线程模块。您可以使用它来启动和跟踪正在运行的线程。Python的线程支持是完全的,但是对问题的描述表明它完全满足了您的需求。

最后,如果你想看一个用Python编写的并行网络应用程序的非常简单的应用程序,请查看ssh.py。它是一个小型库,使用Python线程并行处理许多SSH连接。该设计非常接近您的需求,您可能会发现它是一个很好的资源。

如果您希望获得尽可能好的性能,您可能会考虑使用异步I/O而不是线程。与成千上万个操作系统线程相关的开销是不小的,Python解释器内的上下文切换甚至增加了更多的开销。线程当然可以完成工作,但我怀疑异步路由将提供更好的整体性能。

具体来说,我建议使用Twisted库(http://www.twistedmatrix.com)中的异步web客户端。它有一个公认的陡峭的学习曲线,但一旦你很好地掌握了Twisted的异步编程风格,它就很容易使用。

Twisted的异步web客户端API的HowTo可以在以下地址找到:

http://twistedmatrix.com/documents/current/web/howto/client.html

线程绝对不是这里的答案。它们将提供进程和内核瓶颈,以及吞吐量限制,如果总体目标是“最快的方式”,这些限制是不可接受的。

一点点twisted和它的异步HTTP客户端会给你更好的结果。

考虑使用风车,尽管Windmill可能无法执行那么多线程。

您可以在5台机器上使用手卷Python脚本,每台机器使用端口40000-60000连接出站,打开100,000个端口连接。

此外,使用线程良好的QA应用程序(如OpenSTA)做一个示例测试可能会有所帮助,以了解每个服务器可以处理多少。

另外,试着在LWP::ConnCache类中使用简单的Perl。这样您可能会获得更好的性能(更多的连接)。

一个解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools




concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)


def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status


def processResponse(response,url):
print response, url
processedOne()


def processError(error,url):
print "error", url#, error
processedOne()


def processedOne():
if finished.next()==added:
reactor.stop()


def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)


added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())


try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null


real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

Twistedless解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue


concurrent = 200


def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()


def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl


def doSomethingWithResult(status, url):
print status, url


q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)

这个方案比twisted方案稍微快一点,并且使用更少的CPU。

使用grequests,它是请求+ Gevent模块的组合。

GRequests允许您使用带有Gevent的Requests来轻松地生成异步HTTP请求。

用法很简单:

import grequests


urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

这个扭曲的异步web客户端运行得相当快。

#!/usr/bin/python2.7


from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput


pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}


def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]


@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()




dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())


reactor.run()
pprint(codes)

解决方案使用龙卷风异步网络库

from tornado import ioloop, httpclient


i = 0


def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()


http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

这段代码使用非阻塞网络I/O,没有任何限制。它可以扩展到数万个打开的连接。它将在单个线程中运行,但比任何线程解决方案都要快。结帐非阻塞I / O

自从2010年这篇文章发布以来,事情发生了很大的变化,我还没有尝试过所有其他的答案,但我尝试了一些,我发现使用python3.6对我来说这是最好的。

在AWS上运行时,我每秒可以获取大约150个独特的域名。

import concurrent.futures
import requests
import time


out = []
CONNECTIONS = 100
TIMEOUT = 5


tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]


def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code


with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)


print(str(len(out)),end="\r")


time2 = time.time()


print(f'Took {time2-time1:.2f} s')

创建epoll对象,
打开许多客户端TCP套接字,
调整它们的发送缓冲区,使其略大于请求头,
发送一个请求头-它应该是即时的,只是放置到缓冲区, epoll对象中寄存器套接字,
do .poll on epoll obt,
.poll中读取每个套接字的前3个字节,
将它们写入sys.stdout,然后是\n(不刷新),

.关闭客户端套接字

限制同时打开的套接字数量-处理创建套接字时的错误。只有当另一个套接字被关闭时,才创建一个新的套接字 调整操作系统限制
尝试分成几个(不是很多)进程:这可能有助于更有效地使用CPU

我知道这是一个老问题,但在Python 3.7中,你可以使用asyncioaiohttp来做到这一点。

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError


async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)


async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)


for result in results:
print(f'{result[1]} - {str(result[0])}')


if __name__ == "__main__":
import pathlib
import sys


assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent


with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))


asyncio.run(make_requests(urls=urls))

你可以阅读更多关于它的内容,并看到一个例子在这里

我发现使用tornado包是实现这一目标的最快和最简单的方法:

from tornado import ioloop, httpclient, gen




def main(urls):
"""
Asynchronously download the HTML contents of a list of URLs.
:param urls: A list of URLs to download.
:return: List of response objects, one for each URL.
"""


@gen.coroutine
def fetch_and_handle():
httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
http_client = httpclient.AsyncHTTPClient()
waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
for url in urls])
results = []
# Wait for the jobs to complete
while not waiter.done():
try:
response = yield waiter.next()
except httpclient.HTTPError as e:
print(f'Non-200 HTTP response returned: {e}')
continue
except Exception as e:
print(f'An unexpected error occurred querying: {e}')
continue
else:
print(f'URL \'{response.request.url}\' has status code <{response.code}>')
results.append(response)
return results


loop = ioloop.IOLoop.current()
web_pages = loop.run_sync(fetch_and_handle)


return web_pages


my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])

<强>[工具]< / >强

Apache的长椅上是你所需要的。——一个命令行计算机程序(CLI),用于测量HTTP web服务器的性能

https://www.petefreitag.com/item/689.cfm (from 皮特Freitag)

(下一个项目的自我提示)

Python 3解决方案仅使用requests它是最简单和快速的,不需要多处理或复杂的异步库。

最重要的方面是重用连接,特别是对于HTTPS (TLS需要额外的往返才能打开)。注意,连接是特定于子域的。如果在多个域上抓取多个页面,则可以对url列表进行排序,以最大化连接重用(它有效地按域进行排序)。

当给定足够的线程时,它将与任何异步代码一样快。(请求在等待响应时释放python GIL)。

[带有日志记录和错误处理的生产等级代码]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


# source: https://stackoverflow.com/a/68583332/5994461


THREAD_POOL = 16


# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
max_retries=3,
pool_block=True)
)


def get(url):
response = session.get(url)
logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
if response.status_code != 200:
logging.error("request failed, error code %s [%s]", response.status_code, response.url)
if 500 <= response.status_code < 600:
# server is overloaded? give it a break
time.sleep(5)
return response


def download(urls):
with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
# wrap in a list() to wait for all requests to complete
for response in list(executor.map(get, urls)):
if response.status_code == 200:
print(response.content)


def main():
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)


urls = [
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/200",
"https://httpstat.us/404",
"https://httpstat.us/503"
]


download(urls)


if __name__ == "__main__":
main()
pip install requests-threads

使用实例使用async/await - send 100个并发请求

from requests_threads import AsyncSession


session = AsyncSession(n=100)


async def _main():
rs = []
for _ in range(100):
rs.append(await session.get('http://httpbin.org/get'))
print(rs)


if __name__ == '__main__':
session.run(_main)

此示例仅适用于Python 3。您还可以提供自己的asyncio事件循环!

使用实例Twisted

from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession


session = AsyncSession(n=100)


@inlineCallbacks
def main(reactor):
responses = []
for i in range(100):
responses.append(session.get('http://httpbin.org/get'))


for response in responses:
r = yield response
print(r)


if __name__ == '__main__':
react(main)

这个例子在Python 2和Python 3上都可以运行。

也许这对我的回购有帮助,一个基本的例子, 用python编写快速异步HTTP请求 < / p >

下面是一个不使用asyncio“async"解决方案,但底层机制asyncio使用(在Linux上):select()。(或者asyncio可能使用pollepoll,但这是类似的原则。)

它是来自PyCurl的例子的一个轻微修改版本。

(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)

(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while urls and handles更改为while handles,将while nprocessed<nurls更改为while 1。)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info


NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over


# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')


# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
c = pycurl.Curl()
c.setopt(pycurl.FOLLOWLOCATION,  1)
c.setopt(pycurl.MAXREDIRS,       5)
c.setopt(pycurl.CONNECTTIMEOUT,  30)
c.setopt(pycurl.TIMEOUT,         300)
c.setopt(pycurl.NOSIGNAL,        1)
m.handles.append(c)


handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:


while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
url   = urls.pop(0)
c     = handles.pop()
c.buf = io.BytesIO()
c.url = url  # store some info
c.t0  = time.perf_counter()
c.setopt(pycurl.URL,        c.url)
c.setopt(pycurl.WRITEDATA,  c.buf)
c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
m.add_handle(c)


while 1:  # Run the internal curl state machine for the multi stack
ret, num_handles = m.perform()
if ret!=pycurl.E_CALL_MULTI_PERFORM:  break


while 1:  # Check for curl objects which have terminated, and add them to the handles
nq, ok_list, ko_list = m.info_read()
for c in ok_list:
m.remove_handle(c)
t1 = time.perf_counter()
reply = gzip.decompress(c.buf.getvalue())
print(f'\x1b[33mGET  \x1b[32m{t1-c.t0:.3f}  \x1b[37m{len(reply):9,}  \x1b[0m{reply[:32]}...')  # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
handles.append(c)
for c, errno, errmsg in ko_list:
m.remove_handle(c)
print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
handles.append(c)
nprocessed = nprocessed + len(ok_list) + len(ko_list)
if nq==0: break


m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.


for c in m.handles:
c.close()
m.close()

Scrapy框架将快速和专业地解决您的问题。它还将缓存所有请求,以便稍后可以重新运行失败的请求只有

将此脚本保存为quotes_spider.py

# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field


class TextCleaningPipeline(object):
def _clean_text(self, text):
text = text.replace('“', '').replace('”', '')
table = str.maketrans({key: None for key in string.punctuation})
clean_text = text.translate(table)
return clean_text.lower()


def process_item(self, item, spider):
item['text'] = self._clean_text(item['text'])
return item


class JsonWriterPipeline(object):
def open_spider(self, spider):
self.file = open(spider.settings['JSON_FILE'], 'a')


def close_spider(self, spider):
self.file.close()


def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item


class QuoteItem(Item):
text = Field()
author = Field()
tags = Field()
spider = Field()


class QuoteSpider(scrapy.Spider):
name = "quotes"


def start_requests(self):
urls = [
'http://quotes.toscrape.com/page/1/',
'http://quotes.toscrape.com/page/2/',
# ...
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)


def parse(self, response):
for quote in response.css('div.quote'):
item = QuoteItem()
item['text'] = quote.css('span.text::text').get()
item['author'] = quote.css('small.author::text').get()
item['tags'] = quote.css('div.tags a.tag::text').getall()
item['spider'] = self.name
yield item


if __name__ == '__main__':
settings = dict()
settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
settings['HTTPCACHE_ENABLED'] = True
settings['CONCURRENT_REQUESTS'] = 20
settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
settings['JSON_FILE'] = 'items.jl'
settings['ITEM_PIPELINES'] = dict()
settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801


process = CrawlerProcess(settings=settings)
process.crawl(QuoteSpider)
process.start()


紧随其后的是

$ pip install Scrapy
$ python quote_spiders.py

为了微调刮刀,相应地调整CONCURRENT_REQUESTSCONCURRENT_REQUESTS_PER_DOMAIN设置。