【caffeonspark源码分析】【网盘源码 asp】【购票源码php】python协程task源码_协程 python

时间:2024-12-23 23:09:06 编辑:elasticsearch 源码启动 来源:代理ip发包源码

1.Python 协程 asyncio模块底层原理详解
2.python中的协程协程asyncio使用详解与异步协程的处理流程分析
3.Python程序开发系列并发执行协程任务超时的解决方案(案例分析)
4.在Python中使用Asyncio系统(3-4)​Task 和 Future
5.如何用python写一个协程
6.python协程(4):asyncio

python协程task源码_协程 python

Python 协程 asyncio模块底层原理详解

       coroutine function

       定义于async def的function,其执行并不立即启动,而是返回一个coroutine object.

       运行coroutine function需切换至async模式,使event loop启动.此过程将coroutine object转化为task.

       通过asyncio.run(coroutine object)实现.

       多 task

       任务在event loop中以task形式执行.使用await提交多个任务.

       调用coroutine function返回coroutine object,await操作实质是提交一个coroutine object.

       事件循环主动将控制权交予可执行的task,task通过await或执行完毕交还控制权.

       若任务间存在死循环,事件循环会因控制权交接受阻而停滞.

       程序执行时间受任务顺序影响.多个sleep任务依次执行.

       并发多 task

       await task2是否加入取决于任务是否需要返回值.若有多个任务且需要返回值,使用asyncio.gather()实现.

       asyncio.gather()接收coroutine object、task、源码future object,协程协程处理后提交至事件循环,返回顺序列表.

       总结

       定义coroutine function为async def xx().

       通过调用自身xx()生成coroutine object.

       await、asyncio.create_task()、源码asyncio.gather()用于提交任务至事件循环.

       asyncio.run(xx())启动事件循环执行任务.

       并发执行中,协程协程任务间等待时间被有效利用,但本质上仍是顺序执行.

python中的asyncio使用详解与异步协程的处理流程分析

       一些核心概念

       异步函数的定义

       普通函数通过使用 def 关键词定义,而异步函数,源码caffeonspark源码分析即协程函数 (Coroutine) 是协程协程一种特殊类型的函数,可以在代码块中将执行权交予其他协程。源码定义时使用 async def 关键词。协程协程

       如何调用协程并获得结果

       调用普通函数如 result = add2(2) 可直接运行并返回结果4。源码然而,协程协程调用 result = add3(2) 会返回一个协程对象,源码而非结果2+3=5。协程协程网盘源码 asp要获得结果,源码需将协程放置于事件循环中执行。协程协程

       事件循环 Eventloop

       Eventloop 是 asyncio 应用的核心,它注册并循环执行异步函数。在执行过程中,遇到需要等待的 I/O 操作(如网络请求)时会暂停执行,切换到其他函数。这使得多个异步函数可以协同运行。

       得到协程执行结果

       要得到协程函数的结果,必须先创建一个 Eventloop 或者在调用协程时使用 await 关键字。例如,result = await add3(2)。购票源码phpawait 只能在协程函数中使用。

       复杂协程执行示例

       通过将两个协程放在 main 函数中,事件循环会按顺序执行它们,即便两个协程内部有不同等待时间。要使协程并行执行,需要使用 asyncio.gather() 函数。

       使用 asyncio.gather() 和 Task 任务对象

       通过 asyncio.gather() 可以并行执行多个协程函数,返回结果是一个列表。使用 Task 对象如 asyncio.ensure_future() 或 loop.create_task() 可动态添加协程到事件循环。

       异步函数与同步函数的执行

       创建 task 对象并使用 loop.run_until_complete() 可以运行协程函数。同时,可以为 task 对象添加回调方法,博客html模板源码当协程执行结束时调用事件循环的 stop 方法来结束整个循环。

       并行执行多个协程

       使用 asyncio.gather() 函数可以轻松并行执行多个协程,并通过调用 await asyncio.gather(*) 获取所有结果。

       异步与多线程的结合

       在事件循环中动态添加同步函数可以通过子线程运行事件循环并使用 loop.call_soon_threadsafe() 方法添加函数。为了实现并发执行,可以使用 run_in_executor() 方法在执行器中执行同步函数。

       执行器的选择

       使用 concurrent.futures 下的 ThreadPoolExecutor 或 ProcessPoolExecutor 可在多线程或多进程中执行同步函数。注意它们在初始化时的 max_workers 参数决定执行器的工作线程数。

       异步函数的动态添加

       使用 asyncio.run_coroutine_threadsafe() 可以将协程绑定到事件循环上,避免阻塞主线程。通过 gather() 方法结合 run_coroutine_threadsafe() 和 run_in_executor() 获得多个协程的结果。

       获取协程结果

       使用 asyncio.gather() 可以并行获取多个协程的智慧社区 cms源码结果。注意,获取结果时协程所在的线程与主线程可能不同,这取决于事件循环的运行方式。

       异步库与实践

       总结了异步协程的基本概念与流程后,接下来可以进一步学习与实践如 aioplete()。这和你以前见过的不一样。现在循环正在运行,main()协程将开始执行.

        (L)最终,当future的结果被设置时,它就完成了。完成后,可以访问结果。

        当然,你不太可能以这里所示的方式直接使用Future;代码示例仅用于教育目的。你与asynccio的大部分联系都是通过Task实例进行的。

        你可能想知道如果在Task实例上调用set_result()会发生什么。在Python 3.8之前可以这样做,但现在不允许这么做了。任务实例是协程对象的包装器,它们的结果值只能在内部设置为底层协程函数的结果,如 示例 3-所示那样。

        示例 3-. 在task上调用set_result

        (L)唯一的区别是我们创建的是Task实例而不是Future实例。当然,Task API要求我们提供一个协程;这里我们使用sleep()只是因为简单方便。

        (L7)正在传入一个Task实例。它满足函数的类型签名(因为Task是Future的子类),但从Python 3.8开始,我们不再允许在Task上调用set_result():尝试这样做将引发RuntimeError。这个想法是,一个Task代表一个正在运行的协程,所以结果应该总是来自于task自身。

        (L, L)但是,我们仍然可以cancel()一个任务,它将在底层协程中引发CancelledError。

        Create_task? Ensure_Future? 下定决心吧!

        在第页的“快速入门”中,我说过运行协程的方法是使用asyncio.create_task()。在引入该函数之前,有必要获取一个循环实例并使用loop.create_task()完成相同的任务。事实上,这也可以通过一个不同的模块级函数来实现:asyncio.ensure_future()。一些开发人员推荐create_task(),而其他人推荐ensure_future()。

        在我为这本书做研究的过程中,我确信API方法asyncio.ensure_future()是引起对asyncio库广泛误解的罪魁祸首。API的大部分内容都非常清晰,但在学习过程中还存在一些严重的障碍,这就是其中之一。当你遇到ensure_future()时,你的大脑会非常努力地将其集成到关于asyncio应该如何使用的心理模型中——但很可能会失败!

        在Python 3.6 asyncio 文档中,这个现在已经臭名昭著的解释突出了 ensure_future() 的问题:

        asyncio.ensure_future(coro_or_future, *, _loop =None)

        安排执行一个协程对象:把它包装在future中。返回一个Task对象。如果参数是Future,则直接返回。

        什么!? 当我第一次读到这篇文章时,我很困惑。下面希望是对ensure_future()的更清楚的描述:

        这个函数很好地说明了针对终端用户开发人员的asyncio API(高级API)和针对框架设计人员的asyncio API(低级API)之间的区别。让我们在示例 3-中自习看看它是如何工作的。

        示例 3-. 仔细看看ensure_future()在做什么

        (L3)一个简单的什么都不做的协程函数。我们只需要一些能组成协程的东西。

        (L6)我们通过直接调用该函数来创建协程对象。你的代码很少会这样做,但我想在这里明确地表示,我们正在向每个create_task()和ensure_future()传递一个协程对象。

        (L7)获取一个循环。

        (L9)首先,我们使用loop.create_task()在循环中调度协程,并返回一个新的Task实例。

        (L)验证类型。到目前为止,没有什么有趣的。

        (L)我们展示了asyncio.ensure_future()可以被用来执行与create_task()相同的动作:我们传入了一个协程,并返回了一个Task实例(并且协程已经被安排在循环中运行)!如果传入的是协程,那么loop.create_task()和asyncio.ensure_future()之间没有区别。

        (L)如果我们给ensure_future()传递一个Task实例会发生什么呢?注意我们要传递的Task实例是已经在第4步通过loop.create_task()创建好的。

        (L)返回的Task实例与传入的Task实例完全相同:它在被传递时没有被改变。

        直接传递Future实例的意义何在?为什么用同一个函数做两件不同的事情?答案是,ensure_future()的目的是让框架作者向最终用户开发者提供可以处理两种参数的API。不相信我?这是ex-BDFL自己说的:

        ensure_future()的要点是,如果你有一个可能是协程或Future(后者包括一个Task,因为它是Future的子类)的东西,并且你想能够调用一个只在Future上定义的方法(可能唯一有用的例子是cancel())。当它已经是Future(或Task)时,它什么也不做;当它是协程时,它将它包装在Task中。

        如果您知道您有一个协程,并且希望它被调度,那么正确的API是create_task()。唯一应该调用ensure_future()的时候是当你提供一个API(像大多数asyncio自己的API),它接受协程或Future,你需要对它做一些事情,需要你有一个Future。

        —Guido van Rossum

        总而言之,asyncio.sure_future()是一个为框架设计者准备的辅助函数。这一点最容易通过与一种更常见的函数进行类比来解释,所以我们来做这个解释。如果你有几年的编程经验,你可能已经见过类似于例3-中的istify()函数的函数。示例 3-中listify()的函数。

        示例 3-. 一个强制输入列表的工具函数

        这个函数试图将参数转换为一个列表,不管输入的是什么。api和框架中经常使用这类函数将输入强制转换为已知类型,这将简化后续代码——在本例中,您知道参数(来自listify()的输出)将始终是一个列表。

        如果我将listify()函数重命名为ensure_list(),那么您应该开始看到与asyncio.ensure_future()的类似之处:它总是试图将参数强制转换为Future(或子类)类型。这是一个实用函数,它使框架开发人员(而不是像你我这样的终端用户开发人员)的工作变得更容易。

        实际上,asyncio标准库模块本身使用ensure_future()正是出于这个原因。当你下次查看API时,你会发现函数参数被描述为“可等待对象”,很可能内部使用ensure_future()强制转换参数。例如,asyncio.gather()函数就像下面的代码一样:

        aws参数表示“可等待对象”,包括协程、task和future。在内部,gather()使用ensure_future()进行类型强制转换:task和future保持不变,而把协程强制转为task。

        这里的关键是,作为终端用户应用程序开发人员,应该永远不需要使用asyncio.ensure_future()。它更像是框架设计师的工具。如果你需要在事件循环上调度协程,只需直接使用asyncio.create_task()来完成。

        在接下来的几节中,我们将回到语言级别的特性,从异步上下文管理器开始。

如何用python写一个协程

       1 使用纯Python代码开发 (必须是人形,非狗非猫非其他) 

       2 真正的操作系统,不仅仅能调度任务,还提供了许许多多的系统调用,比如说新建一个进程,kill一个进程,这些我们也要实现!!(所以说就算是找个充气的也必须是有多功能的)

       3 可以处理多任务 (洗衣做饭啥都能干)!

python协程(4):asyncio

        asyncio是官方提供的协程的类库,从python3.4开始支持该模块

        async & awiat是python3.5中引入的关键字,使用async关键字可以将一个函数定义为协程函数,使用awiat关键字可以在遇到IO的时候挂起当前协程(也就是任务),去执行其他协程。

        await + 可等待的对象(协程对象、Future对象、Task对象 -> IO等待)

        注意:在python3.4中是通过asyncio装饰器定义协程,在python3.8中已经移除了asyncio装饰器。

        事件循环,可以把他当做是一个while循环,这个while循环在周期性的运行并执行一些协程(任务),在特定条件下终止循环。

        loop = asyncio.get_event_loop():生成一个事件循环

        loop.run_until_complete(任务):将任务放到事件循环

        Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。

        本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

        注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用 asyncio.ensure_future() 函数。

        下面结合async & awiat、事件循环和Task看一个示例

        示例一:

        *注意:python 3.7以后增加了asyncio.run(协程对象),效果等同于loop = asyncio.get_event_loop(),loop.run_until_complete(协程对象)

*

        示例二:

        注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。

        示例三:

python中多进程+协程的使用以及为什么要用它

       å‰é¢è®²äº†ä¸ºä»€ä¹ˆpython里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

       ä¸è¿‡ç‰¹æ®Šæƒ…况(特指IO密集型任务)下,多线程是比多进程好用的。

       ä¸¾ä¸ªä¾‹å­ï¼šç»™ä½ W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

       ä¾‹å¦‚每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

       1、单进程+单线程:需要2秒*W=W秒==.个小时==.3天,这个速度明显是不能接受的

       2、单进程+多线程:例如我们在这个进程中开了个多线程,比1中能够提升倍速度,也就是大约4.天能够完成W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开W个线程肯定是不靠谱的)

       3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开个进程,每个进程里开W个线程,执行的速度理论上是比单进程开W个线程快倍以上的(为什么是倍以上而不是倍,主要是cpu切换W个线程的消耗肯定比切换W个进程大得多,考虑到这部分开销,所以是倍以上)。

       è¿˜æœ‰æ›´å¥½çš„方法吗?答案是肯定的,它就是:

       4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

       what:

       åç¨‹æ˜¯ä¸€ç§ç”¨æˆ·çº§çš„轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

       åç¨‹èƒ½ä¿ç•™ä¸Šä¸€æ¬¡è°ƒç”¨æ—¶çš„状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

       åœ¨å¹¶å‘编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

       why:

       ç›®å‰ä¸»æµè¯­è¨€åŸºæœ¬ä¸Šéƒ½é€‰æ‹©äº†å¤šçº¿ç¨‹ä½œä¸ºå¹¶å‘设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

       ä¸ç®¡æ˜¯è¿›ç¨‹è¿˜æ˜¯çº¿ç¨‹ï¼Œæ¯æ¬¡é˜»å¡žã€åˆ‡æ¢éƒ½éœ€è¦é™·å…¥ç³»ç»Ÿè°ƒç”¨(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。

       è€Œä¸”由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

       å› ä¸ºåç¨‹æ˜¯ç”¨æˆ·è‡ªå·±æ¥ç¼–写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

       how:

       python里面怎么使用协程?答案是使用gevent,使用方法:看这里

       ä½¿ç”¨åç¨‹ï¼Œå¯ä»¥ä¸å—线程开销的限制,我尝试过一次把W条url放在单进程的协程里执行,完全没问题。

       æ‰€ä»¥æœ€æŽ¨èçš„方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)

       å¤šè¿›ç¨‹+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。

       å°ä¾‹å­ï¼š

       [python] view plain copy

       #-*- coding=utf-8 -*-  

       import requests  

       from multiprocessing import Process  

       import gevent  

       from gevent import monkey; monkey.patch_all()  

       import sys  

       reload(sys)  

       sys.setdefaultencoding('utf8')  

       def fetch(url):  

       try:  

       s = requests.Session()  

       r = s.get(url,timeout=1)#在这里抓取页面  

       except Exception,e:  

       print e   

       return ''  

       def process_start(url_list):  

       tasks = []  

       for url in url_list:  

       tasks.append(gevent.spawn(fetch,url))  

       gevent.joinall(tasks)#使用协程来执行  

       def task_start(filepath,flag = ):#每W条url启动一个进程  

       with open(filepath,'r') as reader:#从给定的文件中读取url  

       url = reader.readline().strip()  

       url_list = []#这个list用于存放协程任务  

       i = 0 #计数器,记录添加了多少个url到协程队列  

       while url!='':  

       i += 1  

       url_list.append(url)#每次读取出url,将url添加到队列  

       if i == flag:#一定数量的url就启动一个进程并执行  

       p = Process(target=process_start,args=(url_list,))  

       p.start()  

       url_list = [] #重置url队列  

       i = 0 #重置计数器  

       url = reader.readline().strip()  

       if url_list not []:#若退出循环后任务队列里还有url剩余  

       p = Process(target=process_start,args=(url_list,))#把剩余的url全都放到最后这个进程来执行  

       p.start()  

       if __name__ == '__main__':  

       task_start('./testData.txt')#读取指定文件  

       ç»†å¿ƒçš„同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。

Python 协程-asyncio、async/await

       理解Python协程,尤其是asyncio和async/await机制,对于提升异步编程能力至关重要。这两个特性允许在Python中以一种优雅且高效的方式处理非阻塞性任务。

       在使用asyncio进行异步编程时,可以将一系列任务组合到一个协程中,使用await asyncio.gather()函数来并发执行这些任务。这种方式允许在处理多个任务时提高效率,避免了传统同步编程中可能出现的阻塞问题。

       以一个简单的例子说明,假设我们有两个协程a()和b(),分别代表两个耗时的操作。通过将await asyncio.gather(a(), b())添加到程序中,我们能够在等待这两个任务完成的同时进行其他操作,从而实现更高效的资源利用。

       进一步探索Python中的异步接口和同步实现,Task类提供了一种实现异步生产者消费者模型的途径。在这样的模型中,生产者创建任务并将结果传递给消费者,消费者处理这些结果。通过使用Task类,可以轻松地管理这些任务,并利用await关键字在需要等待任务完成时暂停程序执行。

       最后,学习如何构建和利用Python的生产者消费者模型,可以参考相关资源。这种模型在处理大量数据流时特别有用,例如在网络编程、数据处理和并发计算场景中。