python---aiohttp的使用

目录

1.aiohttp 的简单使用 (配合 asyncio 模块)
2. 发起一个 session 请求
3. 在 url 中传递参数(其实与 requests 模块使用大致相同)
4. 获取响应内容(由于获取响应内容是一个阻塞耗时过程,所以我们使用 await 实现协程切换)
5. 特殊响应内容 json(和上面一样)
6. 字节流形式获取数据(不像 text,read 一次获取所有数据)
7. 自定义请求头(和 requests 一样)
8. 自定义 cookie
9. 获取当前访问网站的 cookie
10. 获取网站的响应状态码
11. 查看响应头
12. 查看重定向的响应头(我们此时已经到了新的网址,向之前的网址查看)
13. 超时处理
14.ClientSession 用于在多个连接之间(同一网站)共享 cookie,请求头等
  总结:
15.cookie 的安全性
16. 控制同时连接的数量(连接池)
17. 自定义域名解析地址
18. 设置代理
19.post 传递数据的方法
  (1)模拟表单
  (2)post json
  (3)post 小文件
  (4)post 大文件
  (5)从一个 url 获取文件后,直接 post 给另一个 url
  (6)post 预压缩数据

1.aiohttp 的简单使用 (配合 asyncio 模块)

import asyncio,aiohttp

async def fetch_async(url):
print(url)
async with aiohttp.request("GET",url) as r:
reponse = await r.text(encoding="utf-8")  #或者直接 await r.read() 不编码,直接读取,适合于图像等无法编码文件
print(reponse)

tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

2. 发起一个 session 请求

import asyncio,aiohttp

async def fetch_async(url):
print(url)
async with aiohttp.ClientSession() as session:  # 协程嵌套,只需要处理最外层协程即可 fetch_async
async with session.get(url) as
resp:
print(resp.status)
print(
await resp.text())  # 因为这里使用到了 await 关键字,实现异步,所有他上面的函数体需要声明为异步 async

tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')]

event_loop = asyncio.get_event_loop()
results
= event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

除了上面的 get 方法外,会话还支持 post,put,delete.... 等

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

不要为每次的连接都创建一次 session, 一般情况下只需要创建一个 session,然后使用这个 session 执行所有的请求。

每个 session 对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。

3. 在 url 中传递参数(其实与 requests 模块使用大致相同)

只需要将参数字典,传入 params 参数中即可

import asyncio,aiohttp
async def func1(url,params):
async with aiohttp.ClientSession() as session:
async with session.get(url,params=params) as r:
print(r.url)
print(await r.read())
tasks = [func1('https://www.ckook.com/forum.php',{"gid":6}),]
event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

4. 获取响应内容(由于获取响应内容是一个阻塞耗时过程,所以我们使用 await 实现协程切换)

(1)使用 text() 方法

async def func1(url,params):
    async with aiohttp.ClientSession() as session:
        async with session.get(url,params=params) as r:
            print(r.url)
            print(r.charset)  # 查看默认编码为 utf-8
            print(await r.text())  # 不编码,则是使用默认编码  使用 encoding 指定编码

(2)使用 read() 方法,不进行编码,为字节形式

async def func1(url,params):
    async with aiohttp.ClientSession() as session:
        async with session.get(url,params=params) as r:
            print(r.url)
            print(await r.read())

(3)注意:text(),read() 方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用”字节流“(StreamResponse)

5. 特殊响应内容 json(和上面一样)

async def func1(url,params):
    async with aiohttp.ClientSession() as session:
        async with session.get(url,params=params) as r:
            print(r.url)
            print(r.charset)
            print(await r.json())  # 可以设置编码,设置处理函数

6. 字节流形式获取数据(不像 text,read 一次获取所有数据)

注意:我们获取的 session.get() 是 Response 对象,他继承于 StreamResponse

async def func1(url,params):
    async with aiohttp.ClientSession() as session:
        async with session.get(url,params=params) as r:
            print(await r.content.read(10))    #读取前 10 字节

下面字节流形式读取数据,保存文件

async def func1(url,params,filename):
    async with aiohttp.ClientSession() as session:
        async with session.get(url,params=params) as r:
            with open(filename,"wb") as fp:
                while True:
                    chunk = await r.content.read(10)
                    if not chunk:
                        break
                    fp.write(chunk)

tasks = [func1('https://www.ckook.com/forum.php',{"gid":6},"1.html"),]

注意:

async with session.get(url,params=params) as r:  # 异步上下文管理器

with open(filename,"wb") as fp:  #普通上下文管理器

两者的区别:

在于异步上下文管理器中定义了

__aenter__ 和 __aexit__ 方法

异步上下文管理器指的是在enterexit方法处能够暂停执行的上下文管理器

 为了实现这样的功能,需要加入两个新的方法:__aenter__ 和__aexit__。这两个方法都要返回一个 awaitable 类型的值。

推文:异步上下文管理器async with 和异步迭代器 async for

7. 自定义请求头(和 requests 一样)

async def func1(url,params,filename):
    async with aiohttp.ClientSession() as session:
        headers = {'Content-Type':'text/html; charset=utf-8'}
        async with session.get(url,params=params,headers=headers) as r:
            with open(filename,"wb") as fp:
                while True:
                    chunk = await r.content.read(10)
                    if not chunk:
                        break
                    fp.write(chunk)

8. 自定义 cookie

注意:对于自定义 cookie,我们需要设置在 ClientSession(cookies= 自定义 cookie 字典), 而不是 session.get() 中

class ClientSession:
def __init__(self, </span>*, connector=None, loop=None, <span style="color: rgba(255, 0, 0, 1)">cookies</span>=<span style="color: rgba(0, 0, 0, 1)">None,
             headers</span>=None, skip_auto_headers=<span style="color: rgba(0, 0, 0, 1)">None,
             auth</span>=None, json_serialize=<span style="color: rgba(0, 0, 0, 1)">json.dumps,
             request_class</span>=ClientRequest, response_class=<span style="color: rgba(0, 0, 0, 1)">ClientResponse,
             ws_response_class</span>=<span style="color: rgba(0, 0, 0, 1)">ClientWebSocketResponse,
             version</span>=<span style="color: rgba(0, 0, 0, 1)">http.HttpVersion11,
             cookie_jar</span>=None, connector_owner=True, raise_for_status=<span style="color: rgba(0, 0, 0, 1)">False,
             read_timeout</span>=sentinel, conn_timeout=<span style="color: rgba(0, 0, 0, 1)">None,
             <strong><span style="color: rgba(255, 0, 0, 1)">timeout</span></strong></span>=<span style="color: rgba(0, 0, 0, 1)">sentinel,
             auto_decompress</span>=True, trust_env=<span style="color: rgba(0, 0, 0, 1)">False,
             trace_configs</span>=None):</pre>

使用:

cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:

9. 获取当前访问网站的 cookie

async with session.get(url) as resp:
    print(resp.cookies)

10. 获取网站的响应状态码

async with session.get(url) as resp:
    print(resp.status)

11. 查看响应头

resp.headers 来查看响应头,得到的值类型是一个dict
resp.raw_headers  查看原生的响应头,字节类型

12. 查看重定向的响应头(我们此时已经到了新的网址,向之前的网址查看)

resp.history  # 查看被重定向之前的响应头

13. 超时处理

默认的 IO 操作都有 5 分钟的响应时间 我们可以通过 timeout 进行重写:

async with session.get('https://github.com', timeout=60) as r:
    ...

如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。

14.ClientSession 用于在多个连接之间(同一网站)共享 cookie,请求头等

async def func1():
    cookies = {'my_cookie': "my_value"}
    async with aiohttp.ClientSession(cookies=cookies) as session:
        async with session.get("https://segmentfault.com/q/1010000007987098") as r:
            print(session.cookie_jar.filter_cookies("https://segmentfault.com"))
        async with session.get("https://segmentfault.com/hottest") as rp:
            print(session.cookie_jar.filter_cookies("https://segmentfault.com"))
Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q
Set-Cookie: my_cookie=my_value
Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q
Set-Cookie: my_cookie=my_value

我们最好使用 session.cookie_jar.filter_cookies() 获取网站 cookie,不同于 requests 模块,虽然我们可以使用 rp.cookies 有可能获取到 cookie,但似乎并未获取到所有的 cookies。


async def func1():
    cookies = {'my_cookie': "my_value"}
    async with aiohttp.ClientSession(cookies=cookies) as session:
        async with session.get("https://segmentfault.com/q/1010000007987098") as rp:
            print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print(rp.cookies)  #Set-Cookie: PHPSESSID=web2~jh3ouqoabvr4e72f87vtherkp6; Domain=segmentfault.com; Path=/  #首次访问会获取网站设置的 cookie
        async with session.get("https://segmentfault.com/hottest") as rp:
            print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print(rp.cookies)  # 为空,服务端未设置 cookie
        async with session.get("https://segmentfault.com/newest") as rp:
            print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print(rp.cookies)  # 为空,服务端未设置 cookie

总结:

当我们使用 rp.cookie 时,只会获取到当前 url 下设置的 cookie, 不会维护整站的 cookie

而 session.cookie_jar.filter_cookies("https://segmentfault.com") 会一直保留这个网站的所有设置 cookies,含有我们在会话时设置的 cookie,并且会根据响应修改更新 cookie。这个才是我们需要的

而我们设置 cookie,也是需要在 aiohttp.ClientSession(cookies=cookies) 中设置

ClientSession 还支持 请求头,keep-alive 连接和连接池 (connection pooling)

15.cookie 的安全性

默认 ClientSession 使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受 url 和 ip 地址产生的 cookie,只能接受 DNS 解析 IP 产生的 cookie。可以通过设置 aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)

16. 控制同时连接的数量(连接池)

TCPConnector 维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求

async def func1():
    cookies = {'my_cookie': "my_value"}
    conn = aiohttp.TCPConnector(limit=2)  # 默认 100,0 表示无限
    async with aiohttp.ClientSession(cookies=cookies,connector=conn) as session:
        for i in range(7,35):
            url = "https://www.ckook.com/list-%s-1.html"%i
            async with session.get(url) as rp:
                print('---------------------------------')print(rp.status)

限制同时打开限制同时打开连接到同一端点的数量((host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:

limit_per_host: 同一端点的最大连接数量。同一端点即 (host, port, is_ssl) 完全相同

conn = aiohttp.TCPConnector(limit_per_host=30)# 默认是 0

在协程下测试效果不明显

17. 自定义域名解析地址

我们可以指定域名服务器的 IP 对我们提供的 get 或 post 的 url 进行解析:

from aiohttp.resolver import AsyncResolver

resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn
= aiohttp.TCPConnector(resolver=resolver)

18. 设置代理

aiohttp 支持使用代理来访问网页:

async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com") as resp:
        print(resp.status)

当然也支持需要授权的页面:

async with aiohttp.ClientSession() as session:
    proxy_auth = aiohttp.BasicAuth('user', 'pass')  # 用户,密码
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com",
                           proxy_auth=proxy_auth) as resp:
        print(resp.status)

或者通过这种方式来验证授权:

session.get("http://python.org",
            proxy="http://user:pass@some.proxy.com")

19.post 传递数据的方法

(1)模拟表单

payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post',
                        data=payload) as resp:
    print(await resp.text())

注意:data=dict 的方式 post 的数据将被转码,和 form 提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp:

其实 json.dumps(payload) 返回的也是一个字符串,只不过这个字符串可以被识别为 json 格式

(3)post 小文件

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)

url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file',
               open('report.xls', 'rb'),
               filename='report.xls',
               content_type='application/vnd.ms-excel')

await session.post(url, data=data)

如果将文件对象设置为数据参数,aiohttp 将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp 支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。

@aiohttp.streamer
def file_sender(writer, file_name=None):
    with open(file_name, 'rb') as f:
        chunk = f.read(2**16)
        while chunk:
            yield from writer.write(chunk)
            chunk = f.read(2**16)

Then you can use file_sender as a data provider:

async with session.post('http://httpbin.org/post',
data
=file_sender(file_name='huge_file')) as resp:
print(
await resp.text())

(5)从一个 url 获取文件后,直接 post 给另一个 url

r = await session.get('http://python.org')
await session.post('http://httpbin.org/post',data=r.content)

(6)post 预压缩数据

在通过 aiohttp 发送前就已经压缩的数据, 调用压缩函数的函数名(通常是 deflate 或 zlib)作为 content-encoding 的值:

async def my_coroutine(session, headers, my_data):
    data = zlib.compress(my_data)
    headers = {'Content-Encoding': 'deflate'}
    async with session.post('http://httpbin.org/post',
                            data=data,
                            headers=headers)
        pass

 

作者:山上有风景
欢迎任何形式的转载,但请务必注明出处。
限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。