Python 请求和持久会话

我正在使用 requests模块。 我已经知道如何将数据提交到网站上的登录表单并检索会话密钥,但是我看不出在后续请求中使用这个会话密钥的明显方法。 有没有人能在下面的代码中填写省略号,或者提出另一种方法?

>>> import requests
>>> login_data =  {'formPosted': '1', 'login_email': 'me@example.com', 'password': 'pw'}
>>> r = requests.post('https://localhost/login.py', login_data)
>>>
>>> r.text
'You are being redirected <a href="profilePage?_ck=1349394964">here</a>'
>>> r.cookies
{'session_id_myapp': '127-0-0-1-825ff22a-6ed1-453b-aebc-5d3cf2987065'}
>>>
>>> r2 = requests.get('https://localhost/profile_data.json', ...)
394199 次浏览

文档说,get接受一个可选的 cookies参数,允许您指定要使用的 cookie:

来自文件:

>>> url = 'http://httpbin.org/cookies'
>>> cookies = dict(cookies_are='working')


>>> r = requests.get(url, cookies=cookies)
>>> r.text
'{"cookies": {"cookies_are": "working"}}'

Http://docs.python-requests.org/en/latest/user/quickstart/#cookies

您可以使用以下方法轻松创建持久性会话:

s = requests.Session()

然后,继续你的请求:

s.post('https://localhost/login.py', login_data)
# logged in! cookies saved for future requests.
r2 = s.get('https://localhost/profile_data.json', ...)
# cookies sent automatically!
# do whatever, s will keep your cookies intact :)

了解更多关于 Sessions 的信息: https://requests.readthedocs.io/en/latest/user/advanced/#session-objects

看看我对这个类似问题的回答:

Python: urllib2如何使用 urlopen 请求发送 cookie

import urllib2
import urllib
from cookielib import CookieJar


cj = CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
# input-type values from the html form
formdata = { "username" : username, "password": password, "form-id" : "1234" }
data_encoded = urllib.urlencode(formdata)
response = opener.open("https://page.com/login.php", data_encoded)
content = response.read()

编辑:

我知道我的回答得到了一些反对票,但是没有解释性的评论。我猜这是因为我引用的是 urllib库而不是 requests库。我这样做是因为 OP 请求对 requests的帮助,或者请某人提出另一种方法。

其他答案有助于理解如何维护这样的会话。此外,我还想提供一个类,它可以在脚本的不同运行期间(使用缓存文件)维护会话。这意味着只有在需要时才执行适当的“登录”(超时或缓存中不存在会话)。此外,它还支持后续调用‘ get’或‘ post’时的代理设置。

它是用 Python 3测试的。

将它作为您自己代码的基础

import pickle
import datetime
import os
from urllib.parse import urlparse
import requests


class MyLoginSession:
"""
a class which handles and saves login sessions. It also keeps track of proxy settings.
It does also maintine a cache-file for restoring session data from earlier
script executions.
"""
def __init__(self,
loginUrl,
loginData,
loginTestUrl,
loginTestString,
sessionFileAppendix = '_session.dat',
maxSessionTimeSeconds = 30 * 60,
proxies = None,
userAgent = 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1',
debug = True,
forceLogin = False,
**kwargs):
"""
save some information needed to login the session


you'll have to provide 'loginTestString' which will be looked for in the
responses html to make sure, you've properly been logged in


'proxies' is of format { 'https' : 'https://user:pass@server:port', 'http' : ...
'loginData' will be sent as post data (dictionary of id : value).
'maxSessionTimeSeconds' will be used to determine when to re-login.
"""
urlData = urlparse(loginUrl)


self.proxies = proxies
self.loginData = loginData
self.loginUrl = loginUrl
self.loginTestUrl = loginTestUrl
self.maxSessionTime = maxSessionTimeSeconds
self.sessionFile = urlData.netloc + sessionFileAppendix
self.userAgent = userAgent
self.loginTestString = loginTestString
self.debug = debug


self.login(forceLogin, **kwargs)


def modification_date(self, filename):
"""
return last file modification date as datetime object
"""
t = os.path.getmtime(filename)
return datetime.datetime.fromtimestamp(t)


def login(self, forceLogin = False, **kwargs):
"""
login to a session. Try to read last saved session from cache file. If this fails
do proper login. If the last cache access was too old, also perform a proper login.
Always updates session cache file.
"""
wasReadFromCache = False
if self.debug:
print('loading or generating session...')
if os.path.exists(self.sessionFile) and not forceLogin:
time = self.modification_date(self.sessionFile)


# only load if file less than 30 minutes old
lastModification = (datetime.datetime.now() - time).seconds
if lastModification < self.maxSessionTime:
with open(self.sessionFile, "rb") as f:
self.session = pickle.load(f)
wasReadFromCache = True
if self.debug:
print("loaded session from cache (last access %ds ago) "
% lastModification)
if not wasReadFromCache:
self.session = requests.Session()
self.session.headers.update({'user-agent' : self.userAgent})
res = self.session.post(self.loginUrl, data = self.loginData,
proxies = self.proxies, **kwargs)


if self.debug:
print('created new session with login' )
self.saveSessionToCache()


# test login
res = self.session.get(self.loginTestUrl)
if res.text.lower().find(self.loginTestString.lower()) < 0:
raise Exception("could not log into provided site '%s'"
" (did not find successful login string)"
% self.loginUrl)


def saveSessionToCache(self):
"""
save session to a cache file
"""
# always save (to update timeout)
with open(self.sessionFile, "wb") as f:
pickle.dump(self.session, f)
if self.debug:
print('updated session cache-file %s' % self.sessionFile)


def retrieveContent(self, url, method = "get", postData = None, **kwargs):
"""
return the content of the url with respect to the session.


If 'method' is not 'get', the url will be called with 'postData'
as a post request.
"""
if method == 'get':
res = self.session.get(url , proxies = self.proxies, **kwargs)
else:
res = self.session.post(url , data = postData, proxies = self.proxies, **kwargs)


# the session has been updated on the server, so also update in cache
self.saveSessionToCache()


return res

使用上述类的代码片段如下所示:

if __name__ == "__main__":
# proxies = {'https' : 'https://user:pass@server:port',
#           'http' : 'http://user:pass@server:port'}


loginData = {'user' : 'usr',
'password' :  'pwd'}


loginUrl = 'https://...'
loginTestUrl = 'https://...'
successStr = 'Hello Tom'
s = MyLoginSession(loginUrl, loginData, loginTestUrl, successStr,
#proxies = proxies
)


res = s.retrieveContent('https://....')
print(res.text)


# if, for instance, login via JSON values required try this:
s = MyLoginSession(loginUrl, None, loginTestUrl, successStr,
#proxies = proxies,
json = loginData)

在尝试了以上所有答案之后,我发现在后续请求中使用“ RequestsCookieJar”而不是常规的 CookieJar 解决了我的问题。

import requests
import json


# The Login URL
authUrl = 'https://whatever.com/login'


# The subsequent URL
testUrl = 'https://whatever.com/someEndpoint'


# Logout URL
testlogoutUrl = 'https://whatever.com/logout'


# Whatever you are posting
login_data =  {'formPosted':'1',
'login_email':'me@example.com',
'password':'pw'
}


# The Authentication token or any other data that we will receive from the Authentication Request.
token = ''


# Post the login Request
loginRequest = requests.post(authUrl, login_data)
print("{}".format(loginRequest.text))


# Save the request content to your variable. In this case I needed a field called token.
token = str(json.loads(loginRequest.content)['token'])  # or ['access_token']
print("{}".format(token))


# Verify Successful login
print("{}".format(loginRequest.status_code))


# Create your Requests Cookie Jar for your subsequent requests and add the cookie
jar = requests.cookies.RequestsCookieJar()
jar.set('LWSSO_COOKIE_KEY', token)


# Execute your next request(s) with the Request Cookie Jar set
r = requests.get(testUrl, cookies=jar)
print("R.TEXT: {}".format(r.text))
print("R.STCD: {}".format(r.status_code))


# Execute your logout request(s) with the Request Cookie Jar set
r = requests.delete(testlogoutUrl, cookies=jar)
print("R.TEXT: {}".format(r.text))  # should show "Request Not Authorized"
print("R.STCD: {}".format(r.status_code))  # should show 401

检索 json 数据的代码片段,密码受保护

import requests


username = "my_user_name"
password = "my_super_secret"
url = "https://www.my_base_url.com"
the_page_i_want = "/my_json_data_page"


session = requests.Session()
# retrieve cookie value
resp = session.get(url+'/login')
csrf_token = resp.cookies['csrftoken']
# login, add referer
resp = session.post(url+"/login",
data={
'username': username,
'password': password,
'csrfmiddlewaretoken': csrf_token,
'next': the_page_i_want,
},
headers=dict(Referer=url+"/login"))
print(resp.json())

这将在 Python 中为您工作;

# Call JIRA API with HTTPBasicAuth
import json
import requests
from requests.auth import HTTPBasicAuth


JIRA_EMAIL = "****"
JIRA_TOKEN = "****"
BASE_URL = "https://****.atlassian.net"
API_URL = "/rest/api/3/serverInfo"


API_URL = BASE_URL+API_URL


BASIC_AUTH = HTTPBasicAuth(JIRA_EMAIL, JIRA_TOKEN)
HEADERS = {'Content-Type' : 'application/json;charset=iso-8859-1'}


response = requests.get(
API_URL,
headers=HEADERS,
auth=BASIC_AUTH
)


print(json.dumps(json.loads(response.text), sort_keys=True, indent=4, separators=(",", ": ")))

只保存必需的 cookie 并重用它们。

import os
import pickle
from urllib.parse import urljoin, urlparse


login = 'my@email.com'
password = 'secret'
# Assuming two cookies are used for persistent login.
# (Find it by tracing the login process)
persistentCookieNames = ['sessionId', 'profileId']
URL = 'http://example.com'
urlData = urlparse(URL)
cookieFile = urlData.netloc + '.cookie'
signinUrl = urljoin(URL, "/signin")
with requests.Session() as session:
try:
with open(cookieFile, 'rb') as f:
print("Loading cookies...")
session.cookies.update(pickle.load(f))
except Exception:
# If could not load cookies from file, get the new ones by login in
print("Login in...")
post = session.post(
signinUrl,
data={
'email': login,
'password': password,
}
)
try:
with open(cookieFile, 'wb') as f:
jar = requests.cookies.RequestsCookieJar()
for cookie in session.cookies:
if cookie.name in persistentCookieNames:
jar.set_cookie(cookie)
pickle.dump(jar, f)
except Exception as e:
os.remove(cookieFile)
raise(e)
MyPage = urljoin(URL, "/mypage")
page = session.get(MyPage)