一、单进程,单线程引起等待import requestsdef fetch_async(url): response = requests.get(url) return responseurl_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']for url in url_list: fetch_async(url) 多线程执行from concurrent.futures import ThreadPoolExecutorimport requestsdef fetch_async(url): response = requests.get(url) return responseurl_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']pool = ThreadPoolExecutor(5)for url in url_list: pool.submit(fetch_async,url)pool.shutdown(wait=True)多线程+回调函数执行from concurrent.futures import ThreadPoolExecutorimport requestsdef fetch_async(url): response = requests.get(url) return responsedef callback(future): print(future.result())url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']pool = ThreadPoolExecutor(5)for url in url_list: v = pool.submit(fetch_async,url) v.add_done_callback(callback)pool.shutdown(wait=True)多进程执行from concurrent.futures import ProcessPoolExecutorimport requestsdef fetch_async(url): response = requests.get(url) return responseif __name__ == '__main__': url_list = ['http://www.github.com', 'http://www.baidu.com', 'http://www.bing.com'] pool = ProcessPoolExecutor(5) for url in url_list: pool.submit(fetch_async, url) pool.shutdown(wait=True) 示例2:from concurrent.futures import ThreadPoolExecutorimport requestsurls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']def load_url(url): res = requests.get(url) print('{0} page is {1} bytes'.format(url, len(res.text)))executor = ThreadPoolExecutor(max_workers=3)for url in urls: future = executor.submit(load_url,url) print(future.done())print('主线程')结果:FalseFalseFalse主线程http://www.163.com page is 646107 byteshttps://www.baidu.com/ page is 2443 byteshttps://github.com/ page is 51203 bytes我们使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。由于线程池异步提交了任务,主线程并不会等待线程池里创建的线程执行完毕,所以执行了print('主线程'),相应的线程池中创建的线程并没有执行完毕,故future.done()返回结果为False 示例3from concurrent.futures import ThreadPoolExecutorimport requestsurls = urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']def load_url(url): res = requests.get(url) print('{0} page is {1} bytes'.format(url, len(res.text)))executor = ThreadPoolExecutor(max_workers=3)executor.map(load_url,urls) #同python内置的map函数用法一样print('主线程')示例4from concurrent.futures import ThreadPoolExecutor,wait,as_completedimport requestsurls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']def load_url(url): res = requests.get(url) print('{0} page is {1} bytes'.format(url, len(res.text)))executor = ThreadPoolExecutor(max_workers=3)f_list = []for url in urls: future = executor.submit(load_url, url) f_list.append(future)print(wait(f_list))print('主线程')注: wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程.示例5from concurrent.futures import ThreadPoolExecutor,wait,as_completedimport requestsurls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']def load_url(url): res = requests.get(url) print('{0} page is {1} bytes'.format(url, len(res.text)))executor = ThreadPoolExecutor(max_workers=3)f_list = []for url in urls: future = executor.submit(load_url, url) f_list.append(future)print(wait(f_list, return_when='FIRST_COMPLETED')) #如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成print('主线程')多进程+回调函数from concurrent.futures import ProcessPoolExecutorimport requestsdef fetch_async(url): response = requests.get(url) return responsedef callback(future): print(future.result())url_list = ['http://www.github.com', 'http://www.baidu.com', 'http://www.bing.com']if __name__ == '__main__': pool = ProcessPoolExecutor(5) for url in url_list: v = pool.submit(fetch_async, url) v.add_done_callback(callback) pool.shutdown(wait=True) 二、异步io库asyncio,asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO示例1import asyncio@asyncio.coroutinedef func1(): print('before...func1......') yield from asyncio.sleep(5) print('end...func1......')@asyncio.coroutinedef func2(): print('before...func2......') yield from asyncio.sleep(5) print('end...func2......')tasks = [func1(), func2()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))loop.close()示例2import asyncio@asyncio.coroutinedef fetch_async(host, url='/'): print(host, url) reader, writer = yield from asyncio.open_connection(host, 80) request_header_content = """GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n""".format(url, host) request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close()tasks = [ fetch_async('www.cnblogs.com', '/wupeiqi/'), fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()示例3import threadingimport asyncio@asyncio.coroutinedef hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(5) print('Hello again! (%s)' % threading.currentThread())loop = asyncio.get_event_loop()tasks = [hello(), hello()]loop.run_until_complete(asyncio.wait(tasks))loop.close()示例4:import timeimport asyncionow = lambda : time.time()async def do_some_work(x): #async关键字定义协程 print('Waiting: ', x) return 'Done after {0}'.format(x)def callback(future): print('Callback: ', future.result())start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)task.add_done_callback(callback)loop.run_until_complete(task)print('TIME: ', now() - start)aiohttp+asyncio模块import aiohttpimport asyncio@asyncio.coroutinedef fetch_async(url): print(url) response = yield from aiohttp.request('GET', url) print(url, response) response.close()tasks = [fetch_async('http://www.baidu.com'), fetch_async('http://www.chouti.com')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))event_loop.close()asyncio+requests模块import asyncioimport requests@asyncio.coroutinedef fetch_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) response = yield from future print(response.url, response.content)tasks = [ fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'), fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()gevent+requests模块import geventimport requestsfrom gevent import monkeymonkey.patch_all()def fetch_async(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url, response.content)gevent.joinall([ gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.sina.com/', req_kwargs={})])grequests模块import grequestsrequest_list = [ grequests.get('http://httpbin.org/delay/1',timeout=0.001), grequests.get('http://fakedomain/'), grequests.get('http://httpbin.org/status/500')]twisted模块from twisted.web.client import getPage, deferfrom twisted.internet import reactordef all_done(arg): reactor.stop()def callback(contents): print(contents)deferred_list = []url_list = ['http://www.bing.com', 'http://www.baidu.com',]for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) deferred.addCallback(callback) deferred_list.append(deferred)dlist = defer.DeferredList(deferred_list)dlist.addBoth(all_done)reactor.run()tornado模块from tornado.httpclient import AsyncHTTPClientfrom tornado.httpclient import HTTPRequestfrom tornado import ioloopdef handle_response(response): if response.error: print("Error:", response.error) else: print(response.body)def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response)ioloop.IOLoop.current().add_callback(func)ioloop.IOLoop.current().start()tornado更多from twisted.internet import reactorfrom twisted.web.client import getPageimport urllib.parsedef one_done(arg): print(arg) reactor.stop()post_data = urllib.parse.urlencode({'check_data': 'adf'})post_data = bytes(post_data, encoding='utf8')headers = {b'Content-Type': b'application/x-www-form-urlencoded'}response = getPage(bytes('http://dig.chouti.com/login',encoding='utf8'), method=bytes('POST', encoding='utf8'), postdata=post_data, cookies={}, headers=headers)response.addBoth(one_done)reactor.run()以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】
三、牛逼的select模块服务端例子import socketimport selectsk1 = socket.socket()sk1.bind(('0.0.0.0', 8001))sk1.listen()sk2 = socket.socket()sk2.bind(('0.0.0.0', 8002))sk2.listen()sk3 = socket.socket()sk3.bind(('0.0.0.0', 8003))sk3.listen()inputs = [sk1,sk2,sk3]while True: r_list, w_list, e_list = select.select(inputs,[],inputs,1) for sk in r_list: conn, address = sk.accept() conn.sendall(bytes('hello', encoding='utf-8')) conn.close() for sk in e_list: inputs.remove(sk) 解释: # select内部自动监听sk1,sk2,sk3三个对象,监听三个句柄是否发生变化,把发生变化的元素放 入r_list中。 # 如果有人连接sk1,则r_list = [sk1] # 如果有人连接sk1和sk2,则r_list = [sk1,sk2] # select中第1个参数表示inputs中发生变化的句柄放入r_list。 # select中第2个参数表示[]中的值原封不动的传递给w_list。 # select中第3个参数表示inputs中发生错误的句柄放入e_list。 # 参数1表示1秒监听一次 # 当有用户连接时,r_list里面的内容[] 客户端import socketobj = socket.socket()obj.connect(('127.0.0.1', 8001))content = str(obj.recv(1024), encoding='utf-8')print(content)obj.close()