关于协程的认知

前言

在执行IO密集型任务的时候,程序经常会因为等待IO而阻塞。比如平时使用的requests库来进行请求接口,如果响应过慢,程序会一直等待响应,最后导致抓取数据的效率低下。为了解决这一问题,来研究一下异步协程加速的方法。

概念

  • 阻塞。阻塞状态是指程序未得到所需计算资源被挂起的状态。程序在等待某操作期间,自身无法继续干别的事。常见的阻塞形式:网络I/O阻塞、磁盘IO/阻塞、用户输入阻塞等。
  • 非阻塞。程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事,则成该程序在该操作上非阻塞的。 非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时和效率低下,因此我们才要把它变味非阻塞的。
  • 同步。不同的程序为了完成某个任务,在执行过程中需要依靠某种同喜方式协调一致,称这些程序单元是同步执行的,比如商品库存。
  • 异步。为了完成某个任务,不同程序之间过程无需通信协调,也能完成任务的方式,不相关的程序单元之间是可以异步的。比如爬虫网页。
  • 多进程。就是利用CPU多核的优势,在同一时间并行的执行多个任务,可以极大的提高效率。
  • 协程,Coroutine,又称微线程,是一种用户态的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复之前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。协程本质上是个单线程,相对于多进程来说,无需线程的上下文切换的开销,无需原子操作锁定及同步的开销。可以使用的场景,比如在网络爬虫的场景,发出一个请求之后,需要等待一定的实际才能得到响应。但是在等待过程中,程序可以做一些其他的事情,等到响应后再切回来继续处理,这样可以充分利用CPU和其他资源,也就是协程的优势所在。

执行顺序的对比

1658742421514.png
1658742437775.png

协程的用法

协程相关的概念:

  • event_loop 事件循环,相当于一个无限循环,可以把一些函数注册到这个事件循环上,当满足条件时,就会调用对应的处理方法。
  • coroutine 协程,在 Python 中常指代为协程对象类型,可以将协程对象注册到事件循环中,会被事件循环调用。使用 async 关键字来定义一个方法,在调用时不会立即被执行,而是先放回一个协程对象。
  • task 任务,是对协程对象的进一步封装,包含了任务的所有状态。
  • future 代表将来执行或没有执行任务的任务的结果,和task没有本质的区别。

定义协程

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def get_number(x):
print('Number:', x)
return x


if __name__ == '__main__':
coroutine = get_number(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

# 执行结果

Coroutine: <coroutine object get_number at 0x10e049f40>
After calling execute
Number: 1
After calling loop

执行过程的分析:

  1. 引入 asyncio ,这样才可以使用 async 和 await;
  2. 然后使用async定义一个方法,方法接收一个数字的参数,该方法执行后会打印数字;
  3. 随后调用这个方法,但是并没有执行,而是返回一个coroutine 协程对象;
  4. 之后我们使用get_event_loop方法创建一个事件循环loop,并调用了loop对象的run_until_complate方法将协程注册到事件循环loop中,然后启动;
  5. 最后看到了输出结果。

可见,async 定义的方法机会变成一个无法直接运行的coroutine对象,必须注册到事件循环中才可以执行。上文中提到的task,是对coroutine对象的进一步封装,比coroutine对象多了允许状态,比如running、fiished等,可以通过这些状态来获取协程对象的执行情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

import asyncio

async def get_number(x):
print('Number:', x)
return x


if __name__ == '__main__':
coroutine = get_number(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

# 输出
Coroutine: <coroutine object get_number at 0x10e38df40>
After calling execute
Task: <Task pending name='Task-1' coro=<get_number() running at /Users/medivh/tools/test_async.py:45>>
Number: 1
Task: <Task finished name='Task-1' coro=<get_number() done, defined at /Users/medivh/tools/test_async.py:45> result=1>
After calling loop

这里定义loop对象后,接着调用了create_task方法将coroutine对象转化为了task对象,随后的输出发现是pending状态。接着将task对象添加到事件循环中得到执行,随后再输出,就变成了finished。并且同时result变成了1,也就是get_number方法的返回结果。

另外,还有一种定义task对象的方式,直接通过asyncio的ensure_future()方法,返回的也是task对象,就不需要借助loop来定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio


async def get_number(x):
print('Number:', x)
return x


if __name__ == '__main__':
coroutine = get_number(1)
print('Coroutine:', coroutine)
print('After calling execute')

task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

# 输出
Coroutine: <coroutine object get_number at 0x10c19ff40>
After calling execute
Task: <Task pending name='Task-1' coro=<get_number() running at /Users/medivh/tools/test_async.py:48>>
Number: 1
Task: <Task finished name='Task-1' coro=<get_number() done, defined at /Users/medivh/tools/test_async.py:48> result=1>
After calling loop

绑定回调

可以给某个task搬到回调方法,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import requests


async def get_status():
url = 'https://www.baidu.com'
status = requests.get(url).status_code
return status


def callback(task):
print('Status:', task.result())


if __name__ == '__main__':
coroutine_obj = get_status()
task = asyncio.ensure_future(coroutine_obj) # 定义task
task.add_done_callback(callback) # 将callback() 传递给封装好的task对象
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
# 输出
Task: <Task pending name='Task-1' coro=<request() running at /Users/medivh/github/mercury/tools/test_async.py:69> cb=[callback() at /Users/medivh/tools/test_async.py:75]>
Status: 200
Task: <Task finished name='Task-1' coro=<request() done, defined at /Users/medivh/tools/test_async.py:69> result=200>

代码说明:

  1. 调用add_done_callback方法,将callback方法传递给封装好的task对象;
  2. task执行完毕后调用callback方法;
  3. task对象同时作为参数传递给callback方法,调用task对象的result方法就可以获取返回结果。

其实,不用回方法,直接在task运行完毕后也可以直接调用result方法获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

import asyncio
import requests


async def get_status():
url = 'https://www.baidu.com'
status = requests.get(url).status_code
return status

if __name__ == '__main__':
coroutine_obj = get_status()
task = asyncio.ensure_future(coroutine_obj) # 定义task
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())
# 输出
Task: <Task pending name='Task-1' coro=<get_status() running at /Users/medivh/tools/test_async.py:69>>
Task: <Task finished name='Task-1' coro=<get_status() done, defined at /Users/medivh/tools/test_async.py:69> result=200>
Task Result: 200

多任务协程

对于想执行多次请求的方案,可以定义一个task列表,然后使用asyncio的wait方法即可执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import requests


async def get_status():
url = 'https://www.baidu.com'
status = requests.get(url).status_code
return status


if __name__ == '__main__':
tasks = [asyncio.ensure_future(get_status()) for _ in range(5)]
print("Tasks: ", tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
print("Task Result:", task.result())
# 输出
Tasks: [<Task pending name='Task-1' coro=<get_status() running at /Users/medivh/github/mercury/tools/test_async.py:69>>, <Task pending name='Task-2' coro=<get_status() running at /Users/medivh/github/mercury/tools/test_async.py:69>>, <Task pending name='Task-3' coro=<get_status() running at /Users/medivh/github/mercury/tools/test_async.py:69>>, <Task pending name='Task-4' coro=<get_status() running at /Users/medivh/github/mercury/tools/test_async.py:69>>, <Task pending name='Task-5' coro=<get_status() running at /Users/medivh/github/mercury/tools/test_async.py:69>>]
Task Result: 200
Task Result: 200
Task Result: 200
Task Result: 200
Task Result: 200

代码说明:

  1. for循环创建5个task,组成list;
  2. 把list首先传给asyncio的wait方法,然后注册到事件循环中;
  3. 发起5个任务;
  4. 输出任务结果。

协程实现

前面的代码都是以网络请求为例,都是耗时的等待的操作,因为在请求网页后需要等待页面响应并返回结果。耗时的等待操作一般都是IO操作,比如文件读写、网络请求等,而协程对于处理这种操作具有很大优势。往往可以在需要等待的时候,程序可以暂时挂起,转而执行其他的操作,从而避免等待一个程序而耗费过多的时间,达到充分利用资源的目的。

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import requests
import time


async def get_status():
url = 'http://127.0.0.1/welcome'
print('waiting for {} {}'.format(url,time.time()))
status = requests.get(url).status_code
return status


if __name__ == '__main__':
start = time.time()
tasks = [asyncio.ensure_future(get_status()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
# 输出
waiting for http://127.0.0.1/welcome 1658818769.805218
waiting for http://127.0.0.1/welcome 1658818770.0085208
waiting for http://127.0.0.1/welcome 1658818770.101907
waiting for http://127.0.0.1/welcome 1658818770.1906009
waiting for http://127.0.0.1/welcome 1658818770.262319
Cost time: 0.5311617851257324


事实上和正常的请求耗时相差不大,几乎是依次执行的。其实出现这种情况是因为要实现异步处理,必须得有挂起的操作,当一个任务需要等待IO结果的时候,可以挂起当前任务,转而去执行其他任务。

接下来了解一下await的用法,使用await可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到await,事件循环就会将本协程挂起,转而执行其他的协程,知道其他的协程挂起或执行完毕。然后,将代码改造一下。

如果直接改造上文代码,会出现以下提示Class 'int' does not define '__await__', so the 'await' operator cannot be used on its instances。这是因为await后必须符合以下对象:

  • 一个原生coroutine对象
  • 一个有types.coroutine()修饰的生成器,这个生成器可以返回coroutine对象
  • 一个包含await方法的对象返回的一个迭代器

接下来进行一次尝试,用async把请求的方法改造成coroutine对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import requests
import time


async def get_status(url):
return requests.get(url).status_code


async def get_result():
url = 'http://127.0.0.1/welcome'
print('waiting for {} {}'.format(url, time.time()))
status = await get_status(url)
return status


if __name__ == '__main__':
start = time.time()
tasks = [asyncio.ensure_future(get_result()) for _ in range(50)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
# 输出
waiting for http://127.0.0.1/welcome 1658819455.563387
waiting for http://127.0.0.1/welcome 1658819455.612329
waiting for http://127.0.0.1/welcome 1658819455.662548
Cost time: 3.005825996398926

输出的结果证明这种方式不可行,并没有达到真正的异步。这里使用一个支持异步请求的库——aiohttp,利用它和asyncio配合可以方便的实现异步请求操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
pip install apiohttp

import asyncio
import time
import aiohttp


async def get_status(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
await session.close()
return result


async def get_result():
url = 'http://127.0.0.1/api/afk/pod?current=1&pageSize=100'
status = await get_status(url)
return status


if __name__ == '__main__':
start = time.time()
tasks = [asyncio.ensure_future(get_result()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
# 输出
Cost time: 1.1510028839111328

请求耗时直接缩短到1秒左右。

  1. 使用await,后面跟get方法,在执行10个协程的时候,遇到了await,就会将当前协程挂起,转而执行其他协程,直到其他协程也挂起或执行完毕,再进行下一个协程的执行;
  2. 开始运行的时候,事件循环会运行第一个task,第一个task执行遇到await跟着的get方法后,被挂起。但这个get方法第一步的执行是非阻塞的,挂起后立刻被环行,创建了ClientSession对象,接着遇到第二个await,调用了session.get()请求方法,然后就被挂起。由于请求耗时较久,所以一直没有被唤醒;
  3. 事件循环会寻找当前为被挂起的协程继续执行,于是转而执行第二个task。之后,依次执行了第10个task的session.get()方法后,全部的task被挂起。
  4. 所有task处于挂起状态,等待响应。1秒后,所有请求几乎同时有了响应,然后这10个task都被唤醒继续执行,输出请求结果。

接下来尝试不同量级的task,输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1
Cost time: 1.0380501747131348

# 10
Cost time: 1.143162727355957

# 50
Cost time: 3.379934787750244

# 100
Cost time: 5.655962705612183

# 200
Cost time: 9.78205680847168

# 300
Cost time: 9.797142028808594

最后的运行时间基本都在10秒内,200个task开始就会出现502情况,当然这个完全在于后端服务的问题。但是100个task以内的时候,时间都是很接近的。

和单进程、多进程对比

单进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import requests
import time


def get_page():
url = 'http://127.0.0.1/api/afk/pod?current=1&pageSize=100'
result = requests.get(url).status_code
return result


if __name__ == '__main__':
start = time.time()

for _ in range(100):
get_page()

end = time.time()
print('Cost time:', end - start)
# 输出
Cost time: 106.35312700271606

多进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import requests
import time
import multiprocessing


def get_page(_):
url = 'http://127.0.0.1/api/afk/pod?current=1&pageSize=100'
result = requests.get(url).status_code
return result


if __name__ == '__main__':
start = time.time()

pool = multiprocessing.Pool(8)
pool.map(get_page, range(100))

end = time.time()
print('Cost time:', end - start)

# 输出
Cost time: 16.823662996292114

8核CPU执行时间为16.8秒,远大于协程的5.6秒。

多线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import threading


def get_page():
url = 'http://127.0.0.1/api/afk/pod?current=1&pageSize=100'
result = requests.get(url).status_code
return result


if __name__ == '__main__':
start = time.time()
threads = []
for i in range(200):
t = threading.Thread(target=get_page)
threads.append(t)
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join()
end = time.time()
print('Cost time:', end - start)
# 输出

Cost time: 10.563114881515503

多线程:

1
2
Cost time: 1.160654067993164

回顾

前面讲了那么多,是不是有些内容值得思考一下呢?现在回过头来再总结一下。

进程

进程是操作系统对一个正在运行的程序的一种抽象,进程是资源分配的最小单位。
那么为什么会有进程这个事物呢?主要目的是为了合理压榨CPU性能和分配运行的时间片。在计算机系统中,其计算核心是CPU,负责所有计算相关的工作和资源。单个CPU一次只能运行一个任务。如果一个进程运行着就完全占用一个CPU,是非常不合理的。

线程

有了多进程,为什么还要线程?原因如下:

  1. 进程直接的信息难以共享,父子进程并未共享内存,需要进程间通信,性能开销大
  2. 创建进程的性能开销较大。

进程由多个线程组成,一个进程可以由多个线程的执行单元组成。每个线程都在运行进程的上下文中,共享着同样的代码和全局数据。多个线程比多进程之间更容易共享数据,在上下文切换中一般比进程更高效,原因如下:

  1. 线程间能够快速、方便共享数据
  2. 创建线程的速度比创建进程快10倍以上

协程

协程是用户态的线程。协程的优势:

  1. 节省CPU。避免系统内核级的线程频繁切换,造成CPU的浪费。协程是用户态的线程,用户可以自行控制协程的创建和销毁,极大程度上避免了系统级线程上下文切换造成的资源浪费。
  2. 节约内存。64位的Linux系统中,一个线程需要分配8MB栈内存和64MB堆内存。系统内存的制约导致无法开启更多线程的并发。而协程只需要KB级别,可以轻松达到几十万。
  3. 稳定性。线程之间通过内存来共享数据,这就会导致一个问题,比如一个线程出错时,进程中的所有线程都会跟着崩溃。
  4. 开发效率。开发过程中,可以方便的把一些耗时的IO操作异步化,比如些写文件、耗时IO请求等。

总之,协程的本质是用户态下的线程.

用一个形象的例子:

  • 进程就像一家餐馆,餐馆有多个服务员,每个餐桌是要完成的任务。单进程,对应一家餐馆;
  • 多进程,增加了N家餐馆,接待的客人多了,成本也上去了;
  • 多线程。一家餐馆,来一桌客人安排一个服务员;
  • 协程。一家餐馆只安排一个服务员,A点菜,就去B服务,A点完了就再回到A这。B还没点完就去C那,依次类推,直到所有人都吃上饭。