,文章

python 并发 ThreadPoolExecutor

ex = ThreadPoolExecutor(max_workers=20)#变量ex就是20线程

future = [ex.submit(down_pic,fenxi(toplist(url))) for url in urls]#ex.submit有2个参数,down_pic不要带括号和参数,逗号后边的参数就是一个可迭代对象用来给第一个参数使用的。

wait(future,return_when=ALL_COMPLETED)#多线程处理模块的wait,等待括号内的所有内容结束

看以上三点即可

本文实例讲述了Python线程池模块ThreadPoolExecutor用法。分享给大家供大家参考,具体如下:

python3内置的有Threadingpool和ThreadPoolExecutor模块,两个都可以做线程池,当然ThreadPoolExecutor会更好用一些,而且也有ProcessPoolExecutor进程池模块,使用方法基本一致。

首先导入模块

from concurrent.futures import ThreadPoolExecutor

使用方法很简单,最常用的可能就是map方法和submit+as_completed

注意,一定要使用with,而不要使用for,如果你一定要用for,那么一定要手动进行executor.shutdown,而你使用了with方法的话,再with方法内部已经实现了wait(),在使用完毕之后可以自行关闭线程池,减少资源浪费。

使用map

with ThreadPoolExecutor(max_workers=2) as executor:
  result = executor.map(map_fun, itr_arg)
  '''map_fun:你传入的要执行的map函数
    itr_arg:一个可迭代的参数,可以是列表字典等可迭代的对象
    基本上和python的map函数一样
    注意result并不是你map_fun返回的结果,而是一个生成器,如果要从中去结果,你可以使用列表生成式或者其他你想使用的方法
  '''
  for res in result:
    print(res) #这个res就是你map_fun返回的结果,你可以在这里做进一步处理

使用submit+as_completed也可以很灵活

with ThreadPoolExecutor(max_workers=2) as executor:
  future= executor.submit(fun, args)
  '''
  在这里你可以使用for循环来做,返回的是一个future对象
  future_list=[]
  for i in range(max_workers):
    future= executor.submit(fun, args[i])
    future_list.append(future)
  '''
  for res in ac_completed(futrue_list): #这个futrure_list是你future对象的列表
    print(res.result())        #循环遍历时用.result()来取返回值

两种方式差不多,都可以很好的实现多线程任务,切记一定使用with!

正文:
Executor是一个抽象类,子类:

ThreadPoolExecutor和ProcessPoolExecutor ,一个线程池,一个进程池.

future对象:在未来的某一时刻完成操作的对象.
submit方法可以返回一个future对象,此对象直接返回,等线程函数执行完后把return的数据再set_result到future对象中;

下面实现了submit, map 与 as_completed的差别 , 下面的例子中都没有使用with ,实际使用时需要调用shutdown , 或用with

#线程执行的函数
def add(n1,n2):
    v = n1 + n2
    print('add :', v , ', tid:',threading.currentThread().ident)
    time.sleep(n1)
    return v
#通过submit把需要执行的函数扔进线程池中.
#submit 直接返回一个future对象
ex = ThreadPoolExecutor(max_workers=3)      #制定最多运行N个线程
f1 = ex.submit(add,2,3)
f2 = ex.submit(add,2,2)
print('main thread running')
print(f1.done())                            #done 看看任务结束了没
print(f1.result())                          #获取结果 ,阻塞方法

注意 map 方法,返回是跟你提交序列是一致的. 是有序的

#下面是map 方法的简单使用.  注意:map 返回是一个生成器 ,并且是*有序的*
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    print('thread id:',threading.currentThread().ident,' 访问了:',url)
    return requests.get(url)            #这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)
res_iter = ex.map(get_html,URLS)        #内部迭代中, 每个url 开启一个线程
for res in res_iter:                    #此时将阻塞 , 直到线程完成或异常
    print('url:%s ,len: %d'%(res.url,len(res.text)))

接下来,使用as_completed . 这个函数为submit 而生, 为啥呢?

你总想通过一种办法来解决submit后啥时候完成的吧 , 而不是一次次调用future.done 或者 使用 future.result 吧.

concurrent.futures.as_completed(fs, timeout=None) 返回一个生成器,在迭代过程中会阻塞,

直到线程完成或者异常时,返回一个被set_result的Future对象.

同时注意, map方法返回是有序的, as_completed 是那个线程先完成/失败 就返回

#这是一个简单的 as_completed
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    time.sleep(3)
    print('thread id:',threading.currentThread().ident,' 访问了:',url)
    return requests.get(url)            #这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)
f = ex.submit(get_html,URLS[0])          #提交一个任务,放入线程池中,准备执行
print('main thread running')
for future in as_completed([f]):        #as_completed()接受一个可迭代的Future序列,返回一个生成器,在完成或异常时返回这个Future对象
    print('一个任务完成.')
    print(future.result())
#as_completed 完整的例子
#as_completed 返回一个生成器,用于迭代, 一旦一个线程完成(或失败) 就返回
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    time.sleep(1)
    print('thread id:',threading.currentThread().ident,' 访问了:',url)
    return requests.get(url)            #这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)   #最多3个线程
future_tasks = [ex.submit(get_html,url) for url in URLS]    #创建3个future对象
for future in as_completed(future_tasks):       #迭代生成器
    try:
        resp = future.result()
    except Exception as e:
        print('%s'%e)
    else:
        print('%s has %d bytes!'%(resp.url, len(resp.text)))
"""
thread id: 5160  访问了: http://www.baidu.com
thread id: 7752  访问了: http://www.sina.com.cn
thread id: 5928  访问了: http://www.qq.com
http://www.qq.com/ has 240668 bytes!
http://www.baidu.com/ has 2381 bytes!
https://www.sina.com.cn/ has 577244 bytes!
"""

wait 是阻塞函数,第一个参数和as_completed一样, 一个可迭代的future序列,返回一个元组 ,包含2个set , 一个完成的,一个未完成的

"""
wait 例子
参数:
    FIRST_COMPLETED    当任何未来完成或被取消时,该函数将返回。

    FIRST_EXCEPTION    当任何未来通过提出异常完成时,函数将返回。如果没有未来引发异常,那么它等同于 ALL_COMPLETED。

    ALL_COMPLETED(默认)      当所有future完成或被取消时,函数将返回。
"""
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    time.sleep(1)
    print('thread id:',threading.currentThread().ident,' 访问了:',url)
    return requests.get(url)            #这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)   #最多3个线程
future_tasks = [ex.submit(get_html,url) for url in URLS]    #创建3个future对象
try:
    result = wait(future_tasks,return_when = fu.FIRST_COMPLETED)
    done_set = result[0]
    for future in done_set:
        resp = future.result()
        print('第一个网页任务完成 url:%s , len:%d bytes! ' % (resp.url, len(resp.text)))
except Exception as e:
    print('exception :' , e)

最后说一下回调:add_done_callback(fn) , 回调函数是在调用线程完成后再调用的,在同一个线程中.

import os,sys,time,requests,threading
from concurrent import futures


URLS = [
        'http://baidu.com',
        'http://www.qq.com',
        'http://www.sina.com.cn'
        ]

def load_url(url):
    print('tid:',threading.currentThread().ident,',url:',url)
    with requests.get(url) as resp:
        return resp.content
def call_back(obj):
    print('->>>>>>>>>call_back , tid:',threading.currentThread().ident, ',obj:',obj)

with futures.ThreadPoolExecutor(max_workers=3) as ex:
    # mp = {ex.submit(load_url,url) : url for url in URLS}
    mp = dict()
    for url in URLS:
        f = ex.submit(load_url,url)
        mp[f] = url
        f.add_done_callback(call_back)
    for f in futures.as_completed(mp):
        url = mp[f]
        try:
            data = f.result()
        except Exception as exc:
            print(exc, ',url:',url)
        else:
            print('url:', url, ',len:',len(data),',data[:20]:',data[:20])
"""
tid: 7128 ,url: http://baidu.com
tid: 7892 ,url: http://www.qq.com
tid: 3712 ,url: http://www.sina.com.cn
->>>>>>>>>call_back , tid: 7892 ,obj: <Future at 0x2dd64b0 state=finished returned bytes>
url: http://www.qq.com ,len: 251215 ,data[:20]: b'<!DOCTYPE html>\n<htm'
->>>>>>>>>call_back , tid: 3712 ,obj: <Future at 0x2de07b0 state=finished returned bytes>
url: http://www.sina.com.cn ,len: 577333 ,data[:20]: b'<!DOCTYPE html>\n<!--'
->>>>>>>>>call_back , tid: 7128 ,obj: <Future at 0x2d533d0 state=finished returned bytes>
url: http://baidu.com ,len: 81 ,data[:20]: b'<html>\n<meta http-eq'
"""
原文来自:python 并发 ThreadPoolExecutor,尊重自己,尊重每一个人;转发请注明来源!
0 0

发表评论