requestsはコードを自動的に識別する方法
31157 ワード
そのため、爬虫類の群れはrequestsが結果をつかんで印刷するのが文字化けしてどのように解決するかを尋ねる人がいて、そこで他の人は公式のドキュメントに従って、
しかしrequestはどのように符号化方式を獲得しましたか?Python+Requests符号化問題とPython+Requests符号化識別Bugを検索すると、requestsには3つの符号化を取得する方法があり、
requests githubにおける関連議論
codecs
BOM
These constants define various encodings of the Unicode byte order mark (BOM) used in UTF-16 and UTF-32 data streams to indicate the byte order used in the stream or file and in UTF-8 as a Unicode signature.
ソースコードを表示し、requestsの3つの取得符号化方法の使用を整理しました. requestsが応答すると、
ここで定義された順序ですが、呼び出しの順序は最後から前を見ることをお勧めします.
r.encoding
でデフォルトの復号方式を見て、それからr.encoding = 'utf-8'
を通じて修正して、復号して印刷することに成功しました.しかしrequestはどのように符号化方式を獲得しましたか?Python+Requests符号化問題とPython+Requests符号化識別Bugを検索すると、requestsには3つの符号化を取得する方法があり、
get_encodings_from_content
は応答の内容から符号化を取得し、get_encoding_from_headers
はHTTP応答ヘッダから符号化を取得し、chardet.detect
はツールを使用して応答内容から自動的に検出する.get_encodings_from_content
utils.py
で定義され、正規表現により返信されたコンテンツから符号化方式が取得される.例えばHTML headのmetaから.def get_encodings_from_content(content):
charset_re = re.compile(r']', flags=re.I)
pragma_re = re.compile(r']', flags=re.I)
xml_re = re.compile(r'^]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
get_encoding_from_headers
utils.py
に定義され、RFC 2616によれば、HTTPヘッダContent-Type
のMIMEがtext/*であり、charsetが設定されていない場合、符号化方式はISO-8859-1
であると仮定する.charsetがあれば、符号化方式を直接取得します.requests githubにおける関連議論
def get_encoding_from_headers(headers):
"""Returns encodings from given HTTP Header Dict.
:param headers: dictionary to extract encoding from.
"""
content_type = headers.get('content-type')
if not content_type:
return None
content_type, params = cgi.parse_header(content_type)
if 'charset' in params:
return params['charset'].strip("'\"")
if 'text' in content_type:
return 'ISO-8859-1'
chardet.detect
requests.packages
パケット、chardet.__init__.py
、推測符号化def detect(aBuf):
if ((version_info < (3, 0) and isinstance(aBuf, unicode)) or
(version_info >= (3, 0) and not isinstance(aBuf, bytes))):
raise ValueError('Expected a bytes object, not a unicode object')
from . import universaldetector
u = universaldetector.UniversalDetector()
u.reset()
u.feed(aBuf)
u.close()
return u.result
universaldetector.py
、コード自動検出ツールですが、見てみると、検出機能は完全ではありません.ファイルコードを検出したいと思っていましたが、gbk
はサポートされていません.######################## BEGIN LICENSE BLOCK ########################
# The Original Code is Mozilla Universal charset detector code.
#
# The Initial Developer of the Original Code is
# Netscape Communications Corporation.
# Portions created by the Initial Developer are Copyright (C) 2001
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Pilgrim - port to Python
# Shy Shalom - original C code
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
from . import constants
import sys
import codecs
from .latin1prober import Latin1Prober # windows-1252
from .mbcsgroupprober import MBCSGroupProber # multi-byte character sets
from .sbcsgroupprober import SBCSGroupProber # single-byte character sets
from .escprober import EscCharSetProber # ISO-2122, etc.
import re
MINIMUM_THRESHOLD = 0.20
ePureAscii = 0
eEscAscii = 1
eHighbyte = 2
class UniversalDetector:
def __init__(self):
self._highBitDetector = re.compile(b'[\x80-\xFF]')
self._escDetector = re.compile(b'(\033|~{)')
self._mEscCharSetProber = None
self._mCharSetProbers = []
self.reset()
def reset(self):
self.result = {'encoding': None, 'confidence': 0.0}
self.done = False
self._mStart = True
self._mGotData = False
self._mInputState = ePureAscii
self._mLastChar = b''
if self._mEscCharSetProber:
self._mEscCharSetProber.reset()
for prober in self._mCharSetProbers:
prober.reset()
def feed(self, aBuf):
if self.done:
return
aLen = len(aBuf)
if not aLen:
return
if not self._mGotData:
# If the data starts with BOM, we know it is UTF
if aBuf[:3] == codecs.BOM_UTF8:
# EF BB BF UTF-8 with BOM
self.result = {'encoding': "UTF-8-SIG", 'confidence': 1.0}
elif aBuf[:4] == codecs.BOM_UTF32_LE:
# FF FE 00 00 UTF-32, little-endian BOM
self.result = {'encoding': "UTF-32LE", 'confidence': 1.0}
elif aBuf[:4] == codecs.BOM_UTF32_BE:
# 00 00 FE FF UTF-32, big-endian BOM
self.result = {'encoding': "UTF-32BE", 'confidence': 1.0}
elif aBuf[:4] == b'\xFE\xFF\x00\x00':
# FE FF 00 00 UCS-4, unusual octet order BOM (3412)
self.result = {
'encoding': "X-ISO-10646-UCS-4-3412",
'confidence': 1.0
}
elif aBuf[:4] == b'\x00\x00\xFF\xFE':
# 00 00 FF FE UCS-4, unusual octet order BOM (2143)
self.result = {
'encoding': "X-ISO-10646-UCS-4-2143",
'confidence': 1.0
}
elif aBuf[:2] == codecs.BOM_LE:
# FF FE UTF-16, little endian BOM
self.result = {'encoding': "UTF-16LE", 'confidence': 1.0}
elif aBuf[:2] == codecs.BOM_BE:
# FE FF UTF-16, big endian BOM
self.result = {'encoding': "UTF-16BE", 'confidence': 1.0}
self._mGotData = True
if self.result['encoding'] and (self.result['confidence'] > 0.0):
self.done = True
return
if self._mInputState == ePureAscii:
if self._highBitDetector.search(aBuf):
self._mInputState = eHighbyte
elif ((self._mInputState == ePureAscii) and
self._escDetector.search(self._mLastChar + aBuf)):
self._mInputState = eEscAscii
self._mLastChar = aBuf[-1:]
if self._mInputState == eEscAscii:
if not self._mEscCharSetProber:
self._mEscCharSetProber = EscCharSetProber()
if self._mEscCharSetProber.feed(aBuf) == constants.eFoundIt:
self.result = {'encoding': self._mEscCharSetProber.get_charset_name(),
'confidence': self._mEscCharSetProber.get_confidence()}
self.done = True
elif self._mInputState == eHighbyte:
if not self._mCharSetProbers:
self._mCharSetProbers = [MBCSGroupProber(), SBCSGroupProber(),
Latin1Prober()]
for prober in self._mCharSetProbers:
if prober.feed(aBuf) == constants.eFoundIt:
self.result = {'encoding': prober.get_charset_name(),
'confidence': prober.get_confidence()}
self.done = True
break
def close(self):
if self.done:
return
if not self._mGotData:
if constants._debug:
sys.stderr.write('no data received!
')
return
self.done = True
if self._mInputState == ePureAscii:
self.result = {'encoding': 'ascii', 'confidence': 1.0}
return self.result
if self._mInputState == eHighbyte:
proberConfidence = None
maxProberConfidence = 0.0
maxProber = None
for prober in self._mCharSetProbers:
if not prober:
continue
proberConfidence = prober.get_confidence()
if proberConfidence > maxProberConfidence:
maxProberConfidence = proberConfidence
maxProber = prober
if maxProber and (maxProberConfidence > MINIMUM_THRESHOLD):
self.result = {'encoding': maxProber.get_charset_name(),
'confidence': maxProber.get_confidence()}
return self.result
if constants._debug:
sys.stderr.write('no probers hit minimum threshhold
')
for prober in self._mCharSetProbers[0].mProbers:
if not prober:
continue
sys.stderr.write('%s confidence = %s
' %
(prober.get_charset_name(),
prober.get_confidence()))
codecs
BOM
These constants define various encodings of the Unicode byte order mark (BOM) used in UTF-16 and UTF-32 data streams to indicate the byte order used in the stream or file and in UTF-8 as a Unicode signature.
3つの方式の呼び出し
ソースコードを表示し、requestsの3つの取得符号化方法の使用を整理しました.
utils.py
で定義されたget_encoding_from_headers
を自動的に呼び出して符号化方式を取得する.r.text
メソッドを使用すると、encoding
が空であるかどうかをチェックします.空である場合、charset
を使用して自動的に検出されます.つまり、charset
は使用されません.常に設定されているため、デフォルトはISO-8859-1
です.utils.py
で定義されたget_encodings_from_content
ツールは、応答処理中には表示されません.手動で呼び出す方法
import requests
r = requests.get('https://www.python.org')
print r.encoding
# utf-8
print requests.utils.get_encodings_from_content(r.content)
# ['utf-8']
print r.apparent_encoding
# ISO-8859-2
r.text
を取得する前に、手動呼び出しの結果をr.encoding
に割り当てることができる.または、r.encoding = 'utf-8'
は、復号方式を直接設定する.requests符号化に関連する完全な呼び出しプロセス
ここで定義された順序ですが、呼び出しの順序は最後から前を見ることをお勧めします.
models.py
プライマリ・オブジェクトの定義#
from .compat import (chardet)
# Request
class Request(RequestHooksMixin):
"""A user-created :class:`Request ` object.
Used to prepare a :class:`PreparedRequest `, which is sent to the server.
Usage::
>>> import requests
>>> req = requests.Request('GET', 'http://httpbin.org/get')
>>> req.prepare()
"""
def __init__(self, method=None, url=None, headers=None, files=None,
data=None, params=None, auth=None, cookies=None, hooks=None, json=None):
# Default empty dicts for dict params.
data = [] if data is None else data
files = [] if files is None else files
headers = {} if headers is None else headers
params = {} if params is None else params
hooks = {} if hooks is None else hooks
self.hooks = default_hooks()
for (k, v) in list(hooks.items()):
self.register_hook(event=k, hook=v)
self.method = method
self.url = url
self.headers = headers
self.files = files
self.data = data
self.json = json
self.params = params
self.auth = auth
self.cookies = cookies
def __repr__(self):
return '' % (self.method)
def prepare(self):
"""Constructs a :class:`PreparedRequest ` for transmission and returns it."""
p = PreparedRequest()
p.prepare(
method=self.method,
url=self.url,
headers=self.headers,
files=self.files,
data=self.data,
json=self.json,
params=self.params,
auth=self.auth,
cookies=self.cookies,
hooks=self.hooks,
)
return p
# Response , `text``apparent_encoding`
class Response(object):
"""The :class:`Response ` object, which contains a
server's response to an HTTP request.
"""
__attrs__ = [
'_content', 'status_code', 'headers', 'url', 'history',
'encoding', 'reason', 'cookies', 'elapsed', 'request'
]
def __init__(self):
super(Response, self).__init__()
#: Encoding to decode with when accessing r.text.
self.encoding = None
@property
def apparent_encoding(self):
"""The apparent encoding, provided by the chardet library"""
return chardet.detect(self.content)['encoding']
@property
def text(self):
"""Content of the response, in unicode.
If Response.encoding is None, encoding will be guessed using
``chardet``.
The encoding of the response content is determined based solely on HTTP
headers, following RFC 2616 to the letter. If you can take advantage of
non-HTTP knowledge to make a better guess at the encoding, you should
set ``r.encoding`` appropriately before accessing this property.
"""
# Try charset from content-type
content = None
encoding = self.encoding
if not self.content:
return str('')
# Fallback to auto-detected encoding.
if self.encoding is None:
encoding = self.apparent_encoding
# Decode unicode from given encoding.
try:
content = str(self.content, encoding, errors='replace')
except (LookupError, TypeError):
# A LookupError is raised if the encoding was not found which could
# indicate a misspelling or similar mistake.
#
# A TypeError can be raised if encoding is None
#
# So we try blindly encoding.
content = str(self.content, errors='replace')
return content
utils.py
は、HTMLページから符号化を取得するget_encodings_from_content
、HTTPヘッダから符号化を取得するget_encoding_from_headers
を含むいくつかのツールを定義するdef get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r']', flags=re.I)
pragma_re = re.compile(r']', flags=re.I)
xml_re = re.compile(r'^]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_encoding_from_headers(headers):
"""Returns encodings from given HTTP Header Dict.
:param headers: dictionary to extract encoding from.
"""
content_type = headers.get('content-type')
if not content_type:
return None
content_type, params = cgi.parse_header(content_type)
if 'charset' in params:
return params['charset'].strip("'\"")
if 'text' in content_type:
return 'ISO-8859-1'
adapters.py
、requestを送信しresponseを構築するアダプタを定義するfrom .utils import (get_encoding_from_headers,)
class HTTPAdapter(BaseAdapter):
"""The built-in HTTP Adapter for urllib3.
Provides a general-case interface for Requests sessions to contact HTTP and
HTTPS urls by implementing the Transport Adapter interface. This class will
usually be created by the :class:`Session ` class under the
covers.
Usage::
>>> import requests
>>> s = requests.Session()
>>> a = requests.adapters.HTTPAdapter(max_retries=3)
>>> s.mount('http://', a)
"""
__attrs__ = ['max_retries', 'config', '_pool_connections', '_pool_maxsize',
'_pool_block']
# Response() response , `utils.py` `get_encoding_from_headers`
def build_response(self, req, resp):
"""Builds a :class:`Response ` object from a urllib3
response. This should not be called from user code, and is only exposed
for use when subclassing the
:class:`HTTPAdapter `
:param req: The :class:`PreparedRequest ` used to generate the response.
:param resp: The urllib3 response object.
"""
response = Response()
# Fallback to None if there's no status_code, for whatever reason.
response.status_code = getattr(resp, 'status', None)
# Make headers case-insensitive.
response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))
# Set encoding.
response.encoding = get_encoding_from_headers(response.headers)
response.raw = resp
response.reason = response.raw.reason
if isinstance(req.url, bytes):
response.url = req.url.decode('utf-8')
else:
response.url = req.url
# Add new cookies from the server.
extract_cookies_to_jar(response.cookies, req, resp)
# Give the Response some context.
response.request = req
response.connection = self
return response
# , response
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
"""Sends PreparedRequest object. Returns Response object.
"""
conn = self.get_connection(request.url, proxies)
self.cert_verify(conn, request.url, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(request)
chunked = not (request.body is None or 'Content-Length' in request.headers)
if isinstance(timeout, tuple):
try:
connect, read = timeout
timeout = TimeoutSauce(connect=connect, read=read)
except ValueError as e:
# this may raise a string formatting error.
err = ("Invalid timeout {0}. Pass a (connect, read) "
"timeout tuple, or a single float to set "
"both timeouts to the same value".format(timeout))
raise ValueError(err)
else:
timeout = TimeoutSauce(connect=timeout, read=timeout)
try:
if not chunked:
resp = conn.urlopen(
method=request.method,
url=url,
body=request.body,
headers=request.headers,
redirect=False,
assert_same_host=False,
preload_content=False,
decode_content=False,
retries=self.max_retries,
timeout=timeout
)
# Send the request.
else:
if hasattr(conn, 'proxy_pool'):
conn = conn.proxy_pool
low_conn = conn._get_conn(timeout=DEFAULT_POOL_TIMEOUT)
try:
low_conn.putrequest(request.method,
url,
skip_accept_encoding=True)
for header, value in request.headers.items():
low_conn.putheader(header, value)
low_conn.endheaders()
for i in request.body:
low_conn.send(hex(len(i))[2:].encode('utf-8'))
low_conn.send(b'\r
')
low_conn.send(i)
low_conn.send(b'\r
')
low_conn.send(b'0\r
\r
')
# Receive the response from the server
try:
# For Python 2.7+ versions, use buffering of HTTP
# responses
r = low_conn.getresponse(buffering=True)
except TypeError:
# For compatibility with Python 2.6 versions and back
r = low_conn.getresponse()
resp = HTTPResponse.from_httplib(
r,
pool=conn,
connection=low_conn,
preload_content=False,
decode_content=False
)
except:
# If we hit any problems here, clean up the connection.
# Then, reraise so that we can handle the actual exception.
low_conn.close()
raise
return self.build_response(request, resp)
sessions.py
from .adapters import HTTPAdapter # HTTP
class Session(SessionRedirectMixin):
"""A Requests session.
Provides cookie persistence, connection-pooling, and configuration.
Basic Usage::
>>> import requests
>>> s = requests.Session()
>>> s.get('http://httpbin.org/get')
Or as a context manager::
>>> with requests.Session() as s:
>>> s.get('http://httpbin.org/get')
"""
__attrs__ = [
'headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'prefetch', 'adapters', 'stream', 'trust_env',
'max_redirects',
]
def __init__(self):
#: A case-insensitive dictionary of headers to be sent on each
#: :class:`Request ` sent from this
#: :class:`Session `.
self.headers = default_headers()
# Default connection adapters.
self.adapters = OrderedDict()
self.mount('https://', HTTPAdapter()) # https
self.mount('http://', HTTPAdapter()) # http
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def prepare_request(self, request):
"""Constructs a :class:`PreparedRequest ` for
transmission and returns it. The :class:`PreparedRequest` has settings
merged from the :class:`Request ` instance and those of the
:class:`Session`.
:param request: :class:`Request` instance to prepare with this
session's settings.
"""
cookies = request.cookies or {}
# Bootstrap CookieJar.
if not isinstance(cookies, cookielib.CookieJar):
cookies = cookiejar_from_dict(cookies)
# Merge with session cookies
merged_cookies = merge_cookies(
merge_cookies(RequestsCookieJar(), self.cookies), cookies)
# Set environment's basic authentication if not explicitly set.
auth = request.auth
if self.trust_env and not auth and not self.auth:
auth = get_netrc_auth(request.url)
p = PreparedRequest()
p.prepare(
method=request.method.upper(),
url=request.url,
files=request.files,
data=request.data,
json=request.json,
headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict),
params=merge_setting(request.params, self.params),
auth=merge_setting(auth, self.auth),
cookies=merged_cookies,
hooks=merge_hooks(request.hooks, self.hooks),
)
return p
# , self.send,
def request(self, method, url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=None,
timeout=None,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
json=None):
"""Constructs a :class:`Request `, prepares it and sends it.
Returns :class:`Response ` object.
"""
# Create the Request.
req = Request(
method = method.upper(),
url = url,
headers = headers,
files = files,
data = data or {},
json = json,
params = params or {},
auth = auth,
cookies = cookies,
hooks = hooks,
)
prep = self.prepare_request(req)
proxies = proxies or {}
settings = self.merge_environment_settings(
prep.url, proxies, stream, verify, cert
)
# Send the request.
send_kwargs = {
'timeout': timeout,
'allow_redirects': allow_redirects,
}
send_kwargs.update(settings)
resp = self.send(prep, **send_kwargs)
return resp
# , send ,
def send(self, request, **kwargs):
"""Send a given PreparedRequest."""
# Get the appropriate adapter to use
adapter = self.get_adapter(url=request.url)
# Start time (approximately) of the request
start = datetime.utcnow()
# Send the request
r = adapter.send(request, **kwargs)
return r
def get_adapter(self, url):
"""Returns the appropriate connection adapter for the given URL."""
for (prefix, adapter) in self.adapters.items():
if url.lower().startswith(prefix):
return adapter
# Nothing matches :-/
raise InvalidSchema("No connection adapters were found for '%s'" % url)
def close(self):
"""Closes all adapters and as such the session"""
for v in self.adapters.values():
v.close()
def mount(self, prefix, adapter):
"""Registers a connection adapter to a prefix.
Adapters are sorted in descending order by key length."""
self.adapters[prefix] = adapter
keys_to_move = [k for k in self.adapters if len(k) < len(prefix)]
for key in keys_to_move:
self.adapters[key] = self.adapters.pop(key)
api.py
、ユーザによって呼び出されるAPIを提供する# -*- coding: utf-8 -*-
"""
requests.api
~~~~~~~~~~~~
This module implements the Requests API.
:copyright: (c) 2012 by Kenneth Reitz.
:license: Apache2, see LICENSE for more details.
"""
from . import sessions
def request(method, url, **kwargs):
"""Constructs and sends a :class:`Request `.
:param method: method for the new :class:`Request` object.
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:return: :class:`Response ` object
:rtype: requests.Response
Usage::
>>> import requests
>>> req = requests.request('GET', 'http://httpbin.org/get')
"""
# By using the 'with' statement we are sure the session is closed, thus we
# avoid leaving sockets open which can trigger a ResourceWarning in some
# cases, and look like a memory leak in others.
with sessions.Session() as session:
return session.request(method=method, url=url, **kwargs)
def get(url, params=None, **kwargs):
"""Sends a GET request.
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:return: :class:`Response ` object
:rtype: requests.Response
"""
kwargs.setdefault('allow_redirects', True)
return request('get', url, params=params, **kwargs)
requests.__init__.py
"""
Requests HTTP library
~~~~~~~~~~~~~~~~~~~~~
Requests is an HTTP library, written in Python, for human beings. Basic GET
usage:
>>> import requests
>>> r = requests.get('https://www.python.org')
>>> r.status_code
200
>>> 'Python is a programming language' in r.content
True
... or POST:
>>> payload = dict(key1='value1', key2='value2')
>>> r = requests.post('http://httpbin.org/post', data=payload)
>>> print(r.text)
{
...
"form": {
"key2": "value2",
"key1": "value1"
},
...
}
"""
__title__ = 'requests'
__version__ = '2.10.0'
__build__ = 0x021000
__author__ = 'Kenneth Reitz'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2016 Kenneth Reitz'
from . import utils
from .models import Request, Response, PreparedRequest
from .api import request, get, head, post, patch, put, delete, options